Skip to main content

omnigraph/exec/
merge.rs

1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8    AdoptSourceState,
9    RewriteMerged(StagedMergeResult),
10}
11
12#[derive(Debug)]
13struct StagedTable {
14    _dir: TempDir,
15    dataset: Dataset,
16}
17
18#[derive(Debug)]
19struct StagedMergeResult {
20    full_staged: StagedTable,
21    delta_staged: Option<StagedTable>,
22    deleted_ids: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
26struct CursorRow {
27    id: String,
28    signature: String,
29    dataset: Dataset,
30    batch: RecordBatch,
31    row_index: usize,
32}
33
34struct OrderedTableCursor {
35    stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
36    dataset: Option<Dataset>,
37    current_batch: Option<RecordBatch>,
38    current_row: usize,
39    peeked: Option<CursorRow>,
40}
41
42impl OrderedTableCursor {
43    async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
44        let dataset = match snapshot.entry(table_key) {
45            Some(_) => Some(snapshot.open(table_key).await?),
46            None => None,
47        };
48        Self::from_dataset(dataset).await
49    }
50
51    async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
52        let stream = if let Some(ds) = &dataset {
53            Some(Box::pin(
54                crate::table_store::TableStore::scan_stream_with(
55                    ds,
56                    None,
57                    None,
58                    Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
59                    true,
60                    |_| Ok(()),
61                )
62                .await?,
63            ))
64        } else {
65            None
66        };
67
68        Ok(Self {
69            stream,
70            dataset,
71            current_batch: None,
72            current_row: 0,
73            peeked: None,
74        })
75    }
76
77    async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
78        if self.peeked.is_none() {
79            self.peeked = self.next_row().await?;
80        }
81        Ok(self.peeked.clone())
82    }
83
84    async fn pop(&mut self) -> Result<Option<CursorRow>> {
85        if self.peeked.is_some() {
86            return Ok(self.peeked.take());
87        }
88        self.next_row().await
89    }
90
91    async fn next_row(&mut self) -> Result<Option<CursorRow>> {
92        loop {
93            if let Some(batch) = &self.current_batch {
94                if self.current_row < batch.num_rows() {
95                    let row_index = self.current_row;
96                    self.current_row += 1;
97                    let dataset = self.dataset.clone().ok_or_else(|| {
98                        OmniError::manifest("cursor row missing source dataset".to_string())
99                    })?;
100                    return Ok(Some(CursorRow {
101                        id: row_id_at(batch, row_index)?,
102                        signature: row_signature(batch, row_index)?,
103                        dataset,
104                        batch: batch.clone(),
105                        row_index,
106                    }));
107                }
108            }
109
110            let Some(stream) = self.stream.as_mut() else {
111                return Ok(None);
112            };
113            match stream.try_next().await {
114                Ok(Some(batch)) => {
115                    self.current_batch = Some(batch);
116                    self.current_row = 0;
117                }
118                Ok(None) => {
119                    self.stream = None;
120                    self.current_batch = None;
121                    return Ok(None);
122                }
123                Err(err) => return Err(OmniError::Lance(err.to_string())),
124            }
125        }
126    }
127}
128
129struct StagedTableWriter {
130    schema: SchemaRef,
131    dataset_uri: String,
132    dir: TempDir,
133    dataset: Option<Dataset>,
134    buffered_rows: usize,
135    row_count: u64,
136    batches: Vec<RecordBatch>,
137}
138
139impl StagedTableWriter {
140    fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
141        let dir = merge_stage_tempdir(table_key)?;
142        let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
143        Ok(Self {
144            schema,
145            dataset_uri,
146            dir,
147            dataset: None,
148            buffered_rows: 0,
149            row_count: 0,
150            batches: Vec::new(),
151        })
152    }
153
154    async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
155        self.row_count += 1;
156        self.buffered_rows += 1;
157        self.batches.push(self.row_batch(row).await?);
158        if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
159            self.flush().await?;
160        }
161        Ok(())
162    }
163
164    async fn row_batch(&self, row: &CursorRow) -> Result<RecordBatch> {
165        let batch = row.batch.slice(row.row_index, 1);
166        let has_blob_columns = row
167            .dataset
168            .schema()
169            .fields_pre_order()
170            .any(|field| field.is_blob());
171        if has_blob_columns {
172            return crate::table_store::TableStore::materialize_blob_batch(&row.dataset, batch)
173                .await;
174        }
175        let columns = self
176            .schema
177            .fields()
178            .iter()
179            .map(|field| {
180                batch.column_by_name(field.name()).cloned().ok_or_else(|| {
181                    OmniError::Lance(format!("batch missing column '{}'", field.name()))
182                })
183            })
184            .collect::<Result<Vec<_>>>()?;
185        RecordBatch::try_new(self.schema.clone(), columns)
186            .map_err(|e| OmniError::Lance(e.to_string()))
187    }
188
189    async fn finish(mut self) -> Result<StagedTable> {
190        self.flush().await?;
191        if self.dataset.is_none() {
192            self.dataset = Some(
193                crate::table_store::TableStore::create_empty_dataset(
194                    &self.dataset_uri,
195                    &self.schema,
196                )
197                .await?,
198            );
199        }
200        Ok(StagedTable {
201            _dir: self.dir,
202            dataset: self.dataset.unwrap(),
203        })
204    }
205
206    async fn flush(&mut self) -> Result<()> {
207        if self.batches.is_empty() {
208            return Ok(());
209        }
210
211        let batch = if self.batches.len() == 1 {
212            self.batches.pop().unwrap()
213        } else {
214            let batches = std::mem::take(&mut self.batches);
215            arrow_select::concat::concat_batches(&self.schema, &batches)
216                .map_err(|e| OmniError::Lance(e.to_string()))?
217        };
218        self.buffered_rows = 0;
219
220        let ds = crate::table_store::TableStore::append_or_create_batch(
221            &self.dataset_uri,
222            self.dataset.take(),
223            batch,
224        )
225        .await?;
226        self.dataset = Some(ds);
227        Ok(())
228    }
229}
230
231fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
232    if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
233        return TempDirBuilder::new()
234            .prefix(&format!(
235                "omnigraph-merge-{}-",
236                sanitize_table_key(table_key)
237            ))
238            .tempdir_in(PathBuf::from(root))
239            .map_err(OmniError::from);
240    }
241    TempDirBuilder::new()
242        .prefix(&format!(
243            "omnigraph-merge-{}-",
244            sanitize_table_key(table_key)
245        ))
246        .tempdir()
247        .map_err(OmniError::from)
248}
249
250fn sanitize_table_key(table_key: &str) -> String {
251    table_key
252        .chars()
253        .map(|ch| match ch {
254            ':' | '/' | '\\' => '-',
255            other => other,
256        })
257        .collect()
258}
259
260/// Computes the delta between base and source for an adopted-source merge.
261/// Returns the changed/new rows (for merge_insert) and deleted IDs (for delete).
262async fn compute_source_delta(
263    table_key: &str,
264    catalog: &Catalog,
265    base_snapshot: &Snapshot,
266    source_snapshot: &Snapshot,
267) -> Result<Option<StagedMergeResult>> {
268    let schema = schema_for_table_key(catalog, table_key)?;
269    let mut full_writer =
270        StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
271    let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
272    let mut deleted_ids: Vec<String> = Vec::new();
273    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
274    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
275
276    let mut needs_update = false;
277
278    loop {
279        let base_row = base.peek_cloned().await?;
280        let source_row = source.peek_cloned().await?;
281
282        let next_id = [base_row.as_ref(), source_row.as_ref()]
283            .into_iter()
284            .flatten()
285            .map(|row| row.id.clone())
286            .min();
287        let Some(next_id) = next_id else { break };
288
289        let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
290            base.pop().await?
291        } else {
292            None
293        };
294        let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
295            source.pop().await?
296        } else {
297            None
298        };
299
300        let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
301        let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
302
303        match (&base_row, &source_row) {
304            (Some(_), None) => {
305                // Deleted on source
306                deleted_ids.push(next_id);
307                needs_update = true;
308            }
309            (None, Some(src)) => {
310                // New on source
311                full_writer.push_row(src).await?;
312                delta_writer.push_row(src).await?;
313                needs_update = true;
314            }
315            (Some(_), Some(src)) if source_sig != base_sig => {
316                // Changed on source
317                full_writer.push_row(src).await?;
318                delta_writer.push_row(src).await?;
319                needs_update = true;
320            }
321            (Some(base), Some(_)) => {
322                // Unchanged — write to full (for validation), skip delta
323                full_writer.push_row(base).await?;
324            }
325            (None, None) => unreachable!(),
326        }
327    }
328
329    if !needs_update {
330        return Ok(None);
331    }
332
333    let delta_staged = if delta_writer.row_count > 0 {
334        Some(delta_writer.finish().await?)
335    } else {
336        None
337    };
338
339    Ok(Some(StagedMergeResult {
340        full_staged: full_writer.finish().await?,
341        delta_staged,
342        deleted_ids,
343    }))
344}
345
346fn min_cursor_id(
347    base_row: &Option<CursorRow>,
348    source_row: &Option<CursorRow>,
349    target_row: &Option<CursorRow>,
350) -> Option<String> {
351    [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
352        .into_iter()
353        .flatten()
354        .map(|row| row.id.clone())
355        .min()
356}
357
358async fn stage_streaming_table_merge(
359    table_key: &str,
360    catalog: &Catalog,
361    base_snapshot: &Snapshot,
362    source_snapshot: &Snapshot,
363    target_snapshot: &Snapshot,
364    conflicts: &mut Vec<MergeConflict>,
365) -> Result<Option<StagedMergeResult>> {
366    let schema = schema_for_table_key(catalog, table_key)?;
367    let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
368    let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
369    let mut deleted_ids: Vec<String> = Vec::new();
370    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
371    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
372    let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
373
374    let prior_conflict_count = conflicts.len();
375    let mut needs_update = false;
376
377    loop {
378        let base_row = base.peek_cloned().await?;
379        let source_row = source.peek_cloned().await?;
380        let target_row = target.peek_cloned().await?;
381        let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
382            break;
383        };
384
385        let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
386            base.pop().await?
387        } else {
388            None
389        };
390        let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
391        {
392            source.pop().await?
393        } else {
394            None
395        };
396        let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
397        {
398            target.pop().await?
399        } else {
400            None
401        };
402
403        let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
404        let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
405        let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
406
407        let source_changed = source_sig != base_sig;
408        let target_changed = target_sig != base_sig;
409
410        let selection = if !source_changed {
411            target_row.as_ref()
412        } else if !target_changed {
413            source_row.as_ref()
414        } else if source_sig == target_sig {
415            target_row.as_ref()
416        } else {
417            conflicts.push(classify_merge_conflict(
418                table_key, &next_id, base_sig, source_sig, target_sig,
419            ));
420            None
421        };
422
423        if conflicts.len() > prior_conflict_count {
424            continue;
425        }
426
427        // Row existed in target but not in merge result → delete
428        if selection.is_none() && target_row.is_some() {
429            deleted_ids.push(next_id.clone());
430            needs_update = true;
431            continue;
432        }
433
434        if let Some(selection) = selection {
435            // Always write to full (for validation)
436            full_writer.push_row(selection).await?;
437            // Only write changed rows to delta (for publish)
438            if selection.signature.as_str() != target_sig.unwrap_or("") {
439                delta_writer.push_row(selection).await?;
440                needs_update = true;
441            }
442        }
443    }
444
445    if conflicts.len() > prior_conflict_count {
446        return Ok(None);
447    }
448    if !needs_update {
449        return Ok(None);
450    }
451
452    let delta_staged = if delta_writer.row_count > 0 {
453        Some(delta_writer.finish().await?)
454    } else {
455        None
456    };
457
458    Ok(Some(StagedMergeResult {
459        full_staged: full_writer.finish().await?,
460        delta_staged,
461        deleted_ids,
462    }))
463}
464
465fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
466    if let Some(name) = table_key.strip_prefix("node:") {
467        return catalog
468            .node_types
469            .get(name)
470            .map(|t| t.arrow_schema.clone())
471            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
472    }
473    if let Some(name) = table_key.strip_prefix("edge:") {
474        return catalog
475            .edge_types
476            .get(name)
477            .map(|t| t.arrow_schema.clone())
478            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
479    }
480    Err(OmniError::manifest(format!(
481        "invalid table key '{}'",
482        table_key
483    )))
484}
485
486fn same_manifest_state(
487    left: Option<&crate::db::SubTableEntry>,
488    right: Option<&crate::db::SubTableEntry>,
489) -> bool {
490    match (left, right) {
491        (Some(left), Some(right)) => {
492            left.table_version == right.table_version && left.table_branch == right.table_branch
493        }
494        (None, None) => true,
495        _ => false,
496    }
497}
498
499fn classify_merge_conflict(
500    table_key: &str,
501    row_id: &str,
502    base_sig: Option<&str>,
503    source_sig: Option<&str>,
504    target_sig: Option<&str>,
505) -> MergeConflict {
506    let (kind, message) = match (base_sig, source_sig, target_sig) {
507        (None, Some(_), Some(_)) => (
508            MergeConflictKind::DivergentInsert,
509            format!("divergent insert for id '{}'", row_id),
510        ),
511        (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
512            MergeConflictKind::DeleteVsUpdate,
513            format!("delete/update conflict for id '{}'", row_id),
514        ),
515        _ => (
516            MergeConflictKind::DivergentUpdate,
517            format!("divergent update for id '{}'", row_id),
518        ),
519    };
520    MergeConflict {
521        table_key: table_key.to_string(),
522        row_id: Some(row_id.to_string()),
523        kind,
524        message,
525    }
526}
527
528fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
529    let mut values = Vec::with_capacity(batch.num_columns());
530    for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
531        if field.name().starts_with("_row") {
532            continue;
533        }
534        values.push(
535            array_value_to_string(column.as_ref(), row)
536                .map_err(|e| OmniError::Lance(e.to_string()))?,
537        );
538    }
539    Ok(values.join("\u{1f}"))
540}
541
542async fn scan_validation_stream(ds: &Dataset) -> Result<DatasetRecordBatchStream> {
543    crate::table_store::TableStore::scan_stream_with(ds, None, None, None, false, |_| Ok(())).await
544}
545
546async fn validate_merge_candidates(
547    db: &Omnigraph,
548    source_snapshot: &Snapshot,
549    target_snapshot: &Snapshot,
550    candidates: &HashMap<String, CandidateTableState>,
551) -> Result<()> {
552    let mut conflicts = Vec::new();
553    let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
554
555    for (type_name, node_type) in &db.catalog().node_types {
556        let table_key = format!("node:{}", type_name);
557        let mut values = HashSet::new();
558        let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
559
560        if let Some(ds) =
561            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
562        {
563            let mut stream = scan_validation_stream(&ds).await?;
564            while let Some(batch) = stream
565                .try_next()
566                .await
567                .map_err(|e| OmniError::Lance(e.to_string()))?
568            {
569                if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
570                    conflicts.push(MergeConflict {
571                        table_key: table_key.clone(),
572                        row_id: None,
573                        kind: MergeConflictKind::ValueConstraintViolation,
574                        message: err.to_string(),
575                    });
576                }
577                update_unique_constraints(
578                    &table_key,
579                    &batch,
580                    &node_type.unique_constraints,
581                    &mut unique_seen,
582                    &mut conflicts,
583                )?;
584                let ids = batch
585                    .column_by_name("id")
586                    .ok_or_else(|| {
587                        OmniError::manifest(format!("table {} missing id column", table_key))
588                    })?
589                    .as_any()
590                    .downcast_ref::<StringArray>()
591                    .ok_or_else(|| {
592                        OmniError::manifest(format!("table {} id column is not Utf8", table_key))
593                    })?;
594                for row in 0..ids.len() {
595                    values.insert(ids.value(row).to_string());
596                }
597            }
598        }
599        node_ids.insert(type_name.clone(), values);
600    }
601
602    for (edge_name, edge_type) in &db.catalog().edge_types {
603        let table_key = format!("edge:{}", edge_name);
604        let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
605        let mut src_counts = HashMap::new();
606
607        if let Some(ds) =
608            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
609        {
610            let mut stream = scan_validation_stream(&ds).await?;
611            while let Some(batch) = stream
612                .try_next()
613                .await
614                .map_err(|e| OmniError::Lance(e.to_string()))?
615            {
616                update_unique_constraints(
617                    &table_key,
618                    &batch,
619                    &edge_type.unique_constraints,
620                    &mut unique_seen,
621                    &mut conflicts,
622                )?;
623                accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
624                conflicts.extend(validate_orphan_edges_batch(
625                    &table_key, edge_type, &batch, &node_ids,
626                )?);
627            }
628        }
629
630        conflicts.extend(finalize_edge_cardinality_conflicts(
631            &table_key,
632            edge_name,
633            edge_type.cardinality.min,
634            edge_type.cardinality.max,
635            src_counts,
636        ));
637    }
638
639    if conflicts.is_empty() {
640        Ok(())
641    } else {
642        Err(OmniError::MergeConflicts(conflicts))
643    }
644}
645
646async fn candidate_dataset(
647    source_snapshot: &Snapshot,
648    target_snapshot: &Snapshot,
649    candidates: &HashMap<String, CandidateTableState>,
650    table_key: &str,
651) -> Result<Option<Dataset>> {
652    if let Some(candidate) = candidates.get(table_key) {
653        return match candidate {
654            CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
655                Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
656                None => Ok(None),
657            },
658            CandidateTableState::RewriteMerged(staged) => {
659                Ok(Some(staged.full_staged.dataset.clone()))
660            }
661        };
662    }
663    match target_snapshot.entry(table_key) {
664        Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
665        None => Ok(None),
666    }
667}
668
669fn update_unique_constraints(
670    table_key: &str,
671    batch: &RecordBatch,
672    constraints: &[Vec<String>],
673    seen: &mut [HashMap<Vec<String>, String>],
674    conflicts: &mut Vec<MergeConflict>,
675) -> Result<()> {
676    for (constraint_idx, columns) in constraints.iter().enumerate() {
677        let seen = &mut seen[constraint_idx];
678        // Resolve the group's columns once. The candidate dataset always
679        // carries the full table schema, so a missing column is an internal
680        // error rather than a skip.
681        let group_columns = columns
682            .iter()
683            .map(|column_name| {
684                batch.column_by_name(column_name).cloned().ok_or_else(|| {
685                    OmniError::manifest(format!(
686                        "table {} missing unique column '{}'",
687                        table_key, column_name
688                    ))
689                })
690            })
691            .collect::<Result<Vec<_>>>()?;
692        for row in 0..batch.num_rows() {
693            // Same tuple key as the intake path — one shared derivation in
694            // `crate::loader::composite_unique_key`, so the two cannot drift on
695            // separator or scalar conversion. Null rows are exempt.
696            let Some(key) = crate::loader::composite_unique_key(&group_columns, row)? else {
697                continue;
698            };
699            let row_id = row_id_at(batch, row)?;
700            if let Some(first_row_id) = seen.insert(key, row_id.clone()) {
701                conflicts.push(MergeConflict {
702                    table_key: table_key.to_string(),
703                    row_id: Some(row_id.clone()),
704                    kind: MergeConflictKind::UniqueViolation,
705                    message: format!(
706                        "unique constraint {:?} violated by '{}' and '{}'",
707                        columns, first_row_id, row_id
708                    ),
709                });
710            }
711        }
712    }
713    Ok(())
714}
715
716fn accumulate_edge_cardinality(
717    batch: &RecordBatch,
718    counts: &mut HashMap<String, u32>,
719    table_key: &str,
720) -> Result<()> {
721    let srcs = batch
722        .column_by_name("src")
723        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
724        .as_any()
725        .downcast_ref::<StringArray>()
726        .ok_or_else(|| {
727            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
728        })?;
729    for row in 0..srcs.len() {
730        *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
731    }
732    Ok(())
733}
734
735fn finalize_edge_cardinality_conflicts(
736    table_key: &str,
737    edge_name: &str,
738    min: u32,
739    max: Option<u32>,
740    counts: HashMap<String, u32>,
741) -> Vec<MergeConflict> {
742    let mut conflicts = Vec::new();
743    for (src, count) in counts {
744        if let Some(max) = max {
745            if count > max {
746                conflicts.push(MergeConflict {
747                    table_key: table_key.to_string(),
748                    row_id: None,
749                    kind: MergeConflictKind::CardinalityViolation,
750                    message: format!(
751                        "@card violation on edge {}: source '{}' has {} edges (max {})",
752                        edge_name, src, count, max
753                    ),
754                });
755            }
756        }
757        if count < min {
758            conflicts.push(MergeConflict {
759                table_key: table_key.to_string(),
760                row_id: None,
761                kind: MergeConflictKind::CardinalityViolation,
762                message: format!(
763                    "@card violation on edge {}: source '{}' has {} edges (min {})",
764                    edge_name, src, count, min
765                ),
766            });
767        }
768    }
769    conflicts
770}
771
772fn validate_orphan_edges_batch(
773    table_key: &str,
774    edge_type: &omnigraph_compiler::catalog::EdgeType,
775    batch: &RecordBatch,
776    node_ids: &HashMap<String, HashSet<String>>,
777) -> Result<Vec<MergeConflict>> {
778    let srcs = batch
779        .column_by_name("src")
780        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
781        .as_any()
782        .downcast_ref::<StringArray>()
783        .ok_or_else(|| {
784            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
785        })?;
786    let dsts = batch
787        .column_by_name("dst")
788        .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
789        .as_any()
790        .downcast_ref::<StringArray>()
791        .ok_or_else(|| {
792            OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
793        })?;
794
795    let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
796        OmniError::manifest(format!(
797            "missing candidate node ids for {}",
798            edge_type.from_type
799        ))
800    })?;
801    let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
802        OmniError::manifest(format!(
803            "missing candidate node ids for {}",
804            edge_type.to_type
805        ))
806    })?;
807
808    let mut conflicts = Vec::new();
809    for row in 0..batch.num_rows() {
810        let row_id = row_id_at(batch, row)?;
811        let src = srcs.value(row);
812        let dst = dsts.value(row);
813        if !from_ids.contains(src) {
814            conflicts.push(MergeConflict {
815                table_key: table_key.to_string(),
816                row_id: Some(row_id.clone()),
817                kind: MergeConflictKind::OrphanEdge,
818                message: format!("src '{}' not found in {}", src, edge_type.from_type),
819            });
820        }
821        if !to_ids.contains(dst) {
822            conflicts.push(MergeConflict {
823                table_key: table_key.to_string(),
824                row_id: Some(row_id),
825                kind: MergeConflictKind::OrphanEdge,
826                message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
827            });
828        }
829    }
830    Ok(conflicts)
831}
832
833fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
834    let ids = batch
835        .column_by_name("id")
836        .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
837        .as_any()
838        .downcast_ref::<StringArray>()
839        .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
840    Ok(ids.value(row).to_string())
841}
842
843async fn publish_adopted_source_state(
844    target_db: &Omnigraph,
845    catalog: &Catalog,
846    base_snapshot: &Snapshot,
847    source_snapshot: &Snapshot,
848    target_snapshot: &Snapshot,
849    table_key: &str,
850) -> Result<crate::db::SubTableUpdate> {
851    let source_entry = source_snapshot
852        .entry(table_key)
853        .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
854    let target_entry = target_snapshot.entry(table_key);
855
856    let target_active = target_db.active_branch().await;
857    match (
858        target_active.as_deref(),
859        source_entry.table_branch.as_deref(),
860    ) {
861        // Both on main — pointer switch is safe (same lineage, version columns valid)
862        (None, None) => Ok(crate::db::SubTableUpdate {
863            table_key: table_key.to_string(),
864            table_version: source_entry.table_version,
865            table_branch: None,
866            row_count: source_entry.row_count,
867            version_metadata: source_entry.version_metadata.clone(),
868        }),
869        // Source on main, target on branch — pointer switch to main version
870        // (target reads from main, same lineage)
871        (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
872            table_key: table_key.to_string(),
873            table_version: source_entry.table_version,
874            table_branch: None,
875            row_count: source_entry.row_count,
876            version_metadata: source_entry.version_metadata.clone(),
877        }),
878        // Source on branch, target on main — apply delta to preserve version metadata
879        (None, Some(_source_branch)) => {
880            let delta =
881                compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
882            match delta {
883                Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
884                None => Ok(crate::db::SubTableUpdate {
885                    table_key: table_key.to_string(),
886                    table_version: target_entry
887                        .map(|e| e.table_version)
888                        .unwrap_or(source_entry.table_version),
889                    table_branch: None,
890                    row_count: source_entry.row_count,
891                    version_metadata: target_entry
892                        .map(|entry| entry.version_metadata.clone())
893                        .unwrap_or_else(|| source_entry.version_metadata.clone()),
894                }),
895            }
896        }
897        // Both on branches
898        (Some(target_branch), Some(source_branch)) => {
899            if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
900                // Target already owns this table — apply delta onto its lineage
901                let delta =
902                    compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
903                        .await?;
904                match delta {
905                    Some(staged) => {
906                        publish_rewritten_merge_table(target_db, table_key, &staged).await
907                    }
908                    None => Ok(crate::db::SubTableUpdate {
909                        table_key: table_key.to_string(),
910                        table_version: target_entry.unwrap().table_version,
911                        table_branch: Some(target_branch.to_string()),
912                        row_count: source_entry.row_count,
913                        version_metadata: target_entry.unwrap().version_metadata.clone(),
914                    }),
915                }
916            } else {
917                // Target doesn't own this table yet — fork from source state.
918                // This creates the target branch on the sub-table dataset.
919                let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
920                let ds = target_db
921                    .fork_dataset_from_entry_state(
922                        table_key,
923                        &full_path,
924                        Some(source_branch),
925                        source_entry.table_version,
926                        target_branch,
927                    )
928                    .await?;
929                let state = target_db.storage().table_state(&full_path, &ds).await?;
930                Ok(crate::db::SubTableUpdate {
931                    table_key: table_key.to_string(),
932                    table_version: state.version,
933                    table_branch: Some(target_branch.to_string()),
934                    row_count: state.row_count,
935                    version_metadata: state.version_metadata,
936                })
937            }
938        }
939    }
940}
941
942async fn publish_rewritten_merge_table(
943    target_db: &Omnigraph,
944    table_key: &str,
945    staged: &StagedMergeResult,
946) -> Result<crate::db::SubTableUpdate> {
947    // Branch merge's source-rewrite path is Merge-shaped (upsert from
948    // source onto target). The inline `delete_where` later in this
949    // function operates on rows the rewrite chose to remove, not
950    // user-facing predicates, so Merge is the correct policy here.
951    let (ds, full_path, table_branch) = target_db
952        .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
953        .await?;
954    let mut current_ds = ds;
955
956    // Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
957    // existing rows, bumps _row_last_updated_at_version only for actually-changed rows).
958    //
959    // Routed through the staged primitive so a failure between writing
960    // fragments and committing leaves no Lance-HEAD drift. The
961    // commit_staged here is per-table per-call (Lance has no
962    // multi-dataset atomic commit); the residual sits at this single
963    // commit point, narrowed from the previous "merge_insert + delete +
964    // index" multi-step inline-commit chain.
965    if let Some(delta) = &staged.delta_staged {
966        // The staged delta dataset is a temp-dir Lance dataset used only
967        // to collect the rewrite batches; wrap it in a `SnapshotHandle`
968        // so we can route through the trait's `scan_batches_for_rewrite`.
969        let delta_snapshot = SnapshotHandle::new(delta.dataset.clone());
970        let batches: Vec<RecordBatch> = target_db
971            .storage()
972            .scan_batches_for_rewrite(&delta_snapshot)
973            .await?
974            .into_iter()
975            .filter(|batch| batch.num_rows() > 0)
976            .collect();
977        if !batches.is_empty() {
978            // Concat into one batch — stage_merge_insert takes a single batch.
979            let combined = if batches.len() == 1 {
980                batches.into_iter().next().unwrap()
981            } else {
982                let schema = batches[0].schema();
983                arrow_select::concat::concat_batches(&schema, &batches)
984                    .map_err(|e| OmniError::Lance(e.to_string()))?
985            };
986            let staged_merge = target_db
987                .storage()
988                .stage_merge_insert(
989                    current_ds.clone(),
990                    combined,
991                    vec!["id".to_string()],
992                    lance::dataset::WhenMatched::UpdateAll,
993                    lance::dataset::WhenNotMatched::InsertAll,
994                )
995                .await?;
996            current_ds = target_db
997                .storage()
998                .commit_staged(current_ds, staged_merge)
999                .await?;
1000        }
1001    }
1002
1003    // Phase 2: delete removed rows via deletion vectors.
1004    //
1005    // INLINE-COMMIT RESIDUAL: lance-6.0.1 does not expose a public
1006    // two-phase delete API (DeleteJob is `pub(crate)` —
1007    // lance-format/lance#6658 is open with no PRs). We deliberately do
1008    // NOT introduce a `stage_delete` wrapper that would secretly
1009    // inline-commit (it would create a side-channel between the staged
1010    // and inline write paths). When the upstream API ships, swap this
1011    // `delete_where` call for `stage_delete` + `commit_staged`.
1012    if !staged.deleted_ids.is_empty() {
1013        let escaped: Vec<String> = staged
1014            .deleted_ids
1015            .iter()
1016            .map(|id| format!("'{}'", id.replace('\'', "''")))
1017            .collect();
1018        let filter = format!("id IN ({})", escaped.join(", "));
1019        let (new_ds, _) = target_db
1020            .storage_inline_residual()
1021            .delete_where(&full_path, current_ds, &filter)
1022            .await?;
1023        current_ds = new_ds;
1024    }
1025
1026    // Phase 3: rebuild indices.
1027    //
1028    // `build_indices_on_dataset` uses `stage_create_btree_index` /
1029    // `stage_create_inverted_index` + `commit_staged` for scalar
1030    // indices. Vector indices remain inline-commit
1031    // (`build_index_metadata_from_segments` is `pub(crate)` in lance-
1032    // 6.0.1 — companion ticket to lance-format/lance#6666).
1033    let row_count = target_db
1034        .storage()
1035        .table_state(&full_path, &current_ds)
1036        .await?
1037        .row_count;
1038    if row_count > 0 {
1039        target_db
1040            .build_indices_on_dataset(table_key, &mut current_ds)
1041            .await?;
1042    }
1043    let final_state = target_db
1044        .storage()
1045        .table_state(&full_path, &current_ds)
1046        .await?;
1047
1048    Ok(crate::db::SubTableUpdate {
1049        table_key: table_key.to_string(),
1050        table_version: final_state.version,
1051        table_branch,
1052        row_count: final_state.row_count,
1053        version_metadata: final_state.version_metadata,
1054    })
1055}
1056
1057impl Omnigraph {
1058    pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
1059        self.branch_merge_as(source, target, None).await
1060    }
1061
1062    pub async fn branch_merge_as(
1063        &self,
1064        source: &str,
1065        target: &str,
1066        actor_id: Option<&str>,
1067    ) -> Result<MergeOutcome> {
1068        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
1069        // `BranchTransition { source, target }` — matches the HTTP-layer
1070        // convention at `server_branch_merge` (branch=Some(source),
1071        // target_branch=Some(target)). Cedar rules using
1072        // `target_branch_scope: protected` therefore correctly gate
1073        // merges INTO protected branches without forbidding the
1074        // (symmetric) source-side reference.
1075        self.enforce(
1076            omnigraph_policy::PolicyAction::BranchMerge,
1077            &omnigraph_policy::ResourceScope::BranchTransition {
1078                source: source.to_string(),
1079                target: target.to_string(),
1080            },
1081            actor_id,
1082        )?;
1083        self.ensure_schema_apply_idle("branch_merge").await?;
1084        // Converge any pending recovery sidecar before the merge
1085        // captures its target snapshot: the merge's publish would
1086        // otherwise make the drifted Phase-B commit visible as an
1087        // unattributed side effect (manifest catches up to HEAD with no
1088        // recovery audit row) and leave the stale sidecar behind. Runs
1089        // before the merge's own sidecar exists.
1090        self.heal_pending_recovery_sidecars().await?;
1091        self.branch_merge_impl(source, target, actor_id).await
1092    }
1093
1094    async fn branch_merge_impl(
1095        &self,
1096        source: &str,
1097        target: &str,
1098        actor_id: Option<&str>,
1099    ) -> Result<MergeOutcome> {
1100        if is_internal_system_branch(source) || is_internal_system_branch(target) {
1101            return Err(OmniError::manifest(format!(
1102                "branch_merge does not allow internal system refs ('{}' -> '{}')",
1103                source, target
1104            )));
1105        }
1106        let source_branch = Omnigraph::normalize_branch_name(source)?;
1107        let target_branch = Omnigraph::normalize_branch_name(target)?;
1108        if source_branch == target_branch {
1109            return Err(OmniError::manifest(
1110                "branch_merge requires distinct source and target branches".to_string(),
1111            ));
1112        }
1113
1114        let source_head_commit_id = self
1115            .head_commit_id_for_branch(source_branch.as_deref())
1116            .await?
1117            .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1118        let target_head_commit_id = self
1119            .head_commit_id_for_branch(target_branch.as_deref())
1120            .await?
1121            .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1122        let base_commit = CommitGraph::merge_base(
1123            self.uri(),
1124            source_branch.as_deref(),
1125            target_branch.as_deref(),
1126        )
1127        .await?
1128        .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1129
1130        if source_head_commit_id == target_head_commit_id
1131            || base_commit.graph_commit_id == source_head_commit_id
1132        {
1133            return Ok(MergeOutcome::AlreadyUpToDate);
1134        }
1135        let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1136
1137        let base_snapshot = ManifestCoordinator::snapshot_at(
1138            self.uri(),
1139            base_commit.manifest_branch.as_deref(),
1140            base_commit.manifest_version,
1141        )
1142        .await?;
1143        let source_snapshot = self
1144            .resolved_target(ReadTarget::Branch(
1145                source_branch.clone().unwrap_or_else(|| "main".to_string()),
1146            ))
1147            .await?
1148            .snapshot;
1149        // Hold the merge-exclusive mutex across the full swap → operate
1150        // → restore window. Two concurrent branch_merge calls would
1151        // otherwise interleave their three separate `coordinator.write()`
1152        // acquisitions, leaving each merge's body running against the
1153        // other's swapped coord. Pinned by
1154        // `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
1155        // in `crates/omnigraph-server/tests/server.rs`.
1156        let merge_exclusive = self.merge_exclusive();
1157        let _merge_guard = merge_exclusive.lock().await;
1158
1159        let previous_branch = self.active_branch().await;
1160        let previous = self
1161            .swap_coordinator_for_branch(target_branch.as_deref())
1162            .await?;
1163        let merge_result = self
1164            .branch_merge_on_current_target(
1165                &base_snapshot,
1166                &source_snapshot,
1167                &target_head_commit_id,
1168                &source_head_commit_id,
1169                is_fast_forward,
1170                actor_id,
1171            )
1172            .await;
1173        self.restore_coordinator(previous).await;
1174
1175        // Sync the restored coordinator's cached manifest snapshot with
1176        // disk on both Ok and Err paths. During the swap window above,
1177        // `self.coordinator` was a freshly opened coord for the merge
1178        // target; any concurrent writer on that target (e.g. a `/change`
1179        // on `main` racing a `merge into=main`) publishes against the
1180        // swapped coord and never touches the original. Without this
1181        // sync, the restored coord's cached manifest snapshot would
1182        // diverge from disk and seed a stale `expected_versions` into
1183        // the next op's publisher CAS fence — a non-retryable
1184        // `ExpectedVersionMismatch` for a user with no concurrent
1185        // writer of their own. Pinned by the
1186        // `[d:merge×change:into-target]` cell of
1187        // `concurrent_branch_ops_morphological_matrix` in
1188        // `crates/omnigraph-server/tests/server.rs`, which flakes
1189        // pre-fix and stabilises post-fix.
1190        //
1191        // Use `refresh_coordinator_only` rather than `refresh` so the
1192        // recovery sweep doesn't race the merge's own in-flight
1193        // sidecar: when the merge body returns Err between Phase B
1194        // (per-table `commit_staged` + sidecar write) and Phase C
1195        // (manifest publish + sidecar delete), the sidecar is still on
1196        // disk. `refresh`'s `RollForwardOnly` sweep would observe it
1197        // and close it here — masking the failure from the next
1198        // `Omnigraph::open` sweep and from the audit row that the open
1199        // sweep emits. Pinned by
1200        // `branch_merge_phase_b_failure_recovered_on_next_open` in
1201        // `crates/omnigraph/tests/failpoints.rs`.
1202        //
1203        // Err-path refresh is best-effort: the merge body's error
1204        // (typically the structured `manifest_conflict` from the
1205        // post_queue_snapshot drift check) is the value the caller
1206        // needs to see. A refresh-time storage error would replace
1207        // that with a less informative error; the next op or the next
1208        // `Omnigraph::open` will re-sync the coord anyway.
1209        if previous_branch == target_branch {
1210            if let Err(refresh_err) = self.refresh_coordinator_only().await {
1211                if merge_result.is_ok() {
1212                    return Err(refresh_err);
1213                }
1214                tracing::warn!(
1215                    error = %refresh_err,
1216                    "post-merge coordinator refresh failed on the error path; \
1217                     the next op or open will re-sync"
1218                );
1219            }
1220        }
1221
1222        merge_result
1223    }
1224
1225    async fn branch_merge_on_current_target(
1226        &self,
1227        base_snapshot: &Snapshot,
1228        source_snapshot: &Snapshot,
1229        target_head_commit_id: &str,
1230        source_head_commit_id: &str,
1231        is_fast_forward: bool,
1232        actor_id: Option<&str>,
1233    ) -> Result<MergeOutcome> {
1234        self.ensure_commit_graph_initialized().await?;
1235        let target_snapshot = self.snapshot().await;
1236
1237        let mut table_keys = HashSet::new();
1238        for entry in base_snapshot.entries() {
1239            table_keys.insert(entry.table_key.clone());
1240        }
1241        for entry in source_snapshot.entries() {
1242            table_keys.insert(entry.table_key.clone());
1243        }
1244        for entry in target_snapshot.entries() {
1245            table_keys.insert(entry.table_key.clone());
1246        }
1247
1248        let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1249        ordered_table_keys.sort();
1250
1251        let mut conflicts = Vec::new();
1252        let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1253
1254        for table_key in &ordered_table_keys {
1255            let base_entry = base_snapshot.entry(table_key);
1256            let source_entry = source_snapshot.entry(table_key);
1257            let target_entry = target_snapshot.entry(table_key);
1258            if same_manifest_state(source_entry, target_entry) {
1259                continue;
1260            }
1261            if same_manifest_state(base_entry, source_entry) {
1262                continue;
1263            }
1264            if same_manifest_state(base_entry, target_entry) {
1265                candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
1266                continue;
1267            }
1268
1269            if let Some(staged) = stage_streaming_table_merge(
1270                table_key,
1271                &self.catalog(),
1272                base_snapshot,
1273                source_snapshot,
1274                &target_snapshot,
1275                &mut conflicts,
1276            )
1277            .await?
1278            {
1279                candidates.insert(
1280                    table_key.clone(),
1281                    CandidateTableState::RewriteMerged(staged),
1282                );
1283            }
1284        }
1285
1286        if !conflicts.is_empty() {
1287            return Err(OmniError::MergeConflicts(conflicts));
1288        }
1289
1290        validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1291
1292        // Recovery sidecar: protect the per-table commit_staged loop.
1293        // Pin only `RewriteMerged` candidates because they always
1294        // advance Lance HEAD through `publish_rewritten_merge_table`
1295        // (which runs stage_merge_insert + delete_where + index
1296        // rebuilds — multiple commit_staged calls per table; loose
1297        // classification handles the multi-step drift).
1298        //
1299        // `AdoptSourceState` candidates are NOT pinned: their publish
1300        // path is `publish_adopted_source_state`, whose subcases mostly
1301        // don't advance Lance HEAD (pure manifest pointer switch, or
1302        // fork via `fork_dataset_from_entry_state` which only adds a
1303        // Lance branch ref). If those subcases were pinned, recovery
1304        // would classify them as NoMovement and the all-or-nothing
1305        // decision would force a rollback that destroys legitimately-
1306        // committed work on sibling RewriteMerged tables.
1307        //
1308        // Residual: two `AdoptSourceState` subcases (when source has a
1309        // table_branch AND the source delta is non-empty) internally
1310        // call `publish_rewritten_merge_table` and DO advance HEAD.
1311        // Those are not covered by this sidecar — if they fail mid-
1312        // commit, the residual persists until the next ReadWrite open
1313        // detects it via a subsequent ExpectedVersionMismatch from a
1314        // later writer that touches the same table. Closing this gap
1315        // requires pre-computing source deltas during candidate
1316        // classification (a structural change to `CandidateTableState`)
1317        // and is left as follow-up work.
1318        // Acquire per-(table_key, target_branch) queues for every table
1319        // touched by the merge plan. Sorted-order acquisition prevents
1320        // lock-order inversion against concurrent multi-table writers.
1321        // The active branch (set by the caller's `swap_coordinator_for_branch`)
1322        // is the merge target; queue keys are scoped to it because a
1323        // branch_merge writes only to the target branch.
1324        //
1325        // Held across the per-table publish loop and the manifest
1326        // commit + record_merge_commit calls below, so no concurrent
1327        // writer to a touched (table, target_branch) can interleave
1328        // between our commit_staged and our publish.
1329        let active_branch_for_keys = self.active_branch().await;
1330        let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
1331            .iter()
1332            .filter(|table_key| {
1333                matches!(
1334                    candidates.get(*table_key),
1335                    Some(CandidateTableState::RewriteMerged(_))
1336                        | Some(CandidateTableState::AdoptSourceState)
1337                )
1338            })
1339            .map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
1340            .collect();
1341        let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
1342
1343        let post_queue_snapshot = self.snapshot().await;
1344        for table_key in &ordered_table_keys {
1345            let Some(candidate) = candidates.get(table_key) else {
1346                continue;
1347            };
1348            if !matches!(
1349                candidate,
1350                CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState
1351            ) {
1352                continue;
1353            }
1354            let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
1355            let current = post_queue_snapshot
1356                .entry(table_key)
1357                .map(|e| e.table_version);
1358            if expected != current {
1359                return Err(OmniError::manifest_expected_version_mismatch(
1360                    table_key.clone(),
1361                    expected.unwrap_or(0),
1362                    current.unwrap_or(0),
1363                ));
1364            }
1365        }
1366
1367        let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
1368            .iter()
1369            .filter_map(|table_key| {
1370                let candidate = candidates.get(table_key)?;
1371                if !matches!(candidate, CandidateTableState::RewriteMerged(_)) {
1372                    return None;
1373                }
1374                let entry = target_snapshot.entry(table_key)?;
1375                Some(crate::db::manifest::SidecarTablePin {
1376                    table_key: table_key.clone(),
1377                    table_path: self.storage().dataset_uri(&entry.table_path),
1378                    expected_version: entry.table_version,
1379                    post_commit_pin: entry.table_version + 1,
1380                    // Use the merge target branch (where commits actually
1381                    // land), NOT entry.table_branch (where the table
1382                    // currently lives). publish_rewritten_merge_table calls
1383                    // open_for_mutation, which forks an inherited-from-main
1384                    // table to active_branch on first write — the resulting
1385                    // Lance commit lands on active_branch. Recovery's
1386                    // open_lance_head must check the same branch, otherwise
1387                    // an inherited-table feature-to-feature merge classifies
1388                    // as NoMovement and the all-or-nothing rollback skips
1389                    // the orphaned post-Phase-B HEAD on the target ref.
1390                    // Same rationale as table_ops.rs:115-120 in
1391                    // ensure_indices_for_branch.
1392                    table_branch: active_branch_for_keys.clone(),
1393                })
1394            })
1395            .collect();
1396        let recovery_handle = if recovery_pins.is_empty() {
1397            None
1398        } else {
1399            // Use the merge target branch directly, NOT a heuristic
1400            // derived from `ordered_table_keys.first()`. The first
1401            // sorted table key may not be in the target snapshot at all
1402            // (its `entry()` returns None → branch becomes None == main),
1403            // and the SubTableEntry's `table_branch` field isn't
1404            // necessarily the merge target branch. The caller
1405            // `branch_merge` calls `swap_coordinator_for_branch(target_branch)`
1406            // before invoking this function, so `self.active_branch()`
1407            // is the target.
1408            let target_branch = active_branch_for_keys.clone();
1409            let mut sidecar = crate::db::manifest::new_sidecar(
1410                crate::db::manifest::SidecarKind::BranchMerge,
1411                target_branch,
1412                actor_id.map(str::to_string),
1413                recovery_pins,
1414            );
1415            // Carry the source branch's HEAD commit id so the recovery
1416            // sweep's audit step can record this as a MERGE commit
1417            // (linked to the source) instead of a plain commit. Without
1418            // this, future merges between the same pair lose
1419            // already-up-to-date detection and merge-base correctness.
1420            sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
1421            Some(
1422                crate::db::manifest::write_sidecar(
1423                    self.root_uri(),
1424                    self.storage_adapter(),
1425                    &sidecar,
1426                )
1427                .await?,
1428            )
1429        };
1430
1431        let mut updates = Vec::new();
1432        let mut changed_edge_tables = false;
1433        for table_key in &ordered_table_keys {
1434            let Some(candidate_state) = candidates.get(table_key) else {
1435                continue;
1436            };
1437            let update = match candidate_state {
1438                CandidateTableState::AdoptSourceState => {
1439                    publish_adopted_source_state(
1440                        self,
1441                        &self.catalog(),
1442                        base_snapshot,
1443                        source_snapshot,
1444                        &target_snapshot,
1445                        table_key,
1446                    )
1447                    .await?
1448                }
1449                CandidateTableState::RewriteMerged(staged) => {
1450                    publish_rewritten_merge_table(self, table_key, staged).await?
1451                }
1452            };
1453            if table_key.starts_with("edge:") {
1454                changed_edge_tables = true;
1455            }
1456            updates.push(update);
1457        }
1458
1459        // Failpoint: pin the per-writer Phase B → Phase C residual for
1460        // branch_merge. Lance HEAD has advanced on every touched table
1461        // (publish_*) but the manifest publish below hasn't run. Used
1462        // by `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`.
1463        crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
1464
1465        let manifest_version = if updates.is_empty() {
1466            self.version().await
1467        } else {
1468            self.commit_manifest_updates(&updates).await?
1469        };
1470
1471        // Recovery sidecar lifecycle: delete after manifest publish.
1472        // Best-effort cleanup; the merge already landed durably so
1473        // failing the user here is undesirable.
1474        if let Some(handle) = recovery_handle {
1475            if let Err(err) =
1476                crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
1477            {
1478                tracing::warn!(
1479                    error = %err,
1480                    operation_id = handle.operation_id.as_str(),
1481                    "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
1482                );
1483            }
1484        }
1485        self.record_merge_commit(
1486            manifest_version,
1487            target_head_commit_id,
1488            source_head_commit_id,
1489            actor_id,
1490        )
1491        .await?;
1492
1493        if changed_edge_tables {
1494            self.invalidate_graph_index().await;
1495        }
1496
1497        Ok(if is_fast_forward {
1498            MergeOutcome::FastForward
1499        } else {
1500            MergeOutcome::Merged
1501        })
1502    }
1503}