Skip to main content

omnigraph/exec/
merge.rs

1use super::*;
2
3const MERGE_STAGE_BATCH_ROWS: usize = 8192;
4const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
5
6#[derive(Debug)]
7enum CandidateTableState {
8    AdoptSourceState,
9    RewriteMerged(StagedMergeResult),
10}
11
12#[derive(Debug)]
13struct StagedTable {
14    _dir: TempDir,
15    dataset: Dataset,
16}
17
18#[derive(Debug)]
19struct StagedMergeResult {
20    full_staged: StagedTable,
21    delta_staged: Option<StagedTable>,
22    deleted_ids: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
26struct CursorRow {
27    id: String,
28    signature: String,
29    batch: RecordBatch,
30    row_index: usize,
31}
32
33struct OrderedTableCursor {
34    stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
35    current_batch: Option<RecordBatch>,
36    current_row: usize,
37    peeked: Option<CursorRow>,
38}
39
40impl OrderedTableCursor {
41    async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
42        let dataset = match snapshot.entry(table_key) {
43            Some(_) => Some(snapshot.open(table_key).await?),
44            None => None,
45        };
46        Self::from_dataset(dataset).await
47    }
48
49    async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
50        let stream = if let Some(ds) = dataset {
51            Some(Box::pin(
52                crate::table_store::TableStore::scan_stream(
53                    &ds,
54                    None,
55                    None,
56                    Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
57                    false,
58                )
59                .await?,
60            ))
61        } else {
62            None
63        };
64
65        Ok(Self {
66            stream,
67            current_batch: None,
68            current_row: 0,
69            peeked: None,
70        })
71    }
72
73    async fn peek_cloned(&mut self) -> Result<Option<CursorRow>> {
74        if self.peeked.is_none() {
75            self.peeked = self.next_row().await?;
76        }
77        Ok(self.peeked.clone())
78    }
79
80    async fn pop(&mut self) -> Result<Option<CursorRow>> {
81        if self.peeked.is_some() {
82            return Ok(self.peeked.take());
83        }
84        self.next_row().await
85    }
86
87    async fn next_row(&mut self) -> Result<Option<CursorRow>> {
88        loop {
89            if let Some(batch) = &self.current_batch {
90                if self.current_row < batch.num_rows() {
91                    let row_index = self.current_row;
92                    self.current_row += 1;
93                    return Ok(Some(CursorRow {
94                        id: row_id_at(batch, row_index)?,
95                        signature: row_signature(batch, row_index)?,
96                        batch: batch.clone(),
97                        row_index,
98                    }));
99                }
100            }
101
102            let Some(stream) = self.stream.as_mut() else {
103                return Ok(None);
104            };
105            match stream.try_next().await {
106                Ok(Some(batch)) => {
107                    self.current_batch = Some(batch);
108                    self.current_row = 0;
109                }
110                Ok(None) => {
111                    self.stream = None;
112                    self.current_batch = None;
113                    return Ok(None);
114                }
115                Err(err) => return Err(OmniError::Lance(err.to_string())),
116            }
117        }
118    }
119}
120
121struct StagedTableWriter {
122    schema: SchemaRef,
123    dataset_uri: String,
124    dir: TempDir,
125    dataset: Option<Dataset>,
126    buffered_rows: usize,
127    row_count: u64,
128    batches: Vec<RecordBatch>,
129}
130
131impl StagedTableWriter {
132    fn new(table_key: &str, schema: SchemaRef) -> Result<Self> {
133        let dir = merge_stage_tempdir(table_key)?;
134        let dataset_uri = dir.path().join("table.lance").to_string_lossy().to_string();
135        Ok(Self {
136            schema,
137            dataset_uri,
138            dir,
139            dataset: None,
140            buffered_rows: 0,
141            row_count: 0,
142            batches: Vec::new(),
143        })
144    }
145
146    async fn push_row(&mut self, row: &CursorRow) -> Result<()> {
147        self.row_count += 1;
148        self.buffered_rows += 1;
149        self.batches.push(row.batch.slice(row.row_index, 1));
150        if self.buffered_rows >= MERGE_STAGE_BATCH_ROWS {
151            self.flush().await?;
152        }
153        Ok(())
154    }
155
156    async fn finish(mut self) -> Result<StagedTable> {
157        self.flush().await?;
158        if self.dataset.is_none() {
159            self.dataset = Some(
160                crate::table_store::TableStore::create_empty_dataset(
161                    &self.dataset_uri,
162                    &self.schema,
163                )
164                .await?,
165            );
166        }
167        Ok(StagedTable {
168            _dir: self.dir,
169            dataset: self.dataset.unwrap(),
170        })
171    }
172
173    async fn flush(&mut self) -> Result<()> {
174        if self.batches.is_empty() {
175            return Ok(());
176        }
177
178        let batch = if self.batches.len() == 1 {
179            self.batches.pop().unwrap()
180        } else {
181            let batches = std::mem::take(&mut self.batches);
182            arrow_select::concat::concat_batches(&self.schema, &batches)
183                .map_err(|e| OmniError::Lance(e.to_string()))?
184        };
185        self.buffered_rows = 0;
186
187        let ds = crate::table_store::TableStore::append_or_create_batch(
188            &self.dataset_uri,
189            self.dataset.take(),
190            batch,
191        )
192        .await?;
193        self.dataset = Some(ds);
194        Ok(())
195    }
196}
197
198fn merge_stage_tempdir(table_key: &str) -> Result<TempDir> {
199    if let Ok(root) = env::var(MERGE_STAGE_DIR_ENV) {
200        return TempDirBuilder::new()
201            .prefix(&format!(
202                "omnigraph-merge-{}-",
203                sanitize_table_key(table_key)
204            ))
205            .tempdir_in(PathBuf::from(root))
206            .map_err(OmniError::from);
207    }
208    TempDirBuilder::new()
209        .prefix(&format!(
210            "omnigraph-merge-{}-",
211            sanitize_table_key(table_key)
212        ))
213        .tempdir()
214        .map_err(OmniError::from)
215}
216
217fn sanitize_table_key(table_key: &str) -> String {
218    table_key
219        .chars()
220        .map(|ch| match ch {
221            ':' | '/' | '\\' => '-',
222            other => other,
223        })
224        .collect()
225}
226
227/// Computes the delta between base and source for an adopted-source merge.
228/// Returns the changed/new rows (for merge_insert) and deleted IDs (for delete).
229async fn compute_source_delta(
230    table_key: &str,
231    catalog: &Catalog,
232    base_snapshot: &Snapshot,
233    source_snapshot: &Snapshot,
234) -> Result<Option<StagedMergeResult>> {
235    let schema = schema_for_table_key(catalog, table_key)?;
236    let mut full_writer =
237        StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
238    let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
239    let mut deleted_ids: Vec<String> = Vec::new();
240    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
241    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
242
243    let mut needs_update = false;
244
245    loop {
246        let base_row = base.peek_cloned().await?;
247        let source_row = source.peek_cloned().await?;
248
249        let next_id = [base_row.as_ref(), source_row.as_ref()]
250            .into_iter()
251            .flatten()
252            .map(|row| row.id.clone())
253            .min();
254        let Some(next_id) = next_id else { break };
255
256        let base_row = if base_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
257            base.pop().await?
258        } else {
259            None
260        };
261        let source_row = if source_row.as_ref().map(|r| r.id.as_str()) == Some(next_id.as_str()) {
262            source.pop().await?
263        } else {
264            None
265        };
266
267        let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
268        let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
269
270        match (&base_row, &source_row) {
271            (Some(_), None) => {
272                // Deleted on source
273                deleted_ids.push(next_id);
274                needs_update = true;
275            }
276            (None, Some(src)) => {
277                // New on source
278                full_writer.push_row(src).await?;
279                delta_writer.push_row(src).await?;
280                needs_update = true;
281            }
282            (Some(_), Some(src)) if source_sig != base_sig => {
283                // Changed on source
284                full_writer.push_row(src).await?;
285                delta_writer.push_row(src).await?;
286                needs_update = true;
287            }
288            (Some(base), Some(_)) => {
289                // Unchanged — write to full (for validation), skip delta
290                full_writer.push_row(base).await?;
291            }
292            (None, None) => unreachable!(),
293        }
294    }
295
296    if !needs_update {
297        return Ok(None);
298    }
299
300    let delta_staged = if delta_writer.row_count > 0 {
301        Some(delta_writer.finish().await?)
302    } else {
303        None
304    };
305
306    Ok(Some(StagedMergeResult {
307        full_staged: full_writer.finish().await?,
308        delta_staged,
309        deleted_ids,
310    }))
311}
312
313fn min_cursor_id(
314    base_row: &Option<CursorRow>,
315    source_row: &Option<CursorRow>,
316    target_row: &Option<CursorRow>,
317) -> Option<String> {
318    [base_row.as_ref(), source_row.as_ref(), target_row.as_ref()]
319        .into_iter()
320        .flatten()
321        .map(|row| row.id.clone())
322        .min()
323}
324
325async fn stage_streaming_table_merge(
326    table_key: &str,
327    catalog: &Catalog,
328    base_snapshot: &Snapshot,
329    source_snapshot: &Snapshot,
330    target_snapshot: &Snapshot,
331    conflicts: &mut Vec<MergeConflict>,
332) -> Result<Option<StagedMergeResult>> {
333    let schema = schema_for_table_key(catalog, table_key)?;
334    let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
335    let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
336    let mut deleted_ids: Vec<String> = Vec::new();
337    let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
338    let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
339    let mut target = OrderedTableCursor::from_snapshot(target_snapshot, table_key).await?;
340
341    let prior_conflict_count = conflicts.len();
342    let mut needs_update = false;
343
344    loop {
345        let base_row = base.peek_cloned().await?;
346        let source_row = source.peek_cloned().await?;
347        let target_row = target.peek_cloned().await?;
348        let Some(next_id) = min_cursor_id(&base_row, &source_row, &target_row) else {
349            break;
350        };
351
352        let base_row = if base_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str()) {
353            base.pop().await?
354        } else {
355            None
356        };
357        let source_row = if source_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
358        {
359            source.pop().await?
360        } else {
361            None
362        };
363        let target_row = if target_row.as_ref().map(|row| row.id.as_str()) == Some(next_id.as_str())
364        {
365            target.pop().await?
366        } else {
367            None
368        };
369
370        let base_sig = base_row.as_ref().map(|row| row.signature.as_str());
371        let source_sig = source_row.as_ref().map(|row| row.signature.as_str());
372        let target_sig = target_row.as_ref().map(|row| row.signature.as_str());
373
374        let source_changed = source_sig != base_sig;
375        let target_changed = target_sig != base_sig;
376
377        let selection = if !source_changed {
378            target_row.as_ref()
379        } else if !target_changed {
380            source_row.as_ref()
381        } else if source_sig == target_sig {
382            target_row.as_ref()
383        } else {
384            conflicts.push(classify_merge_conflict(
385                table_key, &next_id, base_sig, source_sig, target_sig,
386            ));
387            None
388        };
389
390        if conflicts.len() > prior_conflict_count {
391            continue;
392        }
393
394        // Row existed in target but not in merge result → delete
395        if selection.is_none() && target_row.is_some() {
396            deleted_ids.push(next_id.clone());
397            needs_update = true;
398            continue;
399        }
400
401        if let Some(selection) = selection {
402            // Always write to full (for validation)
403            full_writer.push_row(selection).await?;
404            // Only write changed rows to delta (for publish)
405            if selection.signature.as_str() != target_sig.unwrap_or("") {
406                delta_writer.push_row(selection).await?;
407                needs_update = true;
408            }
409        }
410    }
411
412    if conflicts.len() > prior_conflict_count {
413        return Ok(None);
414    }
415    if !needs_update {
416        return Ok(None);
417    }
418
419    let delta_staged = if delta_writer.row_count > 0 {
420        Some(delta_writer.finish().await?)
421    } else {
422        None
423    };
424
425    Ok(Some(StagedMergeResult {
426        full_staged: full_writer.finish().await?,
427        delta_staged,
428        deleted_ids,
429    }))
430}
431
432fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<SchemaRef> {
433    if let Some(name) = table_key.strip_prefix("node:") {
434        return catalog
435            .node_types
436            .get(name)
437            .map(|t| t.arrow_schema.clone())
438            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", name)));
439    }
440    if let Some(name) = table_key.strip_prefix("edge:") {
441        return catalog
442            .edge_types
443            .get(name)
444            .map(|t| t.arrow_schema.clone())
445            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", name)));
446    }
447    Err(OmniError::manifest(format!(
448        "invalid table key '{}'",
449        table_key
450    )))
451}
452
453fn same_manifest_state(
454    left: Option<&crate::db::SubTableEntry>,
455    right: Option<&crate::db::SubTableEntry>,
456) -> bool {
457    match (left, right) {
458        (Some(left), Some(right)) => {
459            left.table_version == right.table_version && left.table_branch == right.table_branch
460        }
461        (None, None) => true,
462        _ => false,
463    }
464}
465
466fn classify_merge_conflict(
467    table_key: &str,
468    row_id: &str,
469    base_sig: Option<&str>,
470    source_sig: Option<&str>,
471    target_sig: Option<&str>,
472) -> MergeConflict {
473    let (kind, message) = match (base_sig, source_sig, target_sig) {
474        (None, Some(_), Some(_)) => (
475            MergeConflictKind::DivergentInsert,
476            format!("divergent insert for id '{}'", row_id),
477        ),
478        (Some(_), None, Some(_)) | (Some(_), Some(_), None) => (
479            MergeConflictKind::DeleteVsUpdate,
480            format!("delete/update conflict for id '{}'", row_id),
481        ),
482        _ => (
483            MergeConflictKind::DivergentUpdate,
484            format!("divergent update for id '{}'", row_id),
485        ),
486    };
487    MergeConflict {
488        table_key: table_key.to_string(),
489        row_id: Some(row_id.to_string()),
490        kind,
491        message,
492    }
493}
494
495fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
496    let mut values = Vec::with_capacity(batch.num_columns());
497    for column in batch.columns() {
498        values.push(
499            array_value_to_string(column.as_ref(), row)
500                .map_err(|e| OmniError::Lance(e.to_string()))?,
501        );
502    }
503    Ok(values.join("\u{1f}"))
504}
505
506async fn validate_merge_candidates(
507    db: &Omnigraph,
508    source_snapshot: &Snapshot,
509    target_snapshot: &Snapshot,
510    candidates: &HashMap<String, CandidateTableState>,
511) -> Result<()> {
512    let mut conflicts = Vec::new();
513    let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
514
515    for (type_name, node_type) in &db.catalog().node_types {
516        let table_key = format!("node:{}", type_name);
517        let mut values = HashSet::new();
518        let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
519
520        if let Some(ds) =
521            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
522        {
523            let mut stream =
524                crate::table_store::TableStore::scan_stream(&ds, None, None, None, false).await?;
525            while let Some(batch) = stream
526                .try_next()
527                .await
528                .map_err(|e| OmniError::Lance(e.to_string()))?
529            {
530                if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
531                    conflicts.push(MergeConflict {
532                        table_key: table_key.clone(),
533                        row_id: None,
534                        kind: MergeConflictKind::ValueConstraintViolation,
535                        message: err.to_string(),
536                    });
537                }
538                update_unique_constraints(
539                    &table_key,
540                    &batch,
541                    &node_type.unique_constraints,
542                    &mut unique_seen,
543                    &mut conflicts,
544                )?;
545                let ids = batch
546                    .column_by_name("id")
547                    .ok_or_else(|| {
548                        OmniError::manifest(format!("table {} missing id column", table_key))
549                    })?
550                    .as_any()
551                    .downcast_ref::<StringArray>()
552                    .ok_or_else(|| {
553                        OmniError::manifest(format!("table {} id column is not Utf8", table_key))
554                    })?;
555                for row in 0..ids.len() {
556                    values.insert(ids.value(row).to_string());
557                }
558            }
559        }
560        node_ids.insert(type_name.clone(), values);
561    }
562
563    for (edge_name, edge_type) in &db.catalog().edge_types {
564        let table_key = format!("edge:{}", edge_name);
565        let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
566        let mut src_counts = HashMap::new();
567
568        if let Some(ds) =
569            candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
570        {
571            let mut stream =
572                crate::table_store::TableStore::scan_stream(&ds, None, None, None, false).await?;
573            while let Some(batch) = stream
574                .try_next()
575                .await
576                .map_err(|e| OmniError::Lance(e.to_string()))?
577            {
578                update_unique_constraints(
579                    &table_key,
580                    &batch,
581                    &edge_type.unique_constraints,
582                    &mut unique_seen,
583                    &mut conflicts,
584                )?;
585                accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
586                conflicts.extend(validate_orphan_edges_batch(
587                    &table_key, edge_type, &batch, &node_ids,
588                )?);
589            }
590        }
591
592        conflicts.extend(finalize_edge_cardinality_conflicts(
593            &table_key,
594            edge_name,
595            edge_type.cardinality.min,
596            edge_type.cardinality.max,
597            src_counts,
598        ));
599    }
600
601    if conflicts.is_empty() {
602        Ok(())
603    } else {
604        Err(OmniError::MergeConflicts(conflicts))
605    }
606}
607
608async fn candidate_dataset(
609    source_snapshot: &Snapshot,
610    target_snapshot: &Snapshot,
611    candidates: &HashMap<String, CandidateTableState>,
612    table_key: &str,
613) -> Result<Option<Dataset>> {
614    if let Some(candidate) = candidates.get(table_key) {
615        return match candidate {
616            CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
617                Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
618                None => Ok(None),
619            },
620            CandidateTableState::RewriteMerged(staged) => {
621                Ok(Some(staged.full_staged.dataset.clone()))
622            }
623        };
624    }
625    match target_snapshot.entry(table_key) {
626        Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
627        None => Ok(None),
628    }
629}
630
631fn update_unique_constraints(
632    table_key: &str,
633    batch: &RecordBatch,
634    constraints: &[Vec<String>],
635    seen: &mut [HashMap<String, String>],
636    conflicts: &mut Vec<MergeConflict>,
637) -> Result<()> {
638    for (constraint_idx, columns) in constraints.iter().enumerate() {
639        let seen = &mut seen[constraint_idx];
640        for row in 0..batch.num_rows() {
641            let mut parts = Vec::with_capacity(columns.len());
642            let mut any_null = false;
643            for column_name in columns {
644                let column = batch.column_by_name(column_name).ok_or_else(|| {
645                    OmniError::manifest(format!(
646                        "table {} missing unique column '{}'",
647                        table_key, column_name
648                    ))
649                })?;
650                if column.is_null(row) {
651                    any_null = true;
652                    break;
653                }
654                parts.push(
655                    array_value_to_string(column.as_ref(), row)
656                        .map_err(|e| OmniError::Lance(e.to_string()))?,
657                );
658            }
659            if any_null {
660                continue;
661            }
662            let value = parts.join("|");
663            let row_id = row_id_at(batch, row)?;
664            if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
665                conflicts.push(MergeConflict {
666                    table_key: table_key.to_string(),
667                    row_id: Some(row_id.clone()),
668                    kind: MergeConflictKind::UniqueViolation,
669                    message: format!(
670                        "unique constraint {:?} violated by '{}' and '{}'",
671                        columns, first_row_id, row_id
672                    ),
673                });
674            }
675        }
676    }
677    Ok(())
678}
679
680fn accumulate_edge_cardinality(
681    batch: &RecordBatch,
682    counts: &mut HashMap<String, u32>,
683    table_key: &str,
684) -> Result<()> {
685    let srcs = batch
686        .column_by_name("src")
687        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
688        .as_any()
689        .downcast_ref::<StringArray>()
690        .ok_or_else(|| {
691            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
692        })?;
693    for row in 0..srcs.len() {
694        *counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
695    }
696    Ok(())
697}
698
699fn finalize_edge_cardinality_conflicts(
700    table_key: &str,
701    edge_name: &str,
702    min: u32,
703    max: Option<u32>,
704    counts: HashMap<String, u32>,
705) -> Vec<MergeConflict> {
706    let mut conflicts = Vec::new();
707    for (src, count) in counts {
708        if let Some(max) = max {
709            if count > max {
710                conflicts.push(MergeConflict {
711                    table_key: table_key.to_string(),
712                    row_id: None,
713                    kind: MergeConflictKind::CardinalityViolation,
714                    message: format!(
715                        "@card violation on edge {}: source '{}' has {} edges (max {})",
716                        edge_name, src, count, max
717                    ),
718                });
719            }
720        }
721        if count < min {
722            conflicts.push(MergeConflict {
723                table_key: table_key.to_string(),
724                row_id: None,
725                kind: MergeConflictKind::CardinalityViolation,
726                message: format!(
727                    "@card violation on edge {}: source '{}' has {} edges (min {})",
728                    edge_name, src, count, min
729                ),
730            });
731        }
732    }
733    conflicts
734}
735
736fn validate_orphan_edges_batch(
737    table_key: &str,
738    edge_type: &omnigraph_compiler::catalog::EdgeType,
739    batch: &RecordBatch,
740    node_ids: &HashMap<String, HashSet<String>>,
741) -> Result<Vec<MergeConflict>> {
742    let srcs = batch
743        .column_by_name("src")
744        .ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
745        .as_any()
746        .downcast_ref::<StringArray>()
747        .ok_or_else(|| {
748            OmniError::manifest(format!("table {} src column is not Utf8", table_key))
749        })?;
750    let dsts = batch
751        .column_by_name("dst")
752        .ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
753        .as_any()
754        .downcast_ref::<StringArray>()
755        .ok_or_else(|| {
756            OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
757        })?;
758
759    let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
760        OmniError::manifest(format!(
761            "missing candidate node ids for {}",
762            edge_type.from_type
763        ))
764    })?;
765    let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
766        OmniError::manifest(format!(
767            "missing candidate node ids for {}",
768            edge_type.to_type
769        ))
770    })?;
771
772    let mut conflicts = Vec::new();
773    for row in 0..batch.num_rows() {
774        let row_id = row_id_at(batch, row)?;
775        let src = srcs.value(row);
776        let dst = dsts.value(row);
777        if !from_ids.contains(src) {
778            conflicts.push(MergeConflict {
779                table_key: table_key.to_string(),
780                row_id: Some(row_id.clone()),
781                kind: MergeConflictKind::OrphanEdge,
782                message: format!("src '{}' not found in {}", src, edge_type.from_type),
783            });
784        }
785        if !to_ids.contains(dst) {
786            conflicts.push(MergeConflict {
787                table_key: table_key.to_string(),
788                row_id: Some(row_id),
789                kind: MergeConflictKind::OrphanEdge,
790                message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
791            });
792        }
793    }
794    Ok(conflicts)
795}
796
797fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
798    let ids = batch
799        .column_by_name("id")
800        .ok_or_else(|| OmniError::manifest("batch missing id column".to_string()))?
801        .as_any()
802        .downcast_ref::<StringArray>()
803        .ok_or_else(|| OmniError::manifest("id column is not Utf8".to_string()))?;
804    Ok(ids.value(row).to_string())
805}
806
807async fn publish_adopted_source_state(
808    target_db: &Omnigraph,
809    catalog: &Catalog,
810    base_snapshot: &Snapshot,
811    source_snapshot: &Snapshot,
812    target_snapshot: &Snapshot,
813    table_key: &str,
814) -> Result<crate::db::SubTableUpdate> {
815    let source_entry = source_snapshot
816        .entry(table_key)
817        .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
818    let target_entry = target_snapshot.entry(table_key);
819
820    match (
821        target_db.active_branch(),
822        source_entry.table_branch.as_deref(),
823    ) {
824        // Both on main — pointer switch is safe (same lineage, version columns valid)
825        (None, None) => Ok(crate::db::SubTableUpdate {
826            table_key: table_key.to_string(),
827            table_version: source_entry.table_version,
828            table_branch: None,
829            row_count: source_entry.row_count,
830            version_metadata: source_entry.version_metadata.clone(),
831        }),
832        // Source on main, target on branch — pointer switch to main version
833        // (target reads from main, same lineage)
834        (Some(_target_branch), None) => Ok(crate::db::SubTableUpdate {
835            table_key: table_key.to_string(),
836            table_version: source_entry.table_version,
837            table_branch: None,
838            row_count: source_entry.row_count,
839            version_metadata: source_entry.version_metadata.clone(),
840        }),
841        // Source on branch, target on main — apply delta to preserve version metadata
842        (None, Some(_source_branch)) => {
843            let delta =
844                compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
845            match delta {
846                Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
847                None => Ok(crate::db::SubTableUpdate {
848                    table_key: table_key.to_string(),
849                    table_version: target_entry
850                        .map(|e| e.table_version)
851                        .unwrap_or(source_entry.table_version),
852                    table_branch: None,
853                    row_count: source_entry.row_count,
854                    version_metadata: target_entry
855                        .map(|entry| entry.version_metadata.clone())
856                        .unwrap_or_else(|| source_entry.version_metadata.clone()),
857                }),
858            }
859        }
860        // Both on branches
861        (Some(target_branch), Some(source_branch)) => {
862            if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
863                // Target already owns this table — apply delta onto its lineage
864                let delta =
865                    compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
866                        .await?;
867                match delta {
868                    Some(staged) => {
869                        publish_rewritten_merge_table(target_db, table_key, &staged).await
870                    }
871                    None => Ok(crate::db::SubTableUpdate {
872                        table_key: table_key.to_string(),
873                        table_version: target_entry.unwrap().table_version,
874                        table_branch: Some(target_branch.to_string()),
875                        row_count: source_entry.row_count,
876                        version_metadata: target_entry.unwrap().version_metadata.clone(),
877                    }),
878                }
879            } else {
880                // Target doesn't own this table yet — fork from source state.
881                // This creates the target branch on the sub-table dataset.
882                let full_path = format!("{}/{}", target_db.uri(), source_entry.table_path);
883                let ds = target_db
884                    .fork_dataset_from_entry_state(
885                        table_key,
886                        &full_path,
887                        Some(source_branch),
888                        source_entry.table_version,
889                        target_branch,
890                    )
891                    .await?;
892                let state = target_db.table_store().table_state(&full_path, &ds).await?;
893                Ok(crate::db::SubTableUpdate {
894                    table_key: table_key.to_string(),
895                    table_version: state.version,
896                    table_branch: Some(target_branch.to_string()),
897                    row_count: state.row_count,
898                    version_metadata: state.version_metadata,
899                })
900            }
901        }
902    }
903}
904
905async fn publish_rewritten_merge_table(
906    target_db: &Omnigraph,
907    table_key: &str,
908    staged: &StagedMergeResult,
909) -> Result<crate::db::SubTableUpdate> {
910    let (ds, full_path, table_branch) = target_db.open_for_mutation(table_key).await?;
911    let mut current_ds = ds;
912
913    // Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
914    // existing rows, bumps _row_last_updated_at_version only for actually-changed rows)
915    if let Some(delta) = &staged.delta_staged {
916        let batches: Vec<RecordBatch> = target_db
917            .table_store()
918            .scan_batches(&delta.dataset)
919            .await?
920            .into_iter()
921            .filter(|batch| batch.num_rows() > 0)
922            .collect();
923        if !batches.is_empty() {
924            let state = target_db
925                .table_store()
926                .merge_insert_batches(
927                    &full_path,
928                    current_ds,
929                    batches,
930                    vec!["id".to_string()],
931                    lance::dataset::WhenMatched::UpdateAll,
932                    lance::dataset::WhenNotMatched::InsertAll,
933                )
934                .await?;
935            current_ds = target_db
936                .reopen_for_mutation(
937                    table_key,
938                    &full_path,
939                    table_branch.as_deref(),
940                    state.version,
941                )
942                .await?;
943        }
944    }
945
946    // Phase 2: delete removed rows via deletion vectors
947    if !staged.deleted_ids.is_empty() {
948        let escaped: Vec<String> = staged
949            .deleted_ids
950            .iter()
951            .map(|id| format!("'{}'", id.replace('\'', "''")))
952            .collect();
953        let filter = format!("id IN ({})", escaped.join(", "));
954        target_db
955            .table_store()
956            .delete_where(&full_path, &mut current_ds, &filter)
957            .await?;
958    }
959
960    // Phase 3: rebuild indices
961    let row_count = target_db
962        .table_store()
963        .table_state(&full_path, &current_ds)
964        .await?
965        .row_count;
966    if row_count > 0 {
967        target_db
968            .build_indices_on_dataset(table_key, &mut current_ds)
969            .await?;
970    }
971    let final_state = target_db
972        .table_store()
973        .table_state(&full_path, &current_ds)
974        .await?;
975
976    Ok(crate::db::SubTableUpdate {
977        table_key: table_key.to_string(),
978        table_version: final_state.version,
979        table_branch,
980        row_count: final_state.row_count,
981        version_metadata: final_state.version_metadata,
982    })
983}
984
985impl Omnigraph {
986    pub async fn branch_merge(&mut self, source: &str, target: &str) -> Result<MergeOutcome> {
987        self.branch_merge_as(source, target, None).await
988    }
989
990    pub async fn branch_merge_as(
991        &mut self,
992        source: &str,
993        target: &str,
994        actor_id: Option<&str>,
995    ) -> Result<MergeOutcome> {
996        self.ensure_schema_apply_idle("branch_merge").await?;
997        let previous_actor = self.audit_actor_id.clone();
998        self.audit_actor_id = actor_id.map(str::to_string);
999        let result = self.branch_merge_impl(source, target, false).await;
1000        self.audit_actor_id = previous_actor;
1001        result
1002    }
1003
1004    pub(crate) async fn branch_merge_internal(
1005        &mut self,
1006        source: &str,
1007        target: &str,
1008    ) -> Result<MergeOutcome> {
1009        self.branch_merge_impl(source, target, true).await
1010    }
1011
1012    async fn branch_merge_impl(
1013        &mut self,
1014        source: &str,
1015        target: &str,
1016        allow_internal_refs: bool,
1017    ) -> Result<MergeOutcome> {
1018        if !allow_internal_refs {
1019            if is_internal_run_branch(source) || is_internal_run_branch(target) {
1020                return Err(OmniError::manifest(format!(
1021                    "branch_merge does not allow internal run refs ('{}' -> '{}')",
1022                    source, target
1023                )));
1024            }
1025        }
1026        let source_branch = Omnigraph::normalize_branch_name(source)?;
1027        let target_branch = Omnigraph::normalize_branch_name(target)?;
1028        if source_branch == target_branch {
1029            return Err(OmniError::manifest(
1030                "branch_merge requires distinct source and target branches".to_string(),
1031            ));
1032        }
1033
1034        let source_head_commit_id = self
1035            .head_commit_id_for_branch(source_branch.as_deref())
1036            .await?
1037            .ok_or_else(|| OmniError::manifest("source branch has no head commit".to_string()))?;
1038        let target_head_commit_id = self
1039            .head_commit_id_for_branch(target_branch.as_deref())
1040            .await?
1041            .ok_or_else(|| OmniError::manifest("target branch has no head commit".to_string()))?;
1042        let base_commit = CommitGraph::merge_base(
1043            self.uri(),
1044            source_branch.as_deref(),
1045            target_branch.as_deref(),
1046        )
1047        .await?
1048        .ok_or_else(|| OmniError::manifest("branches have no common ancestor".to_string()))?;
1049
1050        if source_head_commit_id == target_head_commit_id
1051            || base_commit.graph_commit_id == source_head_commit_id
1052        {
1053            return Ok(MergeOutcome::AlreadyUpToDate);
1054        }
1055        let is_fast_forward = base_commit.graph_commit_id == target_head_commit_id;
1056
1057        let base_snapshot = ManifestCoordinator::snapshot_at(
1058            self.uri(),
1059            base_commit.manifest_branch.as_deref(),
1060            base_commit.manifest_version,
1061        )
1062        .await?;
1063        let source_snapshot = self
1064            .resolved_target(ReadTarget::Branch(
1065                source_branch.clone().unwrap_or_else(|| "main".to_string()),
1066            ))
1067            .await?
1068            .snapshot;
1069        let previous_branch = self.active_branch().map(str::to_string);
1070        let previous = self
1071            .swap_coordinator_for_branch(target_branch.as_deref())
1072            .await?;
1073        let merge_result = self
1074            .branch_merge_on_current_target(
1075                &base_snapshot,
1076                &source_snapshot,
1077                &target_head_commit_id,
1078                &source_head_commit_id,
1079                is_fast_forward,
1080            )
1081            .await;
1082        self.restore_coordinator(previous);
1083
1084        if merge_result.is_ok() && previous_branch == target_branch {
1085            self.refresh().await?;
1086        }
1087
1088        merge_result
1089    }
1090
1091    async fn branch_merge_on_current_target(
1092        &mut self,
1093        base_snapshot: &Snapshot,
1094        source_snapshot: &Snapshot,
1095        target_head_commit_id: &str,
1096        source_head_commit_id: &str,
1097        is_fast_forward: bool,
1098    ) -> Result<MergeOutcome> {
1099        self.ensure_commit_graph_initialized().await?;
1100        let target_snapshot = self.snapshot();
1101
1102        let mut table_keys = HashSet::new();
1103        for entry in base_snapshot.entries() {
1104            table_keys.insert(entry.table_key.clone());
1105        }
1106        for entry in source_snapshot.entries() {
1107            table_keys.insert(entry.table_key.clone());
1108        }
1109        for entry in target_snapshot.entries() {
1110            table_keys.insert(entry.table_key.clone());
1111        }
1112
1113        let mut ordered_table_keys: Vec<String> = table_keys.into_iter().collect();
1114        ordered_table_keys.sort();
1115
1116        let mut conflicts = Vec::new();
1117        let mut candidates: HashMap<String, CandidateTableState> = HashMap::new();
1118
1119        for table_key in &ordered_table_keys {
1120            let base_entry = base_snapshot.entry(table_key);
1121            let source_entry = source_snapshot.entry(table_key);
1122            let target_entry = target_snapshot.entry(table_key);
1123            if same_manifest_state(source_entry, target_entry) {
1124                continue;
1125            }
1126            if same_manifest_state(base_entry, source_entry) {
1127                continue;
1128            }
1129            if same_manifest_state(base_entry, target_entry) {
1130                candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
1131                continue;
1132            }
1133
1134            if let Some(staged) = stage_streaming_table_merge(
1135                table_key,
1136                self.catalog(),
1137                base_snapshot,
1138                source_snapshot,
1139                &target_snapshot,
1140                &mut conflicts,
1141            )
1142            .await?
1143            {
1144                candidates.insert(
1145                    table_key.clone(),
1146                    CandidateTableState::RewriteMerged(staged),
1147                );
1148            }
1149        }
1150
1151        if !conflicts.is_empty() {
1152            return Err(OmniError::MergeConflicts(conflicts));
1153        }
1154
1155        validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
1156
1157        let mut updates = Vec::new();
1158        let mut changed_edge_tables = false;
1159        for table_key in &ordered_table_keys {
1160            let Some(candidate_state) = candidates.get(table_key) else {
1161                continue;
1162            };
1163            let update = match candidate_state {
1164                CandidateTableState::AdoptSourceState => {
1165                    publish_adopted_source_state(
1166                        self,
1167                        self.catalog(),
1168                        base_snapshot,
1169                        source_snapshot,
1170                        &target_snapshot,
1171                        table_key,
1172                    )
1173                    .await?
1174                }
1175                CandidateTableState::RewriteMerged(staged) => {
1176                    publish_rewritten_merge_table(self, table_key, staged).await?
1177                }
1178            };
1179            if table_key.starts_with("edge:") {
1180                changed_edge_tables = true;
1181            }
1182            updates.push(update);
1183        }
1184
1185        let manifest_version = if updates.is_empty() {
1186            self.version()
1187        } else {
1188            self.commit_manifest_updates(&updates).await?
1189        };
1190        self.record_merge_commit(
1191            manifest_version,
1192            target_head_commit_id,
1193            source_head_commit_id,
1194        )
1195        .await?;
1196
1197        if changed_edge_tables {
1198            self.invalidate_graph_index().await;
1199        }
1200
1201        Ok(if is_fast_forward {
1202            MergeOutcome::FastForward
1203        } else {
1204            MergeOutcome::Merged
1205        })
1206    }
1207}