Skip to main content

omnigraph/exec/
merge.rs

1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8    AdoptSourceState,
9    RewriteMerged(StagedMergeResult),
10}
11
12#[derive(Debug)]
13struct StagedTable {
14    _dir: TempDir,
15    dataset: Dataset,
16}
17
18#[derive(Debug)]
19struct StagedMergeResult {
20    full_staged: StagedTable,
21    delta_staged: Option<StagedTable>,
22    deleted_ids: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
26struct CursorRow {
27    id: String,
28    signature: String,
29    batch: RecordBatch,
30    row_index: usize,
31}
32
33struct OrderedTableCursor {
34    stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
35    current_batch: Option<RecordBatch>,
36    current_row: usize,
37    peeked: Option<CursorRow>,
38}
39
40impl OrderedTableCursor {
41    async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
42        let dataset = match snapshot.entry(table_key) {
43            Some(_) => Some(snapshot.open(table_key).await?),
44            None => None,
45        };
46        Self::from_dataset(dataset).await
47    }
48
49    async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
50        let stream = if let Some(ds) = dataset {
51            Some(Box::pin(
52                crate::table_store::TableStore::scan_stream(
53                    &ds,
54                    None,
55                    None,
56                    Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
57                    false,
58                )
59                .await?,
60            ))
61        } else {
62            None
63        };
64
65        Ok(Self {
66            stream,
67            current_batch: None,
68            current_row: 0,
69            peeked: None,
70        })
71    }
72
73    async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
74        if self.peeked.is_none() {
75            self.peeked = self.next_row().await?;
76        }
77        Ok(self.peeked.clone())
78    }
79
80    async fn pop(&mut self) -> Result<Option<CursorRow>> {
81        if self.peeked.is_some() {
82            return Ok(self.peeked.take());
83        }
84        self.next_row().await
85    }
86
87    async fn next_row(&mut self) -> Result<Option<CursorRow>> {
88        loop {
89            if let Some(batch) = &self.current_batch {
90                if self.current_row < batch.num_rows() {
91                    let row_index = self.current_row;
92                    self.current_row += 1;
93                    return Ok(Some(CursorRow {
94                        id: row_id_at(batch, row_index)?,
95                        signature: row_signature(batch, row_index)?,
96                        batch: batch.clone(),
97                        row_index,
98                    }));
99                }
100            }
101
102            let Some(stream) = self.stream.as_mut() else {
103                return Ok(None);
104            };
105            match stream.try_next().await {
106                Ok(Some(batch)) => {
107                    self.current_batch = Some(batch);
108                    self.current_row = 0;
109                }
110                Ok(None) => {
111                    self.stream = None;
112                    self.current_batch = None;
113                    return Ok(None);
114                }
115                Err(err) => return Err(OmniError::Lance(err.to_string())),
116            }
117        }
118    }
119}
120
121struct StagedTableWriter {
122    schema: SchemaRef,
123    dataset_uri: String,
124    dir: TempDir,
125    dataset: Option<Dataset>,
126    buffered_rows: usize,
127    row_count: u64,
128    batches: Vec<RecordBatch>,
129}
130
131impl StagedTableWriter {
132    fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
133        let dir = merge_stage_tempdir(table_key)?;
134        let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
135        Ok(Self {
136            schema,
137            dataset_uri,
138            dir,
139            dataset: None,
140            buffered_rows: 0,
141            row_count: 0,
142            batches: Vec::new(),
143        })
144    }
145
146    async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
147        self.row_count += 1;
148        self.buffered_rows += 1;
149        self.batches.push(row.batch.slice(row.row_index, 1));
150        if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
151            self.flush().await?;
152        }
153        Ok(())
154    }
155
156    async fn finish(mut self) -> Result<StagedTable> {
157        self.flush().await?;
158        if self.dataset.is_none() {
159            self.dataset = Some(
160                crate::table_store::TableStore::create_empty_dataset(
161                    &self.dataset_uri,
162                    &self.schema,
163                )
164                .await?,
165            );
166        }
167        Ok(StagedTable {
168            _dir: self.dir,
169            dataset: self.dataset.unwrap(),
170        })
171    }
172
173    async fn flush(&mut self) -> Result<()> {
174        if self.batches.is_empty() {
175            return Ok(());
176        }
177
178        let batch = if self.batches.len() == 1 {
179            self.batches.pop().unwrap()
180        } else {
181            let batches = std::mem::take(&mut self.batches);
182            arrow_select::concat::concat_batches(&self.schema, &batches)
183                .map_err(|e| OmniError::Lance(e.to_string()))?
184        };
185        self.buffered_rows = 0;
186
187        let ds = crate::table_store::TableStore::append_or_create_batch(
188            &self.dataset_uri,
189            self.dataset.take(),
190            batch,
191        )
192        .await?;
193        self.dataset = Some(ds);
194        Ok(())
195    }
196}
197
198fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
199    if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
200        return TempDirBuilder::new()
201            .prefix(&format!(
202                "omnigraph-merge-{}-",
203                sanitize_table_key(table_key)
204            ))
205            .tempdir_in(PathBuf::from(root))
206            .map_err(OmniError::from);
207    }
208    TempDirBuilder::new()
209        .prefix(&format!(
210            "omnigraph-merge-{}-",
211            sanitize_table_key(table_key)
212        ))
213        .tempdir()
214        .map_err(OmniError::from)
215}
216
217fn sanitize_table_key(table_key: &str) -> String {
218    table_key
219        .chars()
220        .map(|ch| match ch {
221            ':' | '/' | '\\' => '-',
222            other => other,
223        })
224        .collect()
225}
226
227/// Computes the delta between base and source for an adopted-source merge.
228/// Returns the changed/new rows (for merge_insert) and deleted IDs (for delete).
229async fn compute_source_delta(
230    table_key: &str,
231    catalog: &Catalog,
232    base_snapshot: &Snapshot,
233    source_snapshot: &Snapshot,
234) -> Result<Option<StagedMergeResult>> {
235    let schema = schema_for_table_key(catalog, table_key)?;
236    let mut full_writer =
237        StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
238    let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
239    let mut deleted_ids: Vec<String> = Vec::new();
240    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
241    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
242
243    let mut needs_update = false;
244
245    loop {
246        let base_row = base.peek_cloned().await?;
247        let source_row = source.peek_cloned().await?;
248
249        let next_id = [base_row.as_ref(), source_row.as_ref()]
250            .into_iter()
251            .flatten()
252            .map(|row| row.id.clone())
253            .min();
254        let Some(next_id) = next_id else { break };
255
256        let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
257            base.pop().await?
258        } else {
259            None
260        };
261        let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
262            source.pop().await?
263        } else {
264            None
265        };
266
267        let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
268        let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
269
270        match (&base_row, &source_row) {
271            (Some(_), None) => {
272                // Deleted on source
273                deleted_ids.push(next_id);
274                needs_update = true;
275            }
276            (None, Some(src)) => {
277                // New on source
278                full_writer.push_row(src).await?;
279                delta_writer.push_row(src).await?;
280                needs_update = true;
281            }
282            (Some(_), Some(src)) if source_sig != base_sig => {
283                // Changed on source
284                full_writer.push_row(src).await?;
285                delta_writer.push_row(src).await?;
286                needs_update = true;
287            }
288            (Some(base), Some(_)) => {
289                // Unchanged — write to full (for validation), skip delta
290                full_writer.push_row(base).await?;
291            }
292            (None, None) => unreachable!(),
293        }
294    }
295
296    if !needs_update {
297        return Ok(None);
298    }
299
300    let delta_staged = if delta_writer.row_count > 0 {
301        Some(delta_writer.finish().await?)
302    } else {
303        None
304    };
305
306    Ok(Some(StagedMergeResult {
307        full_staged: full_writer.finish().await?,
308        delta_staged,
309        deleted_ids,
310    }))
311}
312
313fn min_cursor_id(
314    base_row: &Option<CursorRow>,
315    source_row: &Option<CursorRow>,
316    target_row: &Option<CursorRow>,
317) -> Option<String> {
318    [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
319        .into_iter()
320        .flatten()
321        .map(|row| row.id.clone())
322        .min()
323}
324
325async fn stage_streaming_table_merge(
326    table_key: &str,
327    catalog: &Catalog,
328    base_snapshot: &Snapshot,
329    source_snapshot: &Snapshot,
330    target_snapshot: &Snapshot,
331    conflicts: &mut Vec<MergeConflict>,
332) -> Result<Option<StagedMergeResult>> {
333    let schema = schema_for_table_key(catalog, table_key)?;
334    let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
335    let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
336    let mut deleted_ids: Vec<String> = Vec::new();
337    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
338    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
339    let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
340
341    let prior_conflict_count = conflicts.len();
342    let mut needs_update = false;
343
344    loop {
345        let base_row = base.peek_cloned().await?;
346        let source_row = source.peek_cloned().await?;
347        let target_row = target.peek_cloned().await?;
348        let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
349            break;
350        };
351
352        let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
353            base.pop().await?
354        } else {
355            None
356        };
357        let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
358        {
359            source.pop().await?
360        } else {
361            None
362        };
363        let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
364        {
365            target.pop().await?
366        } else {
367            None
368        };
369
370        let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
371        let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
372        let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
373
374        let source_changed = source_sig != base_sig;
375        let target_changed = target_sig != base_sig;
376
377        let selection = if !source_changed {
378            target_row.as_ref()
379        } else if !target_changed {
380            source_row.as_ref()
381        } else if source_sig == target_sig {
382            target_row.as_ref()
383        } else {
384            conflicts.push(classify_merge_conflict(
385                table_key, &next_id, base_sig, source_sig, target_sig,
386            ));
387            None
388        };
389
390        if conflicts.len() > prior_conflict_count {
391            continue;
392        }
393
394        // Row existed in target but not in merge result → delete
395        if selection.is_none() && target_row.is_some() {
396            deleted_ids.push(next_id.clone());
397            needs_update = true;
398            continue;
399        }
400
401        if let Some(selection) = selection {
402            // Always write to full (for validation)
403            full_writer.push_row(selection).await?;
404            // Only write changed rows to delta (for publish)
405            if selection.signature.as_str() != target_sig.unwrap_or("") {
406                delta_writer.push_row(selection).await?;
407                needs_update = true;
408            }
409        }
410    }
411
412    if conflicts.len() > prior_conflict_count {
413        return Ok(None);
414    }
415    if !needs_update {
416        return Ok(None);
417    }
418
419    let delta_staged = if delta_writer.row_count > 0 {
420        Some(delta_writer.finish().await?)
421    } else {
422        None
423    };
424
425    Ok(Some(StagedMergeResult {
426        full_staged: full_writer.finish().await?,
427        delta_staged,
428        deleted_ids,
429    }))
430}
431
432fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
433    if let Some(name) = table_key.strip_prefix("node:") {
434        return catalog
435            .node_types
436            .get(name)
437            .map(|t| t.arrow_schema.clone())
438            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
439    }
440    if let Some(name) = table_key.strip_prefix("edge:") {
441        return catalog
442            .edge_types
443            .get(name)
444            .map(|t| t.arrow_schema.clone())
445            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
446    }
447    Err(OmniError::manifest(format!(
448        "invalid table key '{}'",
449        table_key
450    )))
451}
452
453fn same_manifest_state(
454    left: Option<&crate::db::SubTableEntry>,
455    right: Option<&crate::db::SubTableEntry>,
456) -> bool {
457    match (left, right) {
458        (Some(left), Some(right)) => {
459            left.table_version == right.table_version && left.table_branch == right.table_branch
460        }
461        (None, None) => true,
462        _ => false,
463    }
464}
465
466fn classify_merge_conflict(
467    table_key: &str,
468    row_id: &str,
469    base_sig: Option<&str>,
470    source_sig: Option<&str>,
471    target_sig: Option<&str>,
472) -> MergeConflict {
473    let (kind, message) = match (base_sig, source_sig, target_sig) {
474        (None, Some(_), Some(_)) => (
475            MergeConflictKind::DivergentInsert,
476            format!("divergent insert for id '{}'", row_id),
477        ),
478        (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
479            MergeConflictKind::DeleteVsUpdate,
480            format!("delete/update conflict for id '{}'", row_id),
481        ),
482        _ => (
483            MergeConflictKind::DivergentUpdate,
484            format!("divergent update for id '{}'", row_id),
485        ),
486    };
487    MergeConflict {
488        table_key: table_key.to_string(),
489        row_id: Some(row_id.to_string()),
490        kind,
491        message,
492    }
493}
494
495fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
496    let mut values = Vec::with_capacity(batch.num_columns());
497    for column in batch.columns() {
498        values.push(
499            array_value_to_string(column.as_ref(), row)
500                .map_err(|e| OmniError::Lance(e.to_string()))?,
501        );
502    }
503    Ok(values.join("\u{1f}"))
504}
505
506async fn validate_merge_candidates(
507    db: &Omnigraph,
508    source_snapshot: &Snapshot,
509    target_snapshot: &Snapshot,
510    candidates: &HashMap<String, CandidateTableState>,
511) -> Result<()> {
512    let mut conflicts = Vec::new();
513    let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
514
515    for (type_name, node_type) in &db.catalog().node_types {
516        let table_key = format!("node:{}", type_name);
517        let mut values = HashSet::new();
518        let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
519
520        if let Some(ds) =
521            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
522        {
523            let mut stream =
524                crate::table_store::TableStore::scan_stream(&ds, None, None, None, false).await?;
525            while let Some(batch) = stream
526                .try_next()
527                .await
528                .map_err(|e| OmniError::Lance(e.to_string()))?
529            {
530                if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
531                    conflicts.push(MergeConflict {
532                        table_key: table_key.clone(),
533                        row_id: None,
534                        kind: MergeConflictKind::ValueConstraintViolation,
535                        message: err.to_string(),
536                    });
537                }
538                update_unique_constraints(
539                    &table_key,
540                    &batch,
541                    &node_type.unique_constraints,
542                    &mut unique_seen,
543                    &mut conflicts,
544                )?;
545                let ids = batch
546                    .column_by_name("id")
547                    .ok_or_else(|| {
548                        OmniError::manifest(format!("table {} missing id column", table_key))
549                    })?
550                    .as_any()
551                    .downcast_ref::<StringArray>()
552                    .ok_or_else(|| {
553                        OmniError::manifest(format!("table {} id column is not Utf8", table_key))
554                    })?;
555                for row in 0..ids.len() {
556                    values.insert(ids.value(row).to_string());
557                }
558            }
559        }
560        node_ids.insert(type_name.clone(), values);
561    }
562
563    for (edge_name, edge_type) in &db.catalog().edge_types {
564        let table_key = format!("edge:{}", edge_name);
565        let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
566        let mut src_counts = HashMap::new();
567
568        if let Some(ds) =
569            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
570        {
571            let mut stream =
572                crate::table_store::TableStore::scan_stream(&ds, None, None, None, false).await?;
573            while let Some(batch) = stream
574                .try_next()
575                .await
576                .map_err(|e| OmniError::Lance(e.to_string()))?
577            {
578                update_unique_constraints(
579                    &table_key,
580                    &batch,
581                    &edge_type.unique_constraints,
582                    &mut unique_seen,
583                    &mut conflicts,
584                )?;
585                accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
586                conflicts.extend(validate_orphan_edges_batch(
587                    &table_key, edge_type, &batch, &node_ids,
588                )?);
589            }
590        }
591
592        conflicts.extend(finalize_edge_cardinality_conflicts(
593            &table_key,
594            edge_name,
595            edge_type.cardinality.min,
596            edge_type.cardinality.max,
597            src_counts,
598        ));
599    }
600
601    if conflicts.is_empty() {
602        Ok(())
603    } else {
604        Err(OmniError::MergeConflicts(conflicts))
605    }
606}
607
608async fn candidate_dataset(
609    source_snapshot: &Snapshot,
610    target_snapshot: &Snapshot,
611    candidates: &HashMap<String, CandidateTableState>,
612    table_key: &str,
613) -> Result<Option<Dataset>> {
614    if let Some(candidate) = candidates.get(table_key) {
615        return match candidate {
616            CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
617                Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
618                None => Ok(None),
619            },
620            CandidateTableState::RewriteMerged(staged) => {
621                Ok(Some(staged.full_staged.dataset.clone()))
622            }
623        };
624    }
625    match target_snapshot.entry(table_key) {
626        Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
627        None => Ok(None),
628    }
629}
630
631fn update_unique_constraints(
632    table_key: &str,
633    batch: &RecordBatch,
634    constraints: &[Vec<String>],
635    seen: &mut [HashMap<String, String>],
636    conflicts: &mut Vec<MergeConflict>,
637) -> Result<()> {
638    for (constraint_idx, columns) in constraints.iter().enumerate() {
639        let seen = &mut seen[constraint_idx];
640        for row in 0..batch.num_rows() {
641            let mut parts = Vec::with_capacity(columns.len());
642            let mut any_null = false;
643            for column_name in columns {
644                let column = batch.column_by_name(column_name).ok_or_else(|| {
645                    OmniError::manifest(format!(
646                        "table {} missing unique column '{}'",
647                        table_key, column_name
648                    ))
649                })?;
650                if column.is_null(row) {
651                    any_null = true;
652                    break;
653                }
654                parts.push(
655                    array_value_to_string(column.as_ref(), row)
656                        .map_err(|e| OmniError::Lance(e.to_string()))?,
657                );
658            }
659            if any_null {
660                continue;
661            }
662            let value = parts.join("|");
663            let row_id = row_id_at(batch, row)?;
664            if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
665                conflicts.push(MergeConflict {
666                    table_key: table_key.to_string(),
667                    row_id: Some(row_id.clone()),
668                    kind: MergeConflictKind::UniqueViolation,
669                    message: format!(
670                        "unique constraint {:?} violated by '{}' and '{}'",
671                        columns, first_row_id, row_id
672                    ),
673                });
674            }
675        }
676    }
677    Ok(())
678}
679
680fn accumulate_edge_cardinality(
681    batch: &RecordBatch,
682    counts: &mut HashMap<String, u32>,
683    table_key: &str,
684) -> Result<()> {
685    let srcs = batch
686        .column_by_name("src")
687        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
688        .as_any()
689        .downcast_ref::<StringArray>()
690        .ok_or_else(|| {
691            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
692        })?;
693    for row in 0..srcs.len() {
694        *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
695    }
696    Ok(())
697}
698
699fn finalize_edge_cardinality_conflicts(
700    table_key: &str,
701    edge_name: &str,
702    min: u32,
703    max: Option<u32>,
704    counts: HashMap<String, u32>,
705) -> Vec<MergeConflict> {
706    let mut conflicts = Vec::new();
707    for (src, count) in counts {
708        if let Some(max) = max {
709            if count > max {
710                conflicts.push(MergeConflict {
711                    table_key: table_key.to_string(),
712                    row_id: None,
713                    kind: MergeConflictKind::CardinalityViolation,
714                    message: format!(
715                        "@card violation on edge {}: source '{}' has {} edges (max {})",
716                        edge_name, src, count, max
717                    ),
718                });
719            }
720        }
721        if count < min {
722            conflicts.push(MergeConflict {
723                table_key: table_key.to_string(),
724                row_id: None,
725                kind: MergeConflictKind::CardinalityViolation,
726                message: format!(
727                    "@card violation on edge {}: source '{}' has {} edges (min {})",
728                    edge_name, src, count, min
729                ),
730            });
731        }
732    }
733    conflicts
734}
735
736fn validate_orphan_edges_batch(
737    table_key: &str,
738    edge_type: &omnigraph_compiler::catalog::EdgeType,
739    batch: &RecordBatch,
740    node_ids: &HashMap<String, HashSet<String>>,
741) -> Result<Vec<MergeConflict>> {
742    let srcs = batch
743        .column_by_name("src")
744        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
745        .as_any()
746        .downcast_ref::<StringArray>()
747        .ok_or_else(|| {
748            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
749        })?;
750    let dsts = batch
751        .column_by_name("dst")
752        .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
753        .as_any()
754        .downcast_ref::<StringArray>()
755        .ok_or_else(|| {
756            OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
757        })?;
758
759    let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
760        OmniError::manifest(format!(
761            "missing candidate node ids for {}",
762            edge_type.from_type
763        ))
764    })?;
765    let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
766        OmniError::manifest(format!(
767            "missing candidate node ids for {}",
768            edge_type.to_type
769        ))
770    })?;
771
772    let mut conflicts = Vec::new();
773    for row in 0..batch.num_rows() {
774        let row_id = row_id_at(batch, row)?;
775        let src = srcs.value(row);
776        let dst = dsts.value(row);
777        if !from_ids.contains(src) {
778            conflicts.push(MergeConflict {
779                table_key: table_key.to_string(),
780                row_id: Some(row_id.clone()),
781                kind: MergeConflictKind::OrphanEdge,
782                message: format!("src '{}' not found in {}", src, edge_type.from_type),
783            });
784        }
785        if !to_ids.contains(dst) {
786            conflicts.push(MergeConflict {
787                table_key: table_key.to_string(),
788                row_id: Some(row_id),
789                kind: MergeConflictKind::OrphanEdge,
790                message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
791            });
792        }
793    }
794    Ok(conflicts)
795}
796
797fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
798    let ids = batch
799        .column_by_name("id")
800        .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
801        .as_any()
802        .downcast_ref::<StringArray>()
803        .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
804    Ok(ids.value(row).to_string())
805}
806
807async fn publish_adopted_source_state(
808    target_db: &Omnigraph,
809    catalog: &Catalog,
810    base_snapshot: &Snapshot,
811    source_snapshot: &Snapshot,
812    target_snapshot: &Snapshot,
813    table_key: &str,
814) -> Result<crate::db::SubTableUpdate> {
815    let source_entry = source_snapshot
816        .entry(table_key)
817        .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
818    let target_entry = target_snapshot.entry(table_key);
819
820    match (
821        target_db.active_branch(),
822        source_entry.table_branch.as_deref(),
823    ) {
824        // Both on main — pointer switch is safe (same lineage, version columns valid)
825        (None, None) => Ok(crate::db::SubTableUpdate {
826            table_key: table_key.to_string(),
827            table_version: source_entry.table_version,
828            table_branch: None,
829            row_count: source_entry.row_count,
830            version_metadata: source_entry.version_metadata.clone(),
831        }),
832        // Source on main, target on branch — pointer switch to main version
833        // (target reads from main, same lineage)
834        (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
835            table_key: table_key.to_string(),
836            table_version: source_entry.table_version,
837            table_branch: None,
838            row_count: source_entry.row_count,
839            version_metadata: source_entry.version_metadata.clone(),
840        }),
841        // Source on branch, target on main — apply delta to preserve version metadata
842        (None, Some(_source_branch)) => {
843            let delta =
844                compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
845            match delta {
846                Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
847                None => Ok(crate::db::SubTableUpdate {
848                    table_key: table_key.to_string(),
849                    table_version: target_entry
850                        .map(|e| e.table_version)
851                        .unwrap_or(source_entry.table_version),
852                    table_branch: None,
853                    row_count: source_entry.row_count,
854                    version_metadata: target_entry
855                        .map(|entry| entry.version_metadata.clone())
856                        .unwrap_or_else(|| source_entry.version_metadata.clone()),
857                }),
858            }
859        }
860        // Both on branches
861        (Some(target_branch), Some(source_branch)) => {
862            if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
863                // Target already owns this table — apply delta onto its lineage
864                let delta =
865                    compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
866                        .await?;
867                match delta {
868                    Some(staged) => {
869                        publish_rewritten_merge_table(target_db, table_key, &staged).await
870                    }
871                    None => Ok(crate::db::SubTableUpdate {
872                        table_key: table_key.to_string(),
873                        table_version: target_entry.unwrap().table_version,
874                        table_branch: Some(target_branch.to_string()),
875                        row_count: source_entry.row_count,
876                        version_metadata: target_entry.unwrap().version_metadata.clone(),
877                    }),
878                }
879            } else {
880                // Target doesn't own this table yet — fork from source state.
881                // This creates the target branch on the sub-table dataset.
882                let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
883                let ds = target_db
884                    .fork_dataset_from_entry_state(
885                        table_key,
886                        &full_path,
887                        Some(source_branch),
888                        source_entry.table_version,
889                        target_branch,
890                    )
891                    .await?;
892                let state = target_db.table_store().table_state(&full_path, &ds).await?;
893                Ok(crate::db::SubTableUpdate {
894                    table_key: table_key.to_string(),
895                    table_version: state.version,
896                    table_branch: Some(target_branch.to_string()),
897                    row_count: state.row_count,
898                    version_metadata: state.version_metadata,
899                })
900            }
901        }
902    }
903}
904
905async fn publish_rewritten_merge_table(
906    target_db: &Omnigraph,
907    table_key: &str,
908    staged: &StagedMergeResult,
909) -> Result<crate::db::SubTableUpdate> {
910    let (ds, full_path, table_branch) = target_db.open_for_mutation(table_key).await?;
911    let mut current_ds = ds;
912
913    // Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
914    // existing rows, bumps _row_last_updated_at_version only for actually-changed rows).
915    //
916    // MR-793 Phase 5: routed through the staged primitive so a failure
917    // between writing fragments and committing leaves no Lance-HEAD
918    // drift. The commit_staged here is per-table per-call (Lance has no
919    // multi-dataset atomic commit); the residual sits at this single
920    // commit point, narrowed from the previous "merge_insert + delete +
921    // index" multi-step inline-commit chain.
922    if let Some(delta) = &staged.delta_staged {
923        let batches: Vec<RecordBatch> = target_db
924            .table_store()
925            .scan_batches(&delta.dataset)
926            .await?
927            .into_iter()
928            .filter(|batch| batch.num_rows() > 0)
929            .collect();
930        if !batches.is_empty() {
931            // Concat into one batch — stage_merge_insert takes a single batch.
932            let combined = if batches.len() == 1 {
933                batches.into_iter().next().unwrap()
934            } else {
935                let schema = batches[0].schema();
936                arrow_select::concat::concat_batches(&schema, &batches)
937                    .map_err(|e| OmniError::Lance(e.to_string()))?
938            };
939            let staged_merge = target_db
940                .table_store()
941                .stage_merge_insert(
942                    current_ds.clone(),
943                    combined,
944                    vec!["id".to_string()],
945                    lance::dataset::WhenMatched::UpdateAll,
946                    lance::dataset::WhenNotMatched::InsertAll,
947                )
948                .await?;
949            current_ds = target_db
950                .table_store()
951                .commit_staged(Arc::new(current_ds), staged_merge.transaction)
952                .await?;
953        }
954    }
955
956    // Phase 2: delete removed rows via deletion vectors.
957    //
958    // INLINE-COMMIT RESIDUAL: lance-4.0.0 does not expose a public
959    // two-phase delete API (DeleteJob is `pub(crate)` —
960    // lance-format/lance#6658 is open with no PRs). MR-793 deliberately
961    // does NOT introduce a `stage_delete` wrapper that would secretly
962    // inline-commit (a side-channel — see design doc §3.2). When the
963    // upstream API ships, swap this `delete_where` call for
964    // `stage_delete` + `commit_staged`.
965    if !staged.deleted_ids.is_empty() {
966        let escaped: Vec<String> = staged
967            .deleted_ids
968            .iter()
969            .map(|id| format!("'{}'", id.replace('\'', "''")))
970            .collect();
971        let filter = format!("id IN ({})", escaped.join(", "));
972        target_db
973            .table_store()
974            .delete_where(&full_path, &mut current_ds, &filter)
975            .await?;
976    }
977
978    // Phase 3: rebuild indices.
979    //
980    // `build_indices_on_dataset` was migrated in MR-793 Phase 4 to use
981    // `stage_create_btree_index` / `stage_create_inverted_index` +
982    // `commit_staged` for scalar indices. Vector indices remain inline
983    // (residual — `build_index_metadata_from_segments` is `pub(crate)`
984    // in lance-4.0.0; companion ticket to lance-format/lance#6658).
985    let row_count = target_db
986        .table_store()
987        .table_state(&full_path, &current_ds)
988        .await?
989        .row_count;
990    if row_count > 0 {
991        target_db
992            .build_indices_on_dataset(table_key, &mut current_ds)
993            .await?;
994    }
995    let final_state = target_db
996        .table_store()
997        .table_state(&full_path, &current_ds)
998        .await?;
999
1000    Ok(crate::db::SubTableUpdate {
1001        table_key: table_key.to_string(),
1002        table_version: final_state.version,
1003        table_branch,
1004        row_count: final_state.row_count,
1005        version_metadata: final_state.version_metadata,
1006    })
1007}
1008
1009impl Omnigraph {
1010    pub async fn branch_merge(&mut self, source: &str, target: &str) -> Result<MergeOutcome> {
1011        self.branch_merge_as(source, target, None).await
1012    }
1013
1014    pub async fn branch_merge_as(
1015        &mut self,
1016        source: &str,
1017        target: &str,
1018        actor_id: Option<&str>,
1019    ) -> Result<MergeOutcome> {
1020        self.ensure_schema_apply_idle("branch_merge").await?;
1021        let previous_actor = self.audit_actor_id.clone();
1022        self.audit_actor_id = actor_id.map(str::to_string);
1023        let result = self.branch_merge_impl(source, target).await;
1024        self.audit_actor_id = previous_actor;
1025        result
1026    }
1027
1028    async fn branch_merge_impl(
1029        &mut self,
1030        source: &str,
1031        target: &str,
1032    ) -> Result<MergeOutcome> {
1033        if is_internal_run_branch(source) || is_internal_run_branch(target) {
1034            return Err(OmniError::manifest(format!(
1035                "branch_merge does not allow internal run refs ('{}' -> '{}')",
1036                source, target
1037            )));
1038        }
1039        let source_branch = Omnigraph::normalize_branch_name(source)?;
1040        let target_branch = Omnigraph::normalize_branch_name(target)?;
1041        if source_branch == target_branch {
1042            return Err(OmniError::manifest(
1043                "branch_merge requires distinct source and target branches".to_string(),
1044            ));
1045        }
1046
1047        let source_head_commit_id = self
1048            .head_commit_id_for_branch(source_branch.as_deref())
1049            .await?
1050            .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1051        let target_head_commit_id = self
1052            .head_commit_id_for_branch(target_branch.as_deref())
1053            .await?
1054            .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1055        let base_commit = CommitGraph::merge_base(
1056            self.uri(),
1057            source_branch.as_deref(),
1058            target_branch.as_deref(),
1059        )
1060        .await?
1061        .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1062
1063        if source_head_commit_id == target_head_commit_id
1064            || base_commit.graph_commit_id == source_head_commit_id
1065        {
1066            return Ok(MergeOutcome::AlreadyUpToDate);
1067        }
1068        let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1069
1070        let base_snapshot = ManifestCoordinator::snapshot_at(
1071            self.uri(),
1072            base_commit.manifest_branch.as_deref(),
1073            base_commit.manifest_version,
1074        )
1075        .await?;
1076        let source_snapshot = self
1077            .resolved_target(ReadTarget::Branch(
1078                source_branch.clone().unwrap_or_else(|| "main".to_string()),
1079            ))
1080            .await?
1081            .snapshot;
1082        let previous_branch = self.active_branch().map(str::to_string);
1083        let previous = self
1084            .swap_coordinator_for_branch(target_branch.as_deref())
1085            .await?;
1086        let merge_result = self
1087            .branch_merge_on_current_target(
1088                &base_snapshot,
1089                &source_snapshot,
1090                &target_head_commit_id,
1091                &source_head_commit_id,
1092                is_fast_forward,
1093            )
1094            .await;
1095        self.restore_coordinator(previous);
1096
1097        if merge_result.is_ok() && previous_branch == target_branch {
1098            self.refresh().await?;
1099        }
1100
1101        merge_result
1102    }
1103
1104    async fn branch_merge_on_current_target(
1105        &mut self,
1106        base_snapshot: &Snapshot,
1107        source_snapshot: &Snapshot,
1108        target_head_commit_id: &str,
1109        source_head_commit_id: &str,
1110        is_fast_forward: bool,
1111    ) -> Result<MergeOutcome> {
1112        self.ensure_commit_graph_initialized().await?;
1113        let target_snapshot = self.snapshot();
1114
1115        let mut table_keys = HashSet::new();
1116        for entry in base_snapshot.entries() {
1117            table_keys.insert(entry.table_key.clone());
1118        }
1119        for entry in source_snapshot.entries() {
1120            table_keys.insert(entry.table_key.clone());
1121        }
1122        for entry in target_snapshot.entries() {
1123            table_keys.insert(entry.table_key.clone());
1124        }
1125
1126        let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1127        ordered_table_keys.sort();
1128
1129        let mut conflicts = Vec::new();
1130        let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1131
1132        for table_key in &ordered_table_keys {
1133            let base_entry = base_snapshot.entry(table_key);
1134            let source_entry = source_snapshot.entry(table_key);
1135            let target_entry = target_snapshot.entry(table_key);
1136            if same_manifest_state(source_entry, target_entry) {
1137                continue;
1138            }
1139            if same_manifest_state(base_entry, source_entry) {
1140                continue;
1141            }
1142            if same_manifest_state(base_entry, target_entry) {
1143                candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
1144                continue;
1145            }
1146
1147            if let Some(staged) = stage_streaming_table_merge(
1148                table_key,
1149                self.catalog(),
1150                base_snapshot,
1151                source_snapshot,
1152                &target_snapshot,
1153                &mut conflicts,
1154            )
1155            .await?
1156            {
1157                candidates.insert(
1158                    table_key.clone(),
1159                    CandidateTableState::RewriteMerged(staged),
1160                );
1161            }
1162        }
1163
1164        if !conflicts.is_empty() {
1165            return Err(OmniError::MergeConflicts(conflicts));
1166        }
1167
1168        validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1169
1170        let mut updates = Vec::new();
1171        let mut changed_edge_tables = false;
1172        for table_key in &ordered_table_keys {
1173            let Some(candidate_state) = candidates.get(table_key) else {
1174                continue;
1175            };
1176            let update = match candidate_state {
1177                CandidateTableState::AdoptSourceState => {
1178                    publish_adopted_source_state(
1179                        self,
1180                        self.catalog(),
1181                        base_snapshot,
1182                        source_snapshot,
1183                        &target_snapshot,
1184                        table_key,
1185                    )
1186                    .await?
1187                }
1188                CandidateTableState::RewriteMerged(staged) => {
1189                    publish_rewritten_merge_table(self, table_key, staged).await?
1190                }
1191            };
1192            if table_key.starts_with("edge:") {
1193                changed_edge_tables = true;
1194            }
1195            updates.push(update);
1196        }
1197
1198        let manifest_version = if updates.is_empty() {
1199            self.version()
1200        } else {
1201            self.commit_manifest_updates(&updates).await?
1202        };
1203        self.record_merge_commit(
1204            manifest_version,
1205            target_head_commit_id,
1206            source_head_commit_id,
1207        )
1208        .await?;
1209
1210        if changed_edge_tables {
1211            self.invalidate_graph_index().await;
1212        }
1213
1214        Ok(if is_fast_forward {
1215            MergeOutcome::FastForward
1216        } else {
1217            MergeOutcome::Merged
1218        })
1219    }
1220}