Skip to main content

omnigraph/exec/
merge.rs

1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8    AdoptSourceState,
9    RewriteMerged(StagedMergeResult),
10}
11
12#[derive(Debug)]
13struct StagedTable {
14    _dir: TempDir,
15    dataset: Dataset,
16}
17
18#[derive(Debug)]
19struct StagedMergeResult {
20    full_staged: StagedTable,
21    delta_staged: Option<StagedTable>,
22    deleted_ids: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
26struct CursorRow {
27    id: String,
28    signature: String,
29    dataset: Dataset,
30    batch: RecordBatch,
31    row_index: usize,
32}
33
34struct OrderedTableCursor {
35    stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
36    dataset: Option<Dataset>,
37    current_batch: Option<RecordBatch>,
38    current_row: usize,
39    peeked: Option<CursorRow>,
40}
41
42impl OrderedTableCursor {
43    async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
44        let dataset = match snapshot.entry(table_key) {
45            Some(_) => Some(snapshot.open(table_key).await?),
46            None => None,
47        };
48        Self::from_dataset(dataset).await
49    }
50
51    async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
52        let stream = if let Some(ds) = &dataset {
53            Some(Box::pin(
54                crate::table_store::TableStore::scan_stream_with(
55                    ds,
56                    None,
57                    None,
58                    Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
59                    true,
60                    |_| Ok(()),
61                )
62                .await?,
63            ))
64        } else {
65            None
66        };
67
68        Ok(Self {
69            stream,
70            dataset,
71            current_batch: None,
72            current_row: 0,
73            peeked: None,
74        })
75    }
76
77    async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
78        if self.peeked.is_none() {
79            self.peeked = self.next_row().await?;
80        }
81        Ok(self.peeked.clone())
82    }
83
84    async fn pop(&mut self) -> Result<Option<CursorRow>> {
85        if self.peeked.is_some() {
86            return Ok(self.peeked.take());
87        }
88        self.next_row().await
89    }
90
91    async fn next_row(&mut self) -> Result<Option<CursorRow>> {
92        loop {
93            if let Some(batch) = &self.current_batch {
94                if self.current_row < batch.num_rows() {
95                    let row_index = self.current_row;
96                    self.current_row += 1;
97                    let dataset = self.dataset.clone().ok_or_else(|| {
98                        OmniError::manifest("cursor row missing source dataset".to_string())
99                    })?;
100                    return Ok(Some(CursorRow {
101                        id: row_id_at(batch, row_index)?,
102                        signature: row_signature(batch, row_index)?,
103                        dataset,
104                        batch: batch.clone(),
105                        row_index,
106                    }));
107                }
108            }
109
110            let Some(stream) = self.stream.as_mut() else {
111                return Ok(None);
112            };
113            match stream.try_next().await {
114                Ok(Some(batch)) => {
115                    self.current_batch = Some(batch);
116                    self.current_row = 0;
117                }
118                Ok(None) => {
119                    self.stream = None;
120                    self.current_batch = None;
121                    return Ok(None);
122                }
123                Err(err) => return Err(OmniError::Lance(err.to_string())),
124            }
125        }
126    }
127}
128
129struct StagedTableWriter {
130    schema: SchemaRef,
131    dataset_uri: String,
132    dir: TempDir,
133    dataset: Option<Dataset>,
134    buffered_rows: usize,
135    row_count: u64,
136    batches: Vec<RecordBatch>,
137}
138
139impl StagedTableWriter {
140    fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
141        let dir = merge_stage_tempdir(table_key)?;
142        let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
143        Ok(Self {
144            schema,
145            dataset_uri,
146            dir,
147            dataset: None,
148            buffered_rows: 0,
149            row_count: 0,
150            batches: Vec::new(),
151        })
152    }
153
154    async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
155        self.row_count += 1;
156        self.buffered_rows += 1;
157        self.batches.push(self.row_batch(row).await?);
158        if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
159            self.flush().await?;
160        }
161        Ok(())
162    }
163
164    async fn row_batch(&self, row: &CursorRow) -> Result<RecordBatch> {
165        let batch = row.batch.slice(row.row_index, 1);
166        let has_blob_columns = row
167            .dataset
168            .schema()
169            .fields_pre_order()
170            .any(|field| field.is_blob());
171        if has_blob_columns {
172            return crate::table_store::TableStore::materialize_blob_batch(&row.dataset, batch)
173                .await;
174        }
175        let columns = self
176            .schema
177            .fields()
178            .iter()
179            .map(|field| {
180                batch.column_by_name(field.name()).cloned().ok_or_else(|| {
181                    OmniError::Lance(format!("batch missing column '{}'", field.name()))
182                })
183            })
184            .collect::<Result<Vec<_>>>()?;
185        RecordBatch::try_new(self.schema.clone(), columns)
186            .map_err(|e| OmniError::Lance(e.to_string()))
187    }
188
189    async fn finish(mut self) -> Result<StagedTable> {
190        self.flush().await?;
191        if self.dataset.is_none() {
192            self.dataset = Some(
193                crate::table_store::TableStore::create_empty_dataset(
194                    &self.dataset_uri,
195                    &self.schema,
196                )
197                .await?,
198            );
199        }
200        Ok(StagedTable {
201            _dir: self.dir,
202            dataset: self.dataset.unwrap(),
203        })
204    }
205
206    async fn flush(&mut self) -> Result<()> {
207        if self.batches.is_empty() {
208            return Ok(());
209        }
210
211        let batch = if self.batches.len() == 1 {
212            self.batches.pop().unwrap()
213        } else {
214            let batches = std::mem::take(&mut self.batches);
215            arrow_select::concat::concat_batches(&self.schema, &batches)
216                .map_err(|e| OmniError::Lance(e.to_string()))?
217        };
218        self.buffered_rows = 0;
219
220        let ds = crate::table_store::TableStore::append_or_create_batch(
221            &self.dataset_uri,
222            self.dataset.take(),
223            batch,
224        )
225        .await?;
226        self.dataset = Some(ds);
227        Ok(())
228    }
229}
230
231fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
232    if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
233        return TempDirBuilder::new()
234            .prefix(&format!(
235                "omnigraph-merge-{}-",
236                sanitize_table_key(table_key)
237            ))
238            .tempdir_in(PathBuf::from(root))
239            .map_err(OmniError::from);
240    }
241    TempDirBuilder::new()
242        .prefix(&format!(
243            "omnigraph-merge-{}-",
244            sanitize_table_key(table_key)
245        ))
246        .tempdir()
247        .map_err(OmniError::from)
248}
249
250fn sanitize_table_key(table_key: &str) -> String {
251    table_key
252        .chars()
253        .map(|ch| match ch {
254            ':' | '/' | '\\' => '-',
255            other => other,
256        })
257        .collect()
258}
259
260/// Computes the delta between base and source for an adopted-source merge.
261/// Returns the changed/new rows (for merge_insert) and deleted IDs (for delete).
262async fn compute_source_delta(
263    table_key: &str,
264    catalog: &Catalog,
265    base_snapshot: &Snapshot,
266    source_snapshot: &Snapshot,
267) -> Result<Option<StagedMergeResult>> {
268    let schema = schema_for_table_key(catalog, table_key)?;
269    let mut full_writer =
270        StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
271    let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
272    let mut deleted_ids: Vec<String> = Vec::new();
273    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
274    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
275
276    let mut needs_update = false;
277
278    loop {
279        let base_row = base.peek_cloned().await?;
280        let source_row = source.peek_cloned().await?;
281
282        let next_id = [base_row.as_ref(), source_row.as_ref()]
283            .into_iter()
284            .flatten()
285            .map(|row| row.id.clone())
286            .min();
287        let Some(next_id) = next_id else { break };
288
289        let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
290            base.pop().await?
291        } else {
292            None
293        };
294        let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
295            source.pop().await?
296        } else {
297            None
298        };
299
300        let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
301        let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
302
303        match (&base_row, &source_row) {
304            (Some(_), None) => {
305                // Deleted on source
306                deleted_ids.push(next_id);
307                needs_update = true;
308            }
309            (None, Some(src)) => {
310                // New on source
311                full_writer.push_row(src).await?;
312                delta_writer.push_row(src).await?;
313                needs_update = true;
314            }
315            (Some(_), Some(src)) if source_sig != base_sig => {
316                // Changed on source
317                full_writer.push_row(src).await?;
318                delta_writer.push_row(src).await?;
319                needs_update = true;
320            }
321            (Some(base), Some(_)) => {
322                // Unchanged — write to full (for validation), skip delta
323                full_writer.push_row(base).await?;
324            }
325            (None, None) => unreachable!(),
326        }
327    }
328
329    if !needs_update {
330        return Ok(None);
331    }
332
333    let delta_staged = if delta_writer.row_count > 0 {
334        Some(delta_writer.finish().await?)
335    } else {
336        None
337    };
338
339    Ok(Some(StagedMergeResult {
340        full_staged: full_writer.finish().await?,
341        delta_staged,
342        deleted_ids,
343    }))
344}
345
346fn min_cursor_id(
347    base_row: &Option<CursorRow>,
348    source_row: &Option<CursorRow>,
349    target_row: &Option<CursorRow>,
350) -> Option<String> {
351    [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
352        .into_iter()
353        .flatten()
354        .map(|row| row.id.clone())
355        .min()
356}
357
358async fn stage_streaming_table_merge(
359    table_key: &str,
360    catalog: &Catalog,
361    base_snapshot: &Snapshot,
362    source_snapshot: &Snapshot,
363    target_snapshot: &Snapshot,
364    conflicts: &mut Vec<MergeConflict>,
365) -> Result<Option<StagedMergeResult>> {
366    let schema = schema_for_table_key(catalog, table_key)?;
367    let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
368    let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
369    let mut deleted_ids: Vec<String> = Vec::new();
370    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
371    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
372    let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
373
374    let prior_conflict_count = conflicts.len();
375    let mut needs_update = false;
376
377    loop {
378        let base_row = base.peek_cloned().await?;
379        let source_row = source.peek_cloned().await?;
380        let target_row = target.peek_cloned().await?;
381        let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
382            break;
383        };
384
385        let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
386            base.pop().await?
387        } else {
388            None
389        };
390        let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
391        {
392            source.pop().await?
393        } else {
394            None
395        };
396        let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
397        {
398            target.pop().await?
399        } else {
400            None
401        };
402
403        let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
404        let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
405        let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
406
407        let source_changed = source_sig != base_sig;
408        let target_changed = target_sig != base_sig;
409
410        let selection = if !source_changed {
411            target_row.as_ref()
412        } else if !target_changed {
413            source_row.as_ref()
414        } else if source_sig == target_sig {
415            target_row.as_ref()
416        } else {
417            conflicts.push(classify_merge_conflict(
418                table_key, &next_id, base_sig, source_sig, target_sig,
419            ));
420            None
421        };
422
423        if conflicts.len() > prior_conflict_count {
424            continue;
425        }
426
427        // Row existed in target but not in merge result → delete
428        if selection.is_none() && target_row.is_some() {
429            deleted_ids.push(next_id.clone());
430            needs_update = true;
431            continue;
432        }
433
434        if let Some(selection) = selection {
435            // Always write to full (for validation)
436            full_writer.push_row(selection).await?;
437            // Only write changed rows to delta (for publish)
438            if selection.signature.as_str() != target_sig.unwrap_or("") {
439                delta_writer.push_row(selection).await?;
440                needs_update = true;
441            }
442        }
443    }
444
445    if conflicts.len() > prior_conflict_count {
446        return Ok(None);
447    }
448    if !needs_update {
449        return Ok(None);
450    }
451
452    let delta_staged = if delta_writer.row_count > 0 {
453        Some(delta_writer.finish().await?)
454    } else {
455        None
456    };
457
458    Ok(Some(StagedMergeResult {
459        full_staged: full_writer.finish().await?,
460        delta_staged,
461        deleted_ids,
462    }))
463}
464
465fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
466    if let Some(name) = table_key.strip_prefix("node:") {
467        return catalog
468            .node_types
469            .get(name)
470            .map(|t| t.arrow_schema.clone())
471            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
472    }
473    if let Some(name) = table_key.strip_prefix("edge:") {
474        return catalog
475            .edge_types
476            .get(name)
477            .map(|t| t.arrow_schema.clone())
478            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
479    }
480    Err(OmniError::manifest(format!(
481        "invalid table key '{}'",
482        table_key
483    )))
484}
485
486fn same_manifest_state(
487    left: Option<&crate::db::SubTableEntry>,
488    right: Option<&crate::db::SubTableEntry>,
489) -> bool {
490    match (left, right) {
491        (Some(left), Some(right)) => {
492            left.table_version == right.table_version && left.table_branch == right.table_branch
493        }
494        (None, None) => true,
495        _ => false,
496    }
497}
498
499fn classify_merge_conflict(
500    table_key: &str,
501    row_id: &str,
502    base_sig: Option<&str>,
503    source_sig: Option<&str>,
504    target_sig: Option<&str>,
505) -> MergeConflict {
506    let (kind, message) = match (base_sig, source_sig, target_sig) {
507        (None, Some(_), Some(_)) => (
508            MergeConflictKind::DivergentInsert,
509            format!("divergent insert for id '{}'", row_id),
510        ),
511        (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
512            MergeConflictKind::DeleteVsUpdate,
513            format!("delete/update conflict for id '{}'", row_id),
514        ),
515        _ => (
516            MergeConflictKind::DivergentUpdate,
517            format!("divergent update for id '{}'", row_id),
518        ),
519    };
520    MergeConflict {
521        table_key: table_key.to_string(),
522        row_id: Some(row_id.to_string()),
523        kind,
524        message,
525    }
526}
527
528fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
529    let mut values = Vec::with_capacity(batch.num_columns());
530    for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
531        if field.name().starts_with("_row") {
532            continue;
533        }
534        values.push(
535            array_value_to_string(column.as_ref(), row)
536                .map_err(|e| OmniError::Lance(e.to_string()))?,
537        );
538    }
539    Ok(values.join("\u{1f}"))
540}
541
542async fn scan_validation_stream(ds: &Dataset) -> Result<DatasetRecordBatchStream> {
543    crate::table_store::TableStore::scan_stream_with(ds, None, None, None, false, |_| Ok(())).await
544}
545
546async fn validate_merge_candidates(
547    db: &Omnigraph,
548    source_snapshot: &Snapshot,
549    target_snapshot: &Snapshot,
550    candidates: &HashMap<String, CandidateTableState>,
551) -> Result<()> {
552    let mut conflicts = Vec::new();
553    let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
554
555    for (type_name, node_type) in &db.catalog().node_types {
556        let table_key = format!("node:{}", type_name);
557        let mut values = HashSet::new();
558        let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
559
560        if let Some(ds) =
561            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
562        {
563            let mut stream = scan_validation_stream(&ds).await?;
564            while let Some(batch) = stream
565                .try_next()
566                .await
567                .map_err(|e| OmniError::Lance(e.to_string()))?
568            {
569                if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
570                    conflicts.push(MergeConflict {
571                        table_key: table_key.clone(),
572                        row_id: None,
573                        kind: MergeConflictKind::ValueConstraintViolation,
574                        message: err.to_string(),
575                    });
576                }
577                update_unique_constraints(
578                    &table_key,
579                    &batch,
580                    &node_type.unique_constraints,
581                    &mut unique_seen,
582                    &mut conflicts,
583                )?;
584                let ids = batch
585                    .column_by_name("id")
586                    .ok_or_else(|| {
587                        OmniError::manifest(format!("table {} missing id column", table_key))
588                    })?
589                    .as_any()
590                    .downcast_ref::<StringArray>()
591                    .ok_or_else(|| {
592                        OmniError::manifest(format!("table {} id column is not Utf8", table_key))
593                    })?;
594                for row in 0..ids.len() {
595                    values.insert(ids.value(row).to_string());
596                }
597            }
598        }
599        node_ids.insert(type_name.clone(), values);
600    }
601
602    for (edge_name, edge_type) in &db.catalog().edge_types {
603        let table_key = format!("edge:{}", edge_name);
604        let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
605        let mut src_counts = HashMap::new();
606
607        if let Some(ds) =
608            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
609        {
610            let mut stream = scan_validation_stream(&ds).await?;
611            while let Some(batch) = stream
612                .try_next()
613                .await
614                .map_err(|e| OmniError::Lance(e.to_string()))?
615            {
616                update_unique_constraints(
617                    &table_key,
618                    &batch,
619                    &edge_type.unique_constraints,
620                    &mut unique_seen,
621                    &mut conflicts,
622                )?;
623                accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
624                conflicts.extend(validate_orphan_edges_batch(
625                    &table_key, edge_type, &batch, &node_ids,
626                )?);
627            }
628        }
629
630        conflicts.extend(finalize_edge_cardinality_conflicts(
631            &table_key,
632            edge_name,
633            edge_type.cardinality.min,
634            edge_type.cardinality.max,
635            src_counts,
636        ));
637    }
638
639    if conflicts.is_empty() {
640        Ok(())
641    } else {
642        Err(OmniError::MergeConflicts(conflicts))
643    }
644}
645
646async fn candidate_dataset(
647    source_snapshot: &Snapshot,
648    target_snapshot: &Snapshot,
649    candidates: &HashMap<String, CandidateTableState>,
650    table_key: &str,
651) -> Result<Option<Dataset>> {
652    if let Some(candidate) = candidates.get(table_key) {
653        return match candidate {
654            CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
655                Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
656                None => Ok(None),
657            },
658            CandidateTableState::RewriteMerged(staged) => {
659                Ok(Some(staged.full_staged.dataset.clone()))
660            }
661        };
662    }
663    match target_snapshot.entry(table_key) {
664        Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
665        None => Ok(None),
666    }
667}
668
669fn update_unique_constraints(
670    table_key: &str,
671    batch: &RecordBatch,
672    constraints: &[Vec<String>],
673    seen: &mut [HashMap<String, String>],
674    conflicts: &mut Vec<MergeConflict>,
675) -> Result<()> {
676    for (constraint_idx, columns) in constraints.iter().enumerate() {
677        let seen = &mut seen[constraint_idx];
678        for row in 0..batch.num_rows() {
679            let mut parts = Vec::with_capacity(columns.len());
680            let mut any_null = false;
681            for column_name in columns {
682                let column = batch.column_by_name(column_name).ok_or_else(|| {
683                    OmniError::manifest(format!(
684                        "table {} missing unique column '{}'",
685                        table_key, column_name
686                    ))
687                })?;
688                if column.is_null(row) {
689                    any_null = true;
690                    break;
691                }
692                parts.push(
693                    array_value_to_string(column.as_ref(), row)
694                        .map_err(|e| OmniError::Lance(e.to_string()))?,
695                );
696            }
697            if any_null {
698                continue;
699            }
700            let value = parts.join("|");
701            let row_id = row_id_at(batch, row)?;
702            if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
703                conflicts.push(MergeConflict {
704                    table_key: table_key.to_string(),
705                    row_id: Some(row_id.clone()),
706                    kind: MergeConflictKind::UniqueViolation,
707                    message: format!(
708                        "unique constraint {:?} violated by '{}' and '{}'",
709                        columns, first_row_id, row_id
710                    ),
711                });
712            }
713        }
714    }
715    Ok(())
716}
717
718fn accumulate_edge_cardinality(
719    batch: &RecordBatch,
720    counts: &mut HashMap<String, u32>,
721    table_key: &str,
722) -> Result<()> {
723    let srcs = batch
724        .column_by_name("src")
725        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
726        .as_any()
727        .downcast_ref::<StringArray>()
728        .ok_or_else(|| {
729            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
730        })?;
731    for row in 0..srcs.len() {
732        *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
733    }
734    Ok(())
735}
736
737fn finalize_edge_cardinality_conflicts(
738    table_key: &str,
739    edge_name: &str,
740    min: u32,
741    max: Option<u32>,
742    counts: HashMap<String, u32>,
743) -> Vec<MergeConflict> {
744    let mut conflicts = Vec::new();
745    for (src, count) in counts {
746        if let Some(max) = max {
747            if count > max {
748                conflicts.push(MergeConflict {
749                    table_key: table_key.to_string(),
750                    row_id: None,
751                    kind: MergeConflictKind::CardinalityViolation,
752                    message: format!(
753                        "@card violation on edge {}: source '{}' has {} edges (max {})",
754                        edge_name, src, count, max
755                    ),
756                });
757            }
758        }
759        if count < min {
760            conflicts.push(MergeConflict {
761                table_key: table_key.to_string(),
762                row_id: None,
763                kind: MergeConflictKind::CardinalityViolation,
764                message: format!(
765                    "@card violation on edge {}: source '{}' has {} edges (min {})",
766                    edge_name, src, count, min
767                ),
768            });
769        }
770    }
771    conflicts
772}
773
774fn validate_orphan_edges_batch(
775    table_key: &str,
776    edge_type: &omnigraph_compiler::catalog::EdgeType,
777    batch: &RecordBatch,
778    node_ids: &HashMap<String, HashSet<String>>,
779) -> Result<Vec<MergeConflict>> {
780    let srcs = batch
781        .column_by_name("src")
782        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
783        .as_any()
784        .downcast_ref::<StringArray>()
785        .ok_or_else(|| {
786            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
787        })?;
788    let dsts = batch
789        .column_by_name("dst")
790        .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
791        .as_any()
792        .downcast_ref::<StringArray>()
793        .ok_or_else(|| {
794            OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
795        })?;
796
797    let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
798        OmniError::manifest(format!(
799            "missing candidate node ids for {}",
800            edge_type.from_type
801        ))
802    })?;
803    let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
804        OmniError::manifest(format!(
805            "missing candidate node ids for {}",
806            edge_type.to_type
807        ))
808    })?;
809
810    let mut conflicts = Vec::new();
811    for row in 0..batch.num_rows() {
812        let row_id = row_id_at(batch, row)?;
813        let src = srcs.value(row);
814        let dst = dsts.value(row);
815        if !from_ids.contains(src) {
816            conflicts.push(MergeConflict {
817                table_key: table_key.to_string(),
818                row_id: Some(row_id.clone()),
819                kind: MergeConflictKind::OrphanEdge,
820                message: format!("src '{}' not found in {}", src, edge_type.from_type),
821            });
822        }
823        if !to_ids.contains(dst) {
824            conflicts.push(MergeConflict {
825                table_key: table_key.to_string(),
826                row_id: Some(row_id),
827                kind: MergeConflictKind::OrphanEdge,
828                message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
829            });
830        }
831    }
832    Ok(conflicts)
833}
834
835fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
836    let ids = batch
837        .column_by_name("id")
838        .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
839        .as_any()
840        .downcast_ref::<StringArray>()
841        .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
842    Ok(ids.value(row).to_string())
843}
844
845async fn publish_adopted_source_state(
846    target_db: &Omnigraph,
847    catalog: &Catalog,
848    base_snapshot: &Snapshot,
849    source_snapshot: &Snapshot,
850    target_snapshot: &Snapshot,
851    table_key: &str,
852) -> Result<crate::db::SubTableUpdate> {
853    let source_entry = source_snapshot
854        .entry(table_key)
855        .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
856    let target_entry = target_snapshot.entry(table_key);
857
858    let target_active = target_db.active_branch().await;
859    match (
860        target_active.as_deref(),
861        source_entry.table_branch.as_deref(),
862    ) {
863        // Both on main — pointer switch is safe (same lineage, version columns valid)
864        (None, None) => Ok(crate::db::SubTableUpdate {
865            table_key: table_key.to_string(),
866            table_version: source_entry.table_version,
867            table_branch: None,
868            row_count: source_entry.row_count,
869            version_metadata: source_entry.version_metadata.clone(),
870        }),
871        // Source on main, target on branch — pointer switch to main version
872        // (target reads from main, same lineage)
873        (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
874            table_key: table_key.to_string(),
875            table_version: source_entry.table_version,
876            table_branch: None,
877            row_count: source_entry.row_count,
878            version_metadata: source_entry.version_metadata.clone(),
879        }),
880        // Source on branch, target on main — apply delta to preserve version metadata
881        (None, Some(_source_branch)) => {
882            let delta =
883                compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
884            match delta {
885                Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
886                None => Ok(crate::db::SubTableUpdate {
887                    table_key: table_key.to_string(),
888                    table_version: target_entry
889                        .map(|e| e.table_version)
890                        .unwrap_or(source_entry.table_version),
891                    table_branch: None,
892                    row_count: source_entry.row_count,
893                    version_metadata: target_entry
894                        .map(|entry| entry.version_metadata.clone())
895                        .unwrap_or_else(|| source_entry.version_metadata.clone()),
896                }),
897            }
898        }
899        // Both on branches
900        (Some(target_branch), Some(source_branch)) => {
901            if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
902                // Target already owns this table — apply delta onto its lineage
903                let delta =
904                    compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
905                        .await?;
906                match delta {
907                    Some(staged) => {
908                        publish_rewritten_merge_table(target_db, table_key, &staged).await
909                    }
910                    None => Ok(crate::db::SubTableUpdate {
911                        table_key: table_key.to_string(),
912                        table_version: target_entry.unwrap().table_version,
913                        table_branch: Some(target_branch.to_string()),
914                        row_count: source_entry.row_count,
915                        version_metadata: target_entry.unwrap().version_metadata.clone(),
916                    }),
917                }
918            } else {
919                // Target doesn't own this table yet — fork from source state.
920                // This creates the target branch on the sub-table dataset.
921                let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
922                let ds = target_db
923                    .fork_dataset_from_entry_state(
924                        table_key,
925                        &full_path,
926                        Some(source_branch),
927                        source_entry.table_version,
928                        target_branch,
929                    )
930                    .await?;
931                let state = target_db.table_store().table_state(&full_path, &ds).await?;
932                Ok(crate::db::SubTableUpdate {
933                    table_key: table_key.to_string(),
934                    table_version: state.version,
935                    table_branch: Some(target_branch.to_string()),
936                    row_count: state.row_count,
937                    version_metadata: state.version_metadata,
938                })
939            }
940        }
941    }
942}
943
944async fn publish_rewritten_merge_table(
945    target_db: &Omnigraph,
946    table_key: &str,
947    staged: &StagedMergeResult,
948) -> Result<crate::db::SubTableUpdate> {
949    // Branch merge's source-rewrite path is Merge-shaped (upsert from
950    // source onto target). The inline `delete_where` later in this
951    // function operates on rows the rewrite chose to remove, not
952    // user-facing predicates, so Merge is the correct policy here.
953    let (ds, full_path, table_branch) = target_db
954        .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
955        .await?;
956    let mut current_ds = ds;
957
958    // Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
959    // existing rows, bumps _row_last_updated_at_version only for actually-changed rows).
960    //
961    // Routed through the staged primitive so a failure between writing
962    // fragments and committing leaves no Lance-HEAD drift. The
963    // commit_staged here is per-table per-call (Lance has no
964    // multi-dataset atomic commit); the residual sits at this single
965    // commit point, narrowed from the previous "merge_insert + delete +
966    // index" multi-step inline-commit chain.
967    if let Some(delta) = &staged.delta_staged {
968        let batches: Vec<RecordBatch> = target_db
969            .table_store()
970            .scan_batches_for_rewrite(&delta.dataset)
971            .await?
972            .into_iter()
973            .filter(|batch| batch.num_rows() > 0)
974            .collect();
975        if !batches.is_empty() {
976            // Concat into one batch — stage_merge_insert takes a single batch.
977            let combined = if batches.len() == 1 {
978                batches.into_iter().next().unwrap()
979            } else {
980                let schema = batches[0].schema();
981                arrow_select::concat::concat_batches(&schema, &batches)
982                    .map_err(|e| OmniError::Lance(e.to_string()))?
983            };
984            let staged_merge = target_db
985                .table_store()
986                .stage_merge_insert(
987                    current_ds.clone(),
988                    combined,
989                    vec!["id".to_string()],
990                    lance::dataset::WhenMatched::UpdateAll,
991                    lance::dataset::WhenNotMatched::InsertAll,
992                )
993                .await?;
994            current_ds = target_db
995                .table_store()
996                .commit_staged(Arc::new(current_ds), staged_merge.transaction)
997                .await?;
998        }
999    }
1000
1001    // Phase 2: delete removed rows via deletion vectors.
1002    //
1003    // INLINE-COMMIT RESIDUAL: lance-4.0.0 does not expose a public
1004    // two-phase delete API (DeleteJob is `pub(crate)` —
1005    // lance-format/lance#6658 is open with no PRs). We deliberately do
1006    // NOT introduce a `stage_delete` wrapper that would secretly
1007    // inline-commit (it would create a side-channel between the staged
1008    // and inline write paths). When the upstream API ships, swap this
1009    // `delete_where` call for `stage_delete` + `commit_staged`.
1010    if !staged.deleted_ids.is_empty() {
1011        let escaped: Vec<String> = staged
1012            .deleted_ids
1013            .iter()
1014            .map(|id| format!("'{}'", id.replace('\'', "''")))
1015            .collect();
1016        let filter = format!("id IN ({})", escaped.join(", "));
1017        target_db
1018            .table_store()
1019            .delete_where(&full_path, &mut current_ds, &filter)
1020            .await?;
1021    }
1022
1023    // Phase 3: rebuild indices.
1024    //
1025    // `build_indices_on_dataset` uses `stage_create_btree_index` /
1026    // `stage_create_inverted_index` + `commit_staged` for scalar
1027    // indices. Vector indices remain inline-commit
1028    // (`build_index_metadata_from_segments` is `pub(crate)` in lance-
1029    // 4.0.0 — companion ticket to lance-format/lance#6658).
1030    let row_count = target_db
1031        .table_store()
1032        .table_state(&full_path, &current_ds)
1033        .await?
1034        .row_count;
1035    if row_count > 0 {
1036        target_db
1037            .build_indices_on_dataset(table_key, &mut current_ds)
1038            .await?;
1039    }
1040    let final_state = target_db
1041        .table_store()
1042        .table_state(&full_path, &current_ds)
1043        .await?;
1044
1045    Ok(crate::db::SubTableUpdate {
1046        table_key: table_key.to_string(),
1047        table_version: final_state.version,
1048        table_branch,
1049        row_count: final_state.row_count,
1050        version_metadata: final_state.version_metadata,
1051    })
1052}
1053
1054impl Omnigraph {
1055    pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
1056        self.branch_merge_as(source, target, None).await
1057    }
1058
1059    pub async fn branch_merge_as(
1060        &self,
1061        source: &str,
1062        target: &str,
1063        actor_id: Option<&str>,
1064    ) -> Result<MergeOutcome> {
1065        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
1066        // `BranchTransition { source, target }` — matches the HTTP-layer
1067        // convention at `server_branch_merge` (branch=Some(source),
1068        // target_branch=Some(target)). Cedar rules using
1069        // `target_branch_scope: protected` therefore correctly gate
1070        // merges INTO protected branches without forbidding the
1071        // (symmetric) source-side reference.
1072        self.enforce(
1073            omnigraph_policy::PolicyAction::BranchMerge,
1074            &omnigraph_policy::ResourceScope::BranchTransition {
1075                source: source.to_string(),
1076                target: target.to_string(),
1077            },
1078            actor_id,
1079        )?;
1080        self.ensure_schema_apply_idle("branch_merge").await?;
1081        self.branch_merge_impl(source, target, actor_id).await
1082    }
1083
1084    async fn branch_merge_impl(
1085        &self,
1086        source: &str,
1087        target: &str,
1088        actor_id: Option<&str>,
1089    ) -> Result<MergeOutcome> {
1090        if is_internal_system_branch(source) || is_internal_system_branch(target) {
1091            return Err(OmniError::manifest(format!(
1092                "branch_merge does not allow internal system refs ('{}' -> '{}')",
1093                source, target
1094            )));
1095        }
1096        let source_branch = Omnigraph::normalize_branch_name(source)?;
1097        let target_branch = Omnigraph::normalize_branch_name(target)?;
1098        if source_branch == target_branch {
1099            return Err(OmniError::manifest(
1100                "branch_merge requires distinct source and target branches".to_string(),
1101            ));
1102        }
1103
1104        let source_head_commit_id = self
1105            .head_commit_id_for_branch(source_branch.as_deref())
1106            .await?
1107            .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1108        let target_head_commit_id = self
1109            .head_commit_id_for_branch(target_branch.as_deref())
1110            .await?
1111            .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1112        let base_commit = CommitGraph::merge_base(
1113            self.uri(),
1114            source_branch.as_deref(),
1115            target_branch.as_deref(),
1116        )
1117        .await?
1118        .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1119
1120        if source_head_commit_id == target_head_commit_id
1121            || base_commit.graph_commit_id == source_head_commit_id
1122        {
1123            return Ok(MergeOutcome::AlreadyUpToDate);
1124        }
1125        let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1126
1127        let base_snapshot = ManifestCoordinator::snapshot_at(
1128            self.uri(),
1129            base_commit.manifest_branch.as_deref(),
1130            base_commit.manifest_version,
1131        )
1132        .await?;
1133        let source_snapshot = self
1134            .resolved_target(ReadTarget::Branch(
1135                source_branch.clone().unwrap_or_else(|| "main".to_string()),
1136            ))
1137            .await?
1138            .snapshot;
1139        // Hold the merge-exclusive mutex across the full swap → operate
1140        // → restore window. Two concurrent branch_merge calls would
1141        // otherwise interleave their three separate `coordinator.write()`
1142        // acquisitions, leaving each merge's body running against the
1143        // other's swapped coord. Pinned by
1144        // `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
1145        // in `crates/omnigraph-server/tests/server.rs`.
1146        let merge_exclusive = self.merge_exclusive();
1147        let _merge_guard = merge_exclusive.lock().await;
1148
1149        let previous_branch = self.active_branch().await;
1150        let previous = self
1151            .swap_coordinator_for_branch(target_branch.as_deref())
1152            .await?;
1153        let merge_result = self
1154            .branch_merge_on_current_target(
1155                &base_snapshot,
1156                &source_snapshot,
1157                &target_head_commit_id,
1158                &source_head_commit_id,
1159                is_fast_forward,
1160                actor_id,
1161            )
1162            .await;
1163        self.restore_coordinator(previous).await;
1164
1165        // Sync the restored coordinator's cached manifest snapshot with
1166        // disk on both Ok and Err paths. During the swap window above,
1167        // `self.coordinator` was a freshly opened coord for the merge
1168        // target; any concurrent writer on that target (e.g. a `/change`
1169        // on `main` racing a `merge into=main`) publishes against the
1170        // swapped coord and never touches the original. Without this
1171        // sync, the restored coord's cached manifest snapshot would
1172        // diverge from disk and seed a stale `expected_versions` into
1173        // the next op's publisher CAS fence — a non-retryable
1174        // `ExpectedVersionMismatch` for a user with no concurrent
1175        // writer of their own. Pinned by the
1176        // `[d:merge×change:into-target]` cell of
1177        // `concurrent_branch_ops_morphological_matrix` in
1178        // `crates/omnigraph-server/tests/server.rs`, which flakes
1179        // pre-fix and stabilises post-fix.
1180        //
1181        // Use `refresh_coordinator_only` rather than `refresh` so the
1182        // recovery sweep doesn't race the merge's own in-flight
1183        // sidecar: when the merge body returns Err between Phase B
1184        // (per-table `commit_staged` + sidecar write) and Phase C
1185        // (manifest publish + sidecar delete), the sidecar is still on
1186        // disk. `refresh`'s `RollForwardOnly` sweep would observe it
1187        // and close it here — masking the failure from the next
1188        // `Omnigraph::open` sweep and from the audit row that the open
1189        // sweep emits. Pinned by
1190        // `branch_merge_phase_b_failure_recovered_on_next_open` in
1191        // `crates/omnigraph/tests/failpoints.rs`.
1192        //
1193        // Err-path refresh is best-effort: the merge body's error
1194        // (typically the structured `manifest_conflict` from the
1195        // post_queue_snapshot drift check) is the value the caller
1196        // needs to see. A refresh-time storage error would replace
1197        // that with a less informative error; the next op or the next
1198        // `Omnigraph::open` will re-sync the coord anyway.
1199        if previous_branch == target_branch {
1200            if let Err(refresh_err) = self.refresh_coordinator_only().await {
1201                if merge_result.is_ok() {
1202                    return Err(refresh_err);
1203                }
1204                tracing::warn!(
1205                    error = %refresh_err,
1206                    "post-merge coordinator refresh failed on the error path; \
1207                     the next op or open will re-sync"
1208                );
1209            }
1210        }
1211
1212        merge_result
1213    }
1214
1215    async fn branch_merge_on_current_target(
1216        &self,
1217        base_snapshot: &Snapshot,
1218        source_snapshot: &Snapshot,
1219        target_head_commit_id: &str,
1220        source_head_commit_id: &str,
1221        is_fast_forward: bool,
1222        actor_id: Option<&str>,
1223    ) -> Result<MergeOutcome> {
1224        self.ensure_commit_graph_initialized().await?;
1225        let target_snapshot = self.snapshot().await;
1226
1227        let mut table_keys = HashSet::new();
1228        for entry in base_snapshot.entries() {
1229            table_keys.insert(entry.table_key.clone());
1230        }
1231        for entry in source_snapshot.entries() {
1232            table_keys.insert(entry.table_key.clone());
1233        }
1234        for entry in target_snapshot.entries() {
1235            table_keys.insert(entry.table_key.clone());
1236        }
1237
1238        let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1239        ordered_table_keys.sort();
1240
1241        let mut conflicts = Vec::new();
1242        let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1243
1244        for table_key in &ordered_table_keys {
1245            let base_entry = base_snapshot.entry(table_key);
1246            let source_entry = source_snapshot.entry(table_key);
1247            let target_entry = target_snapshot.entry(table_key);
1248            if same_manifest_state(source_entry, target_entry) {
1249                continue;
1250            }
1251            if same_manifest_state(base_entry, source_entry) {
1252                continue;
1253            }
1254            if same_manifest_state(base_entry, target_entry) {
1255                candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
1256                continue;
1257            }
1258
1259            if let Some(staged) = stage_streaming_table_merge(
1260                table_key,
1261                &self.catalog(),
1262                base_snapshot,
1263                source_snapshot,
1264                &target_snapshot,
1265                &mut conflicts,
1266            )
1267            .await?
1268            {
1269                candidates.insert(
1270                    table_key.clone(),
1271                    CandidateTableState::RewriteMerged(staged),
1272                );
1273            }
1274        }
1275
1276        if !conflicts.is_empty() {
1277            return Err(OmniError::MergeConflicts(conflicts));
1278        }
1279
1280        validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1281
1282        // Recovery sidecar: protect the per-table commit_staged loop.
1283        // Pin only `RewriteMerged` candidates because they always
1284        // advance Lance HEAD through `publish_rewritten_merge_table`
1285        // (which runs stage_merge_insert + delete_where + index
1286        // rebuilds — multiple commit_staged calls per table; loose
1287        // classification handles the multi-step drift).
1288        //
1289        // `AdoptSourceState` candidates are NOT pinned: their publish
1290        // path is `publish_adopted_source_state`, whose subcases mostly
1291        // don't advance Lance HEAD (pure manifest pointer switch, or
1292        // fork via `fork_dataset_from_entry_state` which only adds a
1293        // Lance branch ref). If those subcases were pinned, recovery
1294        // would classify them as NoMovement and the all-or-nothing
1295        // decision would force a rollback that destroys legitimately-
1296        // committed work on sibling RewriteMerged tables.
1297        //
1298        // Residual: two `AdoptSourceState` subcases (when source has a
1299        // table_branch AND the source delta is non-empty) internally
1300        // call `publish_rewritten_merge_table` and DO advance HEAD.
1301        // Those are not covered by this sidecar — if they fail mid-
1302        // commit, the residual persists until the next ReadWrite open
1303        // detects it via a subsequent ExpectedVersionMismatch from a
1304        // later writer that touches the same table. Closing this gap
1305        // requires pre-computing source deltas during candidate
1306        // classification (a structural change to `CandidateTableState`)
1307        // and is left as follow-up work.
1308        // Acquire per-(table_key, target_branch) queues for every table
1309        // touched by the merge plan. Sorted-order acquisition prevents
1310        // lock-order inversion against concurrent multi-table writers.
1311        // The active branch (set by the caller's `swap_coordinator_for_branch`)
1312        // is the merge target; queue keys are scoped to it because a
1313        // branch_merge writes only to the target branch.
1314        //
1315        // Held across the per-table publish loop and the manifest
1316        // commit + record_merge_commit calls below. Under PR 1b's
1317        // intermediate state (global server RwLock still in place),
1318        // this acquisition is uncontended.
1319        let active_branch_for_keys = self.active_branch().await;
1320        let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
1321            .iter()
1322            .filter(|table_key| {
1323                matches!(
1324                    candidates.get(*table_key),
1325                    Some(CandidateTableState::RewriteMerged(_))
1326                        | Some(CandidateTableState::AdoptSourceState)
1327                )
1328            })
1329            .map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
1330            .collect();
1331        let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
1332
1333        let post_queue_snapshot = self.snapshot().await;
1334        for table_key in &ordered_table_keys {
1335            let Some(candidate) = candidates.get(table_key) else {
1336                continue;
1337            };
1338            if !matches!(
1339                candidate,
1340                CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState
1341            ) {
1342                continue;
1343            }
1344            let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
1345            let current = post_queue_snapshot
1346                .entry(table_key)
1347                .map(|e| e.table_version);
1348            if expected != current {
1349                return Err(OmniError::manifest_expected_version_mismatch(
1350                    table_key.clone(),
1351                    expected.unwrap_or(0),
1352                    current.unwrap_or(0),
1353                ));
1354            }
1355        }
1356
1357        let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
1358            .iter()
1359            .filter_map(|table_key| {
1360                let candidate = candidates.get(table_key)?;
1361                if !matches!(candidate, CandidateTableState::RewriteMerged(_)) {
1362                    return None;
1363                }
1364                let entry = target_snapshot.entry(table_key)?;
1365                Some(crate::db::manifest::SidecarTablePin {
1366                    table_key: table_key.clone(),
1367                    table_path: self.table_store().dataset_uri(&entry.table_path),
1368                    expected_version: entry.table_version,
1369                    post_commit_pin: entry.table_version + 1,
1370                    // Use the merge target branch (where commits actually
1371                    // land), NOT entry.table_branch (where the table
1372                    // currently lives). publish_rewritten_merge_table calls
1373                    // open_for_mutation, which forks an inherited-from-main
1374                    // table to active_branch on first write — the resulting
1375                    // Lance commit lands on active_branch. Recovery's
1376                    // open_lance_head must check the same branch, otherwise
1377                    // an inherited-table feature-to-feature merge classifies
1378                    // as NoMovement and the all-or-nothing rollback skips
1379                    // the orphaned post-Phase-B HEAD on the target ref.
1380                    // Same rationale as table_ops.rs:115-120 in
1381                    // ensure_indices_for_branch.
1382                    table_branch: active_branch_for_keys.clone(),
1383                })
1384            })
1385            .collect();
1386        let recovery_handle = if recovery_pins.is_empty() {
1387            None
1388        } else {
1389            // Use the merge target branch directly, NOT a heuristic
1390            // derived from `ordered_table_keys.first()`. The first
1391            // sorted table key may not be in the target snapshot at all
1392            // (its `entry()` returns None → branch becomes None == main),
1393            // and the SubTableEntry's `table_branch` field isn't
1394            // necessarily the merge target branch. The caller
1395            // `branch_merge` calls `swap_coordinator_for_branch(target_branch)`
1396            // before invoking this function, so `self.active_branch()`
1397            // is the target.
1398            let target_branch = active_branch_for_keys.clone();
1399            let mut sidecar = crate::db::manifest::new_sidecar(
1400                crate::db::manifest::SidecarKind::BranchMerge,
1401                target_branch,
1402                actor_id.map(str::to_string),
1403                recovery_pins,
1404            );
1405            // Carry the source branch's HEAD commit id so the recovery
1406            // sweep's audit step can record this as a MERGE commit
1407            // (linked to the source) instead of a plain commit. Without
1408            // this, future merges between the same pair lose
1409            // already-up-to-date detection and merge-base correctness.
1410            sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
1411            Some(
1412                crate::db::manifest::write_sidecar(
1413                    self.root_uri(),
1414                    self.storage_adapter(),
1415                    &sidecar,
1416                )
1417                .await?,
1418            )
1419        };
1420
1421        let mut updates = Vec::new();
1422        let mut changed_edge_tables = false;
1423        for table_key in &ordered_table_keys {
1424            let Some(candidate_state) = candidates.get(table_key) else {
1425                continue;
1426            };
1427            let update = match candidate_state {
1428                CandidateTableState::AdoptSourceState => {
1429                    publish_adopted_source_state(
1430                        self,
1431                        &self.catalog(),
1432                        base_snapshot,
1433                        source_snapshot,
1434                        &target_snapshot,
1435                        table_key,
1436                    )
1437                    .await?
1438                }
1439                CandidateTableState::RewriteMerged(staged) => {
1440                    publish_rewritten_merge_table(self, table_key, staged).await?
1441                }
1442            };
1443            if table_key.starts_with("edge:") {
1444                changed_edge_tables = true;
1445            }
1446            updates.push(update);
1447        }
1448
1449        // Failpoint: pin the per-writer Phase B → Phase C residual for
1450        // branch_merge. Lance HEAD has advanced on every touched table
1451        // (publish_*) but the manifest publish below hasn't run. Used
1452        // by `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`.
1453        crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
1454
1455        let manifest_version = if updates.is_empty() {
1456            self.version().await
1457        } else {
1458            self.commit_manifest_updates(&updates).await?
1459        };
1460
1461        // Recovery sidecar lifecycle: delete after manifest publish.
1462        // Best-effort cleanup; the merge already landed durably so
1463        // failing the user here is undesirable.
1464        if let Some(handle) = recovery_handle {
1465            if let Err(err) =
1466                crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
1467            {
1468                tracing::warn!(
1469                    error = %err,
1470                    operation_id = handle.operation_id.as_str(),
1471                    "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
1472                );
1473            }
1474        }
1475        self.record_merge_commit(
1476            manifest_version,
1477            target_head_commit_id,
1478            source_head_commit_id,
1479            actor_id,
1480        )
1481        .await?;
1482
1483        if changed_edge_tables {
1484            self.invalidate_graph_index().await;
1485        }
1486
1487        Ok(if is_fast_forward {
1488            MergeOutcome::FastForward
1489        } else {
1490            MergeOutcome::Merged
1491        })
1492    }
1493}