1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8 AdoptSourceState,
9 RewriteMerged(StagedMergeResult),
10}
11
12#[derive(Debug)]
13struct StagedTable {
14 _dir: TempDir,
15 dataset: Dataset,
16}
17
18#[derive(Debug)]
19struct StagedMergeResult {
20 full_staged: StagedTable,
21 delta_staged: Option<StagedTable>,
22 deleted_ids: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
26struct CursorRow {
27 id: String,
28 signature: String,
29 dataset: Dataset,
30 batch: RecordBatch,
31 row_index: usize,
32}
33
34struct OrderedTableCursor {
35 stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
36 dataset: Option<Dataset>,
37 current_batch: Option<RecordBatch>,
38 current_row: usize,
39 peeked: Option<CursorRow>,
40}
41
42impl OrderedTableCursor {
43 async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
44 let dataset = match snapshot.entry(table_key) {
45 Some(_) => Some(snapshot.open(table_key).await?),
46 None => None,
47 };
48 Self::from_dataset(dataset).await
49 }
50
51 async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
52 let stream = if let Some(ds) = &dataset {
53 Some(Box::pin(
54 crate::table_store::TableStore::scan_stream_with(
55 ds,
56 None,
57 None,
58 Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
59 true,
60 |_| Ok(()),
61 )
62 .await?,
63 ))
64 } else {
65 None
66 };
67
68 Ok(Self {
69 stream,
70 dataset,
71 current_batch: None,
72 current_row: 0,
73 peeked: None,
74 })
75 }
76
77 async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
78 if self.peeked.is_none() {
79 self.peeked = self.next_row().await?;
80 }
81 Ok(self.peeked.clone())
82 }
83
84 async fn pop(&mut self) -> Result<Option<CursorRow>> {
85 if self.peeked.is_some() {
86 return Ok(self.peeked.take());
87 }
88 self.next_row().await
89 }
90
91 async fn next_row(&mut self) -> Result<Option<CursorRow>> {
92 loop {
93 if let Some(batch) = &self.current_batch {
94 if self.current_row < batch.num_rows() {
95 let row_index = self.current_row;
96 self.current_row += 1;
97 let dataset = self.dataset.clone().ok_or_else(|| {
98 OmniError::manifest("cursor row missing source dataset".to_string())
99 })?;
100 return Ok(Some(CursorRow {
101 id: row_id_at(batch, row_index)?,
102 signature: row_signature(batch, row_index)?,
103 dataset,
104 batch: batch.clone(),
105 row_index,
106 }));
107 }
108 }
109
110 let Some(stream) = self.stream.as_mut() else {
111 return Ok(None);
112 };
113 match stream.try_next().await {
114 Ok(Some(batch)) => {
115 self.current_batch = Some(batch);
116 self.current_row = 0;
117 }
118 Ok(None) => {
119 self.stream = None;
120 self.current_batch = None;
121 return Ok(None);
122 }
123 Err(err) => return Err(OmniError::Lance(err.to_string())),
124 }
125 }
126 }
127}
128
129struct StagedTableWriter {
130 schema: SchemaRef,
131 dataset_uri: String,
132 dir: TempDir,
133 dataset: Option<Dataset>,
134 buffered_rows: usize,
135 row_count: u64,
136 batches: Vec<RecordBatch>,
137}
138
139impl StagedTableWriter {
140 fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
141 let dir = merge_stage_tempdir(table_key)?;
142 let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
143 Ok(Self {
144 schema,
145 dataset_uri,
146 dir,
147 dataset: None,
148 buffered_rows: 0,
149 row_count: 0,
150 batches: Vec::new(),
151 })
152 }
153
154 async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
155 self.row_count += 1;
156 self.buffered_rows += 1;
157 self.batches.push(self.row_batch(row).await?);
158 if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
159 self.flush().await?;
160 }
161 Ok(())
162 }
163
164 async fn row_batch(&self, row: &CursorRow) -> Result<RecordBatch> {
165 let batch = row.batch.slice(row.row_index, 1);
166 let has_blob_columns = row
167 .dataset
168 .schema()
169 .fields_pre_order()
170 .any(|field| field.is_blob());
171 if has_blob_columns {
172 return crate::table_store::TableStore::materialize_blob_batch(&row.dataset, batch)
173 .await;
174 }
175 let columns = self
176 .schema
177 .fields()
178 .iter()
179 .map(|field| {
180 batch.column_by_name(field.name()).cloned().ok_or_else(|| {
181 OmniError::Lance(format!("batch missing column '{}'", field.name()))
182 })
183 })
184 .collect::<Result<Vec<_>>>()?;
185 RecordBatch::try_new(self.schema.clone(), columns)
186 .map_err(|e| OmniError::Lance(e.to_string()))
187 }
188
189 async fn finish(mut self) -> Result<StagedTable> {
190 self.flush().await?;
191 if self.dataset.is_none() {
192 self.dataset = Some(
193 crate::table_store::TableStore::create_empty_dataset(
194 &self.dataset_uri,
195 &self.schema,
196 )
197 .await?,
198 );
199 }
200 Ok(StagedTable {
201 _dir: self.dir,
202 dataset: self.dataset.unwrap(),
203 })
204 }
205
206 async fn flush(&mut self) -> Result<()> {
207 if self.batches.is_empty() {
208 return Ok(());
209 }
210
211 let batch = if self.batches.len() == 1 {
212 self.batches.pop().unwrap()
213 } else {
214 let batches = std::mem::take(&mut self.batches);
215 arrow_select::concat::concat_batches(&self.schema, &batches)
216 .map_err(|e| OmniError::Lance(e.to_string()))?
217 };
218 self.buffered_rows = 0;
219
220 let ds = crate::table_store::TableStore::append_or_create_batch(
221 &self.dataset_uri,
222 self.dataset.take(),
223 batch,
224 )
225 .await?;
226 self.dataset = Some(ds);
227 Ok(())
228 }
229}
230
231fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
232 if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
233 return TempDirBuilder::new()
234 .prefix(&format!(
235 "omnigraph-merge-{}-",
236 sanitize_table_key(table_key)
237 ))
238 .tempdir_in(PathBuf::from(root))
239 .map_err(OmniError::from);
240 }
241 TempDirBuilder::new()
242 .prefix(&format!(
243 "omnigraph-merge-{}-",
244 sanitize_table_key(table_key)
245 ))
246 .tempdir()
247 .map_err(OmniError::from)
248}
249
250fn sanitize_table_key(table_key: &str) -> String {
251 table_key
252 .chars()
253 .map(|ch| match ch {
254 ':' | '/' | '\\' => '-',
255 other => other,
256 })
257 .collect()
258}
259
260async fn compute_source_delta(
263 table_key: &str,
264 catalog: &Catalog,
265 base_snapshot: &Snapshot,
266 source_snapshot: &Snapshot,
267) -> Result<Option<StagedMergeResult>> {
268 let schema = schema_for_table_key(catalog, table_key)?;
269 let mut full_writer =
270 StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
271 let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
272 let mut deleted_ids: Vec<String> = Vec::new();
273 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
274 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
275
276 let mut needs_update = false;
277
278 loop {
279 let base_row = base.peek_cloned().await?;
280 let source_row = source.peek_cloned().await?;
281
282 let next_id = [base_row.as_ref(), source_row.as_ref()]
283 .into_iter()
284 .flatten()
285 .map(|row| row.id.clone())
286 .min();
287 let Some(next_id) = next_id else { break };
288
289 let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
290 base.pop().await?
291 } else {
292 None
293 };
294 let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
295 source.pop().await?
296 } else {
297 None
298 };
299
300 let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
301 let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
302
303 match (&base_row, &source_row) {
304 (Some(_), None) => {
305 deleted_ids.push(next_id);
307 needs_update = true;
308 }
309 (None, Some(src)) => {
310 full_writer.push_row(src).await?;
312 delta_writer.push_row(src).await?;
313 needs_update = true;
314 }
315 (Some(_), Some(src)) if source_sig != base_sig => {
316 full_writer.push_row(src).await?;
318 delta_writer.push_row(src).await?;
319 needs_update = true;
320 }
321 (Some(base), Some(_)) => {
322 full_writer.push_row(base).await?;
324 }
325 (None, None) => unreachable!(),
326 }
327 }
328
329 if !needs_update {
330 return Ok(None);
331 }
332
333 let delta_staged = if delta_writer.row_count > 0 {
334 Some(delta_writer.finish().await?)
335 } else {
336 None
337 };
338
339 Ok(Some(StagedMergeResult {
340 full_staged: full_writer.finish().await?,
341 delta_staged,
342 deleted_ids,
343 }))
344}
345
346fn min_cursor_id(
347 base_row: &Option<CursorRow>,
348 source_row: &Option<CursorRow>,
349 target_row: &Option<CursorRow>,
350) -> Option<String> {
351 [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
352 .into_iter()
353 .flatten()
354 .map(|row| row.id.clone())
355 .min()
356}
357
358async fn stage_streaming_table_merge(
359 table_key: &str,
360 catalog: &Catalog,
361 base_snapshot: &Snapshot,
362 source_snapshot: &Snapshot,
363 target_snapshot: &Snapshot,
364 conflicts: &mut Vec<MergeConflict>,
365) -> Result<Option<StagedMergeResult>> {
366 let schema = schema_for_table_key(catalog, table_key)?;
367 let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
368 let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
369 let mut deleted_ids: Vec<String> = Vec::new();
370 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
371 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
372 let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
373
374 let prior_conflict_count = conflicts.len();
375 let mut needs_update = false;
376
377 loop {
378 let base_row = base.peek_cloned().await?;
379 let source_row = source.peek_cloned().await?;
380 let target_row = target.peek_cloned().await?;
381 let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
382 break;
383 };
384
385 let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
386 base.pop().await?
387 } else {
388 None
389 };
390 let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
391 {
392 source.pop().await?
393 } else {
394 None
395 };
396 let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
397 {
398 target.pop().await?
399 } else {
400 None
401 };
402
403 let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
404 let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
405 let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
406
407 let source_changed = source_sig != base_sig;
408 let target_changed = target_sig != base_sig;
409
410 let selection = if !source_changed {
411 target_row.as_ref()
412 } else if !target_changed {
413 source_row.as_ref()
414 } else if source_sig == target_sig {
415 target_row.as_ref()
416 } else {
417 conflicts.push(classify_merge_conflict(
418 table_key, &next_id, base_sig, source_sig, target_sig,
419 ));
420 None
421 };
422
423 if conflicts.len() > prior_conflict_count {
424 continue;
425 }
426
427 if selection.is_none() && target_row.is_some() {
429 deleted_ids.push(next_id.clone());
430 needs_update = true;
431 continue;
432 }
433
434 if let Some(selection) = selection {
435 full_writer.push_row(selection).await?;
437 if selection.signature.as_str() != target_sig.unwrap_or("") {
439 delta_writer.push_row(selection).await?;
440 needs_update = true;
441 }
442 }
443 }
444
445 if conflicts.len() > prior_conflict_count {
446 return Ok(None);
447 }
448 if !needs_update {
449 return Ok(None);
450 }
451
452 let delta_staged = if delta_writer.row_count > 0 {
453 Some(delta_writer.finish().await?)
454 } else {
455 None
456 };
457
458 Ok(Some(StagedMergeResult {
459 full_staged: full_writer.finish().await?,
460 delta_staged,
461 deleted_ids,
462 }))
463}
464
465fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
466 if let Some(name) = table_key.strip_prefix("node:") {
467 return catalog
468 .node_types
469 .get(name)
470 .map(|t| t.arrow_schema.clone())
471 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
472 }
473 if let Some(name) = table_key.strip_prefix("edge:") {
474 return catalog
475 .edge_types
476 .get(name)
477 .map(|t| t.arrow_schema.clone())
478 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
479 }
480 Err(OmniError::manifest(format!(
481 "invalid table key '{}'",
482 table_key
483 )))
484}
485
486fn same_manifest_state(
487 left: Option<&crate::db::SubTableEntry>,
488 right: Option<&crate::db::SubTableEntry>,
489) -> bool {
490 match (left, right) {
491 (Some(left), Some(right)) => {
492 left.table_version == right.table_version && left.table_branch == right.table_branch
493 }
494 (None, None) => true,
495 _ => false,
496 }
497}
498
499fn classify_merge_conflict(
500 table_key: &str,
501 row_id: &str,
502 base_sig: Option<&str>,
503 source_sig: Option<&str>,
504 target_sig: Option<&str>,
505) -> MergeConflict {
506 let (kind, message) = match (base_sig, source_sig, target_sig) {
507 (None, Some(_), Some(_)) => (
508 MergeConflictKind::DivergentInsert,
509 format!("divergent insert for id '{}'", row_id),
510 ),
511 (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
512 MergeConflictKind::DeleteVsUpdate,
513 format!("delete/update conflict for id '{}'", row_id),
514 ),
515 _ => (
516 MergeConflictKind::DivergentUpdate,
517 format!("divergent update for id '{}'", row_id),
518 ),
519 };
520 MergeConflict {
521 table_key: table_key.to_string(),
522 row_id: Some(row_id.to_string()),
523 kind,
524 message,
525 }
526}
527
528fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
529 let mut values = Vec::with_capacity(batch.num_columns());
530 for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
531 if field.name().starts_with("_row") {
532 continue;
533 }
534 values.push(
535 array_value_to_string(column.as_ref(), row)
536 .map_err(|e| OmniError::Lance(e.to_string()))?,
537 );
538 }
539 Ok(values.join("\u{1f}"))
540}
541
542async fn scan_validation_stream(ds: &Dataset) -> Result<DatasetRecordBatchStream> {
543 crate::table_store::TableStore::scan_stream_with(ds, None, None, None, false, |_| Ok(())).await
544}
545
546async fn validate_merge_candidates(
547 db: &Omnigraph,
548 source_snapshot: &Snapshot,
549 target_snapshot: &Snapshot,
550 candidates: &HashMap<String, CandidateTableState>,
551) -> Result<()> {
552 let mut conflicts = Vec::new();
553 let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
554
555 for (type_name, node_type) in &db.catalog().node_types {
556 let table_key = format!("node:{}", type_name);
557 let mut values = HashSet::new();
558 let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
559
560 if let Some(ds) =
561 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
562 {
563 let mut stream = scan_validation_stream(&ds).await?;
564 while let Some(batch) = stream
565 .try_next()
566 .await
567 .map_err(|e| OmniError::Lance(e.to_string()))?
568 {
569 if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
570 conflicts.push(MergeConflict {
571 table_key: table_key.clone(),
572 row_id: None,
573 kind: MergeConflictKind::ValueConstraintViolation,
574 message: err.to_string(),
575 });
576 }
577 update_unique_constraints(
578 &table_key,
579 &batch,
580 &node_type.unique_constraints,
581 &mut unique_seen,
582 &mut conflicts,
583 )?;
584 let ids = batch
585 .column_by_name("id")
586 .ok_or_else(|| {
587 OmniError::manifest(format!("table {} missing id column", table_key))
588 })?
589 .as_any()
590 .downcast_ref::<StringArray>()
591 .ok_or_else(|| {
592 OmniError::manifest(format!("table {} id column is not Utf8", table_key))
593 })?;
594 for row in 0..ids.len() {
595 values.insert(ids.value(row).to_string());
596 }
597 }
598 }
599 node_ids.insert(type_name.clone(), values);
600 }
601
602 for (edge_name, edge_type) in &db.catalog().edge_types {
603 let table_key = format!("edge:{}", edge_name);
604 let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
605 let mut src_counts = HashMap::new();
606
607 if let Some(ds) =
608 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
609 {
610 let mut stream = scan_validation_stream(&ds).await?;
611 while let Some(batch) = stream
612 .try_next()
613 .await
614 .map_err(|e| OmniError::Lance(e.to_string()))?
615 {
616 update_unique_constraints(
617 &table_key,
618 &batch,
619 &edge_type.unique_constraints,
620 &mut unique_seen,
621 &mut conflicts,
622 )?;
623 accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
624 conflicts.extend(validate_orphan_edges_batch(
625 &table_key, edge_type, &batch, &node_ids,
626 )?);
627 }
628 }
629
630 conflicts.extend(finalize_edge_cardinality_conflicts(
631 &table_key,
632 edge_name,
633 edge_type.cardinality.min,
634 edge_type.cardinality.max,
635 src_counts,
636 ));
637 }
638
639 if conflicts.is_empty() {
640 Ok(())
641 } else {
642 Err(OmniError::MergeConflicts(conflicts))
643 }
644}
645
646async fn candidate_dataset(
647 source_snapshot: &Snapshot,
648 target_snapshot: &Snapshot,
649 candidates: &HashMap<String, CandidateTableState>,
650 table_key: &str,
651) -> Result<Option<Dataset>> {
652 if let Some(candidate) = candidates.get(table_key) {
653 return match candidate {
654 CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
655 Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
656 None => Ok(None),
657 },
658 CandidateTableState::RewriteMerged(staged) => {
659 Ok(Some(staged.full_staged.dataset.clone()))
660 }
661 };
662 }
663 match target_snapshot.entry(table_key) {
664 Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
665 None => Ok(None),
666 }
667}
668
669fn update_unique_constraints(
670 table_key: &str,
671 batch: &RecordBatch,
672 constraints: &[Vec<String>],
673 seen: &mut [HashMap<String, String>],
674 conflicts: &mut Vec<MergeConflict>,
675) -> Result<()> {
676 for (constraint_idx, columns) in constraints.iter().enumerate() {
677 let seen = &mut seen[constraint_idx];
678 for row in 0..batch.num_rows() {
679 let mut parts = Vec::with_capacity(columns.len());
680 let mut any_null = false;
681 for column_name in columns {
682 let column = batch.column_by_name(column_name).ok_or_else(|| {
683 OmniError::manifest(format!(
684 "table {} missing unique column '{}'",
685 table_key, column_name
686 ))
687 })?;
688 if column.is_null(row) {
689 any_null = true;
690 break;
691 }
692 parts.push(
693 array_value_to_string(column.as_ref(), row)
694 .map_err(|e| OmniError::Lance(e.to_string()))?,
695 );
696 }
697 if any_null {
698 continue;
699 }
700 let value = parts.join("|");
701 let row_id = row_id_at(batch, row)?;
702 if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
703 conflicts.push(MergeConflict {
704 table_key: table_key.to_string(),
705 row_id: Some(row_id.clone()),
706 kind: MergeConflictKind::UniqueViolation,
707 message: format!(
708 "unique constraint {:?} violated by '{}' and '{}'",
709 columns, first_row_id, row_id
710 ),
711 });
712 }
713 }
714 }
715 Ok(())
716}
717
718fn accumulate_edge_cardinality(
719 batch: &RecordBatch,
720 counts: &mut HashMap<String, u32>,
721 table_key: &str,
722) -> Result<()> {
723 let srcs = batch
724 .column_by_name("src")
725 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
726 .as_any()
727 .downcast_ref::<StringArray>()
728 .ok_or_else(|| {
729 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
730 })?;
731 for row in 0..srcs.len() {
732 *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
733 }
734 Ok(())
735}
736
737fn finalize_edge_cardinality_conflicts(
738 table_key: &str,
739 edge_name: &str,
740 min: u32,
741 max: Option<u32>,
742 counts: HashMap<String, u32>,
743) -> Vec<MergeConflict> {
744 let mut conflicts = Vec::new();
745 for (src, count) in counts {
746 if let Some(max) = max {
747 if count > max {
748 conflicts.push(MergeConflict {
749 table_key: table_key.to_string(),
750 row_id: None,
751 kind: MergeConflictKind::CardinalityViolation,
752 message: format!(
753 "@card violation on edge {}: source '{}' has {} edges (max {})",
754 edge_name, src, count, max
755 ),
756 });
757 }
758 }
759 if count < min {
760 conflicts.push(MergeConflict {
761 table_key: table_key.to_string(),
762 row_id: None,
763 kind: MergeConflictKind::CardinalityViolation,
764 message: format!(
765 "@card violation on edge {}: source '{}' has {} edges (min {})",
766 edge_name, src, count, min
767 ),
768 });
769 }
770 }
771 conflicts
772}
773
774fn validate_orphan_edges_batch(
775 table_key: &str,
776 edge_type: &omnigraph_compiler::catalog::EdgeType,
777 batch: &RecordBatch,
778 node_ids: &HashMap<String, HashSet<String>>,
779) -> Result<Vec<MergeConflict>> {
780 let srcs = batch
781 .column_by_name("src")
782 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
783 .as_any()
784 .downcast_ref::<StringArray>()
785 .ok_or_else(|| {
786 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
787 })?;
788 let dsts = batch
789 .column_by_name("dst")
790 .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
791 .as_any()
792 .downcast_ref::<StringArray>()
793 .ok_or_else(|| {
794 OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
795 })?;
796
797 let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
798 OmniError::manifest(format!(
799 "missing candidate node ids for {}",
800 edge_type.from_type
801 ))
802 })?;
803 let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
804 OmniError::manifest(format!(
805 "missing candidate node ids for {}",
806 edge_type.to_type
807 ))
808 })?;
809
810 let mut conflicts = Vec::new();
811 for row in 0..batch.num_rows() {
812 let row_id = row_id_at(batch, row)?;
813 let src = srcs.value(row);
814 let dst = dsts.value(row);
815 if !from_ids.contains(src) {
816 conflicts.push(MergeConflict {
817 table_key: table_key.to_string(),
818 row_id: Some(row_id.clone()),
819 kind: MergeConflictKind::OrphanEdge,
820 message: format!("src '{}' not found in {}", src, edge_type.from_type),
821 });
822 }
823 if !to_ids.contains(dst) {
824 conflicts.push(MergeConflict {
825 table_key: table_key.to_string(),
826 row_id: Some(row_id),
827 kind: MergeConflictKind::OrphanEdge,
828 message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
829 });
830 }
831 }
832 Ok(conflicts)
833}
834
835fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
836 let ids = batch
837 .column_by_name("id")
838 .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
839 .as_any()
840 .downcast_ref::<StringArray>()
841 .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
842 Ok(ids.value(row).to_string())
843}
844
845async fn publish_adopted_source_state(
846 target_db: &Omnigraph,
847 catalog: &Catalog,
848 base_snapshot: &Snapshot,
849 source_snapshot: &Snapshot,
850 target_snapshot: &Snapshot,
851 table_key: &str,
852) -> Result<crate::db::SubTableUpdate> {
853 let source_entry = source_snapshot
854 .entry(table_key)
855 .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
856 let target_entry = target_snapshot.entry(table_key);
857
858 let target_active = target_db.active_branch().await;
859 match (
860 target_active.as_deref(),
861 source_entry.table_branch.as_deref(),
862 ) {
863 (None, None) => Ok(crate::db::SubTableUpdate {
865 table_key: table_key.to_string(),
866 table_version: source_entry.table_version,
867 table_branch: None,
868 row_count: source_entry.row_count,
869 version_metadata: source_entry.version_metadata.clone(),
870 }),
871 (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
874 table_key: table_key.to_string(),
875 table_version: source_entry.table_version,
876 table_branch: None,
877 row_count: source_entry.row_count,
878 version_metadata: source_entry.version_metadata.clone(),
879 }),
880 (None, Some(_source_branch)) => {
882 let delta =
883 compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
884 match delta {
885 Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
886 None => Ok(crate::db::SubTableUpdate {
887 table_key: table_key.to_string(),
888 table_version: target_entry
889 .map(|e| e.table_version)
890 .unwrap_or(source_entry.table_version),
891 table_branch: None,
892 row_count: source_entry.row_count,
893 version_metadata: target_entry
894 .map(|entry| entry.version_metadata.clone())
895 .unwrap_or_else(|| source_entry.version_metadata.clone()),
896 }),
897 }
898 }
899 (Some(target_branch), Some(source_branch)) => {
901 if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
902 let delta =
904 compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
905 .await?;
906 match delta {
907 Some(staged) => {
908 publish_rewritten_merge_table(target_db, table_key, &staged).await
909 }
910 None => Ok(crate::db::SubTableUpdate {
911 table_key: table_key.to_string(),
912 table_version: target_entry.unwrap().table_version,
913 table_branch: Some(target_branch.to_string()),
914 row_count: source_entry.row_count,
915 version_metadata: target_entry.unwrap().version_metadata.clone(),
916 }),
917 }
918 } else {
919 let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
922 let ds = target_db
923 .fork_dataset_from_entry_state(
924 table_key,
925 &full_path,
926 Some(source_branch),
927 source_entry.table_version,
928 target_branch,
929 )
930 .await?;
931 let state = target_db.table_store().table_state(&full_path, &ds).await?;
932 Ok(crate::db::SubTableUpdate {
933 table_key: table_key.to_string(),
934 table_version: state.version,
935 table_branch: Some(target_branch.to_string()),
936 row_count: state.row_count,
937 version_metadata: state.version_metadata,
938 })
939 }
940 }
941 }
942}
943
944async fn publish_rewritten_merge_table(
945 target_db: &Omnigraph,
946 table_key: &str,
947 staged: &StagedMergeResult,
948) -> Result<crate::db::SubTableUpdate> {
949 let (ds, full_path, table_branch) = target_db
954 .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
955 .await?;
956 let mut current_ds = ds;
957
958 if let Some(delta) = &staged.delta_staged {
968 let batches: Vec<RecordBatch> = target_db
969 .table_store()
970 .scan_batches_for_rewrite(&delta.dataset)
971 .await?
972 .into_iter()
973 .filter(|batch| batch.num_rows() > 0)
974 .collect();
975 if !batches.is_empty() {
976 let combined = if batches.len() == 1 {
978 batches.into_iter().next().unwrap()
979 } else {
980 let schema = batches[0].schema();
981 arrow_select::concat::concat_batches(&schema, &batches)
982 .map_err(|e| OmniError::Lance(e.to_string()))?
983 };
984 let staged_merge = target_db
985 .table_store()
986 .stage_merge_insert(
987 current_ds.clone(),
988 combined,
989 vec!["id".to_string()],
990 lance::dataset::WhenMatched::UpdateAll,
991 lance::dataset::WhenNotMatched::InsertAll,
992 )
993 .await?;
994 current_ds = target_db
995 .table_store()
996 .commit_staged(Arc::new(current_ds), staged_merge.transaction)
997 .await?;
998 }
999 }
1000
1001 if !staged.deleted_ids.is_empty() {
1011 let escaped: Vec<String> = staged
1012 .deleted_ids
1013 .iter()
1014 .map(|id| format!("'{}'", id.replace('\'', "''")))
1015 .collect();
1016 let filter = format!("id IN ({})", escaped.join(", "));
1017 target_db
1018 .table_store()
1019 .delete_where(&full_path, &mut current_ds, &filter)
1020 .await?;
1021 }
1022
1023 let row_count = target_db
1031 .table_store()
1032 .table_state(&full_path, ¤t_ds)
1033 .await?
1034 .row_count;
1035 if row_count > 0 {
1036 target_db
1037 .build_indices_on_dataset(table_key, &mut current_ds)
1038 .await?;
1039 }
1040 let final_state = target_db
1041 .table_store()
1042 .table_state(&full_path, ¤t_ds)
1043 .await?;
1044
1045 Ok(crate::db::SubTableUpdate {
1046 table_key: table_key.to_string(),
1047 table_version: final_state.version,
1048 table_branch,
1049 row_count: final_state.row_count,
1050 version_metadata: final_state.version_metadata,
1051 })
1052}
1053
1054impl Omnigraph {
1055 pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
1056 self.branch_merge_as(source, target, None).await
1057 }
1058
1059 pub async fn branch_merge_as(
1060 &self,
1061 source: &str,
1062 target: &str,
1063 actor_id: Option<&str>,
1064 ) -> Result<MergeOutcome> {
1065 self.enforce(
1073 omnigraph_policy::PolicyAction::BranchMerge,
1074 &omnigraph_policy::ResourceScope::BranchTransition {
1075 source: source.to_string(),
1076 target: target.to_string(),
1077 },
1078 actor_id,
1079 )?;
1080 self.ensure_schema_apply_idle("branch_merge").await?;
1081 self.branch_merge_impl(source, target, actor_id).await
1082 }
1083
1084 async fn branch_merge_impl(
1085 &self,
1086 source: &str,
1087 target: &str,
1088 actor_id: Option<&str>,
1089 ) -> Result<MergeOutcome> {
1090 if is_internal_system_branch(source) || is_internal_system_branch(target) {
1091 return Err(OmniError::manifest(format!(
1092 "branch_merge does not allow internal system refs ('{}' -> '{}')",
1093 source, target
1094 )));
1095 }
1096 let source_branch = Omnigraph::normalize_branch_name(source)?;
1097 let target_branch = Omnigraph::normalize_branch_name(target)?;
1098 if source_branch == target_branch {
1099 return Err(OmniError::manifest(
1100 "branch_merge requires distinct source and target branches".to_string(),
1101 ));
1102 }
1103
1104 let source_head_commit_id = self
1105 .head_commit_id_for_branch(source_branch.as_deref())
1106 .await?
1107 .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1108 let target_head_commit_id = self
1109 .head_commit_id_for_branch(target_branch.as_deref())
1110 .await?
1111 .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1112 let base_commit = CommitGraph::merge_base(
1113 self.uri(),
1114 source_branch.as_deref(),
1115 target_branch.as_deref(),
1116 )
1117 .await?
1118 .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1119
1120 if source_head_commit_id == target_head_commit_id
1121 || base_commit.graph_commit_id == source_head_commit_id
1122 {
1123 return Ok(MergeOutcome::AlreadyUpToDate);
1124 }
1125 let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1126
1127 let base_snapshot = ManifestCoordinator::snapshot_at(
1128 self.uri(),
1129 base_commit.manifest_branch.as_deref(),
1130 base_commit.manifest_version,
1131 )
1132 .await?;
1133 let source_snapshot = self
1134 .resolved_target(ReadTarget::Branch(
1135 source_branch.clone().unwrap_or_else(|| "main".to_string()),
1136 ))
1137 .await?
1138 .snapshot;
1139 let merge_exclusive = self.merge_exclusive();
1147 let _merge_guard = merge_exclusive.lock().await;
1148
1149 let previous_branch = self.active_branch().await;
1150 let previous = self
1151 .swap_coordinator_for_branch(target_branch.as_deref())
1152 .await?;
1153 let merge_result = self
1154 .branch_merge_on_current_target(
1155 &base_snapshot,
1156 &source_snapshot,
1157 &target_head_commit_id,
1158 &source_head_commit_id,
1159 is_fast_forward,
1160 actor_id,
1161 )
1162 .await;
1163 self.restore_coordinator(previous).await;
1164
1165 if previous_branch == target_branch {
1200 if let Err(refresh_err) = self.refresh_coordinator_only().await {
1201 if merge_result.is_ok() {
1202 return Err(refresh_err);
1203 }
1204 tracing::warn!(
1205 error = %refresh_err,
1206 "post-merge coordinator refresh failed on the error path; \
1207 the next op or open will re-sync"
1208 );
1209 }
1210 }
1211
1212 merge_result
1213 }
1214
1215 async fn branch_merge_on_current_target(
1216 &self,
1217 base_snapshot: &Snapshot,
1218 source_snapshot: &Snapshot,
1219 target_head_commit_id: &str,
1220 source_head_commit_id: &str,
1221 is_fast_forward: bool,
1222 actor_id: Option<&str>,
1223 ) -> Result<MergeOutcome> {
1224 self.ensure_commit_graph_initialized().await?;
1225 let target_snapshot = self.snapshot().await;
1226
1227 let mut table_keys = HashSet::new();
1228 for entry in base_snapshot.entries() {
1229 table_keys.insert(entry.table_key.clone());
1230 }
1231 for entry in source_snapshot.entries() {
1232 table_keys.insert(entry.table_key.clone());
1233 }
1234 for entry in target_snapshot.entries() {
1235 table_keys.insert(entry.table_key.clone());
1236 }
1237
1238 let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1239 ordered_table_keys.sort();
1240
1241 let mut conflicts = Vec::new();
1242 let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1243
1244 for table_key in &ordered_table_keys {
1245 let base_entry = base_snapshot.entry(table_key);
1246 let source_entry = source_snapshot.entry(table_key);
1247 let target_entry = target_snapshot.entry(table_key);
1248 if same_manifest_state(source_entry, target_entry) {
1249 continue;
1250 }
1251 if same_manifest_state(base_entry, source_entry) {
1252 continue;
1253 }
1254 if same_manifest_state(base_entry, target_entry) {
1255 candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
1256 continue;
1257 }
1258
1259 if let Some(staged) = stage_streaming_table_merge(
1260 table_key,
1261 &self.catalog(),
1262 base_snapshot,
1263 source_snapshot,
1264 &target_snapshot,
1265 &mut conflicts,
1266 )
1267 .await?
1268 {
1269 candidates.insert(
1270 table_key.clone(),
1271 CandidateTableState::RewriteMerged(staged),
1272 );
1273 }
1274 }
1275
1276 if !conflicts.is_empty() {
1277 return Err(OmniError::MergeConflicts(conflicts));
1278 }
1279
1280 validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1281
1282 let active_branch_for_keys = self.active_branch().await;
1320 let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
1321 .iter()
1322 .filter(|table_key| {
1323 matches!(
1324 candidates.get(*table_key),
1325 Some(CandidateTableState::RewriteMerged(_))
1326 | Some(CandidateTableState::AdoptSourceState)
1327 )
1328 })
1329 .map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
1330 .collect();
1331 let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
1332
1333 let post_queue_snapshot = self.snapshot().await;
1334 for table_key in &ordered_table_keys {
1335 let Some(candidate) = candidates.get(table_key) else {
1336 continue;
1337 };
1338 if !matches!(
1339 candidate,
1340 CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState
1341 ) {
1342 continue;
1343 }
1344 let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
1345 let current = post_queue_snapshot
1346 .entry(table_key)
1347 .map(|e| e.table_version);
1348 if expected != current {
1349 return Err(OmniError::manifest_expected_version_mismatch(
1350 table_key.clone(),
1351 expected.unwrap_or(0),
1352 current.unwrap_or(0),
1353 ));
1354 }
1355 }
1356
1357 let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
1358 .iter()
1359 .filter_map(|table_key| {
1360 let candidate = candidates.get(table_key)?;
1361 if !matches!(candidate, CandidateTableState::RewriteMerged(_)) {
1362 return None;
1363 }
1364 let entry = target_snapshot.entry(table_key)?;
1365 Some(crate::db::manifest::SidecarTablePin {
1366 table_key: table_key.clone(),
1367 table_path: self.table_store().dataset_uri(&entry.table_path),
1368 expected_version: entry.table_version,
1369 post_commit_pin: entry.table_version + 1,
1370 table_branch: active_branch_for_keys.clone(),
1383 })
1384 })
1385 .collect();
1386 let recovery_handle = if recovery_pins.is_empty() {
1387 None
1388 } else {
1389 let target_branch = active_branch_for_keys.clone();
1399 let mut sidecar = crate::db::manifest::new_sidecar(
1400 crate::db::manifest::SidecarKind::BranchMerge,
1401 target_branch,
1402 actor_id.map(str::to_string),
1403 recovery_pins,
1404 );
1405 sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
1411 Some(
1412 crate::db::manifest::write_sidecar(
1413 self.root_uri(),
1414 self.storage_adapter(),
1415 &sidecar,
1416 )
1417 .await?,
1418 )
1419 };
1420
1421 let mut updates = Vec::new();
1422 let mut changed_edge_tables = false;
1423 for table_key in &ordered_table_keys {
1424 let Some(candidate_state) = candidates.get(table_key) else {
1425 continue;
1426 };
1427 let update = match candidate_state {
1428 CandidateTableState::AdoptSourceState => {
1429 publish_adopted_source_state(
1430 self,
1431 &self.catalog(),
1432 base_snapshot,
1433 source_snapshot,
1434 &target_snapshot,
1435 table_key,
1436 )
1437 .await?
1438 }
1439 CandidateTableState::RewriteMerged(staged) => {
1440 publish_rewritten_merge_table(self, table_key, staged).await?
1441 }
1442 };
1443 if table_key.starts_with("edge:") {
1444 changed_edge_tables = true;
1445 }
1446 updates.push(update);
1447 }
1448
1449 crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
1454
1455 let manifest_version = if updates.is_empty() {
1456 self.version().await
1457 } else {
1458 self.commit_manifest_updates(&updates).await?
1459 };
1460
1461 if let Some(handle) = recovery_handle {
1465 if let Err(err) =
1466 crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
1467 {
1468 tracing::warn!(
1469 error = %err,
1470 operation_id = handle.operation_id.as_str(),
1471 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
1472 );
1473 }
1474 }
1475 self.record_merge_commit(
1476 manifest_version,
1477 target_head_commit_id,
1478 source_head_commit_id,
1479 actor_id,
1480 )
1481 .await?;
1482
1483 if changed_edge_tables {
1484 self.invalidate_graph_index().await;
1485 }
1486
1487 Ok(if is_fast_forward {
1488 MergeOutcome::FastForward
1489 } else {
1490 MergeOutcome::Merged
1491 })
1492 }
1493}