1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8 AdoptSourceState,
9 RewriteMerged(StagedMergeResult),
10}
11
12#[derive(Debug)]
13struct StagedTable {
14 _dir: TempDir,
15 dataset: Dataset,
16}
17
18#[derive(Debug)]
19struct StagedMergeResult {
20 full_staged: StagedTable,
21 delta_staged: Option<StagedTable>,
22 deleted_ids: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
26struct CursorRow {
27 id: String,
28 signature: String,
29 dataset: Dataset,
30 batch: RecordBatch,
31 row_index: usize,
32}
33
34struct OrderedTableCursor {
35 stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
36 dataset: Option<Dataset>,
37 current_batch: Option<RecordBatch>,
38 current_row: usize,
39 peeked: Option<CursorRow>,
40}
41
42impl OrderedTableCursor {
43 async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
44 let dataset = match snapshot.entry(table_key) {
45 Some(_) => Some(snapshot.open(table_key).await?),
46 None => None,
47 };
48 Self::from_dataset(dataset).await
49 }
50
51 async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
52 let stream = if let Some(ds) = &dataset {
53 Some(Box::pin(
54 crate::table_store::TableStore::scan_stream_with(
55 ds,
56 None,
57 None,
58 Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
59 true,
60 |_| Ok(()),
61 )
62 .await?,
63 ))
64 } else {
65 None
66 };
67
68 Ok(Self {
69 stream,
70 dataset,
71 current_batch: None,
72 current_row: 0,
73 peeked: None,
74 })
75 }
76
77 async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
78 if self.peeked.is_none() {
79 self.peeked = self.next_row().await?;
80 }
81 Ok(self.peeked.clone())
82 }
83
84 async fn pop(&mut self) -> Result<Option<CursorRow>> {
85 if self.peeked.is_some() {
86 return Ok(self.peeked.take());
87 }
88 self.next_row().await
89 }
90
91 async fn next_row(&mut self) -> Result<Option<CursorRow>> {
92 loop {
93 if let Some(batch) = &self.current_batch {
94 if self.current_row < batch.num_rows() {
95 let row_index = self.current_row;
96 self.current_row += 1;
97 let dataset = self.dataset.clone().ok_or_else(|| {
98 OmniError::manifest("cursor row missing source dataset".to_string())
99 })?;
100 return Ok(Some(CursorRow {
101 id: row_id_at(batch, row_index)?,
102 signature: row_signature(batch, row_index)?,
103 dataset,
104 batch: batch.clone(),
105 row_index,
106 }));
107 }
108 }
109
110 let Some(stream) = self.stream.as_mut() else {
111 return Ok(None);
112 };
113 match stream.try_next().await {
114 Ok(Some(batch)) => {
115 self.current_batch = Some(batch);
116 self.current_row = 0;
117 }
118 Ok(None) => {
119 self.stream = None;
120 self.current_batch = None;
121 return Ok(None);
122 }
123 Err(err) => return Err(OmniError::Lance(err.to_string())),
124 }
125 }
126 }
127}
128
129struct StagedTableWriter {
130 schema: SchemaRef,
131 dataset_uri: String,
132 dir: TempDir,
133 dataset: Option<Dataset>,
134 buffered_rows: usize,
135 row_count: u64,
136 batches: Vec<RecordBatch>,
137}
138
139impl StagedTableWriter {
140 fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
141 let dir = merge_stage_tempdir(table_key)?;
142 let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
143 Ok(Self {
144 schema,
145 dataset_uri,
146 dir,
147 dataset: None,
148 buffered_rows: 0,
149 row_count: 0,
150 batches: Vec::new(),
151 })
152 }
153
154 async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
155 self.row_count += 1;
156 self.buffered_rows += 1;
157 self.batches.push(self.row_batch(row).await?);
158 if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
159 self.flush().await?;
160 }
161 Ok(())
162 }
163
164 async fn row_batch(&self, row: &CursorRow) -> Result<RecordBatch> {
165 let batch = row.batch.slice(row.row_index, 1);
166 let has_blob_columns = row
167 .dataset
168 .schema()
169 .fields_pre_order()
170 .any(|field| field.is_blob());
171 if has_blob_columns {
172 return crate::table_store::TableStore::materialize_blob_batch(&row.dataset, batch)
173 .await;
174 }
175 let columns = self
176 .schema
177 .fields()
178 .iter()
179 .map(|field| {
180 batch.column_by_name(field.name()).cloned().ok_or_else(|| {
181 OmniError::Lance(format!("batch missing column '{}'", field.name()))
182 })
183 })
184 .collect::<Result<Vec<_>>>()?;
185 RecordBatch::try_new(self.schema.clone(), columns)
186 .map_err(|e| OmniError::Lance(e.to_string()))
187 }
188
189 async fn finish(mut self) -> Result<StagedTable> {
190 self.flush().await?;
191 if self.dataset.is_none() {
192 self.dataset = Some(
193 crate::table_store::TableStore::create_empty_dataset(
194 &self.dataset_uri,
195 &self.schema,
196 )
197 .await?,
198 );
199 }
200 Ok(StagedTable {
201 _dir: self.dir,
202 dataset: self.dataset.unwrap(),
203 })
204 }
205
206 async fn flush(&mut self) -> Result<()> {
207 if self.batches.is_empty() {
208 return Ok(());
209 }
210
211 let batch = if self.batches.len() == 1 {
212 self.batches.pop().unwrap()
213 } else {
214 let batches = std::mem::take(&mut self.batches);
215 arrow_select::concat::concat_batches(&self.schema, &batches)
216 .map_err(|e| OmniError::Lance(e.to_string()))?
217 };
218 self.buffered_rows = 0;
219
220 let ds = crate::table_store::TableStore::append_or_create_batch(
221 &self.dataset_uri,
222 self.dataset.take(),
223 batch,
224 )
225 .await?;
226 self.dataset = Some(ds);
227 Ok(())
228 }
229}
230
231fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
232 if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
233 return TempDirBuilder::new()
234 .prefix(&format!(
235 "omnigraph-merge-{}-",
236 sanitize_table_key(table_key)
237 ))
238 .tempdir_in(PathBuf::from(root))
239 .map_err(OmniError::from);
240 }
241 TempDirBuilder::new()
242 .prefix(&format!(
243 "omnigraph-merge-{}-",
244 sanitize_table_key(table_key)
245 ))
246 .tempdir()
247 .map_err(OmniError::from)
248}
249
250fn sanitize_table_key(table_key: &str) -> String {
251 table_key
252 .chars()
253 .map(|ch| match ch {
254 ':' | '/' | '\\' => '-',
255 other => other,
256 })
257 .collect()
258}
259
260async fn compute_source_delta(
263 table_key: &str,
264 catalog: &Catalog,
265 base_snapshot: &Snapshot,
266 source_snapshot: &Snapshot,
267) -> Result<Option<StagedMergeResult>> {
268 let schema = schema_for_table_key(catalog, table_key)?;
269 let mut full_writer =
270 StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
271 let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
272 let mut deleted_ids: Vec<String> = Vec::new();
273 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
274 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
275
276 let mut needs_update = false;
277
278 loop {
279 let base_row = base.peek_cloned().await?;
280 let source_row = source.peek_cloned().await?;
281
282 let next_id = [base_row.as_ref(), source_row.as_ref()]
283 .into_iter()
284 .flatten()
285 .map(|row| row.id.clone())
286 .min();
287 let Some(next_id) = next_id else { break };
288
289 let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
290 base.pop().await?
291 } else {
292 None
293 };
294 let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
295 source.pop().await?
296 } else {
297 None
298 };
299
300 let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
301 let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
302
303 match (&base_row, &source_row) {
304 (Some(_), None) => {
305 deleted_ids.push(next_id);
307 needs_update = true;
308 }
309 (None, Some(src)) => {
310 full_writer.push_row(src).await?;
312 delta_writer.push_row(src).await?;
313 needs_update = true;
314 }
315 (Some(_), Some(src)) if source_sig != base_sig => {
316 full_writer.push_row(src).await?;
318 delta_writer.push_row(src).await?;
319 needs_update = true;
320 }
321 (Some(base), Some(_)) => {
322 full_writer.push_row(base).await?;
324 }
325 (None, None) => unreachable!(),
326 }
327 }
328
329 if !needs_update {
330 return Ok(None);
331 }
332
333 let delta_staged = if delta_writer.row_count > 0 {
334 Some(delta_writer.finish().await?)
335 } else {
336 None
337 };
338
339 Ok(Some(StagedMergeResult {
340 full_staged: full_writer.finish().await?,
341 delta_staged,
342 deleted_ids,
343 }))
344}
345
346fn min_cursor_id(
347 base_row: &Option<CursorRow>,
348 source_row: &Option<CursorRow>,
349 target_row: &Option<CursorRow>,
350) -> Option<String> {
351 [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
352 .into_iter()
353 .flatten()
354 .map(|row| row.id.clone())
355 .min()
356}
357
358async fn stage_streaming_table_merge(
359 table_key: &str,
360 catalog: &Catalog,
361 base_snapshot: &Snapshot,
362 source_snapshot: &Snapshot,
363 target_snapshot: &Snapshot,
364 conflicts: &mut Vec<MergeConflict>,
365) -> Result<Option<StagedMergeResult>> {
366 let schema = schema_for_table_key(catalog, table_key)?;
367 let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
368 let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
369 let mut deleted_ids: Vec<String> = Vec::new();
370 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
371 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
372 let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
373
374 let prior_conflict_count = conflicts.len();
375 let mut needs_update = false;
376
377 loop {
378 let base_row = base.peek_cloned().await?;
379 let source_row = source.peek_cloned().await?;
380 let target_row = target.peek_cloned().await?;
381 let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
382 break;
383 };
384
385 let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
386 base.pop().await?
387 } else {
388 None
389 };
390 let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
391 {
392 source.pop().await?
393 } else {
394 None
395 };
396 let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
397 {
398 target.pop().await?
399 } else {
400 None
401 };
402
403 let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
404 let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
405 let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
406
407 let source_changed = source_sig != base_sig;
408 let target_changed = target_sig != base_sig;
409
410 let selection = if !source_changed {
411 target_row.as_ref()
412 } else if !target_changed {
413 source_row.as_ref()
414 } else if source_sig == target_sig {
415 target_row.as_ref()
416 } else {
417 conflicts.push(classify_merge_conflict(
418 table_key, &next_id, base_sig, source_sig, target_sig,
419 ));
420 None
421 };
422
423 if conflicts.len() > prior_conflict_count {
424 continue;
425 }
426
427 if selection.is_none() && target_row.is_some() {
429 deleted_ids.push(next_id.clone());
430 needs_update = true;
431 continue;
432 }
433
434 if let Some(selection) = selection {
435 full_writer.push_row(selection).await?;
437 if selection.signature.as_str() != target_sig.unwrap_or("") {
439 delta_writer.push_row(selection).await?;
440 needs_update = true;
441 }
442 }
443 }
444
445 if conflicts.len() > prior_conflict_count {
446 return Ok(None);
447 }
448 if !needs_update {
449 return Ok(None);
450 }
451
452 let delta_staged = if delta_writer.row_count > 0 {
453 Some(delta_writer.finish().await?)
454 } else {
455 None
456 };
457
458 Ok(Some(StagedMergeResult {
459 full_staged: full_writer.finish().await?,
460 delta_staged,
461 deleted_ids,
462 }))
463}
464
465fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
466 if let Some(name) = table_key.strip_prefix("node:") {
467 return catalog
468 .node_types
469 .get(name)
470 .map(|t| t.arrow_schema.clone())
471 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
472 }
473 if let Some(name) = table_key.strip_prefix("edge:") {
474 return catalog
475 .edge_types
476 .get(name)
477 .map(|t| t.arrow_schema.clone())
478 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
479 }
480 Err(OmniError::manifest(format!(
481 "invalid table key '{}'",
482 table_key
483 )))
484}
485
486fn same_manifest_state(
487 left: Option<&crate::db::SubTableEntry>,
488 right: Option<&crate::db::SubTableEntry>,
489) -> bool {
490 match (left, right) {
491 (Some(left), Some(right)) => {
492 left.table_version == right.table_version && left.table_branch == right.table_branch
493 }
494 (None, None) => true,
495 _ => false,
496 }
497}
498
499fn classify_merge_conflict(
500 table_key: &str,
501 row_id: &str,
502 base_sig: Option<&str>,
503 source_sig: Option<&str>,
504 target_sig: Option<&str>,
505) -> MergeConflict {
506 let (kind, message) = match (base_sig, source_sig, target_sig) {
507 (None, Some(_), Some(_)) => (
508 MergeConflictKind::DivergentInsert,
509 format!("divergent insert for id '{}'", row_id),
510 ),
511 (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
512 MergeConflictKind::DeleteVsUpdate,
513 format!("delete/update conflict for id '{}'", row_id),
514 ),
515 _ => (
516 MergeConflictKind::DivergentUpdate,
517 format!("divergent update for id '{}'", row_id),
518 ),
519 };
520 MergeConflict {
521 table_key: table_key.to_string(),
522 row_id: Some(row_id.to_string()),
523 kind,
524 message,
525 }
526}
527
528fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
529 let mut values = Vec::with_capacity(batch.num_columns());
530 for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
531 if field.name().starts_with("_row") {
532 continue;
533 }
534 values.push(
535 array_value_to_string(column.as_ref(), row)
536 .map_err(|e| OmniError::Lance(e.to_string()))?,
537 );
538 }
539 Ok(values.join("\u{1f}"))
540}
541
542async fn scan_validation_stream(ds: &Dataset) -> Result<DatasetRecordBatchStream> {
543 crate::table_store::TableStore::scan_stream_with(ds, None, None, None, false, |_| Ok(())).await
544}
545
546async fn validate_merge_candidates(
547 db: &Omnigraph,
548 source_snapshot: &Snapshot,
549 target_snapshot: &Snapshot,
550 candidates: &HashMap<String, CandidateTableState>,
551) -> Result<()> {
552 let mut conflicts = Vec::new();
553 let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
554
555 for (type_name, node_type) in &db.catalog().node_types {
556 let table_key = format!("node:{}", type_name);
557 let mut values = HashSet::new();
558 let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
559
560 if let Some(ds) =
561 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
562 {
563 let mut stream = scan_validation_stream(&ds).await?;
564 while let Some(batch) = stream
565 .try_next()
566 .await
567 .map_err(|e| OmniError::Lance(e.to_string()))?
568 {
569 if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
570 conflicts.push(MergeConflict {
571 table_key: table_key.clone(),
572 row_id: None,
573 kind: MergeConflictKind::ValueConstraintViolation,
574 message: err.to_string(),
575 });
576 }
577 update_unique_constraints(
578 &table_key,
579 &batch,
580 &node_type.unique_constraints,
581 &mut unique_seen,
582 &mut conflicts,
583 )?;
584 let ids = batch
585 .column_by_name("id")
586 .ok_or_else(|| {
587 OmniError::manifest(format!("table {} missing id column", table_key))
588 })?
589 .as_any()
590 .downcast_ref::<StringArray>()
591 .ok_or_else(|| {
592 OmniError::manifest(format!("table {} id column is not Utf8", table_key))
593 })?;
594 for row in 0..ids.len() {
595 values.insert(ids.value(row).to_string());
596 }
597 }
598 }
599 node_ids.insert(type_name.clone(), values);
600 }
601
602 for (edge_name, edge_type) in &db.catalog().edge_types {
603 let table_key = format!("edge:{}", edge_name);
604 let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
605 let mut src_counts = HashMap::new();
606
607 if let Some(ds) =
608 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
609 {
610 let mut stream = scan_validation_stream(&ds).await?;
611 while let Some(batch) = stream
612 .try_next()
613 .await
614 .map_err(|e| OmniError::Lance(e.to_string()))?
615 {
616 update_unique_constraints(
617 &table_key,
618 &batch,
619 &edge_type.unique_constraints,
620 &mut unique_seen,
621 &mut conflicts,
622 )?;
623 accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
624 conflicts.extend(validate_orphan_edges_batch(
625 &table_key, edge_type, &batch, &node_ids,
626 )?);
627 }
628 }
629
630 conflicts.extend(finalize_edge_cardinality_conflicts(
631 &table_key,
632 edge_name,
633 edge_type.cardinality.min,
634 edge_type.cardinality.max,
635 src_counts,
636 ));
637 }
638
639 if conflicts.is_empty() {
640 Ok(())
641 } else {
642 Err(OmniError::MergeConflicts(conflicts))
643 }
644}
645
646async fn candidate_dataset(
647 source_snapshot: &Snapshot,
648 target_snapshot: &Snapshot,
649 candidates: &HashMap<String, CandidateTableState>,
650 table_key: &str,
651) -> Result<Option<Dataset>> {
652 if let Some(candidate) = candidates.get(table_key) {
653 return match candidate {
654 CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
655 Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
656 None => Ok(None),
657 },
658 CandidateTableState::RewriteMerged(staged) => {
659 Ok(Some(staged.full_staged.dataset.clone()))
660 }
661 };
662 }
663 match target_snapshot.entry(table_key) {
664 Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
665 None => Ok(None),
666 }
667}
668
669fn update_unique_constraints(
670 table_key: &str,
671 batch: &RecordBatch,
672 constraints: &[Vec<String>],
673 seen: &mut [HashMap<Vec<String>, String>],
674 conflicts: &mut Vec<MergeConflict>,
675) -> Result<()> {
676 for (constraint_idx, columns) in constraints.iter().enumerate() {
677 let seen = &mut seen[constraint_idx];
678 let group_columns = columns
682 .iter()
683 .map(|column_name| {
684 batch.column_by_name(column_name).cloned().ok_or_else(|| {
685 OmniError::manifest(format!(
686 "table {} missing unique column '{}'",
687 table_key, column_name
688 ))
689 })
690 })
691 .collect::<Result<Vec<_>>>()?;
692 for row in 0..batch.num_rows() {
693 let Some(key) = crate::loader::composite_unique_key(&group_columns, row)? else {
697 continue;
698 };
699 let row_id = row_id_at(batch, row)?;
700 if let Some(first_row_id) = seen.insert(key, row_id.clone()) {
701 conflicts.push(MergeConflict {
702 table_key: table_key.to_string(),
703 row_id: Some(row_id.clone()),
704 kind: MergeConflictKind::UniqueViolation,
705 message: format!(
706 "unique constraint {:?} violated by '{}' and '{}'",
707 columns, first_row_id, row_id
708 ),
709 });
710 }
711 }
712 }
713 Ok(())
714}
715
716fn accumulate_edge_cardinality(
717 batch: &RecordBatch,
718 counts: &mut HashMap<String, u32>,
719 table_key: &str,
720) -> Result<()> {
721 let srcs = batch
722 .column_by_name("src")
723 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
724 .as_any()
725 .downcast_ref::<StringArray>()
726 .ok_or_else(|| {
727 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
728 })?;
729 for row in 0..srcs.len() {
730 *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
731 }
732 Ok(())
733}
734
735fn finalize_edge_cardinality_conflicts(
736 table_key: &str,
737 edge_name: &str,
738 min: u32,
739 max: Option<u32>,
740 counts: HashMap<String, u32>,
741) -> Vec<MergeConflict> {
742 let mut conflicts = Vec::new();
743 for (src, count) in counts {
744 if let Some(max) = max {
745 if count > max {
746 conflicts.push(MergeConflict {
747 table_key: table_key.to_string(),
748 row_id: None,
749 kind: MergeConflictKind::CardinalityViolation,
750 message: format!(
751 "@card violation on edge {}: source '{}' has {} edges (max {})",
752 edge_name, src, count, max
753 ),
754 });
755 }
756 }
757 if count < min {
758 conflicts.push(MergeConflict {
759 table_key: table_key.to_string(),
760 row_id: None,
761 kind: MergeConflictKind::CardinalityViolation,
762 message: format!(
763 "@card violation on edge {}: source '{}' has {} edges (min {})",
764 edge_name, src, count, min
765 ),
766 });
767 }
768 }
769 conflicts
770}
771
772fn validate_orphan_edges_batch(
773 table_key: &str,
774 edge_type: &omnigraph_compiler::catalog::EdgeType,
775 batch: &RecordBatch,
776 node_ids: &HashMap<String, HashSet<String>>,
777) -> Result<Vec<MergeConflict>> {
778 let srcs = batch
779 .column_by_name("src")
780 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
781 .as_any()
782 .downcast_ref::<StringArray>()
783 .ok_or_else(|| {
784 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
785 })?;
786 let dsts = batch
787 .column_by_name("dst")
788 .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
789 .as_any()
790 .downcast_ref::<StringArray>()
791 .ok_or_else(|| {
792 OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
793 })?;
794
795 let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
796 OmniError::manifest(format!(
797 "missing candidate node ids for {}",
798 edge_type.from_type
799 ))
800 })?;
801 let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
802 OmniError::manifest(format!(
803 "missing candidate node ids for {}",
804 edge_type.to_type
805 ))
806 })?;
807
808 let mut conflicts = Vec::new();
809 for row in 0..batch.num_rows() {
810 let row_id = row_id_at(batch, row)?;
811 let src = srcs.value(row);
812 let dst = dsts.value(row);
813 if !from_ids.contains(src) {
814 conflicts.push(MergeConflict {
815 table_key: table_key.to_string(),
816 row_id: Some(row_id.clone()),
817 kind: MergeConflictKind::OrphanEdge,
818 message: format!("src '{}' not found in {}", src, edge_type.from_type),
819 });
820 }
821 if !to_ids.contains(dst) {
822 conflicts.push(MergeConflict {
823 table_key: table_key.to_string(),
824 row_id: Some(row_id),
825 kind: MergeConflictKind::OrphanEdge,
826 message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
827 });
828 }
829 }
830 Ok(conflicts)
831}
832
833fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
834 let ids = batch
835 .column_by_name("id")
836 .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
837 .as_any()
838 .downcast_ref::<StringArray>()
839 .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
840 Ok(ids.value(row).to_string())
841}
842
843async fn publish_adopted_source_state(
844 target_db: &Omnigraph,
845 catalog: &Catalog,
846 base_snapshot: &Snapshot,
847 source_snapshot: &Snapshot,
848 target_snapshot: &Snapshot,
849 table_key: &str,
850) -> Result<crate::db::SubTableUpdate> {
851 let source_entry = source_snapshot
852 .entry(table_key)
853 .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
854 let target_entry = target_snapshot.entry(table_key);
855
856 let target_active = target_db.active_branch().await;
857 match (
858 target_active.as_deref(),
859 source_entry.table_branch.as_deref(),
860 ) {
861 (None, None) => Ok(crate::db::SubTableUpdate {
863 table_key: table_key.to_string(),
864 table_version: source_entry.table_version,
865 table_branch: None,
866 row_count: source_entry.row_count,
867 version_metadata: source_entry.version_metadata.clone(),
868 }),
869 (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
872 table_key: table_key.to_string(),
873 table_version: source_entry.table_version,
874 table_branch: None,
875 row_count: source_entry.row_count,
876 version_metadata: source_entry.version_metadata.clone(),
877 }),
878 (None, Some(_source_branch)) => {
880 let delta =
881 compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
882 match delta {
883 Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
884 None => Ok(crate::db::SubTableUpdate {
885 table_key: table_key.to_string(),
886 table_version: target_entry
887 .map(|e| e.table_version)
888 .unwrap_or(source_entry.table_version),
889 table_branch: None,
890 row_count: source_entry.row_count,
891 version_metadata: target_entry
892 .map(|entry| entry.version_metadata.clone())
893 .unwrap_or_else(|| source_entry.version_metadata.clone()),
894 }),
895 }
896 }
897 (Some(target_branch), Some(source_branch)) => {
899 if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
900 let delta =
902 compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
903 .await?;
904 match delta {
905 Some(staged) => {
906 publish_rewritten_merge_table(target_db, table_key, &staged).await
907 }
908 None => Ok(crate::db::SubTableUpdate {
909 table_key: table_key.to_string(),
910 table_version: target_entry.unwrap().table_version,
911 table_branch: Some(target_branch.to_string()),
912 row_count: source_entry.row_count,
913 version_metadata: target_entry.unwrap().version_metadata.clone(),
914 }),
915 }
916 } else {
917 let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
920 let ds = target_db
921 .fork_dataset_from_entry_state(
922 table_key,
923 &full_path,
924 Some(source_branch),
925 source_entry.table_version,
926 target_branch,
927 )
928 .await?;
929 let state = target_db.storage().table_state(&full_path, &ds).await?;
930 Ok(crate::db::SubTableUpdate {
931 table_key: table_key.to_string(),
932 table_version: state.version,
933 table_branch: Some(target_branch.to_string()),
934 row_count: state.row_count,
935 version_metadata: state.version_metadata,
936 })
937 }
938 }
939 }
940}
941
942async fn publish_rewritten_merge_table(
943 target_db: &Omnigraph,
944 table_key: &str,
945 staged: &StagedMergeResult,
946) -> Result<crate::db::SubTableUpdate> {
947 let (ds, full_path, table_branch) = target_db
952 .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
953 .await?;
954 let mut current_ds = ds;
955
956 if let Some(delta) = &staged.delta_staged {
966 let delta_snapshot = SnapshotHandle::new(delta.dataset.clone());
970 let batches: Vec<RecordBatch> = target_db
971 .storage()
972 .scan_batches_for_rewrite(&delta_snapshot)
973 .await?
974 .into_iter()
975 .filter(|batch| batch.num_rows() > 0)
976 .collect();
977 if !batches.is_empty() {
978 let combined = if batches.len() == 1 {
980 batches.into_iter().next().unwrap()
981 } else {
982 let schema = batches[0].schema();
983 arrow_select::concat::concat_batches(&schema, &batches)
984 .map_err(|e| OmniError::Lance(e.to_string()))?
985 };
986 let staged_merge = target_db
987 .storage()
988 .stage_merge_insert(
989 current_ds.clone(),
990 combined,
991 vec!["id".to_string()],
992 lance::dataset::WhenMatched::UpdateAll,
993 lance::dataset::WhenNotMatched::InsertAll,
994 )
995 .await?;
996 current_ds = target_db
997 .storage()
998 .commit_staged(current_ds, staged_merge)
999 .await?;
1000 }
1001 }
1002
1003 if !staged.deleted_ids.is_empty() {
1013 let escaped: Vec<String> = staged
1014 .deleted_ids
1015 .iter()
1016 .map(|id| format!("'{}'", id.replace('\'', "''")))
1017 .collect();
1018 let filter = format!("id IN ({})", escaped.join(", "));
1019 let (new_ds, _) = target_db
1020 .storage_inline_residual()
1021 .delete_where(&full_path, current_ds, &filter)
1022 .await?;
1023 current_ds = new_ds;
1024 }
1025
1026 let row_count = target_db
1034 .storage()
1035 .table_state(&full_path, ¤t_ds)
1036 .await?
1037 .row_count;
1038 if row_count > 0 {
1039 target_db
1040 .build_indices_on_dataset(table_key, &mut current_ds)
1041 .await?;
1042 }
1043 let final_state = target_db
1044 .storage()
1045 .table_state(&full_path, ¤t_ds)
1046 .await?;
1047
1048 Ok(crate::db::SubTableUpdate {
1049 table_key: table_key.to_string(),
1050 table_version: final_state.version,
1051 table_branch,
1052 row_count: final_state.row_count,
1053 version_metadata: final_state.version_metadata,
1054 })
1055}
1056
1057impl Omnigraph {
1058 pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
1059 self.branch_merge_as(source, target, None).await
1060 }
1061
1062 pub async fn branch_merge_as(
1063 &self,
1064 source: &str,
1065 target: &str,
1066 actor_id: Option<&str>,
1067 ) -> Result<MergeOutcome> {
1068 self.enforce(
1076 omnigraph_policy::PolicyAction::BranchMerge,
1077 &omnigraph_policy::ResourceScope::BranchTransition {
1078 source: source.to_string(),
1079 target: target.to_string(),
1080 },
1081 actor_id,
1082 )?;
1083 self.ensure_schema_apply_idle("branch_merge").await?;
1084 self.heal_pending_recovery_sidecars().await?;
1091 self.branch_merge_impl(source, target, actor_id).await
1092 }
1093
1094 async fn branch_merge_impl(
1095 &self,
1096 source: &str,
1097 target: &str,
1098 actor_id: Option<&str>,
1099 ) -> Result<MergeOutcome> {
1100 if is_internal_system_branch(source) || is_internal_system_branch(target) {
1101 return Err(OmniError::manifest(format!(
1102 "branch_merge does not allow internal system refs ('{}' -> '{}')",
1103 source, target
1104 )));
1105 }
1106 let source_branch = Omnigraph::normalize_branch_name(source)?;
1107 let target_branch = Omnigraph::normalize_branch_name(target)?;
1108 if source_branch == target_branch {
1109 return Err(OmniError::manifest(
1110 "branch_merge requires distinct source and target branches".to_string(),
1111 ));
1112 }
1113
1114 let source_head_commit_id = self
1115 .head_commit_id_for_branch(source_branch.as_deref())
1116 .await?
1117 .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1118 let target_head_commit_id = self
1119 .head_commit_id_for_branch(target_branch.as_deref())
1120 .await?
1121 .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1122 let base_commit = CommitGraph::merge_base(
1123 self.uri(),
1124 source_branch.as_deref(),
1125 target_branch.as_deref(),
1126 )
1127 .await?
1128 .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1129
1130 if source_head_commit_id == target_head_commit_id
1131 || base_commit.graph_commit_id == source_head_commit_id
1132 {
1133 return Ok(MergeOutcome::AlreadyUpToDate);
1134 }
1135 let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1136
1137 let base_snapshot = ManifestCoordinator::snapshot_at(
1138 self.uri(),
1139 base_commit.manifest_branch.as_deref(),
1140 base_commit.manifest_version,
1141 )
1142 .await?;
1143 let source_snapshot = self
1144 .resolved_target(ReadTarget::Branch(
1145 source_branch.clone().unwrap_or_else(|| "main".to_string()),
1146 ))
1147 .await?
1148 .snapshot;
1149 let merge_exclusive = self.merge_exclusive();
1157 let _merge_guard = merge_exclusive.lock().await;
1158
1159 let previous_branch = self.active_branch().await;
1160 let previous = self
1161 .swap_coordinator_for_branch(target_branch.as_deref())
1162 .await?;
1163 let merge_result = self
1164 .branch_merge_on_current_target(
1165 &base_snapshot,
1166 &source_snapshot,
1167 &target_head_commit_id,
1168 &source_head_commit_id,
1169 is_fast_forward,
1170 actor_id,
1171 )
1172 .await;
1173 self.restore_coordinator(previous).await;
1174
1175 if previous_branch == target_branch {
1210 if let Err(refresh_err) = self.refresh_coordinator_only().await {
1211 if merge_result.is_ok() {
1212 return Err(refresh_err);
1213 }
1214 tracing::warn!(
1215 error = %refresh_err,
1216 "post-merge coordinator refresh failed on the error path; \
1217 the next op or open will re-sync"
1218 );
1219 }
1220 }
1221
1222 merge_result
1223 }
1224
1225 async fn branch_merge_on_current_target(
1226 &self,
1227 base_snapshot: &Snapshot,
1228 source_snapshot: &Snapshot,
1229 target_head_commit_id: &str,
1230 source_head_commit_id: &str,
1231 is_fast_forward: bool,
1232 actor_id: Option<&str>,
1233 ) -> Result<MergeOutcome> {
1234 self.ensure_commit_graph_initialized().await?;
1235 let target_snapshot = self.snapshot().await;
1236
1237 let mut table_keys = HashSet::new();
1238 for entry in base_snapshot.entries() {
1239 table_keys.insert(entry.table_key.clone());
1240 }
1241 for entry in source_snapshot.entries() {
1242 table_keys.insert(entry.table_key.clone());
1243 }
1244 for entry in target_snapshot.entries() {
1245 table_keys.insert(entry.table_key.clone());
1246 }
1247
1248 let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1249 ordered_table_keys.sort();
1250
1251 let mut conflicts = Vec::new();
1252 let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1253
1254 for table_key in &ordered_table_keys {
1255 let base_entry = base_snapshot.entry(table_key);
1256 let source_entry = source_snapshot.entry(table_key);
1257 let target_entry = target_snapshot.entry(table_key);
1258 if same_manifest_state(source_entry, target_entry) {
1259 continue;
1260 }
1261 if same_manifest_state(base_entry, source_entry) {
1262 continue;
1263 }
1264 if same_manifest_state(base_entry, target_entry) {
1265 candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
1266 continue;
1267 }
1268
1269 if let Some(staged) = stage_streaming_table_merge(
1270 table_key,
1271 &self.catalog(),
1272 base_snapshot,
1273 source_snapshot,
1274 &target_snapshot,
1275 &mut conflicts,
1276 )
1277 .await?
1278 {
1279 candidates.insert(
1280 table_key.clone(),
1281 CandidateTableState::RewriteMerged(staged),
1282 );
1283 }
1284 }
1285
1286 if !conflicts.is_empty() {
1287 return Err(OmniError::MergeConflicts(conflicts));
1288 }
1289
1290 validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1291
1292 let active_branch_for_keys = self.active_branch().await;
1330 let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
1331 .iter()
1332 .filter(|table_key| {
1333 matches!(
1334 candidates.get(*table_key),
1335 Some(CandidateTableState::RewriteMerged(_))
1336 | Some(CandidateTableState::AdoptSourceState)
1337 )
1338 })
1339 .map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
1340 .collect();
1341 let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
1342
1343 let post_queue_snapshot = self.snapshot().await;
1344 for table_key in &ordered_table_keys {
1345 let Some(candidate) = candidates.get(table_key) else {
1346 continue;
1347 };
1348 if !matches!(
1349 candidate,
1350 CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState
1351 ) {
1352 continue;
1353 }
1354 let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
1355 let current = post_queue_snapshot
1356 .entry(table_key)
1357 .map(|e| e.table_version);
1358 if expected != current {
1359 return Err(OmniError::manifest_expected_version_mismatch(
1360 table_key.clone(),
1361 expected.unwrap_or(0),
1362 current.unwrap_or(0),
1363 ));
1364 }
1365 }
1366
1367 let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
1368 .iter()
1369 .filter_map(|table_key| {
1370 let candidate = candidates.get(table_key)?;
1371 if !matches!(candidate, CandidateTableState::RewriteMerged(_)) {
1372 return None;
1373 }
1374 let entry = target_snapshot.entry(table_key)?;
1375 Some(crate::db::manifest::SidecarTablePin {
1376 table_key: table_key.clone(),
1377 table_path: self.storage().dataset_uri(&entry.table_path),
1378 expected_version: entry.table_version,
1379 post_commit_pin: entry.table_version + 1,
1380 table_branch: active_branch_for_keys.clone(),
1393 })
1394 })
1395 .collect();
1396 let recovery_handle = if recovery_pins.is_empty() {
1397 None
1398 } else {
1399 let target_branch = active_branch_for_keys.clone();
1409 let mut sidecar = crate::db::manifest::new_sidecar(
1410 crate::db::manifest::SidecarKind::BranchMerge,
1411 target_branch,
1412 actor_id.map(str::to_string),
1413 recovery_pins,
1414 );
1415 sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
1421 Some(
1422 crate::db::manifest::write_sidecar(
1423 self.root_uri(),
1424 self.storage_adapter(),
1425 &sidecar,
1426 )
1427 .await?,
1428 )
1429 };
1430
1431 let mut updates = Vec::new();
1432 let mut changed_edge_tables = false;
1433 for table_key in &ordered_table_keys {
1434 let Some(candidate_state) = candidates.get(table_key) else {
1435 continue;
1436 };
1437 let update = match candidate_state {
1438 CandidateTableState::AdoptSourceState => {
1439 publish_adopted_source_state(
1440 self,
1441 &self.catalog(),
1442 base_snapshot,
1443 source_snapshot,
1444 &target_snapshot,
1445 table_key,
1446 )
1447 .await?
1448 }
1449 CandidateTableState::RewriteMerged(staged) => {
1450 publish_rewritten_merge_table(self, table_key, staged).await?
1451 }
1452 };
1453 if table_key.starts_with("edge:") {
1454 changed_edge_tables = true;
1455 }
1456 updates.push(update);
1457 }
1458
1459 crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
1464
1465 let manifest_version = if updates.is_empty() {
1466 self.version().await
1467 } else {
1468 self.commit_manifest_updates(&updates).await?
1469 };
1470
1471 if let Some(handle) = recovery_handle {
1475 if let Err(err) =
1476 crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
1477 {
1478 tracing::warn!(
1479 error = %err,
1480 operation_id = handle.operation_id.as_str(),
1481 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
1482 );
1483 }
1484 }
1485 self.record_merge_commit(
1486 manifest_version,
1487 target_head_commit_id,
1488 source_head_commit_id,
1489 actor_id,
1490 )
1491 .await?;
1492
1493 if changed_edge_tables {
1494 self.invalidate_graph_index().await;
1495 }
1496
1497 Ok(if is_fast_forward {
1498 MergeOutcome::FastForward
1499 } else {
1500 MergeOutcome::Merged
1501 })
1502 }
1503}