1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8 AdoptSourceState,
9 RewriteMerged(StagedMergeResult),
10}
11
12#[derive(Debug)]
13struct StagedTable {
14 _dir: TempDir,
15 dataset: Dataset,
16}
17
18#[derive(Debug)]
19struct StagedMergeResult {
20 full_staged: StagedTable,
21 delta_staged: Option<StagedTable>,
22 deleted_ids: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
26struct CursorRow {
27 id: String,
28 signature: String,
29 batch: RecordBatch,
30 row_index: usize,
31}
32
33struct OrderedTableCursor {
34 stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
35 current_batch: Option<RecordBatch>,
36 current_row: usize,
37 peeked: Option<CursorRow>,
38}
39
40impl OrderedTableCursor {
41 async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
42 let dataset = match snapshot.entry(table_key) {
43 Some(_) => Some(snapshot.open(table_key).await?),
44 None => None,
45 };
46 Self::from_dataset(dataset).await
47 }
48
49 async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
50 let stream = if let Some(ds) = dataset {
51 Some(Box::pin(
52 crate::table_store::TableStore::scan_stream(
53 &ds,
54 None,
55 None,
56 Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
57 false,
58 )
59 .await?,
60 ))
61 } else {
62 None
63 };
64
65 Ok(Self {
66 stream,
67 current_batch: None,
68 current_row: 0,
69 peeked: None,
70 })
71 }
72
73 async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
74 if self.peeked.is_none() {
75 self.peeked = self.next_row().await?;
76 }
77 Ok(self.peeked.clone())
78 }
79
80 async fn pop(&mut self) -> Result<Option<CursorRow>> {
81 if self.peeked.is_some() {
82 return Ok(self.peeked.take());
83 }
84 self.next_row().await
85 }
86
87 async fn next_row(&mut self) -> Result<Option<CursorRow>> {
88 loop {
89 if let Some(batch) = &self.current_batch {
90 if self.current_row < batch.num_rows() {
91 let row_index = self.current_row;
92 self.current_row += 1;
93 return Ok(Some(CursorRow {
94 id: row_id_at(batch, row_index)?,
95 signature: row_signature(batch, row_index)?,
96 batch: batch.clone(),
97 row_index,
98 }));
99 }
100 }
101
102 let Some(stream) = self.stream.as_mut() else {
103 return Ok(None);
104 };
105 match stream.try_next().await {
106 Ok(Some(batch)) => {
107 self.current_batch = Some(batch);
108 self.current_row = 0;
109 }
110 Ok(None) => {
111 self.stream = None;
112 self.current_batch = None;
113 return Ok(None);
114 }
115 Err(err) => return Err(OmniError::Lance(err.to_string())),
116 }
117 }
118 }
119}
120
121struct StagedTableWriter {
122 schema: SchemaRef,
123 dataset_uri: String,
124 dir: TempDir,
125 dataset: Option<Dataset>,
126 buffered_rows: usize,
127 row_count: u64,
128 batches: Vec<RecordBatch>,
129}
130
131impl StagedTableWriter {
132 fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
133 let dir = merge_stage_tempdir(table_key)?;
134 let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
135 Ok(Self {
136 schema,
137 dataset_uri,
138 dir,
139 dataset: None,
140 buffered_rows: 0,
141 row_count: 0,
142 batches: Vec::new(),
143 })
144 }
145
146 async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
147 self.row_count += 1;
148 self.buffered_rows += 1;
149 self.batches.push(row.batch.slice(row.row_index, 1));
150 if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
151 self.flush().await?;
152 }
153 Ok(())
154 }
155
156 async fn finish(mut self) -> Result<StagedTable> {
157 self.flush().await?;
158 if self.dataset.is_none() {
159 self.dataset = Some(
160 crate::table_store::TableStore::create_empty_dataset(
161 &self.dataset_uri,
162 &self.schema,
163 )
164 .await?,
165 );
166 }
167 Ok(StagedTable {
168 _dir: self.dir,
169 dataset: self.dataset.unwrap(),
170 })
171 }
172
173 async fn flush(&mut self) -> Result<()> {
174 if self.batches.is_empty() {
175 return Ok(());
176 }
177
178 let batch = if self.batches.len() == 1 {
179 self.batches.pop().unwrap()
180 } else {
181 let batches = std::mem::take(&mut self.batches);
182 arrow_select::concat::concat_batches(&self.schema, &batches)
183 .map_err(|e| OmniError::Lance(e.to_string()))?
184 };
185 self.buffered_rows = 0;
186
187 let ds = crate::table_store::TableStore::append_or_create_batch(
188 &self.dataset_uri,
189 self.dataset.take(),
190 batch,
191 )
192 .await?;
193 self.dataset = Some(ds);
194 Ok(())
195 }
196}
197
198fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
199 if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
200 return TempDirBuilder::new()
201 .prefix(&format!(
202 "omnigraph-merge-{}-",
203 sanitize_table_key(table_key)
204 ))
205 .tempdir_in(PathBuf::from(root))
206 .map_err(OmniError::from);
207 }
208 TempDirBuilder::new()
209 .prefix(&format!(
210 "omnigraph-merge-{}-",
211 sanitize_table_key(table_key)
212 ))
213 .tempdir()
214 .map_err(OmniError::from)
215}
216
217fn sanitize_table_key(table_key: &str) -> String {
218 table_key
219 .chars()
220 .map(|ch| match ch {
221 ':' | '/' | '\\' => '-',
222 other => other,
223 })
224 .collect()
225}
226
227async fn compute_source_delta(
230 table_key: &str,
231 catalog: &Catalog,
232 base_snapshot: &Snapshot,
233 source_snapshot: &Snapshot,
234) -> Result<Option<StagedMergeResult>> {
235 let schema = schema_for_table_key(catalog, table_key)?;
236 let mut full_writer =
237 StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
238 let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
239 let mut deleted_ids: Vec<String> = Vec::new();
240 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
241 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
242
243 let mut needs_update = false;
244
245 loop {
246 let base_row = base.peek_cloned().await?;
247 let source_row = source.peek_cloned().await?;
248
249 let next_id = [base_row.as_ref(), source_row.as_ref()]
250 .into_iter()
251 .flatten()
252 .map(|row| row.id.clone())
253 .min();
254 let Some(next_id) = next_id else { break };
255
256 let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
257 base.pop().await?
258 } else {
259 None
260 };
261 let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
262 source.pop().await?
263 } else {
264 None
265 };
266
267 let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
268 let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
269
270 match (&base_row, &source_row) {
271 (Some(_), None) => {
272 deleted_ids.push(next_id);
274 needs_update = true;
275 }
276 (None, Some(src)) => {
277 full_writer.push_row(src).await?;
279 delta_writer.push_row(src).await?;
280 needs_update = true;
281 }
282 (Some(_), Some(src)) if source_sig != base_sig => {
283 full_writer.push_row(src).await?;
285 delta_writer.push_row(src).await?;
286 needs_update = true;
287 }
288 (Some(base), Some(_)) => {
289 full_writer.push_row(base).await?;
291 }
292 (None, None) => unreachable!(),
293 }
294 }
295
296 if !needs_update {
297 return Ok(None);
298 }
299
300 let delta_staged = if delta_writer.row_count > 0 {
301 Some(delta_writer.finish().await?)
302 } else {
303 None
304 };
305
306 Ok(Some(StagedMergeResult {
307 full_staged: full_writer.finish().await?,
308 delta_staged,
309 deleted_ids,
310 }))
311}
312
313fn min_cursor_id(
314 base_row: &Option<CursorRow>,
315 source_row: &Option<CursorRow>,
316 target_row: &Option<CursorRow>,
317) -> Option<String> {
318 [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
319 .into_iter()
320 .flatten()
321 .map(|row| row.id.clone())
322 .min()
323}
324
325async fn stage_streaming_table_merge(
326 table_key: &str,
327 catalog: &Catalog,
328 base_snapshot: &Snapshot,
329 source_snapshot: &Snapshot,
330 target_snapshot: &Snapshot,
331 conflicts: &mut Vec<MergeConflict>,
332) -> Result<Option<StagedMergeResult>> {
333 let schema = schema_for_table_key(catalog, table_key)?;
334 let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
335 let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
336 let mut deleted_ids: Vec<String> = Vec::new();
337 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
338 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
339 let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
340
341 let prior_conflict_count = conflicts.len();
342 let mut needs_update = false;
343
344 loop {
345 let base_row = base.peek_cloned().await?;
346 let source_row = source.peek_cloned().await?;
347 let target_row = target.peek_cloned().await?;
348 let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
349 break;
350 };
351
352 let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
353 base.pop().await?
354 } else {
355 None
356 };
357 let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
358 {
359 source.pop().await?
360 } else {
361 None
362 };
363 let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
364 {
365 target.pop().await?
366 } else {
367 None
368 };
369
370 let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
371 let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
372 let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
373
374 let source_changed = source_sig != base_sig;
375 let target_changed = target_sig != base_sig;
376
377 let selection = if !source_changed {
378 target_row.as_ref()
379 } else if !target_changed {
380 source_row.as_ref()
381 } else if source_sig == target_sig {
382 target_row.as_ref()
383 } else {
384 conflicts.push(classify_merge_conflict(
385 table_key, &next_id, base_sig, source_sig, target_sig,
386 ));
387 None
388 };
389
390 if conflicts.len() > prior_conflict_count {
391 continue;
392 }
393
394 if selection.is_none() && target_row.is_some() {
396 deleted_ids.push(next_id.clone());
397 needs_update = true;
398 continue;
399 }
400
401 if let Some(selection) = selection {
402 full_writer.push_row(selection).await?;
404 if selection.signature.as_str() != target_sig.unwrap_or("") {
406 delta_writer.push_row(selection).await?;
407 needs_update = true;
408 }
409 }
410 }
411
412 if conflicts.len() > prior_conflict_count {
413 return Ok(None);
414 }
415 if !needs_update {
416 return Ok(None);
417 }
418
419 let delta_staged = if delta_writer.row_count > 0 {
420 Some(delta_writer.finish().await?)
421 } else {
422 None
423 };
424
425 Ok(Some(StagedMergeResult {
426 full_staged: full_writer.finish().await?,
427 delta_staged,
428 deleted_ids,
429 }))
430}
431
432fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
433 if let Some(name) = table_key.strip_prefix("node:") {
434 return catalog
435 .node_types
436 .get(name)
437 .map(|t| t.arrow_schema.clone())
438 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
439 }
440 if let Some(name) = table_key.strip_prefix("edge:") {
441 return catalog
442 .edge_types
443 .get(name)
444 .map(|t| t.arrow_schema.clone())
445 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
446 }
447 Err(OmniError::manifest(format!(
448 "invalid table key '{}'",
449 table_key
450 )))
451}
452
453fn same_manifest_state(
454 left: Option<&crate::db::SubTableEntry>,
455 right: Option<&crate::db::SubTableEntry>,
456) -> bool {
457 match (left, right) {
458 (Some(left), Some(right)) => {
459 left.table_version == right.table_version && left.table_branch == right.table_branch
460 }
461 (None, None) => true,
462 _ => false,
463 }
464}
465
466fn classify_merge_conflict(
467 table_key: &str,
468 row_id: &str,
469 base_sig: Option<&str>,
470 source_sig: Option<&str>,
471 target_sig: Option<&str>,
472) -> MergeConflict {
473 let (kind, message) = match (base_sig, source_sig, target_sig) {
474 (None, Some(_), Some(_)) => (
475 MergeConflictKind::DivergentInsert,
476 format!("divergent insert for id '{}'", row_id),
477 ),
478 (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
479 MergeConflictKind::DeleteVsUpdate,
480 format!("delete/update conflict for id '{}'", row_id),
481 ),
482 _ => (
483 MergeConflictKind::DivergentUpdate,
484 format!("divergent update for id '{}'", row_id),
485 ),
486 };
487 MergeConflict {
488 table_key: table_key.to_string(),
489 row_id: Some(row_id.to_string()),
490 kind,
491 message,
492 }
493}
494
495fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
496 let mut values = Vec::with_capacity(batch.num_columns());
497 for column in batch.columns() {
498 values.push(
499 array_value_to_string(column.as_ref(), row)
500 .map_err(|e| OmniError::Lance(e.to_string()))?,
501 );
502 }
503 Ok(values.join("\u{1f}"))
504}
505
506async fn validate_merge_candidates(
507 db: &Omnigraph,
508 source_snapshot: &Snapshot,
509 target_snapshot: &Snapshot,
510 candidates: &HashMap<String, CandidateTableState>,
511) -> Result<()> {
512 let mut conflicts = Vec::new();
513 let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
514
515 for (type_name, node_type) in &db.catalog().node_types {
516 let table_key = format!("node:{}", type_name);
517 let mut values = HashSet::new();
518 let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
519
520 if let Some(ds) =
521 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
522 {
523 let mut stream =
524 crate::table_store::TableStore::scan_stream(&ds, None, None, None, false).await?;
525 while let Some(batch) = stream
526 .try_next()
527 .await
528 .map_err(|e| OmniError::Lance(e.to_string()))?
529 {
530 if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
531 conflicts.push(MergeConflict {
532 table_key: table_key.clone(),
533 row_id: None,
534 kind: MergeConflictKind::ValueConstraintViolation,
535 message: err.to_string(),
536 });
537 }
538 update_unique_constraints(
539 &table_key,
540 &batch,
541 &node_type.unique_constraints,
542 &mut unique_seen,
543 &mut conflicts,
544 )?;
545 let ids = batch
546 .column_by_name("id")
547 .ok_or_else(|| {
548 OmniError::manifest(format!("table {} missing id column", table_key))
549 })?
550 .as_any()
551 .downcast_ref::<StringArray>()
552 .ok_or_else(|| {
553 OmniError::manifest(format!("table {} id column is not Utf8", table_key))
554 })?;
555 for row in 0..ids.len() {
556 values.insert(ids.value(row).to_string());
557 }
558 }
559 }
560 node_ids.insert(type_name.clone(), values);
561 }
562
563 for (edge_name, edge_type) in &db.catalog().edge_types {
564 let table_key = format!("edge:{}", edge_name);
565 let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
566 let mut src_counts = HashMap::new();
567
568 if let Some(ds) =
569 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
570 {
571 let mut stream =
572 crate::table_store::TableStore::scan_stream(&ds, None, None, None, false).await?;
573 while let Some(batch) = stream
574 .try_next()
575 .await
576 .map_err(|e| OmniError::Lance(e.to_string()))?
577 {
578 update_unique_constraints(
579 &table_key,
580 &batch,
581 &edge_type.unique_constraints,
582 &mut unique_seen,
583 &mut conflicts,
584 )?;
585 accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
586 conflicts.extend(validate_orphan_edges_batch(
587 &table_key, edge_type, &batch, &node_ids,
588 )?);
589 }
590 }
591
592 conflicts.extend(finalize_edge_cardinality_conflicts(
593 &table_key,
594 edge_name,
595 edge_type.cardinality.min,
596 edge_type.cardinality.max,
597 src_counts,
598 ));
599 }
600
601 if conflicts.is_empty() {
602 Ok(())
603 } else {
604 Err(OmniError::MergeConflicts(conflicts))
605 }
606}
607
608async fn candidate_dataset(
609 source_snapshot: &Snapshot,
610 target_snapshot: &Snapshot,
611 candidates: &HashMap<String, CandidateTableState>,
612 table_key: &str,
613) -> Result<Option<Dataset>> {
614 if let Some(candidate) = candidates.get(table_key) {
615 return match candidate {
616 CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
617 Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
618 None => Ok(None),
619 },
620 CandidateTableState::RewriteMerged(staged) => {
621 Ok(Some(staged.full_staged.dataset.clone()))
622 }
623 };
624 }
625 match target_snapshot.entry(table_key) {
626 Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
627 None => Ok(None),
628 }
629}
630
631fn update_unique_constraints(
632 table_key: &str,
633 batch: &RecordBatch,
634 constraints: &[Vec<String>],
635 seen: &mut [HashMap<String, String>],
636 conflicts: &mut Vec<MergeConflict>,
637) -> Result<()> {
638 for (constraint_idx, columns) in constraints.iter().enumerate() {
639 let seen = &mut seen[constraint_idx];
640 for row in 0..batch.num_rows() {
641 let mut parts = Vec::with_capacity(columns.len());
642 let mut any_null = false;
643 for column_name in columns {
644 let column = batch.column_by_name(column_name).ok_or_else(|| {
645 OmniError::manifest(format!(
646 "table {} missing unique column '{}'",
647 table_key, column_name
648 ))
649 })?;
650 if column.is_null(row) {
651 any_null = true;
652 break;
653 }
654 parts.push(
655 array_value_to_string(column.as_ref(), row)
656 .map_err(|e| OmniError::Lance(e.to_string()))?,
657 );
658 }
659 if any_null {
660 continue;
661 }
662 let value = parts.join("|");
663 let row_id = row_id_at(batch, row)?;
664 if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
665 conflicts.push(MergeConflict {
666 table_key: table_key.to_string(),
667 row_id: Some(row_id.clone()),
668 kind: MergeConflictKind::UniqueViolation,
669 message: format!(
670 "unique constraint {:?} violated by '{}' and '{}'",
671 columns, first_row_id, row_id
672 ),
673 });
674 }
675 }
676 }
677 Ok(())
678}
679
680fn accumulate_edge_cardinality(
681 batch: &RecordBatch,
682 counts: &mut HashMap<String, u32>,
683 table_key: &str,
684) -> Result<()> {
685 let srcs = batch
686 .column_by_name("src")
687 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
688 .as_any()
689 .downcast_ref::<StringArray>()
690 .ok_or_else(|| {
691 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
692 })?;
693 for row in 0..srcs.len() {
694 *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
695 }
696 Ok(())
697}
698
699fn finalize_edge_cardinality_conflicts(
700 table_key: &str,
701 edge_name: &str,
702 min: u32,
703 max: Option<u32>,
704 counts: HashMap<String, u32>,
705) -> Vec<MergeConflict> {
706 let mut conflicts = Vec::new();
707 for (src, count) in counts {
708 if let Some(max) = max {
709 if count > max {
710 conflicts.push(MergeConflict {
711 table_key: table_key.to_string(),
712 row_id: None,
713 kind: MergeConflictKind::CardinalityViolation,
714 message: format!(
715 "@card violation on edge {}: source '{}' has {} edges (max {})",
716 edge_name, src, count, max
717 ),
718 });
719 }
720 }
721 if count < min {
722 conflicts.push(MergeConflict {
723 table_key: table_key.to_string(),
724 row_id: None,
725 kind: MergeConflictKind::CardinalityViolation,
726 message: format!(
727 "@card violation on edge {}: source '{}' has {} edges (min {})",
728 edge_name, src, count, min
729 ),
730 });
731 }
732 }
733 conflicts
734}
735
736fn validate_orphan_edges_batch(
737 table_key: &str,
738 edge_type: &omnigraph_compiler::catalog::EdgeType,
739 batch: &RecordBatch,
740 node_ids: &HashMap<String, HashSet<String>>,
741) -> Result<Vec<MergeConflict>> {
742 let srcs = batch
743 .column_by_name("src")
744 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
745 .as_any()
746 .downcast_ref::<StringArray>()
747 .ok_or_else(|| {
748 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
749 })?;
750 let dsts = batch
751 .column_by_name("dst")
752 .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
753 .as_any()
754 .downcast_ref::<StringArray>()
755 .ok_or_else(|| {
756 OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
757 })?;
758
759 let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
760 OmniError::manifest(format!(
761 "missing candidate node ids for {}",
762 edge_type.from_type
763 ))
764 })?;
765 let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
766 OmniError::manifest(format!(
767 "missing candidate node ids for {}",
768 edge_type.to_type
769 ))
770 })?;
771
772 let mut conflicts = Vec::new();
773 for row in 0..batch.num_rows() {
774 let row_id = row_id_at(batch, row)?;
775 let src = srcs.value(row);
776 let dst = dsts.value(row);
777 if !from_ids.contains(src) {
778 conflicts.push(MergeConflict {
779 table_key: table_key.to_string(),
780 row_id: Some(row_id.clone()),
781 kind: MergeConflictKind::OrphanEdge,
782 message: format!("src '{}' not found in {}", src, edge_type.from_type),
783 });
784 }
785 if !to_ids.contains(dst) {
786 conflicts.push(MergeConflict {
787 table_key: table_key.to_string(),
788 row_id: Some(row_id),
789 kind: MergeConflictKind::OrphanEdge,
790 message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
791 });
792 }
793 }
794 Ok(conflicts)
795}
796
797fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
798 let ids = batch
799 .column_by_name("id")
800 .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
801 .as_any()
802 .downcast_ref::<StringArray>()
803 .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
804 Ok(ids.value(row).to_string())
805}
806
807async fn publish_adopted_source_state(
808 target_db: &Omnigraph,
809 catalog: &Catalog,
810 base_snapshot: &Snapshot,
811 source_snapshot: &Snapshot,
812 target_snapshot: &Snapshot,
813 table_key: &str,
814) -> Result<crate::db::SubTableUpdate> {
815 let source_entry = source_snapshot
816 .entry(table_key)
817 .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
818 let target_entry = target_snapshot.entry(table_key);
819
820 match (
821 target_db.active_branch(),
822 source_entry.table_branch.as_deref(),
823 ) {
824 (None, None) => Ok(crate::db::SubTableUpdate {
826 table_key: table_key.to_string(),
827 table_version: source_entry.table_version,
828 table_branch: None,
829 row_count: source_entry.row_count,
830 version_metadata: source_entry.version_metadata.clone(),
831 }),
832 (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
835 table_key: table_key.to_string(),
836 table_version: source_entry.table_version,
837 table_branch: None,
838 row_count: source_entry.row_count,
839 version_metadata: source_entry.version_metadata.clone(),
840 }),
841 (None, Some(_source_branch)) => {
843 let delta =
844 compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
845 match delta {
846 Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
847 None => Ok(crate::db::SubTableUpdate {
848 table_key: table_key.to_string(),
849 table_version: target_entry
850 .map(|e| e.table_version)
851 .unwrap_or(source_entry.table_version),
852 table_branch: None,
853 row_count: source_entry.row_count,
854 version_metadata: target_entry
855 .map(|entry| entry.version_metadata.clone())
856 .unwrap_or_else(|| source_entry.version_metadata.clone()),
857 }),
858 }
859 }
860 (Some(target_branch), Some(source_branch)) => {
862 if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
863 let delta =
865 compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
866 .await?;
867 match delta {
868 Some(staged) => {
869 publish_rewritten_merge_table(target_db, table_key, &staged).await
870 }
871 None => Ok(crate::db::SubTableUpdate {
872 table_key: table_key.to_string(),
873 table_version: target_entry.unwrap().table_version,
874 table_branch: Some(target_branch.to_string()),
875 row_count: source_entry.row_count,
876 version_metadata: target_entry.unwrap().version_metadata.clone(),
877 }),
878 }
879 } else {
880 let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
883 let ds = target_db
884 .fork_dataset_from_entry_state(
885 table_key,
886 &full_path,
887 Some(source_branch),
888 source_entry.table_version,
889 target_branch,
890 )
891 .await?;
892 let state = target_db.table_store().table_state(&full_path, &ds).await?;
893 Ok(crate::db::SubTableUpdate {
894 table_key: table_key.to_string(),
895 table_version: state.version,
896 table_branch: Some(target_branch.to_string()),
897 row_count: state.row_count,
898 version_metadata: state.version_metadata,
899 })
900 }
901 }
902 }
903}
904
905async fn publish_rewritten_merge_table(
906 target_db: &Omnigraph,
907 table_key: &str,
908 staged: &StagedMergeResult,
909) -> Result<crate::db::SubTableUpdate> {
910 let (ds, full_path, table_branch) = target_db.open_for_mutation(table_key).await?;
911 let mut current_ds = ds;
912
913 if let Some(delta) = &staged.delta_staged {
923 let batches: Vec<RecordBatch> = target_db
924 .table_store()
925 .scan_batches(&delta.dataset)
926 .await?
927 .into_iter()
928 .filter(|batch| batch.num_rows() > 0)
929 .collect();
930 if !batches.is_empty() {
931 let combined = if batches.len() == 1 {
933 batches.into_iter().next().unwrap()
934 } else {
935 let schema = batches[0].schema();
936 arrow_select::concat::concat_batches(&schema, &batches)
937 .map_err(|e| OmniError::Lance(e.to_string()))?
938 };
939 let staged_merge = target_db
940 .table_store()
941 .stage_merge_insert(
942 current_ds.clone(),
943 combined,
944 vec!["id".to_string()],
945 lance::dataset::WhenMatched::UpdateAll,
946 lance::dataset::WhenNotMatched::InsertAll,
947 )
948 .await?;
949 current_ds = target_db
950 .table_store()
951 .commit_staged(Arc::new(current_ds), staged_merge.transaction)
952 .await?;
953 }
954 }
955
956 if !staged.deleted_ids.is_empty() {
966 let escaped: Vec<String> = staged
967 .deleted_ids
968 .iter()
969 .map(|id| format!("'{}'", id.replace('\'', "''")))
970 .collect();
971 let filter = format!("id IN ({})", escaped.join(", "));
972 target_db
973 .table_store()
974 .delete_where(&full_path, &mut current_ds, &filter)
975 .await?;
976 }
977
978 let row_count = target_db
986 .table_store()
987 .table_state(&full_path, ¤t_ds)
988 .await?
989 .row_count;
990 if row_count > 0 {
991 target_db
992 .build_indices_on_dataset(table_key, &mut current_ds)
993 .await?;
994 }
995 let final_state = target_db
996 .table_store()
997 .table_state(&full_path, ¤t_ds)
998 .await?;
999
1000 Ok(crate::db::SubTableUpdate {
1001 table_key: table_key.to_string(),
1002 table_version: final_state.version,
1003 table_branch,
1004 row_count: final_state.row_count,
1005 version_metadata: final_state.version_metadata,
1006 })
1007}
1008
1009impl Omnigraph {
1010 pub async fn branch_merge(&mut self, source: &str, target: &str) -> Result<MergeOutcome> {
1011 self.branch_merge_as(source, target, None).await
1012 }
1013
1014 pub async fn branch_merge_as(
1015 &mut self,
1016 source: &str,
1017 target: &str,
1018 actor_id: Option<&str>,
1019 ) -> Result<MergeOutcome> {
1020 self.ensure_schema_apply_idle("branch_merge").await?;
1021 let previous_actor = self.audit_actor_id.clone();
1022 self.audit_actor_id = actor_id.map(str::to_string);
1023 let result = self.branch_merge_impl(source, target).await;
1024 self.audit_actor_id = previous_actor;
1025 result
1026 }
1027
1028 async fn branch_merge_impl(
1029 &mut self,
1030 source: &str,
1031 target: &str,
1032 ) -> Result<MergeOutcome> {
1033 if is_internal_run_branch(source) || is_internal_run_branch(target) {
1034 return Err(OmniError::manifest(format!(
1035 "branch_merge does not allow internal run refs ('{}' -> '{}')",
1036 source, target
1037 )));
1038 }
1039 let source_branch = Omnigraph::normalize_branch_name(source)?;
1040 let target_branch = Omnigraph::normalize_branch_name(target)?;
1041 if source_branch == target_branch {
1042 return Err(OmniError::manifest(
1043 "branch_merge requires distinct source and target branches".to_string(),
1044 ));
1045 }
1046
1047 let source_head_commit_id = self
1048 .head_commit_id_for_branch(source_branch.as_deref())
1049 .await?
1050 .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1051 let target_head_commit_id = self
1052 .head_commit_id_for_branch(target_branch.as_deref())
1053 .await?
1054 .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1055 let base_commit = CommitGraph::merge_base(
1056 self.uri(),
1057 source_branch.as_deref(),
1058 target_branch.as_deref(),
1059 )
1060 .await?
1061 .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1062
1063 if source_head_commit_id == target_head_commit_id
1064 || base_commit.graph_commit_id == source_head_commit_id
1065 {
1066 return Ok(MergeOutcome::AlreadyUpToDate);
1067 }
1068 let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1069
1070 let base_snapshot = ManifestCoordinator::snapshot_at(
1071 self.uri(),
1072 base_commit.manifest_branch.as_deref(),
1073 base_commit.manifest_version,
1074 )
1075 .await?;
1076 let source_snapshot = self
1077 .resolved_target(ReadTarget::Branch(
1078 source_branch.clone().unwrap_or_else(|| "main".to_string()),
1079 ))
1080 .await?
1081 .snapshot;
1082 let previous_branch = self.active_branch().map(str::to_string);
1083 let previous = self
1084 .swap_coordinator_for_branch(target_branch.as_deref())
1085 .await?;
1086 let merge_result = self
1087 .branch_merge_on_current_target(
1088 &base_snapshot,
1089 &source_snapshot,
1090 &target_head_commit_id,
1091 &source_head_commit_id,
1092 is_fast_forward,
1093 )
1094 .await;
1095 self.restore_coordinator(previous);
1096
1097 if merge_result.is_ok() && previous_branch == target_branch {
1098 self.refresh().await?;
1099 }
1100
1101 merge_result
1102 }
1103
1104 async fn branch_merge_on_current_target(
1105 &mut self,
1106 base_snapshot: &Snapshot,
1107 source_snapshot: &Snapshot,
1108 target_head_commit_id: &str,
1109 source_head_commit_id: &str,
1110 is_fast_forward: bool,
1111 ) -> Result<MergeOutcome> {
1112 self.ensure_commit_graph_initialized().await?;
1113 let target_snapshot = self.snapshot();
1114
1115 let mut table_keys = HashSet::new();
1116 for entry in base_snapshot.entries() {
1117 table_keys.insert(entry.table_key.clone());
1118 }
1119 for entry in source_snapshot.entries() {
1120 table_keys.insert(entry.table_key.clone());
1121 }
1122 for entry in target_snapshot.entries() {
1123 table_keys.insert(entry.table_key.clone());
1124 }
1125
1126 let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1127 ordered_table_keys.sort();
1128
1129 let mut conflicts = Vec::new();
1130 let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1131
1132 for table_key in &ordered_table_keys {
1133 let base_entry = base_snapshot.entry(table_key);
1134 let source_entry = source_snapshot.entry(table_key);
1135 let target_entry = target_snapshot.entry(table_key);
1136 if same_manifest_state(source_entry, target_entry) {
1137 continue;
1138 }
1139 if same_manifest_state(base_entry, source_entry) {
1140 continue;
1141 }
1142 if same_manifest_state(base_entry, target_entry) {
1143 candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
1144 continue;
1145 }
1146
1147 if let Some(staged) = stage_streaming_table_merge(
1148 table_key,
1149 self.catalog(),
1150 base_snapshot,
1151 source_snapshot,
1152 &target_snapshot,
1153 &mut conflicts,
1154 )
1155 .await?
1156 {
1157 candidates.insert(
1158 table_key.clone(),
1159 CandidateTableState::RewriteMerged(staged),
1160 );
1161 }
1162 }
1163
1164 if !conflicts.is_empty() {
1165 return Err(OmniError::MergeConflicts(conflicts));
1166 }
1167
1168 validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1169
1170 let mut updates = Vec::new();
1171 let mut changed_edge_tables = false;
1172 for table_key in &ordered_table_keys {
1173 let Some(candidate_state) = candidates.get(table_key) else {
1174 continue;
1175 };
1176 let update = match candidate_state {
1177 CandidateTableState::AdoptSourceState => {
1178 publish_adopted_source_state(
1179 self,
1180 self.catalog(),
1181 base_snapshot,
1182 source_snapshot,
1183 &target_snapshot,
1184 table_key,
1185 )
1186 .await?
1187 }
1188 CandidateTableState::RewriteMerged(staged) => {
1189 publish_rewritten_merge_table(self, table_key, staged).await?
1190 }
1191 };
1192 if table_key.starts_with("edge:") {
1193 changed_edge_tables = true;
1194 }
1195 updates.push(update);
1196 }
1197
1198 let manifest_version = if updates.is_empty() {
1199 self.version()
1200 } else {
1201 self.commit_manifest_updates(&updates).await?
1202 };
1203 self.record_merge_commit(
1204 manifest_version,
1205 target_head_commit_id,
1206 source_head_commit_id,
1207 )
1208 .await?;
1209
1210 if changed_edge_tables {
1211 self.invalidate_graph_index().await;
1212 }
1213
1214 Ok(if is_fast_forward {
1215 MergeOutcome::FastForward
1216 } else {
1217 MergeOutcome::Merged
1218 })
1219 }
1220}