Skip to main content

omnigraph/changes/
mod.rs

1use std::collections::HashSet;
2
3use arrow_array::{Array, RecordBatch, StringArray, UInt64Array};
4use arrow_cast::display::array_value_to_string;
5use lance::dataset::scanner::ColumnOrdering;
6
7use crate::db::SubTableEntry;
8use crate::db::manifest::Snapshot;
9use crate::error::Result;
10use crate::storage_layer::{SnapshotHandle, TableStorage};
11use crate::table_store::TableStore;
12
13// ─── Types ──────────────────────────────────────────────────────────────────
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum EntityKind {
17    Node,
18    Edge,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ChangeOp {
23    Insert,
24    Update,
25    Delete,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Endpoints {
30    pub src: String,
31    pub dst: String,
32}
33
34#[derive(Debug, Clone)]
35pub struct EntityChange {
36    pub table_key: String,
37    pub kind: EntityKind,
38    pub type_name: String,
39    pub id: String,
40    pub op: ChangeOp,
41    pub manifest_version: u64,
42    pub endpoints: Option<Endpoints>,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct ChangeFilter {
47    pub kinds: Option<Vec<EntityKind>>,
48    pub type_names: Option<Vec<String>>,
49    pub ops: Option<Vec<ChangeOp>>,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct ChangeStats {
54    pub inserts: usize,
55    pub updates: usize,
56    pub deletes: usize,
57    pub types_affected: Vec<String>,
58}
59
60#[derive(Debug, Clone)]
61pub struct ChangeSet {
62    pub from_version: u64,
63    pub to_version: u64,
64    pub branch: Option<String>,
65    pub changes: Vec<EntityChange>,
66    pub stats: ChangeStats,
67}
68
69// ─── Filter helpers ─────────────────────────────────────────────────────────
70
71fn parse_table_key(table_key: &str) -> (EntityKind, &str) {
72    if let Some(name) = table_key.strip_prefix("node:") {
73        (EntityKind::Node, name)
74    } else if let Some(name) = table_key.strip_prefix("edge:") {
75        (EntityKind::Edge, name)
76    } else {
77        (EntityKind::Node, table_key)
78    }
79}
80
81impl ChangeFilter {
82    fn matches_table(&self, table_key: &str) -> bool {
83        let (kind, type_name) = parse_table_key(table_key);
84        if let Some(ref kinds) = self.kinds {
85            if !kinds.contains(&kind) {
86                return false;
87            }
88        }
89        if let Some(ref names) = self.type_names {
90            if !names.iter().any(|n| n == type_name) {
91                return false;
92            }
93        }
94        true
95    }
96
97    fn wants_op(&self, op: ChangeOp) -> bool {
98        match &self.ops {
99            Some(ops) => ops.contains(&op),
100            None => true,
101        }
102    }
103}
104
105// ─── Core diff ──────────────────────────────────────────────────────────────
106
107/// Net-current diff between two snapshots.
108///
109/// Uses a three-level algorithm:
110/// 1. Manifest diff — skip unchanged sub-tables
111/// 2. Lineage check — same branch → version-column diff; different → ID-based diff
112/// 3. Row-level diff
113pub async fn diff_snapshots(
114    root_uri: &str,
115    from: &Snapshot,
116    to: &Snapshot,
117    filter: &ChangeFilter,
118    branch: Option<String>,
119) -> Result<ChangeSet> {
120    let table_store = TableStore::new(root_uri);
121    let mut all_keys: HashSet<String> = HashSet::new();
122    for entry in from.entries() {
123        all_keys.insert(entry.table_key.clone());
124    }
125    for entry in to.entries() {
126        all_keys.insert(entry.table_key.clone());
127    }
128
129    let mut changes = Vec::new();
130
131    for table_key in &all_keys {
132        if !filter.matches_table(table_key) {
133            continue;
134        }
135
136        let from_entry = from.entry(table_key);
137        let to_entry = to.entry(table_key);
138
139        // Skip if both snapshots have identical state for this table
140        if same_state(from_entry, to_entry) {
141            continue;
142        }
143
144        let (kind, type_name) = parse_table_key(table_key);
145        let is_edge = kind == EntityKind::Edge;
146
147        let table_changes = if from_entry.is_none() {
148            // Table added — all rows are inserts
149            diff_table_added(&table_store, to, table_key, is_edge, filter).await?
150        } else if to_entry.is_none() {
151            // Table removed — all rows are deletes
152            diff_table_removed(&table_store, from, table_key, is_edge, filter).await?
153        } else if same_lineage(from_entry, to_entry) {
154            // Fast path: version-column diff
155            diff_table_same_lineage(
156                &table_store,
157                from_entry.unwrap(),
158                to_entry.unwrap(),
159                is_edge,
160                filter,
161            )
162            .await?
163        } else {
164            // Cross-branch path: streaming ID-based diff
165            diff_table_cross_branch(&table_store, from, to, table_key, is_edge, filter).await?
166        };
167
168        for mut c in table_changes {
169            c.table_key = table_key.clone();
170            c.kind = kind;
171            c.type_name = type_name.to_string();
172            if c.manifest_version == 0 {
173                c.manifest_version = to.version();
174            }
175            changes.push(c);
176        }
177    }
178
179    let stats = compute_stats(&changes);
180    Ok(ChangeSet {
181        from_version: from.version(),
182        to_version: to.version(),
183        branch,
184        changes,
185        stats,
186    })
187}
188
189fn same_state(a: Option<&SubTableEntry>, b: Option<&SubTableEntry>) -> bool {
190    match (a, b) {
191        (None, None) => true,
192        (Some(a), Some(b)) => {
193            a.table_version == b.table_version && a.table_branch == b.table_branch
194        }
195        _ => false,
196    }
197}
198
199fn same_lineage(from: Option<&SubTableEntry>, to: Option<&SubTableEntry>) -> bool {
200    match (from, to) {
201        (Some(f), Some(t)) => f.table_branch == t.table_branch,
202        _ => false,
203    }
204}
205
206fn compute_stats(changes: &[EntityChange]) -> ChangeStats {
207    let mut stats = ChangeStats::default();
208    let mut types = HashSet::new();
209    for c in changes {
210        match c.op {
211            ChangeOp::Insert => stats.inserts += 1,
212            ChangeOp::Update => stats.updates += 1,
213            ChangeOp::Delete => stats.deletes += 1,
214        }
215        types.insert(c.type_name.clone());
216    }
217    stats.types_affected = types.into_iter().collect();
218    stats.types_affected.sort();
219    stats
220}
221
222// ─── Fast path: version-column diff ─────────────────────────────────────────
223
224async fn diff_table_same_lineage(
225    table_store: &TableStore,
226    from_entry: &SubTableEntry,
227    to_entry: &SubTableEntry,
228    is_edge: bool,
229    filter: &ChangeFilter,
230) -> Result<Vec<EntityChange>> {
231    let vf = from_entry.table_version;
232    let vt = to_entry.table_version;
233    let storage: &dyn TableStorage = table_store;
234    let to_ds = storage.open_snapshot_at_entry(to_entry).await?;
235
236    let cols: Vec<&str> = if is_edge {
237        vec!["id", "src", "dst", "_row_last_updated_at_version"]
238    } else {
239        vec!["id", "_row_last_updated_at_version"]
240    };
241
242    let wants_inserts = filter.wants_op(ChangeOp::Insert);
243    let wants_updates = filter.wants_op(ChangeOp::Update);
244    let wants_deletes = filter.wants_op(ChangeOp::Delete);
245
246    let mut changes = Vec::new();
247
248    // Inserts + Updates: use _row_last_updated_at_version to find all rows
249    // touched since Vf, then classify by checking whether the ID existed at Vf.
250    //
251    // We key on _row_last_updated_at_version because one scan over it catches
252    // every row touched in the window — inserts and updates alike — regardless
253    // of write mode, and ID-set membership at Vf then distinguishes inserts from
254    // updates. (lance#6774 made merge_insert stamp new rows' _row_created_at_version
255    // with the commit version, so created_at became reliable too; last_updated
256    // stays the right key since it also covers updates.)
257    if wants_inserts || wants_updates {
258        let filter_sql = format!(
259            "_row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}",
260            vf, vt
261        );
262        let changed_rows = scan_with_filter(storage, &to_ds, &cols, &filter_sql).await?;
263
264        if !changed_rows.is_empty() {
265            // Build the set of IDs that existed at the from version
266            let from_ds = storage.open_snapshot_at_entry(from_entry).await?;
267            let from_ids: HashSet<String> = scan_id_set(storage, &from_ds, &["id"])
268                .await?
269                .into_iter()
270                .map(|r| r.id)
271                .collect();
272
273            for row in changed_rows {
274                if from_ids.contains(&row.id) {
275                    if wants_updates {
276                        changes.push(entity_change_from_row(&row, ChangeOp::Update, is_edge));
277                    }
278                } else if wants_inserts {
279                    changes.push(entity_change_from_row(&row, ChangeOp::Insert, is_edge));
280                }
281            }
282        }
283    }
284
285    // Deletes: ID set-difference
286    if wants_deletes {
287        let from_ds = storage.open_snapshot_at_entry(from_entry).await?;
288        let deleted = deleted_ids_by_set_diff(storage, &from_ds, &to_ds, is_edge).await?;
289        changes.extend(deleted);
290    }
291
292    Ok(changes)
293}
294
295// ─── Cross-branch path: streaming ID-based diff ────────────────────────────
296
297async fn diff_table_cross_branch(
298    table_store: &TableStore,
299    from_snap: &Snapshot,
300    to_snap: &Snapshot,
301    table_key: &str,
302    is_edge: bool,
303    filter: &ChangeFilter,
304) -> Result<Vec<EntityChange>> {
305    let storage: &dyn TableStorage = table_store;
306    let from_ds = storage
307        .open_snapshot_at_table(from_snap, table_key)
308        .await?;
309    let to_ds = storage.open_snapshot_at_table(to_snap, table_key).await?;
310
311    let from_rows = scan_all_rows_ordered(storage, &from_ds, is_edge).await?;
312    let to_rows = scan_all_rows_ordered(storage, &to_ds, is_edge).await?;
313
314    let mut changes = Vec::new();
315    let mut fi = 0;
316    let mut ti = 0;
317
318    while fi < from_rows.len() || ti < to_rows.len() {
319        let from_id = from_rows.get(fi).map(|r| r.id.as_str());
320        let to_id = to_rows.get(ti).map(|r| r.id.as_str());
321
322        match (from_id, to_id) {
323            (Some(fid), Some(tid)) if fid < tid => {
324                // ID only in from → Delete
325                if filter.wants_op(ChangeOp::Delete) {
326                    changes.push(entity_change_from_row(
327                        &from_rows[fi],
328                        ChangeOp::Delete,
329                        is_edge,
330                    ));
331                }
332                fi += 1;
333            }
334            (Some(fid), Some(tid)) if fid > tid => {
335                // ID only in to → Insert
336                if filter.wants_op(ChangeOp::Insert) {
337                    changes.push(entity_change_from_row(
338                        &to_rows[ti],
339                        ChangeOp::Insert,
340                        is_edge,
341                    ));
342                }
343                ti += 1;
344            }
345            (Some(_), Some(_)) => {
346                // Same ID — check signature
347                if from_rows[fi].signature != to_rows[ti].signature
348                    && filter.wants_op(ChangeOp::Update)
349                {
350                    changes.push(entity_change_from_row(
351                        &to_rows[ti],
352                        ChangeOp::Update,
353                        is_edge,
354                    ));
355                }
356                fi += 1;
357                ti += 1;
358            }
359            (Some(_), None) => {
360                if filter.wants_op(ChangeOp::Delete) {
361                    changes.push(entity_change_from_row(
362                        &from_rows[fi],
363                        ChangeOp::Delete,
364                        is_edge,
365                    ));
366                }
367                fi += 1;
368            }
369            (None, Some(_)) => {
370                if filter.wants_op(ChangeOp::Insert) {
371                    changes.push(entity_change_from_row(
372                        &to_rows[ti],
373                        ChangeOp::Insert,
374                        is_edge,
375                    ));
376                }
377                ti += 1;
378            }
379            (None, None) => break,
380        }
381    }
382
383    Ok(changes)
384}
385
386// ─── Table added/removed ────────────────────────────────────────────────────
387
388async fn diff_table_added(
389    table_store: &TableStore,
390    to_snap: &Snapshot,
391    table_key: &str,
392    is_edge: bool,
393    filter: &ChangeFilter,
394) -> Result<Vec<EntityChange>> {
395    if !filter.wants_op(ChangeOp::Insert) {
396        return Ok(Vec::new());
397    }
398    let storage: &dyn TableStorage = table_store;
399    let ds = storage.open_snapshot_at_table(to_snap, table_key).await?;
400    let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?;
401    Ok(rows
402        .into_iter()
403        .map(|r| entity_change_from_row(&r, ChangeOp::Insert, is_edge))
404        .collect())
405}
406
407async fn diff_table_removed(
408    table_store: &TableStore,
409    from_snap: &Snapshot,
410    table_key: &str,
411    is_edge: bool,
412    filter: &ChangeFilter,
413) -> Result<Vec<EntityChange>> {
414    if !filter.wants_op(ChangeOp::Delete) {
415        return Ok(Vec::new());
416    }
417    let storage: &dyn TableStorage = table_store;
418    let ds = storage
419        .open_snapshot_at_table(from_snap, table_key)
420        .await?;
421    let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?;
422    Ok(rows
423        .into_iter()
424        .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
425        .collect())
426}
427
428// ─── Helpers ────────────────────────────────────────────────────────────────
429
430/// Scan with a SQL filter, projecting specific columns.
431async fn scan_with_filter(
432    storage: &dyn TableStorage,
433    ds: &SnapshotHandle,
434    cols: &[&str],
435    filter_sql: &str,
436) -> Result<Vec<ScannedRow>> {
437    let batches = storage
438        .scan(ds, Some(cols), Some(filter_sql), None)
439        .await?;
440    Ok(extract_rows(&batches))
441}
442
443/// Scan all rows ordered by id, projecting id (+ src/dst for edges) + all columns for signature.
444async fn scan_all_rows_ordered(
445    storage: &dyn TableStorage,
446    ds: &SnapshotHandle,
447    is_edge: bool,
448) -> Result<Vec<ScannedRow>> {
449    let batches = storage
450        .scan(
451            ds,
452            None,
453            None,
454            Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
455        )
456        .await?;
457    Ok(extract_rows_with_signature(&batches, is_edge))
458}
459
460/// Compute deleted IDs: scan id at from and to, set-difference.
461async fn deleted_ids_by_set_diff(
462    storage: &dyn TableStorage,
463    from_ds: &SnapshotHandle,
464    to_ds: &SnapshotHandle,
465    is_edge: bool,
466) -> Result<Vec<EntityChange>> {
467    let cols: Vec<&str> = if is_edge {
468        vec!["id", "src", "dst"]
469    } else {
470        vec!["id"]
471    };
472
473    let from_rows = scan_id_set(storage, from_ds, &cols).await?;
474    let to_ids: HashSet<String> = scan_id_set(storage, to_ds, &["id"])
475        .await?
476        .into_iter()
477        .map(|r| r.id)
478        .collect();
479
480    Ok(from_rows
481        .into_iter()
482        .filter(|r| !to_ids.contains(&r.id))
483        .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
484        .collect())
485}
486
487async fn scan_id_set(
488    storage: &dyn TableStorage,
489    ds: &SnapshotHandle,
490    cols: &[&str],
491) -> Result<Vec<ScannedRow>> {
492    let batches = storage.scan(ds, Some(cols), None, None).await?;
493    Ok(extract_rows(&batches))
494}
495
496// ─── Row extraction ─────────────────────────────────────────────────────────
497
498#[derive(Debug, Clone)]
499struct ScannedRow {
500    id: String,
501    src: Option<String>,
502    dst: Option<String>,
503    signature: String,
504    change_version: Option<u64>,
505}
506
507fn extract_rows(batches: &[RecordBatch]) -> Vec<ScannedRow> {
508    let mut rows = Vec::new();
509    for batch in batches {
510        let ids = batch
511            .column_by_name("id")
512            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
513        let Some(ids) = ids else { continue };
514        let srcs = batch
515            .column_by_name("src")
516            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
517        let dsts = batch
518            .column_by_name("dst")
519            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
520        for i in 0..ids.len() {
521            rows.push(ScannedRow {
522                id: ids.value(i).to_string(),
523                src: srcs.map(|a| a.value(i).to_string()),
524                dst: dsts.map(|a| a.value(i).to_string()),
525                signature: String::new(),
526                change_version: batch
527                    .column_by_name("_row_last_updated_at_version")
528                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
529                    .map(|versions| versions.value(i)),
530            });
531        }
532    }
533    rows
534}
535
536fn extract_rows_with_signature(batches: &[RecordBatch], is_edge: bool) -> Vec<ScannedRow> {
537    let mut rows = Vec::new();
538    for batch in batches {
539        let ids = batch
540            .column_by_name("id")
541            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
542        let Some(ids) = ids else { continue };
543        let srcs = if is_edge {
544            batch
545                .column_by_name("src")
546                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
547        } else {
548            None
549        };
550        let dsts = if is_edge {
551            batch
552                .column_by_name("dst")
553                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
554        } else {
555            None
556        };
557        for i in 0..ids.len() {
558            let mut values = Vec::with_capacity(batch.num_columns());
559            for (field, col) in batch.schema().fields().iter().zip(batch.columns()) {
560                if field.name().starts_with("_row_") {
561                    continue;
562                }
563                if let Ok(v) = array_value_to_string(col.as_ref(), i) {
564                    values.push(v);
565                }
566            }
567            rows.push(ScannedRow {
568                id: ids.value(i).to_string(),
569                src: srcs.map(|a| a.value(i).to_string()),
570                dst: dsts.map(|a| a.value(i).to_string()),
571                signature: values.join("\x1f"),
572                change_version: batch
573                    .column_by_name("_row_last_updated_at_version")
574                    .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
575                    .map(|versions| versions.value(i)),
576            });
577        }
578    }
579    rows
580}
581
582fn entity_change_from_row(row: &ScannedRow, op: ChangeOp, is_edge: bool) -> EntityChange {
583    EntityChange {
584        table_key: String::new(),
585        kind: if is_edge {
586            EntityKind::Edge
587        } else {
588            EntityKind::Node
589        },
590        type_name: String::new(),
591        id: row.id.clone(),
592        op,
593        manifest_version: row.change_version.unwrap_or(0),
594        endpoints: if is_edge {
595            Some(Endpoints {
596                src: row.src.clone().unwrap_or_default(),
597                dst: row.dst.clone().unwrap_or_default(),
598            })
599        } else {
600            None
601        },
602    }
603}