1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8 AdoptSourceState,
9 RewriteMerged(StagedMergeResult),
10}
11
12#[derive(Debug)]
13struct StagedTable {
14 _dir: TempDir,
15 dataset: Dataset,
16}
17
18#[derive(Debug)]
19struct StagedMergeResult {
20 full_staged: StagedTable,
21 delta_staged: Option<StagedTable>,
22 deleted_ids: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
26struct CursorRow {
27 id: String,
28 signature: String,
29 batch: RecordBatch,
30 row_index: usize,
31}
32
33struct OrderedTableCursor {
34 stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
35 current_batch: Option<RecordBatch>,
36 current_row: usize,
37 peeked: Option<CursorRow>,
38}
39
40impl OrderedTableCursor {
41 async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
42 let dataset = match snapshot.entry(table_key) {
43 Some(_) => Some(snapshot.open(table_key).await?),
44 None => None,
45 };
46 Self::from_dataset(dataset).await
47 }
48
49 async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
50 let stream = if let Some(ds) = dataset {
51 Some(Box::pin(
52 crate::table_store::TableStore::scan_stream(
53 &ds,
54 None,
55 None,
56 Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
57 false,
58 )
59 .await?,
60 ))
61 } else {
62 None
63 };
64
65 Ok(Self {
66 stream,
67 current_batch: None,
68 current_row: 0,
69 peeked: None,
70 })
71 }
72
73 async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
74 if self.peeked.is_none() {
75 self.peeked = self.next_row().await?;
76 }
77 Ok(self.peeked.clone())
78 }
79
80 async fn pop(&mut self) -> Result<Option<CursorRow>> {
81 if self.peeked.is_some() {
82 return Ok(self.peeked.take());
83 }
84 self.next_row().await
85 }
86
87 async fn next_row(&mut self) -> Result<Option<CursorRow>> {
88 loop {
89 if let Some(batch) = &self.current_batch {
90 if self.current_row < batch.num_rows() {
91 let row_index = self.current_row;
92 self.current_row += 1;
93 return Ok(Some(CursorRow {
94 id: row_id_at(batch, row_index)?,
95 signature: row_signature(batch, row_index)?,
96 batch: batch.clone(),
97 row_index,
98 }));
99 }
100 }
101
102 let Some(stream) = self.stream.as_mut() else {
103 return Ok(None);
104 };
105 match stream.try_next().await {
106 Ok(Some(batch)) => {
107 self.current_batch = Some(batch);
108 self.current_row = 0;
109 }
110 Ok(None) => {
111 self.stream = None;
112 self.current_batch = None;
113 return Ok(None);
114 }
115 Err(err) => return Err(OmniError::Lance(err.to_string())),
116 }
117 }
118 }
119}
120
121struct StagedTableWriter {
122 schema: SchemaRef,
123 dataset_uri: String,
124 dir: TempDir,
125 dataset: Option<Dataset>,
126 buffered_rows: usize,
127 row_count: u64,
128 batches: Vec<RecordBatch>,
129}
130
131impl StagedTableWriter {
132 fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
133 let dir = merge_stage_tempdir(table_key)?;
134 let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
135 Ok(Self {
136 schema,
137 dataset_uri,
138 dir,
139 dataset: None,
140 buffered_rows: 0,
141 row_count: 0,
142 batches: Vec::new(),
143 })
144 }
145
146 async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
147 self.row_count += 1;
148 self.buffered_rows += 1;
149 self.batches.push(row.batch.slice(row.row_index, 1));
150 if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
151 self.flush().await?;
152 }
153 Ok(())
154 }
155
156 async fn finish(mut self) -> Result<StagedTable> {
157 self.flush().await?;
158 if self.dataset.is_none() {
159 self.dataset = Some(
160 crate::table_store::TableStore::create_empty_dataset(
161 &self.dataset_uri,
162 &self.schema,
163 )
164 .await?,
165 );
166 }
167 Ok(StagedTable {
168 _dir: self.dir,
169 dataset: self.dataset.unwrap(),
170 })
171 }
172
173 async fn flush(&mut self) -> Result<()> {
174 if self.batches.is_empty() {
175 return Ok(());
176 }
177
178 let batch = if self.batches.len() == 1 {
179 self.batches.pop().unwrap()
180 } else {
181 let batches = std::mem::take(&mut self.batches);
182 arrow_select::concat::concat_batches(&self.schema, &batches)
183 .map_err(|e| OmniError::Lance(e.to_string()))?
184 };
185 self.buffered_rows = 0;
186
187 let ds = crate::table_store::TableStore::append_or_create_batch(
188 &self.dataset_uri,
189 self.dataset.take(),
190 batch,
191 )
192 .await?;
193 self.dataset = Some(ds);
194 Ok(())
195 }
196}
197
198fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
199 if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
200 return TempDirBuilder::new()
201 .prefix(&format!(
202 "omnigraph-merge-{}-",
203 sanitize_table_key(table_key)
204 ))
205 .tempdir_in(PathBuf::from(root))
206 .map_err(OmniError::from);
207 }
208 TempDirBuilder::new()
209 .prefix(&format!(
210 "omnigraph-merge-{}-",
211 sanitize_table_key(table_key)
212 ))
213 .tempdir()
214 .map_err(OmniError::from)
215}
216
217fn sanitize_table_key(table_key: &str) -> String {
218 table_key
219 .chars()
220 .map(|ch| match ch {
221 ':' | '/' | '\\' => '-',
222 other => other,
223 })
224 .collect()
225}
226
227async fn compute_source_delta(
230 table_key: &str,
231 catalog: &Catalog,
232 base_snapshot: &Snapshot,
233 source_snapshot: &Snapshot,
234) -> Result<Option<StagedMergeResult>> {
235 let schema = schema_for_table_key(catalog, table_key)?;
236 let mut full_writer =
237 StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
238 let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
239 let mut deleted_ids: Vec<String> = Vec::new();
240 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
241 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
242
243 let mut needs_update = false;
244
245 loop {
246 let base_row = base.peek_cloned().await?;
247 let source_row = source.peek_cloned().await?;
248
249 let next_id = [base_row.as_ref(), source_row.as_ref()]
250 .into_iter()
251 .flatten()
252 .map(|row| row.id.clone())
253 .min();
254 let Some(next_id) = next_id else { break };
255
256 let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
257 base.pop().await?
258 } else {
259 None
260 };
261 let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
262 source.pop().await?
263 } else {
264 None
265 };
266
267 let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
268 let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
269
270 match (&base_row, &source_row) {
271 (Some(_), None) => {
272 deleted_ids.push(next_id);
274 needs_update = true;
275 }
276 (None, Some(src)) => {
277 full_writer.push_row(src).await?;
279 delta_writer.push_row(src).await?;
280 needs_update = true;
281 }
282 (Some(_), Some(src)) if source_sig != base_sig => {
283 full_writer.push_row(src).await?;
285 delta_writer.push_row(src).await?;
286 needs_update = true;
287 }
288 (Some(base), Some(_)) => {
289 full_writer.push_row(base).await?;
291 }
292 (None, None) => unreachable!(),
293 }
294 }
295
296 if !needs_update {
297 return Ok(None);
298 }
299
300 let delta_staged = if delta_writer.row_count > 0 {
301 Some(delta_writer.finish().await?)
302 } else {
303 None
304 };
305
306 Ok(Some(StagedMergeResult {
307 full_staged: full_writer.finish().await?,
308 delta_staged,
309 deleted_ids,
310 }))
311}
312
313fn min_cursor_id(
314 base_row: &Option<CursorRow>,
315 source_row: &Option<CursorRow>,
316 target_row: &Option<CursorRow>,
317) -> Option<String> {
318 [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
319 .into_iter()
320 .flatten()
321 .map(|row| row.id.clone())
322 .min()
323}
324
325async fn stage_streaming_table_merge(
326 table_key: &str,
327 catalog: &Catalog,
328 base_snapshot: &Snapshot,
329 source_snapshot: &Snapshot,
330 target_snapshot: &Snapshot,
331 conflicts: &mut Vec<MergeConflict>,
332) -> Result<Option<StagedMergeResult>> {
333 let schema = schema_for_table_key(catalog, table_key)?;
334 let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
335 let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
336 let mut deleted_ids: Vec<String> = Vec::new();
337 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
338 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
339 let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
340
341 let prior_conflict_count = conflicts.len();
342 let mut needs_update = false;
343
344 loop {
345 let base_row = base.peek_cloned().await?;
346 let source_row = source.peek_cloned().await?;
347 let target_row = target.peek_cloned().await?;
348 let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
349 break;
350 };
351
352 let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
353 base.pop().await?
354 } else {
355 None
356 };
357 let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
358 {
359 source.pop().await?
360 } else {
361 None
362 };
363 let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
364 {
365 target.pop().await?
366 } else {
367 None
368 };
369
370 let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
371 let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
372 let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
373
374 let source_changed = source_sig != base_sig;
375 let target_changed = target_sig != base_sig;
376
377 let selection = if !source_changed {
378 target_row.as_ref()
379 } else if !target_changed {
380 source_row.as_ref()
381 } else if source_sig == target_sig {
382 target_row.as_ref()
383 } else {
384 conflicts.push(classify_merge_conflict(
385 table_key, &next_id, base_sig, source_sig, target_sig,
386 ));
387 None
388 };
389
390 if conflicts.len() > prior_conflict_count {
391 continue;
392 }
393
394 if selection.is_none() && target_row.is_some() {
396 deleted_ids.push(next_id.clone());
397 needs_update = true;
398 continue;
399 }
400
401 if let Some(selection) = selection {
402 full_writer.push_row(selection).await?;
404 if selection.signature.as_str() != target_sig.unwrap_or("") {
406 delta_writer.push_row(selection).await?;
407 needs_update = true;
408 }
409 }
410 }
411
412 if conflicts.len() > prior_conflict_count {
413 return Ok(None);
414 }
415 if !needs_update {
416 return Ok(None);
417 }
418
419 let delta_staged = if delta_writer.row_count > 0 {
420 Some(delta_writer.finish().await?)
421 } else {
422 None
423 };
424
425 Ok(Some(StagedMergeResult {
426 full_staged: full_writer.finish().await?,
427 delta_staged,
428 deleted_ids,
429 }))
430}
431
432fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
433 if let Some(name) = table_key.strip_prefix("node:") {
434 return catalog
435 .node_types
436 .get(name)
437 .map(|t| t.arrow_schema.clone())
438 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
439 }
440 if let Some(name) = table_key.strip_prefix("edge:") {
441 return catalog
442 .edge_types
443 .get(name)
444 .map(|t| t.arrow_schema.clone())
445 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
446 }
447 Err(OmniError::manifest(format!(
448 "invalid table key '{}'",
449 table_key
450 )))
451}
452
453fn same_manifest_state(
454 left: Option<&crate::db::SubTableEntry>,
455 right: Option<&crate::db::SubTableEntry>,
456) -> bool {
457 match (left, right) {
458 (Some(left), Some(right)) => {
459 left.table_version == right.table_version && left.table_branch == right.table_branch
460 }
461 (None, None) => true,
462 _ => false,
463 }
464}
465
466fn classify_merge_conflict(
467 table_key: &str,
468 row_id: &str,
469 base_sig: Option<&str>,
470 source_sig: Option<&str>,
471 target_sig: Option<&str>,
472) -> MergeConflict {
473 let (kind, message) = match (base_sig, source_sig, target_sig) {
474 (None, Some(_), Some(_)) => (
475 MergeConflictKind::DivergentInsert,
476 format!("divergent insert for id '{}'", row_id),
477 ),
478 (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
479 MergeConflictKind::DeleteVsUpdate,
480 format!("delete/update conflict for id '{}'", row_id),
481 ),
482 _ => (
483 MergeConflictKind::DivergentUpdate,
484 format!("divergent update for id '{}'", row_id),
485 ),
486 };
487 MergeConflict {
488 table_key: table_key.to_string(),
489 row_id: Some(row_id.to_string()),
490 kind,
491 message,
492 }
493}
494
495fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
496 let mut values = Vec::with_capacity(batch.num_columns());
497 for column in batch.columns() {
498 values.push(
499 array_value_to_string(column.as_ref(), row)
500 .map_err(|e| OmniError::Lance(e.to_string()))?,
501 );
502 }
503 Ok(values.join("\u{1f}"))
504}
505
506async fn validate_merge_candidates(
507 db: &Omnigraph,
508 source_snapshot: &Snapshot,
509 target_snapshot: &Snapshot,
510 candidates: &HashMap<String, CandidateTableState>,
511) -> Result<()> {
512 let mut conflicts = Vec::new();
513 let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
514
515 for (type_name, node_type) in &db.catalog().node_types {
516 let table_key = format!("node:{}", type_name);
517 let mut values = HashSet::new();
518 let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
519
520 if let Some(ds) =
521 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
522 {
523 let mut stream =
524 crate::table_store::TableStore::scan_stream(&ds, None, None, None, false).await?;
525 while let Some(batch) = stream
526 .try_next()
527 .await
528 .map_err(|e| OmniError::Lance(e.to_string()))?
529 {
530 if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
531 conflicts.push(MergeConflict {
532 table_key: table_key.clone(),
533 row_id: None,
534 kind: MergeConflictKind::ValueConstraintViolation,
535 message: err.to_string(),
536 });
537 }
538 update_unique_constraints(
539 &table_key,
540 &batch,
541 &node_type.unique_constraints,
542 &mut unique_seen,
543 &mut conflicts,
544 )?;
545 let ids = batch
546 .column_by_name("id")
547 .ok_or_else(|| {
548 OmniError::manifest(format!("table {} missing id column", table_key))
549 })?
550 .as_any()
551 .downcast_ref::<StringArray>()
552 .ok_or_else(|| {
553 OmniError::manifest(format!("table {} id column is not Utf8", table_key))
554 })?;
555 for row in 0..ids.len() {
556 values.insert(ids.value(row).to_string());
557 }
558 }
559 }
560 node_ids.insert(type_name.clone(), values);
561 }
562
563 for (edge_name, edge_type) in &db.catalog().edge_types {
564 let table_key = format!("edge:{}", edge_name);
565 let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
566 let mut src_counts = HashMap::new();
567
568 if let Some(ds) =
569 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
570 {
571 let mut stream =
572 crate::table_store::TableStore::scan_stream(&ds, None, None, None, false).await?;
573 while let Some(batch) = stream
574 .try_next()
575 .await
576 .map_err(|e| OmniError::Lance(e.to_string()))?
577 {
578 update_unique_constraints(
579 &table_key,
580 &batch,
581 &edge_type.unique_constraints,
582 &mut unique_seen,
583 &mut conflicts,
584 )?;
585 accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
586 conflicts.extend(validate_orphan_edges_batch(
587 &table_key, edge_type, &batch, &node_ids,
588 )?);
589 }
590 }
591
592 conflicts.extend(finalize_edge_cardinality_conflicts(
593 &table_key,
594 edge_name,
595 edge_type.cardinality.min,
596 edge_type.cardinality.max,
597 src_counts,
598 ));
599 }
600
601 if conflicts.is_empty() {
602 Ok(())
603 } else {
604 Err(OmniError::MergeConflicts(conflicts))
605 }
606}
607
608async fn candidate_dataset(
609 source_snapshot: &Snapshot,
610 target_snapshot: &Snapshot,
611 candidates: &HashMap<String, CandidateTableState>,
612 table_key: &str,
613) -> Result<Option<Dataset>> {
614 if let Some(candidate) = candidates.get(table_key) {
615 return match candidate {
616 CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
617 Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
618 None => Ok(None),
619 },
620 CandidateTableState::RewriteMerged(staged) => {
621 Ok(Some(staged.full_staged.dataset.clone()))
622 }
623 };
624 }
625 match target_snapshot.entry(table_key) {
626 Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
627 None => Ok(None),
628 }
629}
630
631fn update_unique_constraints(
632 table_key: &str,
633 batch: &RecordBatch,
634 constraints: &[Vec<String>],
635 seen: &mut [HashMap<String, String>],
636 conflicts: &mut Vec<MergeConflict>,
637) -> Result<()> {
638 for (constraint_idx, columns) in constraints.iter().enumerate() {
639 let seen = &mut seen[constraint_idx];
640 for row in 0..batch.num_rows() {
641 let mut parts = Vec::with_capacity(columns.len());
642 let mut any_null = false;
643 for column_name in columns {
644 let column = batch.column_by_name(column_name).ok_or_else(|| {
645 OmniError::manifest(format!(
646 "table {} missing unique column '{}'",
647 table_key, column_name
648 ))
649 })?;
650 if column.is_null(row) {
651 any_null = true;
652 break;
653 }
654 parts.push(
655 array_value_to_string(column.as_ref(), row)
656 .map_err(|e| OmniError::Lance(e.to_string()))?,
657 );
658 }
659 if any_null {
660 continue;
661 }
662 let value = parts.join("|");
663 let row_id = row_id_at(batch, row)?;
664 if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
665 conflicts.push(MergeConflict {
666 table_key: table_key.to_string(),
667 row_id: Some(row_id.clone()),
668 kind: MergeConflictKind::UniqueViolation,
669 message: format!(
670 "unique constraint {:?} violated by '{}' and '{}'",
671 columns, first_row_id, row_id
672 ),
673 });
674 }
675 }
676 }
677 Ok(())
678}
679
680fn accumulate_edge_cardinality(
681 batch: &RecordBatch,
682 counts: &mut HashMap<String, u32>,
683 table_key: &str,
684) -> Result<()> {
685 let srcs = batch
686 .column_by_name("src")
687 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
688 .as_any()
689 .downcast_ref::<StringArray>()
690 .ok_or_else(|| {
691 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
692 })?;
693 for row in 0..srcs.len() {
694 *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
695 }
696 Ok(())
697}
698
699fn finalize_edge_cardinality_conflicts(
700 table_key: &str,
701 edge_name: &str,
702 min: u32,
703 max: Option<u32>,
704 counts: HashMap<String, u32>,
705) -> Vec<MergeConflict> {
706 let mut conflicts = Vec::new();
707 for (src, count) in counts {
708 if let Some(max) = max {
709 if count > max {
710 conflicts.push(MergeConflict {
711 table_key: table_key.to_string(),
712 row_id: None,
713 kind: MergeConflictKind::CardinalityViolation,
714 message: format!(
715 "@card violation on edge {}: source '{}' has {} edges (max {})",
716 edge_name, src, count, max
717 ),
718 });
719 }
720 }
721 if count < min {
722 conflicts.push(MergeConflict {
723 table_key: table_key.to_string(),
724 row_id: None,
725 kind: MergeConflictKind::CardinalityViolation,
726 message: format!(
727 "@card violation on edge {}: source '{}' has {} edges (min {})",
728 edge_name, src, count, min
729 ),
730 });
731 }
732 }
733 conflicts
734}
735
736fn validate_orphan_edges_batch(
737 table_key: &str,
738 edge_type: &omnigraph_compiler::catalog::EdgeType,
739 batch: &RecordBatch,
740 node_ids: &HashMap<String, HashSet<String>>,
741) -> Result<Vec<MergeConflict>> {
742 let srcs = batch
743 .column_by_name("src")
744 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
745 .as_any()
746 .downcast_ref::<StringArray>()
747 .ok_or_else(|| {
748 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
749 })?;
750 let dsts = batch
751 .column_by_name("dst")
752 .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
753 .as_any()
754 .downcast_ref::<StringArray>()
755 .ok_or_else(|| {
756 OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
757 })?;
758
759 let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
760 OmniError::manifest(format!(
761 "missing candidate node ids for {}",
762 edge_type.from_type
763 ))
764 })?;
765 let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
766 OmniError::manifest(format!(
767 "missing candidate node ids for {}",
768 edge_type.to_type
769 ))
770 })?;
771
772 let mut conflicts = Vec::new();
773 for row in 0..batch.num_rows() {
774 let row_id = row_id_at(batch, row)?;
775 let src = srcs.value(row);
776 let dst = dsts.value(row);
777 if !from_ids.contains(src) {
778 conflicts.push(MergeConflict {
779 table_key: table_key.to_string(),
780 row_id: Some(row_id.clone()),
781 kind: MergeConflictKind::OrphanEdge,
782 message: format!("src '{}' not found in {}", src, edge_type.from_type),
783 });
784 }
785 if !to_ids.contains(dst) {
786 conflicts.push(MergeConflict {
787 table_key: table_key.to_string(),
788 row_id: Some(row_id),
789 kind: MergeConflictKind::OrphanEdge,
790 message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
791 });
792 }
793 }
794 Ok(conflicts)
795}
796
797fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
798 let ids = batch
799 .column_by_name("id")
800 .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
801 .as_any()
802 .downcast_ref::<StringArray>()
803 .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
804 Ok(ids.value(row).to_string())
805}
806
807async fn publish_adopted_source_state(
808 target_db: &Omnigraph,
809 catalog: &Catalog,
810 base_snapshot: &Snapshot,
811 source_snapshot: &Snapshot,
812 target_snapshot: &Snapshot,
813 table_key: &str,
814) -> Result<crate::db::SubTableUpdate> {
815 let source_entry = source_snapshot
816 .entry(table_key)
817 .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
818 let target_entry = target_snapshot.entry(table_key);
819
820 let target_active = target_db.active_branch().await;
821 match (
822 target_active.as_deref(),
823 source_entry.table_branch.as_deref(),
824 ) {
825 (None, None) => Ok(crate::db::SubTableUpdate {
827 table_key: table_key.to_string(),
828 table_version: source_entry.table_version,
829 table_branch: None,
830 row_count: source_entry.row_count,
831 version_metadata: source_entry.version_metadata.clone(),
832 }),
833 (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
836 table_key: table_key.to_string(),
837 table_version: source_entry.table_version,
838 table_branch: None,
839 row_count: source_entry.row_count,
840 version_metadata: source_entry.version_metadata.clone(),
841 }),
842 (None, Some(_source_branch)) => {
844 let delta =
845 compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
846 match delta {
847 Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
848 None => Ok(crate::db::SubTableUpdate {
849 table_key: table_key.to_string(),
850 table_version: target_entry
851 .map(|e| e.table_version)
852 .unwrap_or(source_entry.table_version),
853 table_branch: None,
854 row_count: source_entry.row_count,
855 version_metadata: target_entry
856 .map(|entry| entry.version_metadata.clone())
857 .unwrap_or_else(|| source_entry.version_metadata.clone()),
858 }),
859 }
860 }
861 (Some(target_branch), Some(source_branch)) => {
863 if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
864 let delta =
866 compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
867 .await?;
868 match delta {
869 Some(staged) => {
870 publish_rewritten_merge_table(target_db, table_key, &staged).await
871 }
872 None => Ok(crate::db::SubTableUpdate {
873 table_key: table_key.to_string(),
874 table_version: target_entry.unwrap().table_version,
875 table_branch: Some(target_branch.to_string()),
876 row_count: source_entry.row_count,
877 version_metadata: target_entry.unwrap().version_metadata.clone(),
878 }),
879 }
880 } else {
881 let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
884 let ds = target_db
885 .fork_dataset_from_entry_state(
886 table_key,
887 &full_path,
888 Some(source_branch),
889 source_entry.table_version,
890 target_branch,
891 )
892 .await?;
893 let state = target_db.table_store().table_state(&full_path, &ds).await?;
894 Ok(crate::db::SubTableUpdate {
895 table_key: table_key.to_string(),
896 table_version: state.version,
897 table_branch: Some(target_branch.to_string()),
898 row_count: state.row_count,
899 version_metadata: state.version_metadata,
900 })
901 }
902 }
903 }
904}
905
906async fn publish_rewritten_merge_table(
907 target_db: &Omnigraph,
908 table_key: &str,
909 staged: &StagedMergeResult,
910) -> Result<crate::db::SubTableUpdate> {
911 let (ds, full_path, table_branch) = target_db
916 .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
917 .await?;
918 let mut current_ds = ds;
919
920 if let Some(delta) = &staged.delta_staged {
930 let batches: Vec<RecordBatch> = target_db
931 .table_store()
932 .scan_batches(&delta.dataset)
933 .await?
934 .into_iter()
935 .filter(|batch| batch.num_rows() > 0)
936 .collect();
937 if !batches.is_empty() {
938 let combined = if batches.len() == 1 {
940 batches.into_iter().next().unwrap()
941 } else {
942 let schema = batches[0].schema();
943 arrow_select::concat::concat_batches(&schema, &batches)
944 .map_err(|e| OmniError::Lance(e.to_string()))?
945 };
946 let staged_merge = target_db
947 .table_store()
948 .stage_merge_insert(
949 current_ds.clone(),
950 combined,
951 vec!["id".to_string()],
952 lance::dataset::WhenMatched::UpdateAll,
953 lance::dataset::WhenNotMatched::InsertAll,
954 )
955 .await?;
956 current_ds = target_db
957 .table_store()
958 .commit_staged(Arc::new(current_ds), staged_merge.transaction)
959 .await?;
960 }
961 }
962
963 if !staged.deleted_ids.is_empty() {
973 let escaped: Vec<String> = staged
974 .deleted_ids
975 .iter()
976 .map(|id| format!("'{}'", id.replace('\'', "''")))
977 .collect();
978 let filter = format!("id IN ({})", escaped.join(", "));
979 target_db
980 .table_store()
981 .delete_where(&full_path, &mut current_ds, &filter)
982 .await?;
983 }
984
985 let row_count = target_db
993 .table_store()
994 .table_state(&full_path, ¤t_ds)
995 .await?
996 .row_count;
997 if row_count > 0 {
998 target_db
999 .build_indices_on_dataset(table_key, &mut current_ds)
1000 .await?;
1001 }
1002 let final_state = target_db
1003 .table_store()
1004 .table_state(&full_path, ¤t_ds)
1005 .await?;
1006
1007 Ok(crate::db::SubTableUpdate {
1008 table_key: table_key.to_string(),
1009 table_version: final_state.version,
1010 table_branch,
1011 row_count: final_state.row_count,
1012 version_metadata: final_state.version_metadata,
1013 })
1014}
1015
1016impl Omnigraph {
1017 pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
1018 self.branch_merge_as(source, target, None).await
1019 }
1020
1021 pub async fn branch_merge_as(
1022 &self,
1023 source: &str,
1024 target: &str,
1025 actor_id: Option<&str>,
1026 ) -> Result<MergeOutcome> {
1027 self.ensure_schema_apply_idle("branch_merge").await?;
1028 self.branch_merge_impl(source, target, actor_id).await
1029 }
1030
1031 async fn branch_merge_impl(
1032 &self,
1033 source: &str,
1034 target: &str,
1035 actor_id: Option<&str>,
1036 ) -> Result<MergeOutcome> {
1037 if is_internal_run_branch(source) || is_internal_run_branch(target) {
1038 return Err(OmniError::manifest(format!(
1039 "branch_merge does not allow internal run refs ('{}' -> '{}')",
1040 source, target
1041 )));
1042 }
1043 let source_branch = Omnigraph::normalize_branch_name(source)?;
1044 let target_branch = Omnigraph::normalize_branch_name(target)?;
1045 if source_branch == target_branch {
1046 return Err(OmniError::manifest(
1047 "branch_merge requires distinct source and target branches".to_string(),
1048 ));
1049 }
1050
1051 let source_head_commit_id = self
1052 .head_commit_id_for_branch(source_branch.as_deref())
1053 .await?
1054 .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1055 let target_head_commit_id = self
1056 .head_commit_id_for_branch(target_branch.as_deref())
1057 .await?
1058 .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1059 let base_commit = CommitGraph::merge_base(
1060 self.uri(),
1061 source_branch.as_deref(),
1062 target_branch.as_deref(),
1063 )
1064 .await?
1065 .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1066
1067 if source_head_commit_id == target_head_commit_id
1068 || base_commit.graph_commit_id == source_head_commit_id
1069 {
1070 return Ok(MergeOutcome::AlreadyUpToDate);
1071 }
1072 let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1073
1074 let base_snapshot = ManifestCoordinator::snapshot_at(
1075 self.uri(),
1076 base_commit.manifest_branch.as_deref(),
1077 base_commit.manifest_version,
1078 )
1079 .await?;
1080 let source_snapshot = self
1081 .resolved_target(ReadTarget::Branch(
1082 source_branch.clone().unwrap_or_else(|| "main".to_string()),
1083 ))
1084 .await?
1085 .snapshot;
1086 let merge_exclusive = self.merge_exclusive();
1094 let _merge_guard = merge_exclusive.lock().await;
1095
1096 let previous_branch = self.active_branch().await;
1097 let previous = self
1098 .swap_coordinator_for_branch(target_branch.as_deref())
1099 .await?;
1100 let merge_result = self
1101 .branch_merge_on_current_target(
1102 &base_snapshot,
1103 &source_snapshot,
1104 &target_head_commit_id,
1105 &source_head_commit_id,
1106 is_fast_forward,
1107 actor_id,
1108 )
1109 .await;
1110 self.restore_coordinator(previous).await;
1111
1112 if merge_result.is_ok() && previous_branch == target_branch {
1113 self.refresh().await?;
1114 }
1115
1116 merge_result
1117 }
1118
1119 async fn branch_merge_on_current_target(
1120 &self,
1121 base_snapshot: &Snapshot,
1122 source_snapshot: &Snapshot,
1123 target_head_commit_id: &str,
1124 source_head_commit_id: &str,
1125 is_fast_forward: bool,
1126 actor_id: Option<&str>,
1127 ) -> Result<MergeOutcome> {
1128 self.ensure_commit_graph_initialized().await?;
1129 let target_snapshot = self.snapshot().await;
1130
1131 let mut table_keys = HashSet::new();
1132 for entry in base_snapshot.entries() {
1133 table_keys.insert(entry.table_key.clone());
1134 }
1135 for entry in source_snapshot.entries() {
1136 table_keys.insert(entry.table_key.clone());
1137 }
1138 for entry in target_snapshot.entries() {
1139 table_keys.insert(entry.table_key.clone());
1140 }
1141
1142 let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1143 ordered_table_keys.sort();
1144
1145 let mut conflicts = Vec::new();
1146 let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1147
1148 for table_key in &ordered_table_keys {
1149 let base_entry = base_snapshot.entry(table_key);
1150 let source_entry = source_snapshot.entry(table_key);
1151 let target_entry = target_snapshot.entry(table_key);
1152 if same_manifest_state(source_entry, target_entry) {
1153 continue;
1154 }
1155 if same_manifest_state(base_entry, source_entry) {
1156 continue;
1157 }
1158 if same_manifest_state(base_entry, target_entry) {
1159 candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
1160 continue;
1161 }
1162
1163 if let Some(staged) = stage_streaming_table_merge(
1164 table_key,
1165 &self.catalog(),
1166 base_snapshot,
1167 source_snapshot,
1168 &target_snapshot,
1169 &mut conflicts,
1170 )
1171 .await?
1172 {
1173 candidates.insert(
1174 table_key.clone(),
1175 CandidateTableState::RewriteMerged(staged),
1176 );
1177 }
1178 }
1179
1180 if !conflicts.is_empty() {
1181 return Err(OmniError::MergeConflicts(conflicts));
1182 }
1183
1184 validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1185
1186 let active_branch_for_keys = self.active_branch().await;
1224 let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
1225 .iter()
1226 .filter(|table_key| {
1227 matches!(
1228 candidates.get(*table_key),
1229 Some(CandidateTableState::RewriteMerged(_)) | Some(CandidateTableState::AdoptSourceState)
1230 )
1231 })
1232 .map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
1233 .collect();
1234 let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
1235
1236 let post_queue_snapshot = self.snapshot().await;
1237 for table_key in &ordered_table_keys {
1238 let Some(candidate) = candidates.get(table_key) else {
1239 continue;
1240 };
1241 if !matches!(
1242 candidate,
1243 CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState
1244 ) {
1245 continue;
1246 }
1247 let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
1248 let current = post_queue_snapshot
1249 .entry(table_key)
1250 .map(|e| e.table_version);
1251 if expected != current {
1252 return Err(OmniError::manifest_expected_version_mismatch(
1253 table_key.clone(),
1254 expected.unwrap_or(0),
1255 current.unwrap_or(0),
1256 ));
1257 }
1258 }
1259
1260 let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
1261 .iter()
1262 .filter_map(|table_key| {
1263 let candidate = candidates.get(table_key)?;
1264 if !matches!(candidate, CandidateTableState::RewriteMerged(_)) {
1265 return None;
1266 }
1267 let entry = target_snapshot.entry(table_key)?;
1268 Some(crate::db::manifest::SidecarTablePin {
1269 table_key: table_key.clone(),
1270 table_path: self.table_store().dataset_uri(&entry.table_path),
1271 expected_version: entry.table_version,
1272 post_commit_pin: entry.table_version + 1,
1273 table_branch: active_branch_for_keys.clone(),
1286 })
1287 })
1288 .collect();
1289 let recovery_handle = if recovery_pins.is_empty() {
1290 None
1291 } else {
1292 let target_branch = active_branch_for_keys.clone();
1302 let mut sidecar = crate::db::manifest::new_sidecar(
1303 crate::db::manifest::SidecarKind::BranchMerge,
1304 target_branch,
1305 actor_id.map(str::to_string),
1306 recovery_pins,
1307 );
1308 sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
1314 Some(
1315 crate::db::manifest::write_sidecar(
1316 self.root_uri(),
1317 self.storage_adapter(),
1318 &sidecar,
1319 )
1320 .await?,
1321 )
1322 };
1323
1324 let mut updates = Vec::new();
1325 let mut changed_edge_tables = false;
1326 for table_key in &ordered_table_keys {
1327 let Some(candidate_state) = candidates.get(table_key) else {
1328 continue;
1329 };
1330 let update = match candidate_state {
1331 CandidateTableState::AdoptSourceState => {
1332 publish_adopted_source_state(
1333 self,
1334 &self.catalog(),
1335 base_snapshot,
1336 source_snapshot,
1337 &target_snapshot,
1338 table_key,
1339 )
1340 .await?
1341 }
1342 CandidateTableState::RewriteMerged(staged) => {
1343 publish_rewritten_merge_table(self, table_key, staged).await?
1344 }
1345 };
1346 if table_key.starts_with("edge:") {
1347 changed_edge_tables = true;
1348 }
1349 updates.push(update);
1350 }
1351
1352 crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
1357
1358 let manifest_version = if updates.is_empty() {
1359 self.version().await
1360 } else {
1361 self.commit_manifest_updates(&updates).await?
1362 };
1363
1364 if let Some(handle) = recovery_handle {
1368 if let Err(err) =
1369 crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
1370 {
1371 tracing::warn!(
1372 error = %err,
1373 operation_id = handle.operation_id.as_str(),
1374 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
1375 );
1376 }
1377 }
1378 self.record_merge_commit(
1379 manifest_version,
1380 target_head_commit_id,
1381 source_head_commit_id,
1382 actor_id,
1383 )
1384 .await?;
1385
1386 if changed_edge_tables {
1387 self.invalidate_graph_index().await;
1388 }
1389
1390 Ok(if is_fast_forward {
1391 MergeOutcome::FastForward
1392 } else {
1393 MergeOutcome::Merged
1394 })
1395 }
1396}