1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8 AdoptSourceState,
11 AdoptWithDelta(AdoptDelta),
16 RewriteMerged(StagedMergeResult),
17}
18
19#[derive(Debug)]
20struct StagedTable {
21 _dir: TempDir,
22 dataset: Dataset,
23}
24
25#[derive(Debug)]
26struct StagedMergeResult {
27 full_staged: StagedTable,
28 delta_staged: Option<StagedTable>,
29 deleted_ids: Vec<String>,
30}
31
32#[derive(Debug)]
53struct AdoptDelta {
54 appends: Option<StagedTable>,
58 upserts: Option<StagedTable>,
61 deleted_ids: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
65struct CursorRow {
66 id: String,
67 signature: String,
68 dataset: Dataset,
69 batch: RecordBatch,
70 row_index: usize,
71}
72
73impl CursorRow {
74 fn compute_signature(&self) -> Result<String> {
78 row_signature(&self.batch, self.row_index)
79 }
80}
81
82struct OrderedTableCursor {
83 stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
84 dataset: Option<Dataset>,
85 current_batch: Option<RecordBatch>,
86 current_row: usize,
87 peeked: Option<CursorRow>,
88 eager_signatures: bool,
93}
94
95impl OrderedTableCursor {
96 async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
97 Self::open(snapshot, table_key, true).await
98 }
99
100 async fn from_snapshot_lazy(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
103 Self::open(snapshot, table_key, false).await
104 }
105
106 async fn open(snapshot: &Snapshot, table_key: &str, eager_signatures: bool) -> Result<Self> {
107 let dataset = match snapshot.entry(table_key) {
108 Some(_) => Some(snapshot.open(table_key).await?),
109 None => None,
110 };
111 Self::from_dataset(dataset, eager_signatures).await
112 }
113
114 async fn from_dataset(dataset: Option<Dataset>, eager_signatures: bool) -> Result<Self> {
115 let stream = if let Some(ds) = &dataset {
116 Some(Box::pin(
117 crate::table_store::TableStore::scan_stream_with(
118 ds,
119 None,
120 None,
121 Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
122 true,
123 |_| Ok(()),
124 )
125 .await?,
126 ))
127 } else {
128 None
129 };
130
131 Ok(Self {
132 stream,
133 dataset,
134 current_batch: None,
135 current_row: 0,
136 peeked: None,
137 eager_signatures,
138 })
139 }
140
141 async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
142 if self.peeked.is_none() {
143 self.peeked = self.next_row().await?;
144 }
145 Ok(self.peeked.clone())
146 }
147
148 async fn pop(&mut self) -> Result<Option<CursorRow>> {
149 if self.peeked.is_some() {
150 return Ok(self.peeked.take());
151 }
152 self.next_row().await
153 }
154
155 async fn next_row(&mut self) -> Result<Option<CursorRow>> {
156 loop {
157 if let Some(batch) = &self.current_batch {
158 if self.current_row < batch.num_rows() {
159 let row_index = self.current_row;
160 self.current_row += 1;
161 let dataset = self.dataset.clone().ok_or_else(|| {
162 OmniError::manifest("cursor row missing source dataset".to_string())
163 })?;
164 let signature = if self.eager_signatures {
165 row_signature(batch, row_index)?
166 } else {
167 String::new()
168 };
169 return Ok(Some(CursorRow {
170 id: row_id_at(batch, row_index)?,
171 signature,
172 dataset,
173 batch: batch.clone(),
174 row_index,
175 }));
176 }
177 }
178
179 let Some(stream) = self.stream.as_mut() else {
180 return Ok(None);
181 };
182 match stream.try_next().await {
183 Ok(Some(batch)) => {
184 self.current_batch = Some(batch);
185 self.current_row = 0;
186 }
187 Ok(None) => {
188 self.stream = None;
189 self.current_batch = None;
190 return Ok(None);
191 }
192 Err(err) => return Err(OmniError::Lance(err.to_string())),
193 }
194 }
195 }
196}
197
198struct StagedTableWriter {
199 schema: SchemaRef,
200 dataset_uri: String,
201 dir: TempDir,
202 dataset: Option<Dataset>,
203 buffered_rows: usize,
204 row_count: u64,
205 batches: Vec<RecordBatch>,
206}
207
208impl StagedTableWriter {
209 fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
210 let dir = merge_stage_tempdir(table_key)?;
211 let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
212 Ok(Self {
213 schema,
214 dataset_uri,
215 dir,
216 dataset: None,
217 buffered_rows: 0,
218 row_count: 0,
219 batches: Vec::new(),
220 })
221 }
222
223 async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
224 self.row_count += 1;
225 self.buffered_rows += 1;
226 self.batches.push(self.row_batch(row).await?);
227 if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
228 self.flush().await?;
229 }
230 Ok(())
231 }
232
233 async fn row_batch(&self, row: &CursorRow) -> Result<RecordBatch> {
234 let batch = row.batch.slice(row.row_index, 1);
235 let has_blob_columns = row
236 .dataset
237 .schema()
238 .fields_pre_order()
239 .any(|field| field.is_blob());
240 if has_blob_columns {
241 return crate::table_store::TableStore::materialize_blob_batch(&row.dataset, batch)
242 .await;
243 }
244 let columns = self
245 .schema
246 .fields()
247 .iter()
248 .map(|field| {
249 batch.column_by_name(field.name()).cloned().ok_or_else(|| {
250 OmniError::Lance(format!("batch missing column '{}'", field.name()))
251 })
252 })
253 .collect::<Result<Vec<_>>>()?;
254 RecordBatch::try_new(self.schema.clone(), columns)
255 .map_err(|e| OmniError::Lance(e.to_string()))
256 }
257
258 async fn finish(mut self) -> Result<StagedTable> {
259 self.flush().await?;
260 if self.dataset.is_none() {
261 self.dataset = Some(
262 crate::table_store::TableStore::create_empty_dataset(
263 &self.dataset_uri,
264 &self.schema,
265 )
266 .await?,
267 );
268 }
269 Ok(StagedTable {
270 _dir: self.dir,
271 dataset: self.dataset.unwrap(),
272 })
273 }
274
275 async fn flush(&mut self) -> Result<()> {
276 if self.batches.is_empty() {
277 return Ok(());
278 }
279
280 let batch = if self.batches.len() == 1 {
281 self.batches.pop().unwrap()
282 } else {
283 let batches = std::mem::take(&mut self.batches);
284 arrow_select::concat::concat_batches(&self.schema, &batches)
285 .map_err(|e| OmniError::Lance(e.to_string()))?
286 };
287 self.buffered_rows = 0;
288
289 let ds = crate::table_store::TableStore::append_or_create_batch(
290 &self.dataset_uri,
291 self.dataset.take(),
292 batch,
293 )
294 .await?;
295 self.dataset = Some(ds);
296 Ok(())
297 }
298}
299
300fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
301 if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
302 return TempDirBuilder::new()
303 .prefix(&format!(
304 "omnigraph-merge-{}-",
305 sanitize_table_key(table_key)
306 ))
307 .tempdir_in(PathBuf::from(root))
308 .map_err(OmniError::from);
309 }
310 TempDirBuilder::new()
311 .prefix(&format!(
312 "omnigraph-merge-{}-",
313 sanitize_table_key(table_key)
314 ))
315 .tempdir()
316 .map_err(OmniError::from)
317}
318
319fn sanitize_table_key(table_key: &str) -> String {
320 table_key
321 .chars()
322 .map(|ch| match ch {
323 ':' | '/' | '\\' => '-',
324 other => other,
325 })
326 .collect()
327}
328
329async fn compute_adopt_delta(
341 table_key: &str,
342 catalog: &Catalog,
343 base_snapshot: &Snapshot,
344 source_snapshot: &Snapshot,
345) -> Result<Option<AdoptDelta>> {
346 let schema = schema_for_table_key(catalog, table_key)?;
347 let mut append_writer =
348 StagedTableWriter::new(&format!("{}_adopt_append", table_key), schema.clone())?;
349 let mut upsert_writer =
350 StagedTableWriter::new(&format!("{}_adopt_upsert", table_key), schema)?;
351 let mut deleted_ids: Vec<String> = Vec::new();
352 let mut base = OrderedTableCursor::from_snapshot_lazy(base_snapshot, table_key).await?;
353 let mut source = OrderedTableCursor::from_snapshot_lazy(source_snapshot, table_key).await?;
354
355 let mut needs_update = false;
356
357 loop {
358 let base_row = base.peek_cloned().await?;
359 let source_row = source.peek_cloned().await?;
360
361 let next_id = [base_row.as_ref(), source_row.as_ref()]
362 .into_iter()
363 .flatten()
364 .map(|row| row.id.clone())
365 .min();
366 let Some(next_id) = next_id else { break };
367
368 let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
369 base.pop().await?
370 } else {
371 None
372 };
373 let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
374 source.pop().await?
375 } else {
376 None
377 };
378
379 match (&base_row, &source_row) {
380 (Some(_), None) => {
381 deleted_ids.push(next_id);
383 needs_update = true;
384 }
385 (None, Some(src)) => {
386 append_writer.push_row(src).await?;
389 needs_update = true;
390 }
391 (Some(base), Some(src)) => {
392 if src.compute_signature()? != base.compute_signature()? {
396 upsert_writer.push_row(src).await?;
398 needs_update = true;
399 }
400 }
402 (None, None) => unreachable!(),
403 }
404 }
405
406 if !needs_update {
407 return Ok(None);
408 }
409
410 let appends = if append_writer.row_count > 0 {
411 Some(append_writer.finish().await?)
412 } else {
413 None
414 };
415 let upserts = if upsert_writer.row_count > 0 {
416 Some(upsert_writer.finish().await?)
417 } else {
418 None
419 };
420
421 Ok(Some(AdoptDelta {
422 appends,
423 upserts,
424 deleted_ids,
425 }))
426}
427
428fn min_cursor_id(
429 base_row: &Option<CursorRow>,
430 source_row: &Option<CursorRow>,
431 target_row: &Option<CursorRow>,
432) -> Option<String> {
433 [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
434 .into_iter()
435 .flatten()
436 .map(|row| row.id.clone())
437 .min()
438}
439
440async fn stage_streaming_table_merge(
441 table_key: &str,
442 catalog: &Catalog,
443 base_snapshot: &Snapshot,
444 source_snapshot: &Snapshot,
445 target_snapshot: &Snapshot,
446 conflicts: &mut Vec<MergeConflict>,
447) -> Result<Option<StagedMergeResult>> {
448 let schema = schema_for_table_key(catalog, table_key)?;
449 let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
450 let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
451 let mut deleted_ids: Vec<String> = Vec::new();
452 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
453 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
454 let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
455
456 let prior_conflict_count = conflicts.len();
457 let mut needs_update = false;
458
459 loop {
460 let base_row = base.peek_cloned().await?;
461 let source_row = source.peek_cloned().await?;
462 let target_row = target.peek_cloned().await?;
463 let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
464 break;
465 };
466
467 let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
468 base.pop().await?
469 } else {
470 None
471 };
472 let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
473 {
474 source.pop().await?
475 } else {
476 None
477 };
478 let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
479 {
480 target.pop().await?
481 } else {
482 None
483 };
484
485 let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
486 let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
487 let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
488
489 let source_changed = source_sig != base_sig;
490 let target_changed = target_sig != base_sig;
491
492 let selection = if !source_changed {
493 target_row.as_ref()
494 } else if !target_changed {
495 source_row.as_ref()
496 } else if source_sig == target_sig {
497 target_row.as_ref()
498 } else {
499 conflicts.push(classify_merge_conflict(
500 table_key, &next_id, base_sig, source_sig, target_sig,
501 ));
502 None
503 };
504
505 if conflicts.len() > prior_conflict_count {
506 continue;
507 }
508
509 if selection.is_none() && target_row.is_some() {
511 deleted_ids.push(next_id.clone());
512 needs_update = true;
513 continue;
514 }
515
516 if let Some(selection) = selection {
517 full_writer.push_row(selection).await?;
519 if selection.signature.as_str() != target_sig.unwrap_or("") {
521 delta_writer.push_row(selection).await?;
522 needs_update = true;
523 }
524 }
525 }
526
527 if conflicts.len() > prior_conflict_count {
528 return Ok(None);
529 }
530 if !needs_update {
531 return Ok(None);
532 }
533
534 let delta_staged = if delta_writer.row_count > 0 {
535 Some(delta_writer.finish().await?)
536 } else {
537 None
538 };
539
540 Ok(Some(StagedMergeResult {
541 full_staged: full_writer.finish().await?,
542 delta_staged,
543 deleted_ids,
544 }))
545}
546
547fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
548 if let Some(name) = table_key.strip_prefix("node:") {
549 return catalog
550 .node_types
551 .get(name)
552 .map(|t| t.arrow_schema.clone())
553 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
554 }
555 if let Some(name) = table_key.strip_prefix("edge:") {
556 return catalog
557 .edge_types
558 .get(name)
559 .map(|t| t.arrow_schema.clone())
560 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
561 }
562 Err(OmniError::manifest(format!(
563 "invalid table key '{}'",
564 table_key
565 )))
566}
567
568fn same_manifest_state(
569 left: Option<&crate::db::SubTableEntry>,
570 right: Option<&crate::db::SubTableEntry>,
571) -> bool {
572 match (left, right) {
573 (Some(left), Some(right)) => {
574 left.table_version == right.table_version && left.table_branch == right.table_branch
575 }
576 (None, None) => true,
577 _ => false,
578 }
579}
580
581fn classify_merge_conflict(
582 table_key: &str,
583 row_id: &str,
584 base_sig: Option<&str>,
585 source_sig: Option<&str>,
586 target_sig: Option<&str>,
587) -> MergeConflict {
588 let (kind, message) = match (base_sig, source_sig, target_sig) {
589 (None, Some(_), Some(_)) => (
590 MergeConflictKind::DivergentInsert,
591 format!("divergent insert for id '{}'", row_id),
592 ),
593 (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
594 MergeConflictKind::DeleteVsUpdate,
595 format!("delete/update conflict for id '{}'", row_id),
596 ),
597 _ => (
598 MergeConflictKind::DivergentUpdate,
599 format!("divergent update for id '{}'", row_id),
600 ),
601 };
602 MergeConflict {
603 table_key: table_key.to_string(),
604 row_id: Some(row_id.to_string()),
605 kind,
606 message,
607 }
608}
609
610fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
611 let mut values = Vec::with_capacity(batch.num_columns());
612 for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
613 if field.name().starts_with("_row") {
614 continue;
615 }
616 values.push(
617 array_value_to_string(column.as_ref(), row)
618 .map_err(|e| OmniError::Lance(e.to_string()))?,
619 );
620 }
621 Ok(values.join("\u{1f}"))
622}
623
624async fn scan_validation_stream(ds: &Dataset) -> Result<DatasetRecordBatchStream> {
625 crate::table_store::TableStore::scan_stream_with(ds, None, None, None, false, |_| Ok(())).await
626}
627
628async fn validate_merge_candidates(
629 db: &Omnigraph,
630 source_snapshot: &Snapshot,
631 target_snapshot: &Snapshot,
632 candidates: &HashMap<String, CandidateTableState>,
633) -> Result<()> {
634 let mut conflicts = Vec::new();
635 let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
636
637 for (type_name, node_type) in &db.catalog().node_types {
638 let table_key = format!("node:{}", type_name);
639 let mut values = HashSet::new();
640 let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
641
642 if let Some(ds) =
643 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
644 {
645 let mut stream = scan_validation_stream(&ds).await?;
646 while let Some(batch) = stream
647 .try_next()
648 .await
649 .map_err(|e| OmniError::Lance(e.to_string()))?
650 {
651 if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
652 conflicts.push(MergeConflict {
653 table_key: table_key.clone(),
654 row_id: None,
655 kind: MergeConflictKind::ValueConstraintViolation,
656 message: err.to_string(),
657 });
658 }
659 update_unique_constraints(
660 &table_key,
661 &batch,
662 &node_type.unique_constraints,
663 &mut unique_seen,
664 &mut conflicts,
665 )?;
666 let ids = batch
667 .column_by_name("id")
668 .ok_or_else(|| {
669 OmniError::manifest(format!("table {} missing id column", table_key))
670 })?
671 .as_any()
672 .downcast_ref::<StringArray>()
673 .ok_or_else(|| {
674 OmniError::manifest(format!("table {} id column is not Utf8", table_key))
675 })?;
676 for row in 0..ids.len() {
677 values.insert(ids.value(row).to_string());
678 }
679 }
680 }
681 node_ids.insert(type_name.clone(), values);
682 }
683
684 for (edge_name, edge_type) in &db.catalog().edge_types {
685 let table_key = format!("edge:{}", edge_name);
686 let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
687 let mut src_counts = HashMap::new();
688
689 if let Some(ds) =
690 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
691 {
692 let mut stream = scan_validation_stream(&ds).await?;
693 while let Some(batch) = stream
694 .try_next()
695 .await
696 .map_err(|e| OmniError::Lance(e.to_string()))?
697 {
698 update_unique_constraints(
699 &table_key,
700 &batch,
701 &edge_type.unique_constraints,
702 &mut unique_seen,
703 &mut conflicts,
704 )?;
705 accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
706 conflicts.extend(validate_orphan_edges_batch(
707 &table_key, edge_type, &batch, &node_ids,
708 )?);
709 }
710 }
711
712 conflicts.extend(finalize_edge_cardinality_conflicts(
713 &table_key,
714 edge_name,
715 edge_type.cardinality.min,
716 edge_type.cardinality.max,
717 src_counts,
718 ));
719 }
720
721 if conflicts.is_empty() {
722 Ok(())
723 } else {
724 Err(OmniError::MergeConflicts(conflicts))
725 }
726}
727
728async fn candidate_dataset(
729 source_snapshot: &Snapshot,
730 target_snapshot: &Snapshot,
731 candidates: &HashMap<String, CandidateTableState>,
732 table_key: &str,
733) -> Result<Option<Dataset>> {
734 if let Some(candidate) = candidates.get(table_key) {
735 return match candidate {
736 CandidateTableState::AdoptSourceState | CandidateTableState::AdoptWithDelta(_) => {
737 match source_snapshot.entry(table_key) {
738 Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
739 None => Ok(None),
740 }
741 }
742 CandidateTableState::RewriteMerged(staged) => {
743 Ok(Some(staged.full_staged.dataset.clone()))
744 }
745 };
746 }
747 match target_snapshot.entry(table_key) {
748 Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
749 None => Ok(None),
750 }
751}
752
753fn update_unique_constraints(
754 table_key: &str,
755 batch: &RecordBatch,
756 constraints: &[Vec<String>],
757 seen: &mut [HashMap<Vec<String>, String>],
758 conflicts: &mut Vec<MergeConflict>,
759) -> Result<()> {
760 for (constraint_idx, columns) in constraints.iter().enumerate() {
761 let seen = &mut seen[constraint_idx];
762 let group_columns = columns
766 .iter()
767 .map(|column_name| {
768 batch.column_by_name(column_name).cloned().ok_or_else(|| {
769 OmniError::manifest(format!(
770 "table {} missing unique column '{}'",
771 table_key, column_name
772 ))
773 })
774 })
775 .collect::<Result<Vec<_>>>()?;
776 for row in 0..batch.num_rows() {
777 let Some(key) = crate::loader::composite_unique_key(&group_columns, row)? else {
781 continue;
782 };
783 let row_id = row_id_at(batch, row)?;
784 if let Some(first_row_id) = seen.insert(key, row_id.clone()) {
785 conflicts.push(MergeConflict {
786 table_key: table_key.to_string(),
787 row_id: Some(row_id.clone()),
788 kind: MergeConflictKind::UniqueViolation,
789 message: format!(
790 "unique constraint {:?} violated by '{}' and '{}'",
791 columns, first_row_id, row_id
792 ),
793 });
794 }
795 }
796 }
797 Ok(())
798}
799
800fn accumulate_edge_cardinality(
801 batch: &RecordBatch,
802 counts: &mut HashMap<String, u32>,
803 table_key: &str,
804) -> Result<()> {
805 let srcs = batch
806 .column_by_name("src")
807 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
808 .as_any()
809 .downcast_ref::<StringArray>()
810 .ok_or_else(|| {
811 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
812 })?;
813 for row in 0..srcs.len() {
814 *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
815 }
816 Ok(())
817}
818
819fn finalize_edge_cardinality_conflicts(
820 table_key: &str,
821 edge_name: &str,
822 min: u32,
823 max: Option<u32>,
824 counts: HashMap<String, u32>,
825) -> Vec<MergeConflict> {
826 let mut conflicts = Vec::new();
827 for (src, count) in counts {
828 if let Some(max) = max {
829 if count > max {
830 conflicts.push(MergeConflict {
831 table_key: table_key.to_string(),
832 row_id: None,
833 kind: MergeConflictKind::CardinalityViolation,
834 message: format!(
835 "@card violation on edge {}: source '{}' has {} edges (max {})",
836 edge_name, src, count, max
837 ),
838 });
839 }
840 }
841 if count < min {
842 conflicts.push(MergeConflict {
843 table_key: table_key.to_string(),
844 row_id: None,
845 kind: MergeConflictKind::CardinalityViolation,
846 message: format!(
847 "@card violation on edge {}: source '{}' has {} edges (min {})",
848 edge_name, src, count, min
849 ),
850 });
851 }
852 }
853 conflicts
854}
855
856fn validate_orphan_edges_batch(
857 table_key: &str,
858 edge_type: &omnigraph_compiler::catalog::EdgeType,
859 batch: &RecordBatch,
860 node_ids: &HashMap<String, HashSet<String>>,
861) -> Result<Vec<MergeConflict>> {
862 let srcs = batch
863 .column_by_name("src")
864 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
865 .as_any()
866 .downcast_ref::<StringArray>()
867 .ok_or_else(|| {
868 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
869 })?;
870 let dsts = batch
871 .column_by_name("dst")
872 .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
873 .as_any()
874 .downcast_ref::<StringArray>()
875 .ok_or_else(|| {
876 OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
877 })?;
878
879 let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
880 OmniError::manifest(format!(
881 "missing candidate node ids for {}",
882 edge_type.from_type
883 ))
884 })?;
885 let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
886 OmniError::manifest(format!(
887 "missing candidate node ids for {}",
888 edge_type.to_type
889 ))
890 })?;
891
892 let mut conflicts = Vec::new();
893 for row in 0..batch.num_rows() {
894 let row_id = row_id_at(batch, row)?;
895 let src = srcs.value(row);
896 let dst = dsts.value(row);
897 if !from_ids.contains(src) {
898 conflicts.push(MergeConflict {
899 table_key: table_key.to_string(),
900 row_id: Some(row_id.clone()),
901 kind: MergeConflictKind::OrphanEdge,
902 message: format!("src '{}' not found in {}", src, edge_type.from_type),
903 });
904 }
905 if !to_ids.contains(dst) {
906 conflicts.push(MergeConflict {
907 table_key: table_key.to_string(),
908 row_id: Some(row_id),
909 kind: MergeConflictKind::OrphanEdge,
910 message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
911 });
912 }
913 }
914 Ok(conflicts)
915}
916
917fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
918 let ids = batch
919 .column_by_name("id")
920 .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
921 .as_any()
922 .downcast_ref::<StringArray>()
923 .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
924 Ok(ids.value(row).to_string())
925}
926
927async fn classify_adopt(
939 target_db: &Omnigraph,
940 catalog: &Catalog,
941 base_snapshot: &Snapshot,
942 source_snapshot: &Snapshot,
943 target_snapshot: &Snapshot,
944 table_key: &str,
945) -> Result<CandidateTableState> {
946 let Some(source_entry) = source_snapshot.entry(table_key) else {
947 return Ok(CandidateTableState::AdoptSourceState);
948 };
949 let target_entry = target_snapshot.entry(table_key);
950 let target_active = target_db.active_branch().await;
951 let advances_head = match (
952 target_active.as_deref(),
953 source_entry.table_branch.as_deref(),
954 ) {
955 (None, Some(_)) => true,
957 (Some(target_branch), Some(_)) => {
959 target_entry.and_then(|e| e.table_branch.as_deref()) == Some(target_branch)
960 }
961 _ => false,
963 };
964 if !advances_head {
965 return Ok(CandidateTableState::AdoptSourceState);
966 }
967 match compute_adopt_delta(table_key, catalog, base_snapshot, source_snapshot).await? {
968 Some(delta) => Ok(CandidateTableState::AdoptWithDelta(delta)),
969 None => Ok(CandidateTableState::AdoptSourceState),
970 }
971}
972
973async fn publish_adopted_source_state(
979 target_db: &Omnigraph,
980 source_snapshot: &Snapshot,
981 target_snapshot: &Snapshot,
982 table_key: &str,
983) -> Result<crate::db::SubTableUpdate> {
984 let source_entry = source_snapshot
985 .entry(table_key)
986 .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
987 let target_entry = target_snapshot.entry(table_key);
988
989 let target_active = target_db.active_branch().await;
990 match (
991 target_active.as_deref(),
992 source_entry.table_branch.as_deref(),
993 ) {
994 (None, None) => Ok(crate::db::SubTableUpdate {
996 table_key: table_key.to_string(),
997 table_version: source_entry.table_version,
998 table_branch: None,
999 row_count: source_entry.row_count,
1000 version_metadata: source_entry.version_metadata.clone(),
1001 }),
1002 (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
1005 table_key: table_key.to_string(),
1006 table_version: source_entry.table_version,
1007 table_branch: None,
1008 row_count: source_entry.row_count,
1009 version_metadata: source_entry.version_metadata.clone(),
1010 }),
1011 (None, Some(_source_branch)) => Ok(crate::db::SubTableUpdate {
1014 table_key: table_key.to_string(),
1015 table_version: target_entry
1016 .map(|e| e.table_version)
1017 .unwrap_or(source_entry.table_version),
1018 table_branch: None,
1019 row_count: source_entry.row_count,
1020 version_metadata: target_entry
1021 .map(|entry| entry.version_metadata.clone())
1022 .unwrap_or_else(|| source_entry.version_metadata.clone()),
1023 }),
1024 (Some(target_branch), Some(source_branch)) => {
1026 if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
1027 Ok(crate::db::SubTableUpdate {
1030 table_key: table_key.to_string(),
1031 table_version: target_entry.unwrap().table_version,
1032 table_branch: Some(target_branch.to_string()),
1033 row_count: source_entry.row_count,
1034 version_metadata: target_entry.unwrap().version_metadata.clone(),
1035 })
1036 } else {
1037 let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
1040 let ds = target_db
1041 .fork_dataset_from_entry_state(
1042 table_key,
1043 &full_path,
1044 Some(source_branch),
1045 source_entry.table_version,
1046 target_branch,
1047 )
1048 .await?;
1049 let state = target_db.storage().table_state(&full_path, &ds).await?;
1050 Ok(crate::db::SubTableUpdate {
1051 table_key: table_key.to_string(),
1052 table_version: state.version,
1053 table_branch: Some(target_branch.to_string()),
1054 row_count: state.row_count,
1055 version_metadata: state.version_metadata,
1056 })
1057 }
1058 }
1059 }
1060}
1061
1062async fn publish_rewritten_merge_table(
1063 target_db: &Omnigraph,
1064 table_key: &str,
1065 staged: &StagedMergeResult,
1066) -> Result<crate::db::SubTableUpdate> {
1067 let (mut current_ds, full_path, table_branch) = target_db
1075 .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
1076 .await?
1077 .require_handle("branch merge");
1078
1079 if let Some(delta) = &staged.delta_staged {
1089 let delta_snapshot = SnapshotHandle::new(delta.dataset.clone());
1093 let batches: Vec<RecordBatch> = target_db
1094 .storage()
1095 .scan_batches_for_rewrite(&delta_snapshot)
1096 .await?
1097 .into_iter()
1098 .filter(|batch| batch.num_rows() > 0)
1099 .collect();
1100 if !batches.is_empty() {
1101 let combined = if batches.len() == 1 {
1103 batches.into_iter().next().unwrap()
1104 } else {
1105 let schema = batches[0].schema();
1106 arrow_select::concat::concat_batches(&schema, &batches)
1107 .map_err(|e| OmniError::Lance(e.to_string()))?
1108 };
1109 let staged_merge = target_db
1110 .storage()
1111 .stage_merge_insert(
1112 current_ds.clone(),
1113 combined,
1114 vec!["id".to_string()],
1115 lance::dataset::WhenMatched::UpdateAll,
1116 lance::dataset::WhenNotMatched::InsertAll,
1117 )
1118 .await?;
1119 current_ds = target_db
1120 .storage()
1121 .commit_staged(current_ds, staged_merge)
1122 .await?;
1123 }
1124 }
1125
1126 crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_MERGE_PRE_DELETE)?;
1132
1133 if !staged.deleted_ids.is_empty() {
1143 let escaped: Vec<String> = staged
1144 .deleted_ids
1145 .iter()
1146 .map(|id| format!("'{}'", id.replace('\'', "''")))
1147 .collect();
1148 let filter = format!("id IN ({})", escaped.join(", "));
1149 let (new_ds, _) = target_db
1150 .storage_inline_residual()
1151 .delete_where(&full_path, current_ds, &filter)
1152 .await?;
1153 current_ds = new_ds;
1154 }
1155
1156 crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_DELETE_PRE_INDEX)?;
1163
1164 let row_count = target_db
1172 .storage()
1173 .table_state(&full_path, ¤t_ds)
1174 .await?
1175 .row_count;
1176 if row_count > 0 {
1177 target_db
1178 .build_indices_on_dataset(table_key, &mut current_ds)
1179 .await?;
1180 }
1181 let final_state = target_db
1182 .storage()
1183 .table_state(&full_path, ¤t_ds)
1184 .await?;
1185
1186 Ok(crate::db::SubTableUpdate {
1187 table_key: table_key.to_string(),
1188 table_version: final_state.version,
1189 table_branch,
1190 row_count: final_state.row_count,
1191 version_metadata: final_state.version_metadata,
1192 })
1193}
1194
1195async fn scan_staged_combined(
1199 target_db: &Omnigraph,
1200 table: &StagedTable,
1201) -> Result<Option<RecordBatch>> {
1202 crate::instrumentation::record_scan_staged_combined();
1203 let snapshot = SnapshotHandle::new(table.dataset.clone());
1204 let batches: Vec<RecordBatch> = target_db
1205 .storage()
1206 .scan_batches_for_rewrite(&snapshot)
1207 .await?
1208 .into_iter()
1209 .filter(|batch| batch.num_rows() > 0)
1210 .collect();
1211 if batches.is_empty() {
1212 return Ok(None);
1213 }
1214 let combined = if batches.len() == 1 {
1215 batches.into_iter().next().unwrap()
1216 } else {
1217 let schema = batches[0].schema();
1218 arrow_select::concat::concat_batches(&schema, &batches)
1219 .map_err(|e| OmniError::Lance(e.to_string()))?
1220 };
1221 Ok(Some(combined))
1222}
1223
1224async fn publish_adopted_delta(
1239 target_db: &Omnigraph,
1240 table_key: &str,
1241 delta: &AdoptDelta,
1242) -> Result<crate::db::SubTableUpdate> {
1243 let (mut current_ds, full_path, table_branch) = target_db
1247 .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
1248 .await?
1249 .require_handle("branch merge");
1250
1251 if let Some(append_table) = &delta.appends {
1263 let source = SnapshotHandle::new(append_table.dataset.clone());
1264 let staged = target_db
1265 .storage()
1266 .stage_append_stream(¤t_ds, &source, &[])
1267 .await?;
1268 current_ds = target_db
1269 .storage()
1270 .commit_staged(current_ds, staged)
1271 .await?;
1272 }
1273
1274 crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_APPEND_PRE_UPSERT)?;
1280
1281 if let Some(upsert_table) = &delta.upserts {
1288 if let Some(combined) = scan_staged_combined(target_db, upsert_table).await? {
1289 let staged_merge = target_db
1290 .storage()
1291 .stage_merge_insert(
1292 current_ds.clone(),
1293 combined,
1294 vec!["id".to_string()],
1295 lance::dataset::WhenMatched::UpdateAll,
1296 lance::dataset::WhenNotMatched::InsertAll,
1297 )
1298 .await?;
1299 current_ds = target_db
1300 .storage()
1301 .commit_staged(current_ds, staged_merge)
1302 .await?;
1303 }
1304 }
1305
1306 crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_UPSERT_PRE_DELETE)?;
1312
1313 if !delta.deleted_ids.is_empty() {
1316 let escaped: Vec<String> = delta
1317 .deleted_ids
1318 .iter()
1319 .map(|id| format!("'{}'", id.replace('\'', "''")))
1320 .collect();
1321 let filter = format!("id IN ({})", escaped.join(", "));
1322 let (new_ds, _) = target_db
1323 .storage_inline_residual()
1324 .delete_where(&full_path, current_ds, &filter)
1325 .await?;
1326 current_ds = new_ds;
1327 }
1328
1329 let final_state = target_db
1339 .storage()
1340 .table_state(&full_path, ¤t_ds)
1341 .await?;
1342
1343 Ok(crate::db::SubTableUpdate {
1344 table_key: table_key.to_string(),
1345 table_version: final_state.version,
1346 table_branch,
1347 row_count: final_state.row_count,
1348 version_metadata: final_state.version_metadata,
1349 })
1350}
1351
1352impl Omnigraph {
1353 pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
1354 self.branch_merge_as(source, target, None).await
1355 }
1356
1357 pub async fn branch_merge_as(
1358 &self,
1359 source: &str,
1360 target: &str,
1361 actor_id: Option<&str>,
1362 ) -> Result<MergeOutcome> {
1363 self.enforce(
1371 omnigraph_policy::PolicyAction::BranchMerge,
1372 &omnigraph_policy::ResourceScope::BranchTransition {
1373 source: source.to_string(),
1374 target: target.to_string(),
1375 },
1376 actor_id,
1377 )?;
1378 self.ensure_schema_apply_idle("branch_merge").await?;
1379 self.heal_pending_recovery_sidecars().await?;
1386 self.branch_merge_impl(source, target, actor_id).await
1387 }
1388
1389 async fn branch_merge_impl(
1390 &self,
1391 source: &str,
1392 target: &str,
1393 actor_id: Option<&str>,
1394 ) -> Result<MergeOutcome> {
1395 if is_internal_system_branch(source) || is_internal_system_branch(target) {
1396 return Err(OmniError::manifest(format!(
1397 "branch_merge does not allow internal system refs ('{}' -> '{}')",
1398 source, target
1399 )));
1400 }
1401 let source_branch = Omnigraph::normalize_branch_name(source)?;
1402 let target_branch = Omnigraph::normalize_branch_name(target)?;
1403 if source_branch == target_branch {
1404 return Err(OmniError::manifest(
1405 "branch_merge requires distinct source and target branches".to_string(),
1406 ));
1407 }
1408
1409 let source_head_commit_id = self
1410 .head_commit_id_for_branch(source_branch.as_deref())
1411 .await?
1412 .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1413 let target_head_commit_id = self
1414 .head_commit_id_for_branch(target_branch.as_deref())
1415 .await?
1416 .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1417 let base_commit = CommitGraph::merge_base(
1418 self.uri(),
1419 source_branch.as_deref(),
1420 target_branch.as_deref(),
1421 )
1422 .await?
1423 .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1424
1425 if source_head_commit_id == target_head_commit_id
1426 || base_commit.graph_commit_id == source_head_commit_id
1427 {
1428 return Ok(MergeOutcome::AlreadyUpToDate);
1429 }
1430 let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1431
1432 let base_snapshot = ManifestCoordinator::snapshot_at(
1433 self.uri(),
1434 base_commit.manifest_branch.as_deref(),
1435 base_commit.manifest_version,
1436 )
1437 .await?;
1438 let source_snapshot = self
1439 .resolved_target(ReadTarget::Branch(
1440 source_branch.clone().unwrap_or_else(|| "main".to_string()),
1441 ))
1442 .await?
1443 .snapshot;
1444 let merge_exclusive = self.merge_exclusive();
1452 let _merge_guard = merge_exclusive.lock().await;
1453
1454 let previous_branch = self.active_branch().await;
1455 let previous = self
1456 .swap_coordinator_for_branch(target_branch.as_deref())
1457 .await?;
1458 let merge_result = self
1459 .branch_merge_on_current_target(
1460 &base_snapshot,
1461 &source_snapshot,
1462 &target_head_commit_id,
1463 &source_head_commit_id,
1464 is_fast_forward,
1465 actor_id,
1466 )
1467 .await;
1468 self.restore_coordinator(previous).await;
1469
1470 if previous_branch == target_branch {
1505 if let Err(refresh_err) = self.refresh_coordinator_only().await {
1506 if merge_result.is_ok() {
1507 return Err(refresh_err);
1508 }
1509 tracing::warn!(
1510 error = %refresh_err,
1511 "post-merge coordinator refresh failed on the error path; \
1512 the next op or open will re-sync"
1513 );
1514 }
1515 }
1516
1517 merge_result
1518 }
1519
1520 async fn branch_merge_on_current_target(
1521 &self,
1522 base_snapshot: &Snapshot,
1523 source_snapshot: &Snapshot,
1524 target_head_commit_id: &str,
1525 source_head_commit_id: &str,
1526 is_fast_forward: bool,
1527 actor_id: Option<&str>,
1528 ) -> Result<MergeOutcome> {
1529 self.ensure_commit_graph_initialized().await?;
1530 let target_snapshot = self.snapshot().await;
1531
1532 let mut table_keys = HashSet::new();
1533 for entry in base_snapshot.entries() {
1534 table_keys.insert(entry.table_key.clone());
1535 }
1536 for entry in source_snapshot.entries() {
1537 table_keys.insert(entry.table_key.clone());
1538 }
1539 for entry in target_snapshot.entries() {
1540 table_keys.insert(entry.table_key.clone());
1541 }
1542
1543 let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1544 ordered_table_keys.sort();
1545
1546 let mut conflicts = Vec::new();
1547 let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1548
1549 for table_key in &ordered_table_keys {
1550 let base_entry = base_snapshot.entry(table_key);
1551 let source_entry = source_snapshot.entry(table_key);
1552 let target_entry = target_snapshot.entry(table_key);
1553 if same_manifest_state(source_entry, target_entry) {
1554 continue;
1555 }
1556 if same_manifest_state(base_entry, source_entry) {
1557 continue;
1558 }
1559 if same_manifest_state(base_entry, target_entry) {
1560 let candidate = classify_adopt(
1561 self,
1562 &self.catalog(),
1563 base_snapshot,
1564 source_snapshot,
1565 &target_snapshot,
1566 table_key,
1567 )
1568 .await?;
1569 candidates.insert(table_key.clone(), candidate);
1570 continue;
1571 }
1572
1573 if let Some(staged) = stage_streaming_table_merge(
1574 table_key,
1575 &self.catalog(),
1576 base_snapshot,
1577 source_snapshot,
1578 &target_snapshot,
1579 &mut conflicts,
1580 )
1581 .await?
1582 {
1583 candidates.insert(
1584 table_key.clone(),
1585 CandidateTableState::RewriteMerged(staged),
1586 );
1587 }
1588 }
1589
1590 if !conflicts.is_empty() {
1591 return Err(OmniError::MergeConflicts(conflicts));
1592 }
1593
1594 validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1595
1596 let active_branch_for_keys = self.active_branch().await;
1627 let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
1628 .iter()
1629 .filter(|table_key| {
1630 matches!(
1631 candidates.get(*table_key),
1632 Some(CandidateTableState::RewriteMerged(_))
1633 | Some(CandidateTableState::AdoptSourceState)
1634 | Some(CandidateTableState::AdoptWithDelta(_))
1635 )
1636 })
1637 .map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
1638 .collect();
1639 let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
1640
1641 let post_queue_snapshot = self.snapshot().await;
1642 for table_key in &ordered_table_keys {
1643 let Some(candidate) = candidates.get(table_key) else {
1644 continue;
1645 };
1646 if !matches!(
1647 candidate,
1648 CandidateTableState::RewriteMerged(_)
1649 | CandidateTableState::AdoptSourceState
1650 | CandidateTableState::AdoptWithDelta(_)
1651 ) {
1652 continue;
1653 }
1654 let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
1655 let current = post_queue_snapshot
1656 .entry(table_key)
1657 .map(|e| e.table_version);
1658 if expected != current {
1659 return Err(OmniError::manifest_expected_version_mismatch(
1660 table_key.clone(),
1661 expected.unwrap_or(0),
1662 current.unwrap_or(0),
1663 ));
1664 }
1665 }
1666
1667 let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
1668 .iter()
1669 .filter_map(|table_key| {
1670 let candidate = candidates.get(table_key)?;
1671 if !matches!(
1672 candidate,
1673 CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptWithDelta(_)
1674 ) {
1675 return None;
1676 }
1677 let entry = target_snapshot.entry(table_key)?;
1678 Some(crate::db::manifest::SidecarTablePin {
1679 table_key: table_key.clone(),
1680 table_path: self.storage().dataset_uri(&entry.table_path),
1681 expected_version: entry.table_version,
1682 post_commit_pin: entry.table_version + 1,
1683 confirmed_version: None,
1688 table_branch: active_branch_for_keys.clone(),
1701 })
1702 })
1703 .collect();
1704 let mut recovery: Option<(
1709 crate::db::manifest::RecoverySidecar,
1710 crate::db::manifest::RecoverySidecarHandle,
1711 )> = if recovery_pins.is_empty() {
1712 None
1713 } else {
1714 let target_branch = active_branch_for_keys.clone();
1724 let mut sidecar = crate::db::manifest::new_sidecar(
1725 crate::db::manifest::SidecarKind::BranchMerge,
1726 target_branch,
1727 actor_id.map(str::to_string),
1728 recovery_pins,
1729 );
1730 sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
1736 let handle = crate::db::manifest::write_sidecar(
1737 self.root_uri(),
1738 self.storage_adapter(),
1739 &sidecar,
1740 )
1741 .await?;
1742 Some((sidecar, handle))
1743 };
1744
1745 let mut updates = Vec::new();
1746 let mut changed_edge_tables = false;
1747 for table_key in &ordered_table_keys {
1748 let Some(candidate_state) = candidates.get(table_key) else {
1749 continue;
1750 };
1751 let update = match candidate_state {
1752 CandidateTableState::AdoptSourceState => {
1753 publish_adopted_source_state(self, source_snapshot, &target_snapshot, table_key)
1754 .await?
1755 }
1756 CandidateTableState::AdoptWithDelta(delta) => {
1757 publish_adopted_delta(self, table_key, delta).await?
1758 }
1759 CandidateTableState::RewriteMerged(staged) => {
1760 publish_rewritten_merge_table(self, table_key, staged).await?
1761 }
1762 };
1763 if table_key.starts_with("edge:") {
1764 changed_edge_tables = true;
1765 }
1766 updates.push(update);
1767 }
1768
1769 if let Some((sidecar, _)) = recovery.as_mut() {
1778 let confirmed_versions: std::collections::HashMap<String, u64> = updates
1779 .iter()
1780 .map(|u| (u.table_key.clone(), u.table_version))
1781 .collect();
1782 crate::db::manifest::confirm_sidecar_phase_b(
1783 self.root_uri(),
1784 self.storage_adapter(),
1785 sidecar,
1786 &confirmed_versions,
1787 )
1788 .await?;
1789 }
1790
1791 crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT)?;
1797
1798 let manifest_version = if updates.is_empty() {
1799 self.version().await
1800 } else {
1801 self.commit_manifest_updates(&updates).await?
1802 };
1803
1804 if let Some((_, handle)) = recovery {
1808 if let Err(err) =
1809 crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
1810 {
1811 tracing::warn!(
1812 error = %err,
1813 operation_id = handle.operation_id.as_str(),
1814 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
1815 );
1816 }
1817 }
1818 self.record_merge_commit(
1819 manifest_version,
1820 target_head_commit_id,
1821 source_head_commit_id,
1822 actor_id,
1823 )
1824 .await?;
1825
1826 if changed_edge_tables {
1827 self.invalidate_graph_index().await;
1828 }
1829
1830 Ok(if is_fast_forward {
1831 MergeOutcome::FastForward
1832 } else {
1833 MergeOutcome::Merged
1834 })
1835 }
1836}