Skip to main content

omnigraph/exec/
merge.rs

1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8    /// Adopt the source's table state via a pointer switch or a branch fork —
9    /// no data HEAD advance, so nothing to pin for recovery.
10    AdoptSourceState,
11    /// Adopt the source's state by applying a non-empty delta onto the target's
12    /// lineage (append new + upsert changed + delete removed). The delta is
13    /// pre-computed at classification so this candidate can be recovery-pinned:
14    /// its publish advances Lance HEAD before the manifest commit.
15    AdoptWithDelta(AdoptDelta),
16    RewriteMerged(StagedMergeResult),
17}
18
19#[derive(Debug)]
20struct StagedTable {
21    _dir: TempDir,
22    dataset: Dataset,
23}
24
25#[derive(Debug)]
26struct StagedMergeResult {
27    full_staged: StagedTable,
28    delta_staged: Option<StagedTable>,
29    deleted_ids: Vec<String>,
30}
31
32/// Delta for an adopted-source merge (the fast-forward / target-owns path):
33/// the new + changed rows to apply onto the target's base lineage, plus the ids
34/// removed on source. Distinct from [`StagedMergeResult`] (the three-way path),
35/// which also carries a `full_staged` table for validation — the adopt path
36/// validates against the source snapshot directly (`candidate_dataset`), so it
37/// needs no `full_staged` and never builds it.
38///
39/// TRANSITIONAL — fragment-adopt excision point. This whole row-level adopt
40/// (`AdoptDelta`, [`compute_adopt_delta`], [`publish_adopted_delta`], and the
41/// streaming append it drives) re-derives the source branch row-by-row because
42/// today's Lance offers no fragment-level branch merge. When Lance ships
43/// branch-merge/rebase ([#7263]) + UUID branch paths ([#7185]), a fast-forward
44/// merge becomes a *fragment graft* — adopt the source table version's
45/// fragments (and their already-built indexes) by reference, no rows scanned,
46/// re-appended, upserted, or deleted. At that point this struct and its two
47/// functions are removed wholesale; the merge collapses to ~one ref/metadata
48/// op per table. Keep them self-contained so that excision stays a clean delete.
49///
50/// [#7263]: https://github.com/lance-format/lance/issues/7263
51/// [#7185]: https://github.com/lance-format/lance/issues/7185
52#[derive(Debug)]
53struct AdoptDelta {
54    /// New-on-source rows → `stage_append` (a streaming `Operation::Append`, no
55    /// hash join). The connector's dominant case and the OOM fix: appending new
56    /// rows never buffers the whole delta in a full-outer hash join.
57    appends: Option<StagedTable>,
58    /// Changed-on-source rows → `stage_merge_insert` (a hash join bounded to the
59    /// genuinely-changed set, not the whole delta).
60    upserts: Option<StagedTable>,
61    deleted_ids: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
65struct CursorRow {
66    id: String,
67    signature: String,
68    dataset: Dataset,
69    batch: RecordBatch,
70    row_index: usize,
71}
72
73impl CursorRow {
74    /// Compute this row's signature on demand. Used by the lazy adopt cursor,
75    /// where `signature` is left empty; the value is identical to the eager
76    /// `signature` field the three-way cursor populates.
77    fn compute_signature(&self) -> Result<String> {
78        row_signature(&self.batch, self.row_index)
79    }
80}
81
82struct OrderedTableCursor {
83    stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
84    dataset: Option<Dataset>,
85    current_batch: Option<RecordBatch>,
86    current_row: usize,
87    peeked: Option<CursorRow>,
88    /// When false, `next_row` leaves `CursorRow::signature` empty and callers
89    /// compute it on demand via `CursorRow::compute_signature`. The adopt path
90    /// uses this: new/deleted rows never need a signature comparison and would
91    /// otherwise eagerly stringify their embedding for nothing.
92    eager_signatures: bool,
93}
94
95impl OrderedTableCursor {
96    async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
97        Self::open(snapshot, table_key, true).await
98    }
99
100    /// Like `from_snapshot` but leaves row signatures uncomputed (callers use
101    /// `CursorRow::compute_signature` on demand). See `eager_signatures`.
102    async fn from_snapshot_lazy(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
103        Self::open(snapshot, table_key, false).await
104    }
105
106    async fn open(snapshot: &Snapshot, table_key: &str, eager_signatures: bool) -> Result<Self> {
107        let dataset = match snapshot.entry(table_key) {
108            Some(_) => Some(snapshot.open(table_key).await?),
109            None => None,
110        };
111        Self::from_dataset(dataset, eager_signatures).await
112    }
113
114    async fn from_dataset(dataset: Option<Dataset>, eager_signatures: bool) -> Result<Self> {
115        let stream = if let Some(ds) = &dataset {
116            Some(Box::pin(
117                crate::table_store::TableStore::scan_stream_with(
118                    ds,
119                    None,
120                    None,
121                    Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
122                    true,
123                    |_| Ok(()),
124                )
125                .await?,
126            ))
127        } else {
128            None
129        };
130
131        Ok(Self {
132            stream,
133            dataset,
134            current_batch: None,
135            current_row: 0,
136            peeked: None,
137            eager_signatures,
138        })
139    }
140
141    async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
142        if self.peeked.is_none() {
143            self.peeked = self.next_row().await?;
144        }
145        Ok(self.peeked.clone())
146    }
147
148    async fn pop(&mut self) -> Result<Option<CursorRow>> {
149        if self.peeked.is_some() {
150            return Ok(self.peeked.take());
151        }
152        self.next_row().await
153    }
154
155    async fn next_row(&mut self) -> Result<Option<CursorRow>> {
156        loop {
157            if let Some(batch) = &self.current_batch {
158                if self.current_row < batch.num_rows() {
159                    let row_index = self.current_row;
160                    self.current_row += 1;
161                    let dataset = self.dataset.clone().ok_or_else(|| {
162                        OmniError::manifest("cursor row missing source dataset".to_string())
163                    })?;
164                    let signature = if self.eager_signatures {
165                        row_signature(batch, row_index)?
166                    } else {
167                        String::new()
168                    };
169                    return Ok(Some(CursorRow {
170                        id: row_id_at(batch, row_index)?,
171                        signature,
172                        dataset,
173                        batch: batch.clone(),
174                        row_index,
175                    }));
176                }
177            }
178
179            let Some(stream) = self.stream.as_mut() else {
180                return Ok(None);
181            };
182            match stream.try_next().await {
183                Ok(Some(batch)) => {
184                    self.current_batch = Some(batch);
185                    self.current_row = 0;
186                }
187                Ok(None) => {
188                    self.stream = None;
189                    self.current_batch = None;
190                    return Ok(None);
191                }
192                Err(err) => return Err(OmniError::Lance(err.to_string())),
193            }
194        }
195    }
196}
197
198struct StagedTableWriter {
199    schema: SchemaRef,
200    dataset_uri: String,
201    dir: TempDir,
202    dataset: Option<Dataset>,
203    buffered_rows: usize,
204    row_count: u64,
205    batches: Vec<RecordBatch>,
206}
207
208impl StagedTableWriter {
209    fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
210        let dir = merge_stage_tempdir(table_key)?;
211        let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
212        Ok(Self {
213            schema,
214            dataset_uri,
215            dir,
216            dataset: None,
217            buffered_rows: 0,
218            row_count: 0,
219            batches: Vec::new(),
220        })
221    }
222
223    async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
224        self.row_count += 1;
225        self.buffered_rows += 1;
226        self.batches.push(self.row_batch(row).await?);
227        if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
228            self.flush().await?;
229        }
230        Ok(())
231    }
232
233    async fn row_batch(&self, row: &CursorRow) -> Result<RecordBatch> {
234        let batch = row.batch.slice(row.row_index, 1);
235        let has_blob_columns = row
236            .dataset
237            .schema()
238            .fields_pre_order()
239            .any(|field| field.is_blob());
240        if has_blob_columns {
241            return crate::table_store::TableStore::materialize_blob_batch(&row.dataset, batch)
242                .await;
243        }
244        let columns = self
245            .schema
246            .fields()
247            .iter()
248            .map(|field| {
249                batch.column_by_name(field.name()).cloned().ok_or_else(|| {
250                    OmniError::Lance(format!("batch missing column '{}'", field.name()))
251                })
252            })
253            .collect::<Result<Vec<_>>>()?;
254        RecordBatch::try_new(self.schema.clone(), columns)
255            .map_err(|e| OmniError::Lance(e.to_string()))
256    }
257
258    async fn finish(mut self) -> Result<StagedTable> {
259        self.flush().await?;
260        if self.dataset.is_none() {
261            self.dataset = Some(
262                crate::table_store::TableStore::create_empty_dataset(
263                    &self.dataset_uri,
264                    &self.schema,
265                )
266                .await?,
267            );
268        }
269        Ok(StagedTable {
270            _dir: self.dir,
271            dataset: self.dataset.unwrap(),
272        })
273    }
274
275    async fn flush(&mut self) -> Result<()> {
276        if self.batches.is_empty() {
277            return Ok(());
278        }
279
280        let batch = if self.batches.len() == 1 {
281            self.batches.pop().unwrap()
282        } else {
283            let batches = std::mem::take(&mut self.batches);
284            arrow_select::concat::concat_batches(&self.schema, &batches)
285                .map_err(|e| OmniError::Lance(e.to_string()))?
286        };
287        self.buffered_rows = 0;
288
289        let ds = crate::table_store::TableStore::append_or_create_batch(
290            &self.dataset_uri,
291            self.dataset.take(),
292            batch,
293        )
294        .await?;
295        self.dataset = Some(ds);
296        Ok(())
297    }
298}
299
300fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
301    if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
302        return TempDirBuilder::new()
303            .prefix(&format!(
304                "omnigraph-merge-{}-",
305                sanitize_table_key(table_key)
306            ))
307            .tempdir_in(PathBuf::from(root))
308            .map_err(OmniError::from);
309    }
310    TempDirBuilder::new()
311        .prefix(&format!(
312            "omnigraph-merge-{}-",
313            sanitize_table_key(table_key)
314        ))
315        .tempdir()
316        .map_err(OmniError::from)
317}
318
319fn sanitize_table_key(table_key: &str) -> String {
320    table_key
321        .chars()
322        .map(|ch| match ch {
323            ':' | '/' | '\\' => '-',
324            other => other,
325        })
326        .collect()
327}
328
329/// Computes the delta between base and source for an adopted-source merge.
330/// Returns the new + changed rows and the ids deleted on source.
331///
332/// Unchanged rows are dropped: the adopt path validates against the source
333/// snapshot directly (`candidate_dataset`), so no `full_staged` table is built
334/// — saving the O(rows) temp write that `compute_source_delta` used to produce
335/// and then discard.
336///
337/// TRANSITIONAL — removed by the fragment-adopt work (see [`AdoptDelta`]): a
338/// fragment graft adopts the source's fragments by reference, so there is no
339/// row-level delta to compute.
340async fn compute_adopt_delta(
341    table_key: &str,
342    catalog: &Catalog,
343    base_snapshot: &Snapshot,
344    source_snapshot: &Snapshot,
345) -> Result<Option<AdoptDelta>> {
346    let schema = schema_for_table_key(catalog, table_key)?;
347    let mut append_writer =
348        StagedTableWriter::new(&format!("{}_adopt_append", table_key), schema.clone())?;
349    let mut upsert_writer =
350        StagedTableWriter::new(&format!("{}_adopt_upsert", table_key), schema)?;
351    let mut deleted_ids: Vec<String> = Vec::new();
352    let mut base = OrderedTableCursor::from_snapshot_lazy(base_snapshot, table_key).await?;
353    let mut source = OrderedTableCursor::from_snapshot_lazy(source_snapshot, table_key).await?;
354
355    let mut needs_update = false;
356
357    loop {
358        let base_row = base.peek_cloned().await?;
359        let source_row = source.peek_cloned().await?;
360
361        let next_id = [base_row.as_ref(), source_row.as_ref()]
362            .into_iter()
363            .flatten()
364            .map(|row| row.id.clone())
365            .min();
366        let Some(next_id) = next_id else { break };
367
368        let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
369            base.pop().await?
370        } else {
371            None
372        };
373        let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
374            source.pop().await?
375        } else {
376            None
377        };
378
379        match (&base_row, &source_row) {
380            (Some(_), None) => {
381                // Deleted on source
382                deleted_ids.push(next_id);
383                needs_update = true;
384            }
385            (None, Some(src)) => {
386                // New on source → append (streaming, no hash join). No signature
387                // needed — a new id is absent from base by construction.
388                append_writer.push_row(src).await?;
389                needs_update = true;
390            }
391            (Some(base), Some(src)) => {
392                // Present on both — compute signatures lazily (the only case
393                // that needs them) to tell a changed row from an unchanged one.
394                // New/deleted rows above skip the embedding stringify entirely.
395                if src.compute_signature()? != base.compute_signature()? {
396                    // Changed on source → upsert.
397                    upsert_writer.push_row(src).await?;
398                    needs_update = true;
399                }
400                // else unchanged — already on the target's base lineage; drop.
401            }
402            (None, None) => unreachable!(),
403        }
404    }
405
406    if !needs_update {
407        return Ok(None);
408    }
409
410    let appends = if append_writer.row_count > 0 {
411        Some(append_writer.finish().await?)
412    } else {
413        None
414    };
415    let upserts = if upsert_writer.row_count > 0 {
416        Some(upsert_writer.finish().await?)
417    } else {
418        None
419    };
420
421    Ok(Some(AdoptDelta {
422        appends,
423        upserts,
424        deleted_ids,
425    }))
426}
427
428fn min_cursor_id(
429    base_row: &Option<CursorRow>,
430    source_row: &Option<CursorRow>,
431    target_row: &Option<CursorRow>,
432) -> Option<String> {
433    [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
434        .into_iter()
435        .flatten()
436        .map(|row| row.id.clone())
437        .min()
438}
439
440async fn stage_streaming_table_merge(
441    table_key: &str,
442    catalog: &Catalog,
443    base_snapshot: &Snapshot,
444    source_snapshot: &Snapshot,
445    target_snapshot: &Snapshot,
446    conflicts: &mut Vec<MergeConflict>,
447) -> Result<Option<StagedMergeResult>> {
448    let schema = schema_for_table_key(catalog, table_key)?;
449    let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
450    let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
451    let mut deleted_ids: Vec<String> = Vec::new();
452    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
453    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
454    let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
455
456    let prior_conflict_count = conflicts.len();
457    let mut needs_update = false;
458
459    loop {
460        let base_row = base.peek_cloned().await?;
461        let source_row = source.peek_cloned().await?;
462        let target_row = target.peek_cloned().await?;
463        let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
464            break;
465        };
466
467        let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
468            base.pop().await?
469        } else {
470            None
471        };
472        let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
473        {
474            source.pop().await?
475        } else {
476            None
477        };
478        let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
479        {
480            target.pop().await?
481        } else {
482            None
483        };
484
485        let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
486        let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
487        let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
488
489        let source_changed = source_sig != base_sig;
490        let target_changed = target_sig != base_sig;
491
492        let selection = if !source_changed {
493            target_row.as_ref()
494        } else if !target_changed {
495            source_row.as_ref()
496        } else if source_sig == target_sig {
497            target_row.as_ref()
498        } else {
499            conflicts.push(classify_merge_conflict(
500                table_key, &next_id, base_sig, source_sig, target_sig,
501            ));
502            None
503        };
504
505        if conflicts.len() > prior_conflict_count {
506            continue;
507        }
508
509        // Row existed in target but not in merge result → delete
510        if selection.is_none() && target_row.is_some() {
511            deleted_ids.push(next_id.clone());
512            needs_update = true;
513            continue;
514        }
515
516        if let Some(selection) = selection {
517            // Always write to full (for validation)
518            full_writer.push_row(selection).await?;
519            // Only write changed rows to delta (for publish)
520            if selection.signature.as_str() != target_sig.unwrap_or("") {
521                delta_writer.push_row(selection).await?;
522                needs_update = true;
523            }
524        }
525    }
526
527    if conflicts.len() > prior_conflict_count {
528        return Ok(None);
529    }
530    if !needs_update {
531        return Ok(None);
532    }
533
534    let delta_staged = if delta_writer.row_count > 0 {
535        Some(delta_writer.finish().await?)
536    } else {
537        None
538    };
539
540    Ok(Some(StagedMergeResult {
541        full_staged: full_writer.finish().await?,
542        delta_staged,
543        deleted_ids,
544    }))
545}
546
547fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
548    if let Some(name) = table_key.strip_prefix("node:") {
549        return catalog
550            .node_types
551            .get(name)
552            .map(|t| t.arrow_schema.clone())
553            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
554    }
555    if let Some(name) = table_key.strip_prefix("edge:") {
556        return catalog
557            .edge_types
558            .get(name)
559            .map(|t| t.arrow_schema.clone())
560            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
561    }
562    Err(OmniError::manifest(format!(
563        "invalid table key '{}'",
564        table_key
565    )))
566}
567
568fn same_manifest_state(
569    left: Option<&crate::db::SubTableEntry>,
570    right: Option<&crate::db::SubTableEntry>,
571) -> bool {
572    match (left, right) {
573        (Some(left), Some(right)) => {
574            left.table_version == right.table_version && left.table_branch == right.table_branch
575        }
576        (None, None) => true,
577        _ => false,
578    }
579}
580
581fn classify_merge_conflict(
582    table_key: &str,
583    row_id: &str,
584    base_sig: Option<&str>,
585    source_sig: Option<&str>,
586    target_sig: Option<&str>,
587) -> MergeConflict {
588    let (kind, message) = match (base_sig, source_sig, target_sig) {
589        (None, Some(_), Some(_)) => (
590            MergeConflictKind::DivergentInsert,
591            format!("divergent insert for id '{}'", row_id),
592        ),
593        (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
594            MergeConflictKind::DeleteVsUpdate,
595            format!("delete/update conflict for id '{}'", row_id),
596        ),
597        _ => (
598            MergeConflictKind::DivergentUpdate,
599            format!("divergent update for id '{}'", row_id),
600        ),
601    };
602    MergeConflict {
603        table_key: table_key.to_string(),
604        row_id: Some(row_id.to_string()),
605        kind,
606        message,
607    }
608}
609
610fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
611    let mut values = Vec::with_capacity(batch.num_columns());
612    for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
613        if field.name().starts_with("_row") {
614            continue;
615        }
616        values.push(
617            array_value_to_string(column.as_ref(), row)
618                .map_err(|e| OmniError::Lance(e.to_string()))?,
619        );
620    }
621    Ok(values.join("\u{1f}"))
622}
623
624async fn scan_validation_stream(ds: &Dataset) -> Result<DatasetRecordBatchStream> {
625    crate::table_store::TableStore::scan_stream_with(ds, None, None, None, false, |_| Ok(())).await
626}
627
628async fn validate_merge_candidates(
629    db: &Omnigraph,
630    source_snapshot: &Snapshot,
631    target_snapshot: &Snapshot,
632    candidates: &HashMap<String, CandidateTableState>,
633) -> Result<()> {
634    let mut conflicts = Vec::new();
635    let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
636
637    for (type_name, node_type) in &db.catalog().node_types {
638        let table_key = format!("node:{}", type_name);
639        let mut values = HashSet::new();
640        let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
641
642        if let Some(ds) =
643            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
644        {
645            let mut stream = scan_validation_stream(&ds).await?;
646            while let Some(batch) = stream
647                .try_next()
648                .await
649                .map_err(|e| OmniError::Lance(e.to_string()))?
650            {
651                if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
652                    conflicts.push(MergeConflict {
653                        table_key: table_key.clone(),
654                        row_id: None,
655                        kind: MergeConflictKind::ValueConstraintViolation,
656                        message: err.to_string(),
657                    });
658                }
659                update_unique_constraints(
660                    &table_key,
661                    &batch,
662                    &node_type.unique_constraints,
663                    &mut unique_seen,
664                    &mut conflicts,
665                )?;
666                let ids = batch
667                    .column_by_name("id")
668                    .ok_or_else(|| {
669                        OmniError::manifest(format!("table {} missing id column", table_key))
670                    })?
671                    .as_any()
672                    .downcast_ref::<StringArray>()
673                    .ok_or_else(|| {
674                        OmniError::manifest(format!("table {} id column is not Utf8", table_key))
675                    })?;
676                for row in 0..ids.len() {
677                    values.insert(ids.value(row).to_string());
678                }
679            }
680        }
681        node_ids.insert(type_name.clone(), values);
682    }
683
684    for (edge_name, edge_type) in &db.catalog().edge_types {
685        let table_key = format!("edge:{}", edge_name);
686        let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
687        let mut src_counts = HashMap::new();
688
689        if let Some(ds) =
690            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
691        {
692            let mut stream = scan_validation_stream(&ds).await?;
693            while let Some(batch) = stream
694                .try_next()
695                .await
696                .map_err(|e| OmniError::Lance(e.to_string()))?
697            {
698                update_unique_constraints(
699                    &table_key,
700                    &batch,
701                    &edge_type.unique_constraints,
702                    &mut unique_seen,
703                    &mut conflicts,
704                )?;
705                accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
706                conflicts.extend(validate_orphan_edges_batch(
707                    &table_key, edge_type, &batch, &node_ids,
708                )?);
709            }
710        }
711
712        conflicts.extend(finalize_edge_cardinality_conflicts(
713            &table_key,
714            edge_name,
715            edge_type.cardinality.min,
716            edge_type.cardinality.max,
717            src_counts,
718        ));
719    }
720
721    if conflicts.is_empty() {
722        Ok(())
723    } else {
724        Err(OmniError::MergeConflicts(conflicts))
725    }
726}
727
728async fn candidate_dataset(
729    source_snapshot: &Snapshot,
730    target_snapshot: &Snapshot,
731    candidates: &HashMap<String, CandidateTableState>,
732    table_key: &str,
733) -> Result<Option<Dataset>> {
734    if let Some(candidate) = candidates.get(table_key) {
735        return match candidate {
736            CandidateTableState::AdoptSourceState | CandidateTableState::AdoptWithDelta(_) => {
737                match source_snapshot.entry(table_key) {
738                    Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
739                    None => Ok(None),
740                }
741            }
742            CandidateTableState::RewriteMerged(staged) => {
743                Ok(Some(staged.full_staged.dataset.clone()))
744            }
745        };
746    }
747    match target_snapshot.entry(table_key) {
748        Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
749        None => Ok(None),
750    }
751}
752
753fn update_unique_constraints(
754    table_key: &str,
755    batch: &RecordBatch,
756    constraints: &[Vec<String>],
757    seen: &mut [HashMap<Vec<String>, String>],
758    conflicts: &mut Vec<MergeConflict>,
759) -> Result<()> {
760    for (constraint_idx, columns) in constraints.iter().enumerate() {
761        let seen = &mut seen[constraint_idx];
762        // Resolve the group's columns once. The candidate dataset always
763        // carries the full table schema, so a missing column is an internal
764        // error rather than a skip.
765        let group_columns = columns
766            .iter()
767            .map(|column_name| {
768                batch.column_by_name(column_name).cloned().ok_or_else(|| {
769                    OmniError::manifest(format!(
770                        "table {} missing unique column '{}'",
771                        table_key, column_name
772                    ))
773                })
774            })
775            .collect::<Result<Vec<_>>>()?;
776        for row in 0..batch.num_rows() {
777            // Same tuple key as the intake path — one shared derivation in
778            // `crate::loader::composite_unique_key`, so the two cannot drift on
779            // separator or scalar conversion. Null rows are exempt.
780            let Some(key) = crate::loader::composite_unique_key(&group_columns, row)? else {
781                continue;
782            };
783            let row_id = row_id_at(batch, row)?;
784            if let Some(first_row_id) = seen.insert(key, row_id.clone()) {
785                conflicts.push(MergeConflict {
786                    table_key: table_key.to_string(),
787                    row_id: Some(row_id.clone()),
788                    kind: MergeConflictKind::UniqueViolation,
789                    message: format!(
790                        "unique constraint {:?} violated by '{}' and '{}'",
791                        columns, first_row_id, row_id
792                    ),
793                });
794            }
795        }
796    }
797    Ok(())
798}
799
800fn accumulate_edge_cardinality(
801    batch: &RecordBatch,
802    counts: &mut HashMap<String, u32>,
803    table_key: &str,
804) -> Result<()> {
805    let srcs = batch
806        .column_by_name("src")
807        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
808        .as_any()
809        .downcast_ref::<StringArray>()
810        .ok_or_else(|| {
811            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
812        })?;
813    for row in 0..srcs.len() {
814        *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
815    }
816    Ok(())
817}
818
819fn finalize_edge_cardinality_conflicts(
820    table_key: &str,
821    edge_name: &str,
822    min: u32,
823    max: Option<u32>,
824    counts: HashMap<String, u32>,
825) -> Vec<MergeConflict> {
826    let mut conflicts = Vec::new();
827    for (src, count) in counts {
828        if let Some(max) = max {
829            if count > max {
830                conflicts.push(MergeConflict {
831                    table_key: table_key.to_string(),
832                    row_id: None,
833                    kind: MergeConflictKind::CardinalityViolation,
834                    message: format!(
835                        "@card violation on edge {}: source '{}' has {} edges (max {})",
836                        edge_name, src, count, max
837                    ),
838                });
839            }
840        }
841        if count < min {
842            conflicts.push(MergeConflict {
843                table_key: table_key.to_string(),
844                row_id: None,
845                kind: MergeConflictKind::CardinalityViolation,
846                message: format!(
847                    "@card violation on edge {}: source '{}' has {} edges (min {})",
848                    edge_name, src, count, min
849                ),
850            });
851        }
852    }
853    conflicts
854}
855
856fn validate_orphan_edges_batch(
857    table_key: &str,
858    edge_type: &omnigraph_compiler::catalog::EdgeType,
859    batch: &RecordBatch,
860    node_ids: &HashMap<String, HashSet<String>>,
861) -> Result<Vec<MergeConflict>> {
862    let srcs = batch
863        .column_by_name("src")
864        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
865        .as_any()
866        .downcast_ref::<StringArray>()
867        .ok_or_else(|| {
868            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
869        })?;
870    let dsts = batch
871        .column_by_name("dst")
872        .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
873        .as_any()
874        .downcast_ref::<StringArray>()
875        .ok_or_else(|| {
876            OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
877        })?;
878
879    let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
880        OmniError::manifest(format!(
881            "missing candidate node ids for {}",
882            edge_type.from_type
883        ))
884    })?;
885    let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
886        OmniError::manifest(format!(
887            "missing candidate node ids for {}",
888            edge_type.to_type
889        ))
890    })?;
891
892    let mut conflicts = Vec::new();
893    for row in 0..batch.num_rows() {
894        let row_id = row_id_at(batch, row)?;
895        let src = srcs.value(row);
896        let dst = dsts.value(row);
897        if !from_ids.contains(src) {
898            conflicts.push(MergeConflict {
899                table_key: table_key.to_string(),
900                row_id: Some(row_id.clone()),
901                kind: MergeConflictKind::OrphanEdge,
902                message: format!("src '{}' not found in {}", src, edge_type.from_type),
903            });
904        }
905        if !to_ids.contains(dst) {
906            conflicts.push(MergeConflict {
907                table_key: table_key.to_string(),
908                row_id: Some(row_id),
909                kind: MergeConflictKind::OrphanEdge,
910                message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
911            });
912        }
913    }
914    Ok(conflicts)
915}
916
917fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
918    let ids = batch
919        .column_by_name("id")
920        .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
921        .as_any()
922        .downcast_ref::<StringArray>()
923        .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
924    Ok(ids.value(row).to_string())
925}
926
927/// Classify a table whose target state equals base (the adopt / fast-forward
928/// case). Returns [`CandidateTableState::AdoptWithDelta`] — with the delta
929/// pre-computed so it can be recovery-pinned — when the adopt applies a
930/// non-empty delta onto the target's lineage (a HEAD-advancing publish via
931/// [`publish_adopted_delta`]); otherwise [`CandidateTableState::AdoptSourceState`]
932/// (a pointer switch or fork, which does not advance the data HEAD).
933///
934/// The HEAD-advancing subcases mirror [`publish_adopted_source_state`]: source
935/// on a branch with the target either on main or owning the table. Computing the
936/// delta here (rather than inside the publish) is what closes the recovery gap —
937/// the classifier knows whether the publish will move Lance HEAD.
938async fn classify_adopt(
939    target_db: &Omnigraph,
940    catalog: &Catalog,
941    base_snapshot: &Snapshot,
942    source_snapshot: &Snapshot,
943    target_snapshot: &Snapshot,
944    table_key: &str,
945) -> Result<CandidateTableState> {
946    let Some(source_entry) = source_snapshot.entry(table_key) else {
947        return Ok(CandidateTableState::AdoptSourceState);
948    };
949    let target_entry = target_snapshot.entry(table_key);
950    let target_active = target_db.active_branch().await;
951    let advances_head = match (
952        target_active.as_deref(),
953        source_entry.table_branch.as_deref(),
954    ) {
955        // Source on a branch, target on main — delta applied onto main's lineage.
956        (None, Some(_)) => true,
957        // Both on branches, target owns this table — delta applied onto it.
958        (Some(target_branch), Some(_)) => {
959            target_entry.and_then(|e| e.table_branch.as_deref()) == Some(target_branch)
960        }
961        // Source on main (pointer switch) or target doesn't own (fork): no advance.
962        _ => false,
963    };
964    if !advances_head {
965        return Ok(CandidateTableState::AdoptSourceState);
966    }
967    match compute_adopt_delta(table_key, catalog, base_snapshot, source_snapshot).await? {
968        Some(delta) => Ok(CandidateTableState::AdoptWithDelta(delta)),
969        None => Ok(CandidateTableState::AdoptSourceState),
970    }
971}
972
973/// Adopt the source's table state without applying a row delta: a pointer
974/// switch (source/target share lineage) or a branch fork. The HEAD-advancing
975/// delta case is classified [`CandidateTableState::AdoptWithDelta`] and
976/// published by [`publish_adopted_delta`], so reaching the branch-bearing arms
977/// here means the delta was empty.
978async fn publish_adopted_source_state(
979    target_db: &Omnigraph,
980    source_snapshot: &Snapshot,
981    target_snapshot: &Snapshot,
982    table_key: &str,
983) -> Result<crate::db::SubTableUpdate> {
984    let source_entry = source_snapshot
985        .entry(table_key)
986        .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
987    let target_entry = target_snapshot.entry(table_key);
988
989    let target_active = target_db.active_branch().await;
990    match (
991        target_active.as_deref(),
992        source_entry.table_branch.as_deref(),
993    ) {
994        // Both on main — pointer switch is safe (same lineage, version columns valid)
995        (None, None) => Ok(crate::db::SubTableUpdate {
996            table_key: table_key.to_string(),
997            table_version: source_entry.table_version,
998            table_branch: None,
999            row_count: source_entry.row_count,
1000            version_metadata: source_entry.version_metadata.clone(),
1001        }),
1002        // Source on main, target on branch — pointer switch to main version
1003        // (target reads from main, same lineage)
1004        (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
1005            table_key: table_key.to_string(),
1006            table_version: source_entry.table_version,
1007            table_branch: None,
1008            row_count: source_entry.row_count,
1009            version_metadata: source_entry.version_metadata.clone(),
1010        }),
1011        // Source on branch, target on main, empty delta — adopt source's
1012        // version by a pointer switch (the non-empty case is `AdoptWithDelta`).
1013        (None, Some(_source_branch)) => Ok(crate::db::SubTableUpdate {
1014            table_key: table_key.to_string(),
1015            table_version: target_entry
1016                .map(|e| e.table_version)
1017                .unwrap_or(source_entry.table_version),
1018            table_branch: None,
1019            row_count: source_entry.row_count,
1020            version_metadata: target_entry
1021                .map(|entry| entry.version_metadata.clone())
1022                .unwrap_or_else(|| source_entry.version_metadata.clone()),
1023        }),
1024        // Both on branches
1025        (Some(target_branch), Some(source_branch)) => {
1026            if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
1027                // Target already owns this table, empty delta — pointer switch
1028                // onto its own lineage (the non-empty case is `AdoptWithDelta`).
1029                Ok(crate::db::SubTableUpdate {
1030                    table_key: table_key.to_string(),
1031                    table_version: target_entry.unwrap().table_version,
1032                    table_branch: Some(target_branch.to_string()),
1033                    row_count: source_entry.row_count,
1034                    version_metadata: target_entry.unwrap().version_metadata.clone(),
1035                })
1036            } else {
1037                // Target doesn't own this table yet — fork from source state.
1038                // This creates the target branch on the sub-table dataset.
1039                let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
1040                let ds = target_db
1041                    .fork_dataset_from_entry_state(
1042                        table_key,
1043                        &full_path,
1044                        Some(source_branch),
1045                        source_entry.table_version,
1046                        target_branch,
1047                    )
1048                    .await?;
1049                let state = target_db.storage().table_state(&full_path, &ds).await?;
1050                Ok(crate::db::SubTableUpdate {
1051                    table_key: table_key.to_string(),
1052                    table_version: state.version,
1053                    table_branch: Some(target_branch.to_string()),
1054                    row_count: state.row_count,
1055                    version_metadata: state.version_metadata,
1056                })
1057            }
1058        }
1059    }
1060}
1061
1062async fn publish_rewritten_merge_table(
1063    target_db: &Omnigraph,
1064    table_key: &str,
1065    staged: &StagedMergeResult,
1066) -> Result<crate::db::SubTableUpdate> {
1067    // Branch merge's source-rewrite path is Merge-shaped (upsert from
1068    // source onto target). The inline `delete_where` later in this
1069    // function operates on rows the rewrite chose to remove, not
1070    // user-facing predicates, so Merge is the correct policy here.
1071    let (ds, full_path, table_branch) = target_db
1072        .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
1073        .await?;
1074    let mut current_ds = ds;
1075
1076    // Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
1077    // existing rows, bumps _row_last_updated_at_version only for actually-changed rows).
1078    //
1079    // Routed through the staged primitive so a failure between writing
1080    // fragments and committing leaves no Lance-HEAD drift. The
1081    // commit_staged here is per-table per-call (Lance has no
1082    // multi-dataset atomic commit); the residual sits at this single
1083    // commit point, narrowed from the previous "merge_insert + delete +
1084    // index" multi-step inline-commit chain.
1085    if let Some(delta) = &staged.delta_staged {
1086        // The staged delta dataset is a temp-dir Lance dataset used only
1087        // to collect the rewrite batches; wrap it in a `SnapshotHandle`
1088        // so we can route through the trait's `scan_batches_for_rewrite`.
1089        let delta_snapshot = SnapshotHandle::new(delta.dataset.clone());
1090        let batches: Vec<RecordBatch> = target_db
1091            .storage()
1092            .scan_batches_for_rewrite(&delta_snapshot)
1093            .await?
1094            .into_iter()
1095            .filter(|batch| batch.num_rows() > 0)
1096            .collect();
1097        if !batches.is_empty() {
1098            // Concat into one batch — stage_merge_insert takes a single batch.
1099            let combined = if batches.len() == 1 {
1100                batches.into_iter().next().unwrap()
1101            } else {
1102                let schema = batches[0].schema();
1103                arrow_select::concat::concat_batches(&schema, &batches)
1104                    .map_err(|e| OmniError::Lance(e.to_string()))?
1105            };
1106            let staged_merge = target_db
1107                .storage()
1108                .stage_merge_insert(
1109                    current_ds.clone(),
1110                    combined,
1111                    vec!["id".to_string()],
1112                    lance::dataset::WhenMatched::UpdateAll,
1113                    lance::dataset::WhenNotMatched::InsertAll,
1114                )
1115                .await?;
1116            current_ds = target_db
1117                .storage()
1118                .commit_staged(current_ds, staged_merge)
1119                .await?;
1120        }
1121    }
1122
1123    // Failpoint: crash after the Phase 1 merge_insert commit, before the delete.
1124    // Models a partial Phase B on the three-way path — the merged constructive
1125    // rows are on Lance HEAD but the delete has not committed and the
1126    // achieved-version intent has not been recorded, so recovery must roll BACK.
1127    // See tests/failpoints.rs::branch_merge_rewrite_partial_after_merge_rolls_back.
1128    crate::failpoints::maybe_fail("branch_merge.rewrite_after_merge_pre_delete")?;
1129
1130    // Phase 2: delete removed rows via deletion vectors.
1131    //
1132    // INLINE-COMMIT RESIDUAL: lance-6.0.1 does not expose a public
1133    // two-phase delete API (DeleteJob is `pub(crate)` —
1134    // lance-format/lance#6658 is open with no PRs). We deliberately do
1135    // NOT introduce a `stage_delete` wrapper that would secretly
1136    // inline-commit (it would create a side-channel between the staged
1137    // and inline write paths). When the upstream API ships, swap this
1138    // `delete_where` call for `stage_delete` + `commit_staged`.
1139    if !staged.deleted_ids.is_empty() {
1140        let escaped: Vec<String> = staged
1141            .deleted_ids
1142            .iter()
1143            .map(|id| format!("'{}'", id.replace('\'', "''")))
1144            .collect();
1145        let filter = format!("id IN ({})", escaped.join(", "));
1146        let (new_ds, _) = target_db
1147            .storage_inline_residual()
1148            .delete_where(&full_path, current_ds, &filter)
1149            .await?;
1150        current_ds = new_ds;
1151    }
1152
1153    // Failpoint: crash after the Phase 2 delete commit, before the index build.
1154    // Models a partial Phase B on the three-way path — constructive rows +
1155    // deletes are on Lance HEAD but the achieved-version intent has not been
1156    // recorded, so recovery must roll BACK (the index is reconciler-owned derived
1157    // state, but the merge itself never reached its commit boundary). See
1158    // tests/failpoints.rs::branch_merge_rewrite_partial_after_delete_rolls_back.
1159    crate::failpoints::maybe_fail("branch_merge.rewrite_after_delete_pre_index")?;
1160
1161    // Phase 3: rebuild indices.
1162    //
1163    // `build_indices_on_dataset` uses `stage_create_btree_index` /
1164    // `stage_create_inverted_index` + `commit_staged` for scalar
1165    // indices. Vector indices remain inline-commit
1166    // (`build_index_metadata_from_segments` is `pub(crate)` in lance-
1167    // 6.0.1 — companion ticket to lance-format/lance#6666).
1168    let row_count = target_db
1169        .storage()
1170        .table_state(&full_path, &current_ds)
1171        .await?
1172        .row_count;
1173    if row_count > 0 {
1174        target_db
1175            .build_indices_on_dataset(table_key, &mut current_ds)
1176            .await?;
1177    }
1178    let final_state = target_db
1179        .storage()
1180        .table_state(&full_path, &current_ds)
1181        .await?;
1182
1183    Ok(crate::db::SubTableUpdate {
1184        table_key: table_key.to_string(),
1185        table_version: final_state.version,
1186        table_branch,
1187        row_count: final_state.row_count,
1188        version_metadata: final_state.version_metadata,
1189    })
1190}
1191
1192/// Scan a staged temp table and concat its non-empty batches into the single
1193/// batch that `stage_append` / `stage_merge_insert` consume. Returns `None` when
1194/// the table has no rows (both staged primitives reject an empty batch).
1195async fn scan_staged_combined(
1196    target_db: &Omnigraph,
1197    table: &StagedTable,
1198) -> Result<Option<RecordBatch>> {
1199    crate::instrumentation::record_scan_staged_combined();
1200    let snapshot = SnapshotHandle::new(table.dataset.clone());
1201    let batches: Vec<RecordBatch> = target_db
1202        .storage()
1203        .scan_batches_for_rewrite(&snapshot)
1204        .await?
1205        .into_iter()
1206        .filter(|batch| batch.num_rows() > 0)
1207        .collect();
1208    if batches.is_empty() {
1209        return Ok(None);
1210    }
1211    let combined = if batches.len() == 1 {
1212        batches.into_iter().next().unwrap()
1213    } else {
1214        let schema = batches[0].schema();
1215        arrow_select::concat::concat_batches(&schema, &batches)
1216            .map_err(|e| OmniError::Lance(e.to_string()))?
1217    };
1218    Ok(Some(combined))
1219}
1220
1221/// Apply an [`AdoptDelta`] onto the target's base lineage (the fast-forward /
1222/// target-owns path). Kept separate from `publish_rewritten_merge_table` (the
1223/// three-way path) because the two paths diverge: commit 3 splits this Phase 1
1224/// into append (new) + merge_insert (changed), and commit 6 makes its index
1225/// coverage incremental — neither of which the three-way path takes.
1226///
1227/// `open_for_mutation(Merge)` opens the target's own table lineage (active
1228/// branch is the merge target after the caller's swap), so every write lands on
1229/// the target and survives source-branch deletion — GC-safe.
1230///
1231/// TRANSITIONAL — removed by the fragment-adopt work (see [`AdoptDelta`]): the
1232/// multi-commit append → upsert → delete publish here (the source of the
1233/// partial-Phase-B recovery window the sidecar confirmation guards) collapses to
1234/// a single fragment-graft commit per table, so this whole function goes away.
1235async fn publish_adopted_delta(
1236    target_db: &Omnigraph,
1237    table_key: &str,
1238    delta: &AdoptDelta,
1239) -> Result<crate::db::SubTableUpdate> {
1240    let (ds, full_path, table_branch) = target_db
1241        .open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
1242        .await?;
1243    let mut current_ds = ds;
1244
1245    // Phase 1a: append the NEW rows. `stage_append_stream` is a streaming
1246    // `Operation::Append` — no hash join — so it never buffers the delta and
1247    // cannot exhaust the DataFusion memory pool (the OOM fix). It streams the
1248    // staged rows straight into the target (Lance rolls fragments at
1249    // `max_rows_per_file`), so memory is bounded regardless of how many rows the
1250    // connector appended — never the whole set in one batch. New ids are absent
1251    // from base by construction (the ordered walk only classifies a row
1252    // `(None, Some)` when base lacks it), so they never collide on `id`. Routed
1253    // through the staged primitive so a failure between writing fragments and
1254    // committing leaves no Lance-HEAD drift. `appends` is `Some` only when the
1255    // staged table is non-empty (`compute_adopt_delta`).
1256    if let Some(append_table) = &delta.appends {
1257        let source = SnapshotHandle::new(append_table.dataset.clone());
1258        let staged = target_db
1259            .storage()
1260            .stage_append_stream(&current_ds, &source, &[])
1261            .await?;
1262        current_ds = target_db
1263            .storage()
1264            .commit_staged(current_ds, staged)
1265            .await?;
1266    }
1267
1268    // Failpoint: crash after the Phase 1a append commit, before the upsert.
1269    // Models a partial Phase B — appends are on Lance HEAD but the upserts/deletes
1270    // have not committed and the achieved-version intent has not been recorded, so
1271    // recovery must roll BACK (not publish the appends-only state). See
1272    // tests/failpoints.rs::branch_merge_adopt_partial_after_append_rolls_back.
1273    crate::failpoints::maybe_fail("branch_merge.adopt_after_append_pre_upsert")?;
1274
1275    // Phase 1b: upsert the CHANGED rows. The merge_insert hash join is now
1276    // bounded to the genuinely-changed set, not the whole delta. It runs against
1277    // the committed view that already includes the appends; the changed ids are
1278    // disjoint from the appended ids (each id is classified into exactly one of
1279    // new / changed / deleted / unchanged in the single ordered walk), so the
1280    // join never collides with an appended row.
1281    if let Some(upsert_table) = &delta.upserts {
1282        if let Some(combined) = scan_staged_combined(target_db, upsert_table).await? {
1283            let staged_merge = target_db
1284                .storage()
1285                .stage_merge_insert(
1286                    current_ds.clone(),
1287                    combined,
1288                    vec!["id".to_string()],
1289                    lance::dataset::WhenMatched::UpdateAll,
1290                    lance::dataset::WhenNotMatched::InsertAll,
1291                )
1292                .await?;
1293            current_ds = target_db
1294                .storage()
1295                .commit_staged(current_ds, staged_merge)
1296                .await?;
1297        }
1298    }
1299
1300    // Failpoint: crash after the Phase 1b upsert commit, before the delete.
1301    // Models a partial Phase B — appends + upserts on Lance HEAD but the delete
1302    // has not committed and the achieved-version intent has not been recorded, so
1303    // recovery must roll BACK. See
1304    // tests/failpoints.rs::branch_merge_adopt_partial_after_upsert_rolls_back.
1305    crate::failpoints::maybe_fail("branch_merge.adopt_after_upsert_pre_delete")?;
1306
1307    // Phase 2: delete removed rows via deletion vectors (inline-commit residual,
1308    // same as the three-way path until Lance ships a public two-phase delete).
1309    if !delta.deleted_ids.is_empty() {
1310        let escaped: Vec<String> = delta
1311            .deleted_ids
1312            .iter()
1313            .map(|id| format!("'{}'", id.replace('\'', "''")))
1314            .collect();
1315        let filter = format!("id IN ({})", escaped.join(", "));
1316        let (new_ds, _) = target_db
1317            .storage_inline_residual()
1318            .delete_where(&full_path, current_ds, &filter)
1319            .await?;
1320        current_ds = new_ds;
1321    }
1322
1323    // Phase 4: index coverage is reconciler-owned on the adopt path. Unlike the
1324    // three-way `RewriteMerged` path, this does NOT build indices inline: the
1325    // appended/upserted rows are left uncovered (reads stay correct via
1326    // brute-force — indexes are derived state, invariant 7) and
1327    // `optimize` / `ensure_indices` folds them in. This keeps even the first
1328    // merge into a freshly schema-applied (unindexed) table fast — no inline IVF
1329    // retrain on the publish path — and is the row-level approximation of Layer
1330    // 2's fragment-adopt, where the source branch's already-built indices carry
1331    // over by reference. See docs/user/branching/merge.md.
1332    let final_state = target_db
1333        .storage()
1334        .table_state(&full_path, &current_ds)
1335        .await?;
1336
1337    Ok(crate::db::SubTableUpdate {
1338        table_key: table_key.to_string(),
1339        table_version: final_state.version,
1340        table_branch,
1341        row_count: final_state.row_count,
1342        version_metadata: final_state.version_metadata,
1343    })
1344}
1345
1346impl Omnigraph {
1347    pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
1348        self.branch_merge_as(source, target, None).await
1349    }
1350
1351    pub async fn branch_merge_as(
1352        &self,
1353        source: &str,
1354        target: &str,
1355        actor_id: Option<&str>,
1356    ) -> Result<MergeOutcome> {
1357        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
1358        // `BranchTransition { source, target }` — matches the HTTP-layer
1359        // convention at `server_branch_merge` (branch=Some(source),
1360        // target_branch=Some(target)). Cedar rules using
1361        // `target_branch_scope: protected` therefore correctly gate
1362        // merges INTO protected branches without forbidding the
1363        // (symmetric) source-side reference.
1364        self.enforce(
1365            omnigraph_policy::PolicyAction::BranchMerge,
1366            &omnigraph_policy::ResourceScope::BranchTransition {
1367                source: source.to_string(),
1368                target: target.to_string(),
1369            },
1370            actor_id,
1371        )?;
1372        self.ensure_schema_apply_idle("branch_merge").await?;
1373        // Converge any pending recovery sidecar before the merge
1374        // captures its target snapshot: the merge's publish would
1375        // otherwise make the drifted Phase-B commit visible as an
1376        // unattributed side effect (manifest catches up to HEAD with no
1377        // recovery audit row) and leave the stale sidecar behind. Runs
1378        // before the merge's own sidecar exists.
1379        self.heal_pending_recovery_sidecars().await?;
1380        self.branch_merge_impl(source, target, actor_id).await
1381    }
1382
1383    async fn branch_merge_impl(
1384        &self,
1385        source: &str,
1386        target: &str,
1387        actor_id: Option<&str>,
1388    ) -> Result<MergeOutcome> {
1389        if is_internal_system_branch(source) || is_internal_system_branch(target) {
1390            return Err(OmniError::manifest(format!(
1391                "branch_merge does not allow internal system refs ('{}' -> '{}')",
1392                source, target
1393            )));
1394        }
1395        let source_branch = Omnigraph::normalize_branch_name(source)?;
1396        let target_branch = Omnigraph::normalize_branch_name(target)?;
1397        if source_branch == target_branch {
1398            return Err(OmniError::manifest(
1399                "branch_merge requires distinct source and target branches".to_string(),
1400            ));
1401        }
1402
1403        let source_head_commit_id = self
1404            .head_commit_id_for_branch(source_branch.as_deref())
1405            .await?
1406            .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1407        let target_head_commit_id = self
1408            .head_commit_id_for_branch(target_branch.as_deref())
1409            .await?
1410            .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1411        let base_commit = CommitGraph::merge_base(
1412            self.uri(),
1413            source_branch.as_deref(),
1414            target_branch.as_deref(),
1415        )
1416        .await?
1417        .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1418
1419        if source_head_commit_id == target_head_commit_id
1420            || base_commit.graph_commit_id == source_head_commit_id
1421        {
1422            return Ok(MergeOutcome::AlreadyUpToDate);
1423        }
1424        let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1425
1426        let base_snapshot = ManifestCoordinator::snapshot_at(
1427            self.uri(),
1428            base_commit.manifest_branch.as_deref(),
1429            base_commit.manifest_version,
1430        )
1431        .await?;
1432        let source_snapshot = self
1433            .resolved_target(ReadTarget::Branch(
1434                source_branch.clone().unwrap_or_else(|| "main".to_string()),
1435            ))
1436            .await?
1437            .snapshot;
1438        // Hold the merge-exclusive mutex across the full swap → operate
1439        // → restore window. Two concurrent branch_merge calls would
1440        // otherwise interleave their three separate `coordinator.write()`
1441        // acquisitions, leaving each merge's body running against the
1442        // other's swapped coord. Pinned by
1443        // `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
1444        // in `crates/omnigraph-server/tests/server.rs`.
1445        let merge_exclusive = self.merge_exclusive();
1446        let _merge_guard = merge_exclusive.lock().await;
1447
1448        let previous_branch = self.active_branch().await;
1449        let previous = self
1450            .swap_coordinator_for_branch(target_branch.as_deref())
1451            .await?;
1452        let merge_result = self
1453            .branch_merge_on_current_target(
1454                &base_snapshot,
1455                &source_snapshot,
1456                &target_head_commit_id,
1457                &source_head_commit_id,
1458                is_fast_forward,
1459                actor_id,
1460            )
1461            .await;
1462        self.restore_coordinator(previous).await;
1463
1464        // Sync the restored coordinator's cached manifest snapshot with
1465        // disk on both Ok and Err paths. During the swap window above,
1466        // `self.coordinator` was a freshly opened coord for the merge
1467        // target; any concurrent writer on that target (e.g. a `/change`
1468        // on `main` racing a `merge into=main`) publishes against the
1469        // swapped coord and never touches the original. Without this
1470        // sync, the restored coord's cached manifest snapshot would
1471        // diverge from disk and seed a stale `expected_versions` into
1472        // the next op's publisher CAS fence — a non-retryable
1473        // `ExpectedVersionMismatch` for a user with no concurrent
1474        // writer of their own. Pinned by the
1475        // `[d:merge×change:into-target]` cell of
1476        // `concurrent_branch_ops_morphological_matrix` in
1477        // `crates/omnigraph-server/tests/server.rs`, which flakes
1478        // pre-fix and stabilises post-fix.
1479        //
1480        // Use `refresh_coordinator_only` rather than `refresh` so the
1481        // recovery sweep doesn't race the merge's own in-flight
1482        // sidecar: when the merge body returns Err between Phase B
1483        // (per-table `commit_staged` + sidecar write) and Phase C
1484        // (manifest publish + sidecar delete), the sidecar is still on
1485        // disk. `refresh`'s `RollForwardOnly` sweep would observe it
1486        // and close it here — masking the failure from the next
1487        // `Omnigraph::open` sweep and from the audit row that the open
1488        // sweep emits. Pinned by
1489        // `branch_merge_phase_b_failure_recovered_on_next_open` in
1490        // `crates/omnigraph/tests/failpoints.rs`.
1491        //
1492        // Err-path refresh is best-effort: the merge body's error
1493        // (typically the structured `manifest_conflict` from the
1494        // post_queue_snapshot drift check) is the value the caller
1495        // needs to see. A refresh-time storage error would replace
1496        // that with a less informative error; the next op or the next
1497        // `Omnigraph::open` will re-sync the coord anyway.
1498        if previous_branch == target_branch {
1499            if let Err(refresh_err) = self.refresh_coordinator_only().await {
1500                if merge_result.is_ok() {
1501                    return Err(refresh_err);
1502                }
1503                tracing::warn!(
1504                    error = %refresh_err,
1505                    "post-merge coordinator refresh failed on the error path; \
1506                     the next op or open will re-sync"
1507                );
1508            }
1509        }
1510
1511        merge_result
1512    }
1513
1514    async fn branch_merge_on_current_target(
1515        &self,
1516        base_snapshot: &Snapshot,
1517        source_snapshot: &Snapshot,
1518        target_head_commit_id: &str,
1519        source_head_commit_id: &str,
1520        is_fast_forward: bool,
1521        actor_id: Option<&str>,
1522    ) -> Result<MergeOutcome> {
1523        self.ensure_commit_graph_initialized().await?;
1524        let target_snapshot = self.snapshot().await;
1525
1526        let mut table_keys = HashSet::new();
1527        for entry in base_snapshot.entries() {
1528            table_keys.insert(entry.table_key.clone());
1529        }
1530        for entry in source_snapshot.entries() {
1531            table_keys.insert(entry.table_key.clone());
1532        }
1533        for entry in target_snapshot.entries() {
1534            table_keys.insert(entry.table_key.clone());
1535        }
1536
1537        let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1538        ordered_table_keys.sort();
1539
1540        let mut conflicts = Vec::new();
1541        let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1542
1543        for table_key in &ordered_table_keys {
1544            let base_entry = base_snapshot.entry(table_key);
1545            let source_entry = source_snapshot.entry(table_key);
1546            let target_entry = target_snapshot.entry(table_key);
1547            if same_manifest_state(source_entry, target_entry) {
1548                continue;
1549            }
1550            if same_manifest_state(base_entry, source_entry) {
1551                continue;
1552            }
1553            if same_manifest_state(base_entry, target_entry) {
1554                let candidate = classify_adopt(
1555                    self,
1556                    &self.catalog(),
1557                    base_snapshot,
1558                    source_snapshot,
1559                    &target_snapshot,
1560                    table_key,
1561                )
1562                .await?;
1563                candidates.insert(table_key.clone(), candidate);
1564                continue;
1565            }
1566
1567            if let Some(staged) = stage_streaming_table_merge(
1568                table_key,
1569                &self.catalog(),
1570                base_snapshot,
1571                source_snapshot,
1572                &target_snapshot,
1573                &mut conflicts,
1574            )
1575            .await?
1576            {
1577                candidates.insert(
1578                    table_key.clone(),
1579                    CandidateTableState::RewriteMerged(staged),
1580                );
1581            }
1582        }
1583
1584        if !conflicts.is_empty() {
1585            return Err(OmniError::MergeConflicts(conflicts));
1586        }
1587
1588        validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1589
1590        // Recovery sidecar: protect the per-table commit_staged loop.
1591        // Pin `RewriteMerged` and `AdoptWithDelta` candidates — both advance
1592        // Lance HEAD before the manifest publish (RewriteMerged via
1593        // publish_rewritten_merge_table; AdoptWithDelta via publish_adopted_delta:
1594        // stage_append + stage_merge_insert + delete_where + index — multiple
1595        // commit_staged calls per table, which the loose classification handles
1596        // as multi-step drift).
1597        //
1598        // `AdoptSourceState` candidates are NOT pinned: their publish
1599        // (`publish_adopted_source_state`) is a pure pointer switch or a fork
1600        // (`fork_dataset_from_entry_state` only adds a Lance branch ref), neither
1601        // of which advances the data HEAD. Pinning them would classify as
1602        // NoMovement and force an all-or-nothing rollback that destroys sibling
1603        // tables' committed work.
1604        //
1605        // The former gap — adopt subcases that applied a non-empty delta advanced
1606        // HEAD unpinned — is closed: `classify_adopt` pre-computes the delta, so a
1607        // HEAD-advancing adopt is `AdoptWithDelta` (pinned here) and an empty-delta
1608        // adopt stays `AdoptSourceState`.
1609        // Acquire per-(table_key, target_branch) queues for every table
1610        // touched by the merge plan. Sorted-order acquisition prevents
1611        // lock-order inversion against concurrent multi-table writers.
1612        // The active branch (set by the caller's `swap_coordinator_for_branch`)
1613        // is the merge target; queue keys are scoped to it because a
1614        // branch_merge writes only to the target branch.
1615        //
1616        // Held across the per-table publish loop and the manifest
1617        // commit + record_merge_commit calls below, so no concurrent
1618        // writer to a touched (table, target_branch) can interleave
1619        // between our commit_staged and our publish.
1620        let active_branch_for_keys = self.active_branch().await;
1621        let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
1622            .iter()
1623            .filter(|table_key| {
1624                matches!(
1625                    candidates.get(*table_key),
1626                    Some(CandidateTableState::RewriteMerged(_))
1627                        | Some(CandidateTableState::AdoptSourceState)
1628                        | Some(CandidateTableState::AdoptWithDelta(_))
1629                )
1630            })
1631            .map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
1632            .collect();
1633        let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
1634
1635        let post_queue_snapshot = self.snapshot().await;
1636        for table_key in &ordered_table_keys {
1637            let Some(candidate) = candidates.get(table_key) else {
1638                continue;
1639            };
1640            if !matches!(
1641                candidate,
1642                CandidateTableState::RewriteMerged(_)
1643                    | CandidateTableState::AdoptSourceState
1644                    | CandidateTableState::AdoptWithDelta(_)
1645            ) {
1646                continue;
1647            }
1648            let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
1649            let current = post_queue_snapshot
1650                .entry(table_key)
1651                .map(|e| e.table_version);
1652            if expected != current {
1653                return Err(OmniError::manifest_expected_version_mismatch(
1654                    table_key.clone(),
1655                    expected.unwrap_or(0),
1656                    current.unwrap_or(0),
1657                ));
1658            }
1659        }
1660
1661        let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
1662            .iter()
1663            .filter_map(|table_key| {
1664                let candidate = candidates.get(table_key)?;
1665                if !matches!(
1666                    candidate,
1667                    CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptWithDelta(_)
1668                ) {
1669                    return None;
1670                }
1671                let entry = target_snapshot.entry(table_key)?;
1672                Some(crate::db::manifest::SidecarTablePin {
1673                    table_key: table_key.clone(),
1674                    table_path: self.storage().dataset_uri(&entry.table_path),
1675                    expected_version: entry.table_version,
1676                    post_commit_pin: entry.table_version + 1,
1677                    // Stamped after the whole per-table publish completes
1678                    // (Phase-B confirmation, just before the manifest publish).
1679                    // Until then `None` marks an unfinished publish that
1680                    // recovery must roll back, not roll forward.
1681                    confirmed_version: None,
1682                    // Use the merge target branch (where commits actually
1683                    // land), NOT entry.table_branch (where the table
1684                    // currently lives). publish_rewritten_merge_table calls
1685                    // open_for_mutation, which forks an inherited-from-main
1686                    // table to active_branch on first write — the resulting
1687                    // Lance commit lands on active_branch. Recovery's
1688                    // open_lance_head must check the same branch, otherwise
1689                    // an inherited-table feature-to-feature merge classifies
1690                    // as NoMovement and the all-or-nothing rollback skips
1691                    // the orphaned post-Phase-B HEAD on the target ref.
1692                    // Same rationale as table_ops.rs:115-120 in
1693                    // ensure_indices_for_branch.
1694                    table_branch: active_branch_for_keys.clone(),
1695                })
1696            })
1697            .collect();
1698        // Keep the sidecar alongside its handle: after the per-table publish
1699        // loop completes (Phase B), we re-write it with each table's confirmed
1700        // version before the manifest publish, so recovery can tell a finished
1701        // publish (roll forward) from a partial one (roll back).
1702        let mut recovery: Option<(
1703            crate::db::manifest::RecoverySidecar,
1704            crate::db::manifest::RecoverySidecarHandle,
1705        )> = if recovery_pins.is_empty() {
1706            None
1707        } else {
1708            // Use the merge target branch directly, NOT a heuristic
1709            // derived from `ordered_table_keys.first()`. The first
1710            // sorted table key may not be in the target snapshot at all
1711            // (its `entry()` returns None → branch becomes None == main),
1712            // and the SubTableEntry's `table_branch` field isn't
1713            // necessarily the merge target branch. The caller
1714            // `branch_merge` calls `swap_coordinator_for_branch(target_branch)`
1715            // before invoking this function, so `self.active_branch()`
1716            // is the target.
1717            let target_branch = active_branch_for_keys.clone();
1718            let mut sidecar = crate::db::manifest::new_sidecar(
1719                crate::db::manifest::SidecarKind::BranchMerge,
1720                target_branch,
1721                actor_id.map(str::to_string),
1722                recovery_pins,
1723            );
1724            // Carry the source branch's HEAD commit id so the recovery
1725            // sweep's audit step can record this as a MERGE commit
1726            // (linked to the source) instead of a plain commit. Without
1727            // this, future merges between the same pair lose
1728            // already-up-to-date detection and merge-base correctness.
1729            sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
1730            let handle = crate::db::manifest::write_sidecar(
1731                self.root_uri(),
1732                self.storage_adapter(),
1733                &sidecar,
1734            )
1735            .await?;
1736            Some((sidecar, handle))
1737        };
1738
1739        let mut updates = Vec::new();
1740        let mut changed_edge_tables = false;
1741        for table_key in &ordered_table_keys {
1742            let Some(candidate_state) = candidates.get(table_key) else {
1743                continue;
1744            };
1745            let update = match candidate_state {
1746                CandidateTableState::AdoptSourceState => {
1747                    publish_adopted_source_state(self, source_snapshot, &target_snapshot, table_key)
1748                        .await?
1749                }
1750                CandidateTableState::AdoptWithDelta(delta) => {
1751                    publish_adopted_delta(self, table_key, delta).await?
1752                }
1753                CandidateTableState::RewriteMerged(staged) => {
1754                    publish_rewritten_merge_table(self, table_key, staged).await?
1755                }
1756            };
1757            if table_key.starts_with("edge:") {
1758                changed_edge_tables = true;
1759            }
1760            updates.push(update);
1761        }
1762
1763        // Phase-B confirmation: every table's publish finished, so stamp the
1764        // sidecar with each table's exact achieved version before the manifest
1765        // publish. This is the commit point of the recovery WAL: a crash from
1766        // here on rolls FORWARD to these versions, while a crash anywhere in the
1767        // publish loop above left the sidecar unconfirmed and rolls BACK. The
1768        // `updates` carry the real per-table final versions (multiple
1769        // commit_staged calls per table, so not derivable from `post_commit_pin`
1770        // alone). A failure here leaves the unconfirmed sidecar → roll back.
1771        if let Some((sidecar, _)) = recovery.as_mut() {
1772            let confirmed_versions: std::collections::HashMap<String, u64> = updates
1773                .iter()
1774                .map(|u| (u.table_key.clone(), u.table_version))
1775                .collect();
1776            crate::db::manifest::confirm_sidecar_phase_b(
1777                self.root_uri(),
1778                self.storage_adapter(),
1779                sidecar,
1780                &confirmed_versions,
1781            )
1782            .await?;
1783        }
1784
1785        // Failpoint: pin the per-writer Phase B → Phase C residual for
1786        // branch_merge. Lance HEAD has advanced on every touched table
1787        // (publish_*) AND the sidecar is confirmed, but the manifest publish
1788        // below hasn't run — so recovery rolls FORWARD. Used by
1789        // `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`.
1790        crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
1791
1792        let manifest_version = if updates.is_empty() {
1793            self.version().await
1794        } else {
1795            self.commit_manifest_updates(&updates).await?
1796        };
1797
1798        // Recovery sidecar lifecycle: delete after manifest publish.
1799        // Best-effort cleanup; the merge already landed durably so
1800        // failing the user here is undesirable.
1801        if let Some((_, handle)) = recovery {
1802            if let Err(err) =
1803                crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
1804            {
1805                tracing::warn!(
1806                    error = %err,
1807                    operation_id = handle.operation_id.as_str(),
1808                    "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
1809                );
1810            }
1811        }
1812        self.record_merge_commit(
1813            manifest_version,
1814            target_head_commit_id,
1815            source_head_commit_id,
1816            actor_id,
1817        )
1818        .await?;
1819
1820        if changed_edge_tables {
1821            self.invalidate_graph_index().await;
1822        }
1823
1824        Ok(if is_fast_forward {
1825            MergeOutcome::FastForward
1826        } else {
1827            MergeOutcome::Merged
1828        })
1829    }
1830}