1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8 AdoptSourceState,
11 AdoptWithDelta(AdoptDelta),
16 RewriteMerged(StagedMergeResult),
17}
18
19#[derive(Debug)]
20struct StagedTable {
21 _dir: TempDir,
22 dataset: Dataset,
23}
24
25#[derive(Debug)]
26struct StagedMergeResult {
27 full_staged: StagedTable,
28 delta_staged: Option<StagedTable>,
29 deleted_ids: Vec<String>,
30}
31
32#[derive(Debug)]
53struct AdoptDelta {
54 appends: Option<StagedTable>,
58 upserts: Option<StagedTable>,
61 deleted_ids: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
65struct CursorRow {
66 id: String,
67 signature: String,
68 dataset: Dataset,
69 batch: RecordBatch,
70 row_index: usize,
71}
72
73impl CursorRow {
74 fn compute_signature(&self) -> Result<String> {
78 row_signature(&self.batch, self.row_index)
79 }
80}
81
82struct OrderedTableCursor {
83 stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
84 dataset: Option<Dataset>,
85 current_batch: Option<RecordBatch>,
86 current_row: usize,
87 peeked: Option<CursorRow>,
88 eager_signatures: bool,
93}
94
95impl OrderedTableCursor {
96 async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
97 Self::open(snapshot, table_key, true).await
98 }
99
100 async fn from_snapshot_lazy(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
103 Self::open(snapshot, table_key, false).await
104 }
105
106 async fn open(snapshot: &Snapshot, table_key: &str, eager_signatures: bool) -> Result<Self> {
107 let dataset = match snapshot.entry(table_key) {
108 Some(_) => Some(snapshot.open(table_key).await?),
109 None => None,
110 };
111 Self::from_dataset(dataset, eager_signatures).await
112 }
113
114 async fn from_dataset(dataset: Option<Dataset>, eager_signatures: bool) -> Result<Self> {
115 let stream = if let Some(ds) = &dataset {
116 Some(Box::pin(
117 crate::table_store::TableStore::scan_stream_with(
118 ds,
119 None,
120 None,
121 Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
122 true,
123 |_| Ok(()),
124 )
125 .await?,
126 ))
127 } else {
128 None
129 };
130
131 Ok(Self {
132 stream,
133 dataset,
134 current_batch: None,
135 current_row: 0,
136 peeked: None,
137 eager_signatures,
138 })
139 }
140
141 async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
142 if self.peeked.is_none() {
143 self.peeked = self.next_row().await?;
144 }
145 Ok(self.peeked.clone())
146 }
147
148 async fn pop(&mut self) -> Result<Option<CursorRow>> {
149 if self.peeked.is_some() {
150 return Ok(self.peeked.take());
151 }
152 self.next_row().await
153 }
154
155 async fn next_row(&mut self) -> Result<Option<CursorRow>> {
156 loop {
157 if let Some(batch) = &self.current_batch {
158 if self.current_row < batch.num_rows() {
159 let row_index = self.current_row;
160 self.current_row += 1;
161 let dataset = self.dataset.clone().ok_or_else(|| {
162 OmniError::manifest("cursor row missing source dataset".to_string())
163 })?;
164 let signature = if self.eager_signatures {
165 row_signature(batch, row_index)?
166 } else {
167 String::new()
168 };
169 return Ok(Some(CursorRow {
170 id: row_id_at(batch, row_index)?,
171 signature,
172 dataset,
173 batch: batch.clone(),
174 row_index,
175 }));
176 }
177 }
178
179 let Some(stream) = self.stream.as_mut() else {
180 return Ok(None);
181 };
182 match stream.try_next().await {
183 Ok(Some(batch)) => {
184 self.current_batch = Some(batch);
185 self.current_row = 0;
186 }
187 Ok(None) => {
188 self.stream = None;
189 self.current_batch = None;
190 return Ok(None);
191 }
192 Err(err) => return Err(OmniError::Lance(err.to_string())),
193 }
194 }
195 }
196}
197
198struct StagedTableWriter {
199 schema: SchemaRef,
200 dataset_uri: String,
201 dir: TempDir,
202 dataset: Option<Dataset>,
203 buffered_rows: usize,
204 row_count: u64,
205 batches: Vec<RecordBatch>,
206}
207
208impl StagedTableWriter {
209 fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
210 let dir = merge_stage_tempdir(table_key)?;
211 let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
212 Ok(Self {
213 schema,
214 dataset_uri,
215 dir,
216 dataset: None,
217 buffered_rows: 0,
218 row_count: 0,
219 batches: Vec::new(),
220 })
221 }
222
223 async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
224 self.row_count += 1;
225 self.buffered_rows += 1;
226 self.batches.push(self.row_batch(row).await?);
227 if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
228 self.flush().await?;
229 }
230 Ok(())
231 }
232
233 async fn row_batch(&self, row: &CursorRow) -> Result<RecordBatch> {
234 let batch = row.batch.slice(row.row_index, 1);
235 let has_blob_columns = row
236 .dataset
237 .schema()
238 .fields_pre_order()
239 .any(|field| field.is_blob());
240 if has_blob_columns {
241 return crate::table_store::TableStore::materialize_blob_batch(&row.dataset, batch)
242 .await;
243 }
244 let columns = self
245 .schema
246 .fields()
247 .iter()
248 .map(|field| {
249 batch.column_by_name(field.name()).cloned().ok_or_else(|| {
250 OmniError::Lance(format!("batch missing column '{}'", field.name()))
251 })
252 })
253 .collect::<Result<Vec<_>>>()?;
254 RecordBatch::try_new(self.schema.clone(), columns)
255 .map_err(|e| OmniError::Lance(e.to_string()))
256 }
257
258 async fn finish(mut self) -> Result<StagedTable> {
259 self.flush().await?;
260 if self.dataset.is_none() {
261 self.dataset = Some(
262 crate::table_store::TableStore::create_empty_dataset(
263 &self.dataset_uri,
264 &self.schema,
265 )
266 .await?,
267 );
268 }
269 Ok(StagedTable {
270 _dir: self.dir,
271 dataset: self.dataset.unwrap(),
272 })
273 }
274
275 async fn flush(&mut self) -> Result<()> {
276 if self.batches.is_empty() {
277 return Ok(());
278 }
279
280 let batch = if self.batches.len() == 1 {
281 self.batches.pop().unwrap()
282 } else {
283 let batches = std::mem::take(&mut self.batches);
284 arrow_select::concat::concat_batches(&self.schema, &batches)
285 .map_err(|e| OmniError::Lance(e.to_string()))?
286 };
287 self.buffered_rows = 0;
288
289 let ds = crate::table_store::TableStore::append_or_create_batch(
290 &self.dataset_uri,
291 self.dataset.take(),
292 batch,
293 )
294 .await?;
295 self.dataset = Some(ds);
296 Ok(())
297 }
298}
299
300fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
301 if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
302 return TempDirBuilder::new()
303 .prefix(&format!(
304 "omnigraph-merge-{}-",
305 sanitize_table_key(table_key)
306 ))
307 .tempdir_in(PathBuf::from(root))
308 .map_err(OmniError::from);
309 }
310 TempDirBuilder::new()
311 .prefix(&format!(
312 "omnigraph-merge-{}-",
313 sanitize_table_key(table_key)
314 ))
315 .tempdir()
316 .map_err(OmniError::from)
317}
318
319fn sanitize_table_key(table_key: &str) -> String {
320 table_key
321 .chars()
322 .map(|ch| match ch {
323 ':' | '/' | '\\' => '-',
324 other => other,
325 })
326 .collect()
327}
328
329async fn compute_adopt_delta(
341 table_key: &str,
342 catalog: &Catalog,
343 base_snapshot: &Snapshot,
344 source_snapshot: &Snapshot,
345) -> Result<Option<AdoptDelta>> {
346 let schema = schema_for_table_key(catalog, table_key)?;
347 let mut append_writer =
348 StagedTableWriter::new(&format!("{}_adopt_append", table_key), schema.clone())?;
349 let mut upsert_writer =
350 StagedTableWriter::new(&format!("{}_adopt_upsert", table_key), schema)?;
351 let mut deleted_ids: Vec<String> = Vec::new();
352 let mut base = OrderedTableCursor::from_snapshot_lazy(base_snapshot, table_key).await?;
353 let mut source = OrderedTableCursor::from_snapshot_lazy(source_snapshot, table_key).await?;
354
355 let mut needs_update = false;
356
357 loop {
358 let base_row = base.peek_cloned().await?;
359 let source_row = source.peek_cloned().await?;
360
361 let next_id = [base_row.as_ref(), source_row.as_ref()]
362 .into_iter()
363 .flatten()
364 .map(|row| row.id.clone())
365 .min();
366 let Some(next_id) = next_id else { break };
367
368 let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
369 base.pop().await?
370 } else {
371 None
372 };
373 let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
374 source.pop().await?
375 } else {
376 None
377 };
378
379 match (&base_row, &source_row) {
380 (Some(_), None) => {
381 deleted_ids.push(next_id);
383 needs_update = true;
384 }
385 (None, Some(src)) => {
386 append_writer.push_row(src).await?;
389 needs_update = true;
390 }
391 (Some(base), Some(src)) => {
392 if src.compute_signature()? != base.compute_signature()? {
396 upsert_writer.push_row(src).await?;
398 needs_update = true;
399 }
400 }
402 (None, None) => unreachable!(),
403 }
404 }
405
406 if !needs_update {
407 return Ok(None);
408 }
409
410 let appends = if append_writer.row_count > 0 {
411 Some(append_writer.finish().await?)
412 } else {
413 None
414 };
415 let upserts = if upsert_writer.row_count > 0 {
416 Some(upsert_writer.finish().await?)
417 } else {
418 None
419 };
420
421 Ok(Some(AdoptDelta {
422 appends,
423 upserts,
424 deleted_ids,
425 }))
426}
427
428fn min_cursor_id(
429 base_row: &Option<CursorRow>,
430 source_row: &Option<CursorRow>,
431 target_row: &Option<CursorRow>,
432) -> Option<String> {
433 [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
434 .into_iter()
435 .flatten()
436 .map(|row| row.id.clone())
437 .min()
438}
439
440async fn stage_streaming_table_merge(
441 table_key: &str,
442 catalog: &Catalog,
443 base_snapshot: &Snapshot,
444 source_snapshot: &Snapshot,
445 target_snapshot: &Snapshot,
446 conflicts: &mut Vec<MergeConflict>,
447) -> Result<Option<StagedMergeResult>> {
448 let schema = schema_for_table_key(catalog, table_key)?;
449 let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
450 let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
451 let mut deleted_ids: Vec<String> = Vec::new();
452 let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
453 let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
454 let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
455
456 let prior_conflict_count = conflicts.len();
457 let mut needs_update = false;
458
459 loop {
460 let base_row = base.peek_cloned().await?;
461 let source_row = source.peek_cloned().await?;
462 let target_row = target.peek_cloned().await?;
463 let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
464 break;
465 };
466
467 let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
468 base.pop().await?
469 } else {
470 None
471 };
472 let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
473 {
474 source.pop().await?
475 } else {
476 None
477 };
478 let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
479 {
480 target.pop().await?
481 } else {
482 None
483 };
484
485 let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
486 let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
487 let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
488
489 let source_changed = source_sig != base_sig;
490 let target_changed = target_sig != base_sig;
491
492 let selection = if !source_changed {
493 target_row.as_ref()
494 } else if !target_changed {
495 source_row.as_ref()
496 } else if source_sig == target_sig {
497 target_row.as_ref()
498 } else {
499 conflicts.push(classify_merge_conflict(
500 table_key, &next_id, base_sig, source_sig, target_sig,
501 ));
502 None
503 };
504
505 if conflicts.len() > prior_conflict_count {
506 continue;
507 }
508
509 if selection.is_none() && target_row.is_some() {
511 deleted_ids.push(next_id.clone());
512 needs_update = true;
513 continue;
514 }
515
516 if let Some(selection) = selection {
517 full_writer.push_row(selection).await?;
519 if selection.signature.as_str() != target_sig.unwrap_or("") {
521 delta_writer.push_row(selection).await?;
522 needs_update = true;
523 }
524 }
525 }
526
527 if conflicts.len() > prior_conflict_count {
528 return Ok(None);
529 }
530 if !needs_update {
531 return Ok(None);
532 }
533
534 let delta_staged = if delta_writer.row_count > 0 {
535 Some(delta_writer.finish().await?)
536 } else {
537 None
538 };
539
540 Ok(Some(StagedMergeResult {
541 full_staged: full_writer.finish().await?,
542 delta_staged,
543 deleted_ids,
544 }))
545}
546
547fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
548 if let Some(name) = table_key.strip_prefix("node:") {
549 return catalog
550 .node_types
551 .get(name)
552 .map(|t| t.arrow_schema.clone())
553 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
554 }
555 if let Some(name) = table_key.strip_prefix("edge:") {
556 return catalog
557 .edge_types
558 .get(name)
559 .map(|t| t.arrow_schema.clone())
560 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
561 }
562 Err(OmniError::manifest(format!(
563 "invalid table key '{}'",
564 table_key
565 )))
566}
567
568fn same_manifest_state(
569 left: Option<&crate::db::SubTableEntry>,
570 right: Option<&crate::db::SubTableEntry>,
571) -> bool {
572 match (left, right) {
573 (Some(left), Some(right)) => {
574 left.table_version == right.table_version && left.table_branch == right.table_branch
575 }
576 (None, None) => true,
577 _ => false,
578 }
579}
580
581fn classify_merge_conflict(
582 table_key: &str,
583 row_id: &str,
584 base_sig: Option<&str>,
585 source_sig: Option<&str>,
586 target_sig: Option<&str>,
587) -> MergeConflict {
588 let (kind, message) = match (base_sig, source_sig, target_sig) {
589 (None, Some(_), Some(_)) => (
590 MergeConflictKind::DivergentInsert,
591 format!("divergent insert for id '{}'", row_id),
592 ),
593 (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
594 MergeConflictKind::DeleteVsUpdate,
595 format!("delete/update conflict for id '{}'", row_id),
596 ),
597 _ => (
598 MergeConflictKind::DivergentUpdate,
599 format!("divergent update for id '{}'", row_id),
600 ),
601 };
602 MergeConflict {
603 table_key: table_key.to_string(),
604 row_id: Some(row_id.to_string()),
605 kind,
606 message,
607 }
608}
609
610fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
611 let mut values = Vec::with_capacity(batch.num_columns());
612 for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
613 if field.name().starts_with("_row") {
614 continue;
615 }
616 values.push(
617 array_value_to_string(column.as_ref(), row)
618 .map_err(|e| OmniError::Lance(e.to_string()))?,
619 );
620 }
621 Ok(values.join("\u{1f}"))
622}
623
624async fn scan_validation_stream(ds: &Dataset) -> Result<DatasetRecordBatchStream> {
625 crate::table_store::TableStore::scan_stream_with(ds, None, None, None, false, |_| Ok(())).await
626}
627
628async fn validate_merge_candidates(
629 db: &Omnigraph,
630 source_snapshot: &Snapshot,
631 target_snapshot: &Snapshot,
632 candidates: &HashMap<String, CandidateTableState>,
633) -> Result<()> {
634 let mut conflicts = Vec::new();
635 let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
636
637 for (type_name, node_type) in &db.catalog().node_types {
638 let table_key = format!("node:{}", type_name);
639 let mut values = HashSet::new();
640 let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
641
642 if let Some(ds) =
643 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
644 {
645 let mut stream = scan_validation_stream(&ds).await?;
646 while let Some(batch) = stream
647 .try_next()
648 .await
649 .map_err(|e| OmniError::Lance(e.to_string()))?
650 {
651 if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
652 conflicts.push(MergeConflict {
653 table_key: table_key.clone(),
654 row_id: None,
655 kind: MergeConflictKind::ValueConstraintViolation,
656 message: err.to_string(),
657 });
658 }
659 update_unique_constraints(
660 &table_key,
661 &batch,
662 &node_type.unique_constraints,
663 &mut unique_seen,
664 &mut conflicts,
665 )?;
666 let ids = batch
667 .column_by_name("id")
668 .ok_or_else(|| {
669 OmniError::manifest(format!("table {} missing id column", table_key))
670 })?
671 .as_any()
672 .downcast_ref::<StringArray>()
673 .ok_or_else(|| {
674 OmniError::manifest(format!("table {} id column is not Utf8", table_key))
675 })?;
676 for row in 0..ids.len() {
677 values.insert(ids.value(row).to_string());
678 }
679 }
680 }
681 node_ids.insert(type_name.clone(), values);
682 }
683
684 for (edge_name, edge_type) in &db.catalog().edge_types {
685 let table_key = format!("edge:{}", edge_name);
686 let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
687 let mut src_counts = HashMap::new();
688
689 if let Some(ds) =
690 candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
691 {
692 let mut stream = scan_validation_stream(&ds).await?;
693 while let Some(batch) = stream
694 .try_next()
695 .await
696 .map_err(|e| OmniError::Lance(e.to_string()))?
697 {
698 update_unique_constraints(
699 &table_key,
700 &batch,
701 &edge_type.unique_constraints,
702 &mut unique_seen,
703 &mut conflicts,
704 )?;
705 accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
706 conflicts.extend(validate_orphan_edges_batch(
707 &table_key, edge_type, &batch, &node_ids,
708 )?);
709 }
710 }
711
712 conflicts.extend(finalize_edge_cardinality_conflicts(
713 &table_key,
714 edge_name,
715 edge_type.cardinality.min,
716 edge_type.cardinality.max,
717 src_counts,
718 ));
719 }
720
721 if conflicts.is_empty() {
722 Ok(())
723 } else {
724 Err(OmniError::MergeConflicts(conflicts))
725 }
726}
727
728async fn candidate_dataset(
729 source_snapshot: &Snapshot,
730 target_snapshot: &Snapshot,
731 candidates: &HashMap<String, CandidateTableState>,
732 table_key: &str,
733) -> Result<Option<Dataset>> {
734 if let Some(candidate) = candidates.get(table_key) {
735 return match candidate {
736 CandidateTableState::AdoptSourceState | CandidateTableState::AdoptWithDelta(_) => {
737 match source_snapshot.entry(table_key) {
738 Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
739 None => Ok(None),
740 }
741 }
742 CandidateTableState::RewriteMerged(staged) => {
743 Ok(Some(staged.full_staged.dataset.clone()))
744 }
745 };
746 }
747 match target_snapshot.entry(table_key) {
748 Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
749 None => Ok(None),
750 }
751}
752
753fn update_unique_constraints(
754 table_key: &str,
755 batch: &RecordBatch,
756 constraints: &[Vec<String>],
757 seen: &mut [HashMap<Vec<String>, String>],
758 conflicts: &mut Vec<MergeConflict>,
759) -> Result<()> {
760 for (constraint_idx, columns) in constraints.iter().enumerate() {
761 let seen = &mut seen[constraint_idx];
762 let group_columns = columns
766 .iter()
767 .map(|column_name| {
768 batch.column_by_name(column_name).cloned().ok_or_else(|| {
769 OmniError::manifest(format!(
770 "table {} missing unique column '{}'",
771 table_key, column_name
772 ))
773 })
774 })
775 .collect::<Result<Vec<_>>>()?;
776 for row in 0..batch.num_rows() {
777 let Some(key) = crate::loader::composite_unique_key(&group_columns, row)? else {
781 continue;
782 };
783 let row_id = row_id_at(batch, row)?;
784 if let Some(first_row_id) = seen.insert(key, row_id.clone()) {
785 conflicts.push(MergeConflict {
786 table_key: table_key.to_string(),
787 row_id: Some(row_id.clone()),
788 kind: MergeConflictKind::UniqueViolation,
789 message: format!(
790 "unique constraint {:?} violated by '{}' and '{}'",
791 columns, first_row_id, row_id
792 ),
793 });
794 }
795 }
796 }
797 Ok(())
798}
799
800fn accumulate_edge_cardinality(
801 batch: &RecordBatch,
802 counts: &mut HashMap<String, u32>,
803 table_key: &str,
804) -> Result<()> {
805 let srcs = batch
806 .column_by_name("src")
807 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
808 .as_any()
809 .downcast_ref::<StringArray>()
810 .ok_or_else(|| {
811 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
812 })?;
813 for row in 0..srcs.len() {
814 *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
815 }
816 Ok(())
817}
818
819fn finalize_edge_cardinality_conflicts(
820 table_key: &str,
821 edge_name: &str,
822 min: u32,
823 max: Option<u32>,
824 counts: HashMap<String, u32>,
825) -> Vec<MergeConflict> {
826 let mut conflicts = Vec::new();
827 for (src, count) in counts {
828 if let Some(max) = max {
829 if count > max {
830 conflicts.push(MergeConflict {
831 table_key: table_key.to_string(),
832 row_id: None,
833 kind: MergeConflictKind::CardinalityViolation,
834 message: format!(
835 "@card violation on edge {}: source '{}' has {} edges (max {})",
836 edge_name, src, count, max
837 ),
838 });
839 }
840 }
841 if count < min {
842 conflicts.push(MergeConflict {
843 table_key: table_key.to_string(),
844 row_id: None,
845 kind: MergeConflictKind::CardinalityViolation,
846 message: format!(
847 "@card violation on edge {}: source '{}' has {} edges (min {})",
848 edge_name, src, count, min
849 ),
850 });
851 }
852 }
853 conflicts
854}
855
856fn validate_orphan_edges_batch(
857 table_key: &str,
858 edge_type: &omnigraph_compiler::catalog::EdgeType,
859 batch: &RecordBatch,
860 node_ids: &HashMap<String, HashSet<String>>,
861) -> Result<Vec<MergeConflict>> {
862 let srcs = batch
863 .column_by_name("src")
864 .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
865 .as_any()
866 .downcast_ref::<StringArray>()
867 .ok_or_else(|| {
868 OmniError::manifest(format!("table {} src column is not Utf8", table_key))
869 })?;
870 let dsts = batch
871 .column_by_name("dst")
872 .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
873 .as_any()
874 .downcast_ref::<StringArray>()
875 .ok_or_else(|| {
876 OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
877 })?;
878
879 let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
880 OmniError::manifest(format!(
881 "missing candidate node ids for {}",
882 edge_type.from_type
883 ))
884 })?;
885 let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
886 OmniError::manifest(format!(
887 "missing candidate node ids for {}",
888 edge_type.to_type
889 ))
890 })?;
891
892 let mut conflicts = Vec::new();
893 for row in 0..batch.num_rows() {
894 let row_id = row_id_at(batch, row)?;
895 let src = srcs.value(row);
896 let dst = dsts.value(row);
897 if !from_ids.contains(src) {
898 conflicts.push(MergeConflict {
899 table_key: table_key.to_string(),
900 row_id: Some(row_id.clone()),
901 kind: MergeConflictKind::OrphanEdge,
902 message: format!("src '{}' not found in {}", src, edge_type.from_type),
903 });
904 }
905 if !to_ids.contains(dst) {
906 conflicts.push(MergeConflict {
907 table_key: table_key.to_string(),
908 row_id: Some(row_id),
909 kind: MergeConflictKind::OrphanEdge,
910 message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
911 });
912 }
913 }
914 Ok(conflicts)
915}
916
917fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
918 let ids = batch
919 .column_by_name("id")
920 .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
921 .as_any()
922 .downcast_ref::<StringArray>()
923 .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
924 Ok(ids.value(row).to_string())
925}
926
927async fn classify_adopt(
939 target_db: &Omnigraph,
940 catalog: &Catalog,
941 base_snapshot: &Snapshot,
942 source_snapshot: &Snapshot,
943 target_snapshot: &Snapshot,
944 table_key: &str,
945) -> Result<CandidateTableState> {
946 let Some(source_entry) = source_snapshot.entry(table_key) else {
947 return Ok(CandidateTableState::AdoptSourceState);
948 };
949 let target_entry = target_snapshot.entry(table_key);
950 let target_active = target_db.active_branch().await;
951 let advances_head = match (
952 target_active.as_deref(),
953 source_entry.table_branch.as_deref(),
954 ) {
955 (None, Some(_)) => true,
957 (Some(target_branch), Some(_)) => {
959 target_entry.and_then(|e| e.table_branch.as_deref()) == Some(target_branch)
960 }
961 _ => false,
963 };
964 if !advances_head {
965 return Ok(CandidateTableState::AdoptSourceState);
966 }
967 match compute_adopt_delta(table_key, catalog, base_snapshot, source_snapshot).await? {
968 Some(delta) => Ok(CandidateTableState::AdoptWithDelta(delta)),
969 None => Ok(CandidateTableState::AdoptSourceState),
970 }
971}
972
973async fn publish_adopted_source_state(
979 target_db: &Omnigraph,
980 source_snapshot: &Snapshot,
981 target_snapshot: &Snapshot,
982 table_key: &str,
983) -> Result<crate::db::SubTableUpdate> {
984 let source_entry = source_snapshot
985 .entry(table_key)
986 .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
987 let target_entry = target_snapshot.entry(table_key);
988
989 let target_active = target_db.active_branch().await;
990 match (
991 target_active.as_deref(),
992 source_entry.table_branch.as_deref(),
993 ) {
994 (None, None) => Ok(crate::db::SubTableUpdate {
996 table_key: table_key.to_string(),
997 table_version: source_entry.table_version,
998 table_branch: None,
999 row_count: source_entry.row_count,
1000 version_metadata: source_entry.version_metadata.clone(),
1001 }),
1002 (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
1005 table_key: table_key.to_string(),
1006 table_version: source_entry.table_version,
1007 table_branch: None,
1008 row_count: source_entry.row_count,
1009 version_metadata: source_entry.version_metadata.clone(),
1010 }),
1011 (None, Some(_source_branch)) => Ok(crate::db::SubTableUpdate {
1014 table_key: table_key.to_string(),
1015 table_version: target_entry
1016 .map(|e| e.table_version)
1017 .unwrap_or(source_entry.table_version),
1018 table_branch: None,
1019 row_count: source_entry.row_count,
1020 version_metadata: target_entry
1021 .map(|entry| entry.version_metadata.clone())
1022 .unwrap_or_else(|| source_entry.version_metadata.clone()),
1023 }),
1024 (Some(target_branch), Some(source_branch)) => {
1026 if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
1027 Ok(crate::db::SubTableUpdate {
1030 table_key: table_key.to_string(),
1031 table_version: target_entry.unwrap().table_version,
1032 table_branch: Some(target_branch.to_string()),
1033 row_count: source_entry.row_count,
1034 version_metadata: target_entry.unwrap().version_metadata.clone(),
1035 })
1036 } else {
1037 let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
1040 let ds = target_db
1041 .fork_dataset_from_entry_state(
1042 table_key,
1043 &full_path,
1044 Some(source_branch),
1045 source_entry.table_version,
1046 target_branch,
1047 )
1048 .await?;
1049 let state = target_db.storage().table_state(&full_path, &ds).await?;
1050 Ok(crate::db::SubTableUpdate {
1051 table_key: table_key.to_string(),
1052 table_version: state.version,
1053 table_branch: Some(target_branch.to_string()),
1054 row_count: state.row_count,
1055 version_metadata: state.version_metadata,
1056 })
1057 }
1058 }
1059 }
1060}
1061
1062async fn publish_rewritten_merge_table(
1063 target_db: &Omnigraph,
1064 table_key: &str,
1065 staged: &StagedMergeResult,
1066) -> Result<crate::db::SubTableUpdate> {
1067 let (ds, full_path, table_branch) = target_db
1072 .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
1073 .await?;
1074 let mut current_ds = ds;
1075
1076 if let Some(delta) = &staged.delta_staged {
1086 let delta_snapshot = SnapshotHandle::new(delta.dataset.clone());
1090 let batches: Vec<RecordBatch> = target_db
1091 .storage()
1092 .scan_batches_for_rewrite(&delta_snapshot)
1093 .await?
1094 .into_iter()
1095 .filter(|batch| batch.num_rows() > 0)
1096 .collect();
1097 if !batches.is_empty() {
1098 let combined = if batches.len() == 1 {
1100 batches.into_iter().next().unwrap()
1101 } else {
1102 let schema = batches[0].schema();
1103 arrow_select::concat::concat_batches(&schema, &batches)
1104 .map_err(|e| OmniError::Lance(e.to_string()))?
1105 };
1106 let staged_merge = target_db
1107 .storage()
1108 .stage_merge_insert(
1109 current_ds.clone(),
1110 combined,
1111 vec!["id".to_string()],
1112 lance::dataset::WhenMatched::UpdateAll,
1113 lance::dataset::WhenNotMatched::InsertAll,
1114 )
1115 .await?;
1116 current_ds = target_db
1117 .storage()
1118 .commit_staged(current_ds, staged_merge)
1119 .await?;
1120 }
1121 }
1122
1123 crate::failpoints::maybe_fail("branch_merge.rewrite_after_merge_pre_delete")?;
1129
1130 if !staged.deleted_ids.is_empty() {
1140 let escaped: Vec<String> = staged
1141 .deleted_ids
1142 .iter()
1143 .map(|id| format!("'{}'", id.replace('\'', "''")))
1144 .collect();
1145 let filter = format!("id IN ({})", escaped.join(", "));
1146 let (new_ds, _) = target_db
1147 .storage_inline_residual()
1148 .delete_where(&full_path, current_ds, &filter)
1149 .await?;
1150 current_ds = new_ds;
1151 }
1152
1153 crate::failpoints::maybe_fail("branch_merge.rewrite_after_delete_pre_index")?;
1160
1161 let row_count = target_db
1169 .storage()
1170 .table_state(&full_path, ¤t_ds)
1171 .await?
1172 .row_count;
1173 if row_count > 0 {
1174 target_db
1175 .build_indices_on_dataset(table_key, &mut current_ds)
1176 .await?;
1177 }
1178 let final_state = target_db
1179 .storage()
1180 .table_state(&full_path, ¤t_ds)
1181 .await?;
1182
1183 Ok(crate::db::SubTableUpdate {
1184 table_key: table_key.to_string(),
1185 table_version: final_state.version,
1186 table_branch,
1187 row_count: final_state.row_count,
1188 version_metadata: final_state.version_metadata,
1189 })
1190}
1191
1192async fn scan_staged_combined(
1196 target_db: &Omnigraph,
1197 table: &StagedTable,
1198) -> Result<Option<RecordBatch>> {
1199 crate::instrumentation::record_scan_staged_combined();
1200 let snapshot = SnapshotHandle::new(table.dataset.clone());
1201 let batches: Vec<RecordBatch> = target_db
1202 .storage()
1203 .scan_batches_for_rewrite(&snapshot)
1204 .await?
1205 .into_iter()
1206 .filter(|batch| batch.num_rows() > 0)
1207 .collect();
1208 if batches.is_empty() {
1209 return Ok(None);
1210 }
1211 let combined = if batches.len() == 1 {
1212 batches.into_iter().next().unwrap()
1213 } else {
1214 let schema = batches[0].schema();
1215 arrow_select::concat::concat_batches(&schema, &batches)
1216 .map_err(|e| OmniError::Lance(e.to_string()))?
1217 };
1218 Ok(Some(combined))
1219}
1220
1221async fn publish_adopted_delta(
1236 target_db: &Omnigraph,
1237 table_key: &str,
1238 delta: &AdoptDelta,
1239) -> Result<crate::db::SubTableUpdate> {
1240 let (ds, full_path, table_branch) = target_db
1241 .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
1242 .await?;
1243 let mut current_ds = ds;
1244
1245 if let Some(append_table) = &delta.appends {
1257 let source = SnapshotHandle::new(append_table.dataset.clone());
1258 let staged = target_db
1259 .storage()
1260 .stage_append_stream(¤t_ds, &source, &[])
1261 .await?;
1262 current_ds = target_db
1263 .storage()
1264 .commit_staged(current_ds, staged)
1265 .await?;
1266 }
1267
1268 crate::failpoints::maybe_fail("branch_merge.adopt_after_append_pre_upsert")?;
1274
1275 if let Some(upsert_table) = &delta.upserts {
1282 if let Some(combined) = scan_staged_combined(target_db, upsert_table).await? {
1283 let staged_merge = target_db
1284 .storage()
1285 .stage_merge_insert(
1286 current_ds.clone(),
1287 combined,
1288 vec!["id".to_string()],
1289 lance::dataset::WhenMatched::UpdateAll,
1290 lance::dataset::WhenNotMatched::InsertAll,
1291 )
1292 .await?;
1293 current_ds = target_db
1294 .storage()
1295 .commit_staged(current_ds, staged_merge)
1296 .await?;
1297 }
1298 }
1299
1300 crate::failpoints::maybe_fail("branch_merge.adopt_after_upsert_pre_delete")?;
1306
1307 if !delta.deleted_ids.is_empty() {
1310 let escaped: Vec<String> = delta
1311 .deleted_ids
1312 .iter()
1313 .map(|id| format!("'{}'", id.replace('\'', "''")))
1314 .collect();
1315 let filter = format!("id IN ({})", escaped.join(", "));
1316 let (new_ds, _) = target_db
1317 .storage_inline_residual()
1318 .delete_where(&full_path, current_ds, &filter)
1319 .await?;
1320 current_ds = new_ds;
1321 }
1322
1323 let final_state = target_db
1333 .storage()
1334 .table_state(&full_path, ¤t_ds)
1335 .await?;
1336
1337 Ok(crate::db::SubTableUpdate {
1338 table_key: table_key.to_string(),
1339 table_version: final_state.version,
1340 table_branch,
1341 row_count: final_state.row_count,
1342 version_metadata: final_state.version_metadata,
1343 })
1344}
1345
1346impl Omnigraph {
1347 pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
1348 self.branch_merge_as(source, target, None).await
1349 }
1350
1351 pub async fn branch_merge_as(
1352 &self,
1353 source: &str,
1354 target: &str,
1355 actor_id: Option<&str>,
1356 ) -> Result<MergeOutcome> {
1357 self.enforce(
1365 omnigraph_policy::PolicyAction::BranchMerge,
1366 &omnigraph_policy::ResourceScope::BranchTransition {
1367 source: source.to_string(),
1368 target: target.to_string(),
1369 },
1370 actor_id,
1371 )?;
1372 self.ensure_schema_apply_idle("branch_merge").await?;
1373 self.heal_pending_recovery_sidecars().await?;
1380 self.branch_merge_impl(source, target, actor_id).await
1381 }
1382
1383 async fn branch_merge_impl(
1384 &self,
1385 source: &str,
1386 target: &str,
1387 actor_id: Option<&str>,
1388 ) -> Result<MergeOutcome> {
1389 if is_internal_system_branch(source) || is_internal_system_branch(target) {
1390 return Err(OmniError::manifest(format!(
1391 "branch_merge does not allow internal system refs ('{}' -> '{}')",
1392 source, target
1393 )));
1394 }
1395 let source_branch = Omnigraph::normalize_branch_name(source)?;
1396 let target_branch = Omnigraph::normalize_branch_name(target)?;
1397 if source_branch == target_branch {
1398 return Err(OmniError::manifest(
1399 "branch_merge requires distinct source and target branches".to_string(),
1400 ));
1401 }
1402
1403 let source_head_commit_id = self
1404 .head_commit_id_for_branch(source_branch.as_deref())
1405 .await?
1406 .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1407 let target_head_commit_id = self
1408 .head_commit_id_for_branch(target_branch.as_deref())
1409 .await?
1410 .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1411 let base_commit = CommitGraph::merge_base(
1412 self.uri(),
1413 source_branch.as_deref(),
1414 target_branch.as_deref(),
1415 )
1416 .await?
1417 .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1418
1419 if source_head_commit_id == target_head_commit_id
1420 || base_commit.graph_commit_id == source_head_commit_id
1421 {
1422 return Ok(MergeOutcome::AlreadyUpToDate);
1423 }
1424 let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1425
1426 let base_snapshot = ManifestCoordinator::snapshot_at(
1427 self.uri(),
1428 base_commit.manifest_branch.as_deref(),
1429 base_commit.manifest_version,
1430 )
1431 .await?;
1432 let source_snapshot = self
1433 .resolved_target(ReadTarget::Branch(
1434 source_branch.clone().unwrap_or_else(|| "main".to_string()),
1435 ))
1436 .await?
1437 .snapshot;
1438 let merge_exclusive = self.merge_exclusive();
1446 let _merge_guard = merge_exclusive.lock().await;
1447
1448 let previous_branch = self.active_branch().await;
1449 let previous = self
1450 .swap_coordinator_for_branch(target_branch.as_deref())
1451 .await?;
1452 let merge_result = self
1453 .branch_merge_on_current_target(
1454 &base_snapshot,
1455 &source_snapshot,
1456 &target_head_commit_id,
1457 &source_head_commit_id,
1458 is_fast_forward,
1459 actor_id,
1460 )
1461 .await;
1462 self.restore_coordinator(previous).await;
1463
1464 if previous_branch == target_branch {
1499 if let Err(refresh_err) = self.refresh_coordinator_only().await {
1500 if merge_result.is_ok() {
1501 return Err(refresh_err);
1502 }
1503 tracing::warn!(
1504 error = %refresh_err,
1505 "post-merge coordinator refresh failed on the error path; \
1506 the next op or open will re-sync"
1507 );
1508 }
1509 }
1510
1511 merge_result
1512 }
1513
1514 async fn branch_merge_on_current_target(
1515 &self,
1516 base_snapshot: &Snapshot,
1517 source_snapshot: &Snapshot,
1518 target_head_commit_id: &str,
1519 source_head_commit_id: &str,
1520 is_fast_forward: bool,
1521 actor_id: Option<&str>,
1522 ) -> Result<MergeOutcome> {
1523 self.ensure_commit_graph_initialized().await?;
1524 let target_snapshot = self.snapshot().await;
1525
1526 let mut table_keys = HashSet::new();
1527 for entry in base_snapshot.entries() {
1528 table_keys.insert(entry.table_key.clone());
1529 }
1530 for entry in source_snapshot.entries() {
1531 table_keys.insert(entry.table_key.clone());
1532 }
1533 for entry in target_snapshot.entries() {
1534 table_keys.insert(entry.table_key.clone());
1535 }
1536
1537 let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1538 ordered_table_keys.sort();
1539
1540 let mut conflicts = Vec::new();
1541 let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1542
1543 for table_key in &ordered_table_keys {
1544 let base_entry = base_snapshot.entry(table_key);
1545 let source_entry = source_snapshot.entry(table_key);
1546 let target_entry = target_snapshot.entry(table_key);
1547 if same_manifest_state(source_entry, target_entry) {
1548 continue;
1549 }
1550 if same_manifest_state(base_entry, source_entry) {
1551 continue;
1552 }
1553 if same_manifest_state(base_entry, target_entry) {
1554 let candidate = classify_adopt(
1555 self,
1556 &self.catalog(),
1557 base_snapshot,
1558 source_snapshot,
1559 &target_snapshot,
1560 table_key,
1561 )
1562 .await?;
1563 candidates.insert(table_key.clone(), candidate);
1564 continue;
1565 }
1566
1567 if let Some(staged) = stage_streaming_table_merge(
1568 table_key,
1569 &self.catalog(),
1570 base_snapshot,
1571 source_snapshot,
1572 &target_snapshot,
1573 &mut conflicts,
1574 )
1575 .await?
1576 {
1577 candidates.insert(
1578 table_key.clone(),
1579 CandidateTableState::RewriteMerged(staged),
1580 );
1581 }
1582 }
1583
1584 if !conflicts.is_empty() {
1585 return Err(OmniError::MergeConflicts(conflicts));
1586 }
1587
1588 validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1589
1590 let active_branch_for_keys = self.active_branch().await;
1621 let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
1622 .iter()
1623 .filter(|table_key| {
1624 matches!(
1625 candidates.get(*table_key),
1626 Some(CandidateTableState::RewriteMerged(_))
1627 | Some(CandidateTableState::AdoptSourceState)
1628 | Some(CandidateTableState::AdoptWithDelta(_))
1629 )
1630 })
1631 .map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
1632 .collect();
1633 let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
1634
1635 let post_queue_snapshot = self.snapshot().await;
1636 for table_key in &ordered_table_keys {
1637 let Some(candidate) = candidates.get(table_key) else {
1638 continue;
1639 };
1640 if !matches!(
1641 candidate,
1642 CandidateTableState::RewriteMerged(_)
1643 | CandidateTableState::AdoptSourceState
1644 | CandidateTableState::AdoptWithDelta(_)
1645 ) {
1646 continue;
1647 }
1648 let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
1649 let current = post_queue_snapshot
1650 .entry(table_key)
1651 .map(|e| e.table_version);
1652 if expected != current {
1653 return Err(OmniError::manifest_expected_version_mismatch(
1654 table_key.clone(),
1655 expected.unwrap_or(0),
1656 current.unwrap_or(0),
1657 ));
1658 }
1659 }
1660
1661 let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
1662 .iter()
1663 .filter_map(|table_key| {
1664 let candidate = candidates.get(table_key)?;
1665 if !matches!(
1666 candidate,
1667 CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptWithDelta(_)
1668 ) {
1669 return None;
1670 }
1671 let entry = target_snapshot.entry(table_key)?;
1672 Some(crate::db::manifest::SidecarTablePin {
1673 table_key: table_key.clone(),
1674 table_path: self.storage().dataset_uri(&entry.table_path),
1675 expected_version: entry.table_version,
1676 post_commit_pin: entry.table_version + 1,
1677 confirmed_version: None,
1682 table_branch: active_branch_for_keys.clone(),
1695 })
1696 })
1697 .collect();
1698 let mut recovery: Option<(
1703 crate::db::manifest::RecoverySidecar,
1704 crate::db::manifest::RecoverySidecarHandle,
1705 )> = if recovery_pins.is_empty() {
1706 None
1707 } else {
1708 let target_branch = active_branch_for_keys.clone();
1718 let mut sidecar = crate::db::manifest::new_sidecar(
1719 crate::db::manifest::SidecarKind::BranchMerge,
1720 target_branch,
1721 actor_id.map(str::to_string),
1722 recovery_pins,
1723 );
1724 sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
1730 let handle = crate::db::manifest::write_sidecar(
1731 self.root_uri(),
1732 self.storage_adapter(),
1733 &sidecar,
1734 )
1735 .await?;
1736 Some((sidecar, handle))
1737 };
1738
1739 let mut updates = Vec::new();
1740 let mut changed_edge_tables = false;
1741 for table_key in &ordered_table_keys {
1742 let Some(candidate_state) = candidates.get(table_key) else {
1743 continue;
1744 };
1745 let update = match candidate_state {
1746 CandidateTableState::AdoptSourceState => {
1747 publish_adopted_source_state(self, source_snapshot, &target_snapshot, table_key)
1748 .await?
1749 }
1750 CandidateTableState::AdoptWithDelta(delta) => {
1751 publish_adopted_delta(self, table_key, delta).await?
1752 }
1753 CandidateTableState::RewriteMerged(staged) => {
1754 publish_rewritten_merge_table(self, table_key, staged).await?
1755 }
1756 };
1757 if table_key.starts_with("edge:") {
1758 changed_edge_tables = true;
1759 }
1760 updates.push(update);
1761 }
1762
1763 if let Some((sidecar, _)) = recovery.as_mut() {
1772 let confirmed_versions: std::collections::HashMap<String, u64> = updates
1773 .iter()
1774 .map(|u| (u.table_key.clone(), u.table_version))
1775 .collect();
1776 crate::db::manifest::confirm_sidecar_phase_b(
1777 self.root_uri(),
1778 self.storage_adapter(),
1779 sidecar,
1780 &confirmed_versions,
1781 )
1782 .await?;
1783 }
1784
1785 crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
1791
1792 let manifest_version = if updates.is_empty() {
1793 self.version().await
1794 } else {
1795 self.commit_manifest_updates(&updates).await?
1796 };
1797
1798 if let Some((_, handle)) = recovery {
1802 if let Err(err) =
1803 crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
1804 {
1805 tracing::warn!(
1806 error = %err,
1807 operation_id = handle.operation_id.as_str(),
1808 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
1809 );
1810 }
1811 }
1812 self.record_merge_commit(
1813 manifest_version,
1814 target_head_commit_id,
1815 source_head_commit_id,
1816 actor_id,
1817 )
1818 .await?;
1819
1820 if changed_edge_tables {
1821 self.invalidate_graph_index().await;
1822 }
1823
1824 Ok(if is_fast_forward {
1825 MergeOutcome::FastForward
1826 } else {
1827 MergeOutcome::Merged
1828 })
1829 }
1830}