Skip to main content

omnigraph/changes/
mod.rs

1use std::collections::HashSet;
2
3use arrow_array::{Array, RecordBatch, StringArray, UInt64Array};
4use arrow_cast::display::array_value_to_string;
5use lance::dataset::scanner::ColumnOrdering;
6
7use crate::db::SubTableEntry;
8use crate::db::manifest::Snapshot;
9use crate::error::Result;
10use crate::table_store::TableStore;
11
12// ─── Types ──────────────────────────────────────────────────────────────────
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum EntityKind {
16    Node,
17    Edge,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum ChangeOp {
22    Insert,
23    Update,
24    Delete,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct Endpoints {
29    pub src: String,
30    pub dst: String,
31}
32
33#[derive(Debug, Clone)]
34pub struct EntityChange {
35    pub table_key: String,
36    pub kind: EntityKind,
37    pub type_name: String,
38    pub id: String,
39    pub op: ChangeOp,
40    pub manifest_version: u64,
41    pub endpoints: Option<Endpoints>,
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct ChangeFilter {
46    pub kinds: Option<Vec<EntityKind>>,
47    pub type_names: Option<Vec<String>>,
48    pub ops: Option<Vec<ChangeOp>>,
49}
50
51#[derive(Debug, Clone, Default)]
52pub struct ChangeStats {
53    pub inserts: usize,
54    pub updates: usize,
55    pub deletes: usize,
56    pub types_affected: Vec<String>,
57}
58
59#[derive(Debug, Clone)]
60pub struct ChangeSet {
61    pub from_version: u64,
62    pub to_version: u64,
63    pub branch: Option<String>,
64    pub changes: Vec<EntityChange>,
65    pub stats: ChangeStats,
66}
67
68// ─── Filter helpers ─────────────────────────────────────────────────────────
69
70fn parse_table_key(table_key: &str) -> (EntityKind, &str) {
71    if let Some(name) = table_key.strip_prefix("node:") {
72        (EntityKind::Node, name)
73    } else if let Some(name) = table_key.strip_prefix("edge:") {
74        (EntityKind::Edge, name)
75    } else {
76        (EntityKind::Node, table_key)
77    }
78}
79
80impl ChangeFilter {
81    fn matches_table(&self, table_key: &str) -> bool {
82        let (kind, type_name) = parse_table_key(table_key);
83        if let Some(ref kinds) = self.kinds {
84            if !kinds.contains(&kind) {
85                return false;
86            }
87        }
88        if let Some(ref names) = self.type_names {
89            if !names.iter().any(|n| n == type_name) {
90                return false;
91            }
92        }
93        true
94    }
95
96    fn wants_op(&self, op: ChangeOp) -> bool {
97        match &self.ops {
98            Some(ops) => ops.contains(&op),
99            None => true,
100        }
101    }
102}
103
104// ─── Core diff ──────────────────────────────────────────────────────────────
105
106/// Net-current diff between two snapshots.
107///
108/// Uses a three-level algorithm:
109/// 1. Manifest diff — skip unchanged sub-tables
110/// 2. Lineage check — same branch → version-column diff; different → ID-based diff
111/// 3. Row-level diff
112pub async fn diff_snapshots(
113    root_uri: &str,
114    from: &Snapshot,
115    to: &Snapshot,
116    filter: &ChangeFilter,
117    branch: Option<String>,
118) -> Result<ChangeSet> {
119    let table_store = TableStore::new(root_uri);
120    let mut all_keys: HashSet<String> = HashSet::new();
121    for entry in from.entries() {
122        all_keys.insert(entry.table_key.clone());
123    }
124    for entry in to.entries() {
125        all_keys.insert(entry.table_key.clone());
126    }
127
128    let mut changes = Vec::new();
129
130    for table_key in &all_keys {
131        if !filter.matches_table(table_key) {
132            continue;
133        }
134
135        let from_entry = from.entry(table_key);
136        let to_entry = to.entry(table_key);
137
138        // Skip if both snapshots have identical state for this table
139        if same_state(from_entry, to_entry) {
140            continue;
141        }
142
143        let (kind, type_name) = parse_table_key(table_key);
144        let is_edge = kind == EntityKind::Edge;
145
146        let table_changes = if from_entry.is_none() {
147            // Table added — all rows are inserts
148            diff_table_added(&table_store, to, table_key, is_edge, filter).await?
149        } else if to_entry.is_none() {
150            // Table removed — all rows are deletes
151            diff_table_removed(&table_store, from, table_key, is_edge, filter).await?
152        } else if same_lineage(from_entry, to_entry) {
153            // Fast path: version-column diff
154            diff_table_same_lineage(
155                &table_store,
156                from_entry.unwrap(),
157                to_entry.unwrap(),
158                is_edge,
159                filter,
160            )
161            .await?
162        } else {
163            // Cross-branch path: streaming ID-based diff
164            diff_table_cross_branch(&table_store, from, to, table_key, is_edge, filter).await?
165        };
166
167        for mut c in table_changes {
168            c.table_key = table_key.clone();
169            c.kind = kind;
170            c.type_name = type_name.to_string();
171            if c.manifest_version == 0 {
172                c.manifest_version = to.version();
173            }
174            changes.push(c);
175        }
176    }
177
178    let stats = compute_stats(&changes);
179    Ok(ChangeSet {
180        from_version: from.version(),
181        to_version: to.version(),
182        branch,
183        changes,
184        stats,
185    })
186}
187
188fn same_state(a: Option<&SubTableEntry>, b: Option<&SubTableEntry>) -> bool {
189    match (a, b) {
190        (None, None) => true,
191        (Some(a), Some(b)) => {
192            a.table_version == b.table_version && a.table_branch == b.table_branch
193        }
194        _ => false,
195    }
196}
197
198fn same_lineage(from: Option<&SubTableEntry>, to: Option<&SubTableEntry>) -> bool {
199    match (from, to) {
200        (Some(f), Some(t)) => f.table_branch == t.table_branch,
201        _ => false,
202    }
203}
204
205fn compute_stats(changes: &[EntityChange]) -> ChangeStats {
206    let mut stats = ChangeStats::default();
207    let mut types = HashSet::new();
208    for c in changes {
209        match c.op {
210            ChangeOp::Insert => stats.inserts += 1,
211            ChangeOp::Update => stats.updates += 1,
212            ChangeOp::Delete => stats.deletes += 1,
213        }
214        types.insert(c.type_name.clone());
215    }
216    stats.types_affected = types.into_iter().collect();
217    stats.types_affected.sort();
218    stats
219}
220
221// ─── Fast path: version-column diff ─────────────────────────────────────────
222
223async fn diff_table_same_lineage(
224    table_store: &TableStore,
225    from_entry: &SubTableEntry,
226    to_entry: &SubTableEntry,
227    is_edge: bool,
228    filter: &ChangeFilter,
229) -> Result<Vec<EntityChange>> {
230    let vf = from_entry.table_version;
231    let vt = to_entry.table_version;
232    let to_ds = table_store.open_at_entry(to_entry).await?;
233
234    let cols: Vec<&str> = if is_edge {
235        vec!["id", "src", "dst", "_row_last_updated_at_version"]
236    } else {
237        vec!["id", "_row_last_updated_at_version"]
238    };
239
240    let wants_inserts = filter.wants_op(ChangeOp::Insert);
241    let wants_updates = filter.wants_op(ChangeOp::Update);
242    let wants_deletes = filter.wants_op(ChangeOp::Delete);
243
244    let mut changes = Vec::new();
245
246    // Inserts + Updates: use _row_last_updated_at_version to find all rows
247    // touched since Vf, then classify by checking whether the ID existed at Vf.
248    //
249    // Why not _row_created_at_version for inserts: Lance's merge_insert stamps
250    // new rows with _row_created_at_version = dataset_creation_version (v1),
251    // not the merge_insert commit version. This makes _row_created_at_version
252    // unreliable for detecting inserts from merge_insert writes. Using
253    // _row_last_updated_at_version catches all touched rows regardless of
254    // write mode, and ID-set membership distinguishes inserts from updates.
255    if wants_inserts || wants_updates {
256        let filter_sql = format!(
257            "_row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}",
258            vf, vt
259        );
260        let changed_rows = scan_with_filter(table_store, &to_ds, &cols, &filter_sql).await?;
261
262        if !changed_rows.is_empty() {
263            // Build the set of IDs that existed at the from version
264            let from_ds = table_store.open_at_entry(from_entry).await?;
265            let from_ids: HashSet<String> = scan_id_set(table_store, &from_ds, &["id"])
266                .await?
267                .into_iter()
268                .map(|r| r.id)
269                .collect();
270
271            for row in changed_rows {
272                if from_ids.contains(&row.id) {
273                    if wants_updates {
274                        changes.push(entity_change_from_row(&row, ChangeOp::Update, is_edge));
275                    }
276                } else if wants_inserts {
277                    changes.push(entity_change_from_row(&row, ChangeOp::Insert, is_edge));
278                }
279            }
280        }
281    }
282
283    // Deletes: ID set-difference
284    if wants_deletes {
285        let from_ds = table_store.open_at_entry(from_entry).await?;
286        let deleted = deleted_ids_by_set_diff(table_store, &from_ds, &to_ds, is_edge).await?;
287        changes.extend(deleted);
288    }
289
290    Ok(changes)
291}
292
293// ─── Cross-branch path: streaming ID-based diff ────────────────────────────
294
295async fn diff_table_cross_branch(
296    table_store: &TableStore,
297    from_snap: &Snapshot,
298    to_snap: &Snapshot,
299    table_key: &str,
300    is_edge: bool,
301    filter: &ChangeFilter,
302) -> Result<Vec<EntityChange>> {
303    let from_ds = table_store
304        .open_snapshot_table(from_snap, table_key)
305        .await?;
306    let to_ds = table_store.open_snapshot_table(to_snap, table_key).await?;
307
308    let from_rows = scan_all_rows_ordered(table_store, &from_ds, is_edge).await?;
309    let to_rows = scan_all_rows_ordered(table_store, &to_ds, is_edge).await?;
310
311    let mut changes = Vec::new();
312    let mut fi = 0;
313    let mut ti = 0;
314
315    while fi < from_rows.len() || ti < to_rows.len() {
316        let from_id = from_rows.get(fi).map(|r| r.id.as_str());
317        let to_id = to_rows.get(ti).map(|r| r.id.as_str());
318
319        match (from_id, to_id) {
320            (Some(fid), Some(tid)) if fid < tid => {
321                // ID only in from → Delete
322                if filter.wants_op(ChangeOp::Delete) {
323                    changes.push(entity_change_from_row(
324                        &from_rows[fi],
325                        ChangeOp::Delete,
326                        is_edge,
327                    ));
328                }
329                fi += 1;
330            }
331            (Some(fid), Some(tid)) if fid > tid => {
332                // ID only in to → Insert
333                if filter.wants_op(ChangeOp::Insert) {
334                    changes.push(entity_change_from_row(
335                        &to_rows[ti],
336                        ChangeOp::Insert,
337                        is_edge,
338                    ));
339                }
340                ti += 1;
341            }
342            (Some(_), Some(_)) => {
343                // Same ID — check signature
344                if from_rows[fi].signature != to_rows[ti].signature
345                    && filter.wants_op(ChangeOp::Update)
346                {
347                    changes.push(entity_change_from_row(
348                        &to_rows[ti],
349                        ChangeOp::Update,
350                        is_edge,
351                    ));
352                }
353                fi += 1;
354                ti += 1;
355            }
356            (Some(_), None) => {
357                if filter.wants_op(ChangeOp::Delete) {
358                    changes.push(entity_change_from_row(
359                        &from_rows[fi],
360                        ChangeOp::Delete,
361                        is_edge,
362                    ));
363                }
364                fi += 1;
365            }
366            (None, Some(_)) => {
367                if filter.wants_op(ChangeOp::Insert) {
368                    changes.push(entity_change_from_row(
369                        &to_rows[ti],
370                        ChangeOp::Insert,
371                        is_edge,
372                    ));
373                }
374                ti += 1;
375            }
376            (None, None) => break,
377        }
378    }
379
380    Ok(changes)
381}
382
383// ─── Table added/removed ────────────────────────────────────────────────────
384
385async fn diff_table_added(
386    table_store: &TableStore,
387    to_snap: &Snapshot,
388    table_key: &str,
389    is_edge: bool,
390    filter: &ChangeFilter,
391) -> Result<Vec<EntityChange>> {
392    if !filter.wants_op(ChangeOp::Insert) {
393        return Ok(Vec::new());
394    }
395    let ds = table_store.open_snapshot_table(to_snap, table_key).await?;
396    let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?;
397    Ok(rows
398        .into_iter()
399        .map(|r| entity_change_from_row(&r, ChangeOp::Insert, is_edge))
400        .collect())
401}
402
403async fn diff_table_removed(
404    table_store: &TableStore,
405    from_snap: &Snapshot,
406    table_key: &str,
407    is_edge: bool,
408    filter: &ChangeFilter,
409) -> Result<Vec<EntityChange>> {
410    if !filter.wants_op(ChangeOp::Delete) {
411        return Ok(Vec::new());
412    }
413    let ds = table_store
414        .open_snapshot_table(from_snap, table_key)
415        .await?;
416    let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?;
417    Ok(rows
418        .into_iter()
419        .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
420        .collect())
421}
422
423// ─── Helpers ────────────────────────────────────────────────────────────────
424
425/// Scan with a SQL filter, projecting specific columns.
426async fn scan_with_filter(
427    table_store: &TableStore,
428    ds: &lance::Dataset,
429    cols: &[&str],
430    filter_sql: &str,
431) -> Result<Vec<ScannedRow>> {
432    let batches = table_store
433        .scan(ds, Some(cols), Some(filter_sql), None)
434        .await?;
435    Ok(extract_rows(&batches))
436}
437
438/// Scan all rows ordered by id, projecting id (+ src/dst for edges) + all columns for signature.
439async fn scan_all_rows_ordered(
440    table_store: &TableStore,
441    ds: &lance::Dataset,
442    is_edge: bool,
443) -> Result<Vec<ScannedRow>> {
444    let batches = table_store
445        .scan(
446            ds,
447            None,
448            None,
449            Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
450        )
451        .await?;
452    Ok(extract_rows_with_signature(&batches, is_edge))
453}
454
455/// Compute deleted IDs: scan id at from and to, set-difference.
456async fn deleted_ids_by_set_diff(
457    table_store: &TableStore,
458    from_ds: &lance::Dataset,
459    to_ds: &lance::Dataset,
460    is_edge: bool,
461) -> Result<Vec<EntityChange>> {
462    let cols: Vec<&str> = if is_edge {
463        vec!["id", "src", "dst"]
464    } else {
465        vec!["id"]
466    };
467
468    let from_rows = scan_id_set(table_store, from_ds, &cols).await?;
469    let to_ids: HashSet<String> = scan_id_set(table_store, to_ds, &["id"])
470        .await?
471        .into_iter()
472        .map(|r| r.id)
473        .collect();
474
475    Ok(from_rows
476        .into_iter()
477        .filter(|r| !to_ids.contains(&r.id))
478        .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
479        .collect())
480}
481
482async fn scan_id_set(
483    table_store: &TableStore,
484    ds: &lance::Dataset,
485    cols: &[&str],
486) -> Result<Vec<ScannedRow>> {
487    let batches = table_store.scan(ds, Some(cols), None, None).await?;
488    Ok(extract_rows(&batches))
489}
490
491// ─── Row extraction ─────────────────────────────────────────────────────────
492
493#[derive(Debug, Clone)]
494struct ScannedRow {
495    id: String,
496    src: Option<String>,
497    dst: Option<String>,
498    signature: String,
499    change_version: Option<u64>,
500}
501
502fn extract_rows(batches: &[RecordBatch]) -> Vec<ScannedRow> {
503    let mut rows = Vec::new();
504    for batch in batches {
505        let ids = batch
506            .column_by_name("id")
507            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
508        let Some(ids) = ids else { continue };
509        let srcs = batch
510            .column_by_name("src")
511            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
512        let dsts = batch
513            .column_by_name("dst")
514            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
515        for i in 0..ids.len() {
516            rows.push(ScannedRow {
517                id: ids.value(i).to_string(),
518                src: srcs.map(|a| a.value(i).to_string()),
519                dst: dsts.map(|a| a.value(i).to_string()),
520                signature: String::new(),
521                change_version: batch
522                    .column_by_name("_row_last_updated_at_version")
523                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
524                    .map(|versions| versions.value(i)),
525            });
526        }
527    }
528    rows
529}
530
531fn extract_rows_with_signature(batches: &[RecordBatch], is_edge: bool) -> Vec<ScannedRow> {
532    let mut rows = Vec::new();
533    for batch in batches {
534        let ids = batch
535            .column_by_name("id")
536            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
537        let Some(ids) = ids else { continue };
538        let srcs = if is_edge {
539            batch
540                .column_by_name("src")
541                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
542        } else {
543            None
544        };
545        let dsts = if is_edge {
546            batch
547                .column_by_name("dst")
548                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
549        } else {
550            None
551        };
552        for i in 0..ids.len() {
553            let mut values = Vec::with_capacity(batch.num_columns());
554            for (field, col) in batch.schema().fields().iter().zip(batch.columns()) {
555                if field.name().starts_with("_row_") {
556                    continue;
557                }
558                if let Ok(v) = array_value_to_string(col.as_ref(), i) {
559                    values.push(v);
560                }
561            }
562            rows.push(ScannedRow {
563                id: ids.value(i).to_string(),
564                src: srcs.map(|a| a.value(i).to_string()),
565                dst: dsts.map(|a| a.value(i).to_string()),
566                signature: values.join("\x1f"),
567                change_version: batch
568                    .column_by_name("_row_last_updated_at_version")
569                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
570                    .map(|versions| versions.value(i)),
571            });
572        }
573    }
574    rows
575}
576
577fn entity_change_from_row(row: &ScannedRow, op: ChangeOp, is_edge: bool) -> EntityChange {
578    EntityChange {
579        table_key: String::new(),
580        kind: if is_edge {
581            EntityKind::Edge
582        } else {
583            EntityKind::Node
584        },
585        type_name: String::new(),
586        id: row.id.clone(),
587        op,
588        manifest_version: row.change_version.unwrap_or(0),
589        endpoints: if is_edge {
590            Some(Endpoints {
591                src: row.src.clone().unwrap_or_default(),
592                dst: row.dst.clone().unwrap_or_default(),
593            })
594        } else {
595            None
596        },
597    }
598}