1use std::collections::HashSet;
2
3use arrow_array::{Array, RecordBatch, StringArray, UInt64Array};
4use arrow_cast::display::array_value_to_string;
5use lance::dataset::scanner::ColumnOrdering;
6
7use crate::db::SubTableEntry;
8use crate::db::manifest::Snapshot;
9use crate::error::Result;
10use crate::storage_layer::{SnapshotHandle, TableStorage};
11use crate::table_store::TableStore;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum EntityKind {
17 Node,
18 Edge,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ChangeOp {
23 Insert,
24 Update,
25 Delete,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Endpoints {
30 pub src: String,
31 pub dst: String,
32}
33
34#[derive(Debug, Clone)]
35pub struct EntityChange {
36 pub table_key: String,
37 pub kind: EntityKind,
38 pub type_name: String,
39 pub id: String,
40 pub op: ChangeOp,
41 pub manifest_version: u64,
42 pub endpoints: Option<Endpoints>,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct ChangeFilter {
47 pub kinds: Option<Vec<EntityKind>>,
48 pub type_names: Option<Vec<String>>,
49 pub ops: Option<Vec<ChangeOp>>,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct ChangeStats {
54 pub inserts: usize,
55 pub updates: usize,
56 pub deletes: usize,
57 pub types_affected: Vec<String>,
58}
59
60#[derive(Debug, Clone)]
61pub struct ChangeSet {
62 pub from_version: u64,
63 pub to_version: u64,
64 pub branch: Option<String>,
65 pub changes: Vec<EntityChange>,
66 pub stats: ChangeStats,
67}
68
69fn parse_table_key(table_key: &str) -> (EntityKind, &str) {
72 if let Some(name) = table_key.strip_prefix("node:") {
73 (EntityKind::Node, name)
74 } else if let Some(name) = table_key.strip_prefix("edge:") {
75 (EntityKind::Edge, name)
76 } else {
77 (EntityKind::Node, table_key)
78 }
79}
80
81impl ChangeFilter {
82 fn matches_table(&self, table_key: &str) -> bool {
83 let (kind, type_name) = parse_table_key(table_key);
84 if let Some(ref kinds) = self.kinds {
85 if !kinds.contains(&kind) {
86 return false;
87 }
88 }
89 if let Some(ref names) = self.type_names {
90 if !names.iter().any(|n| n == type_name) {
91 return false;
92 }
93 }
94 true
95 }
96
97 fn wants_op(&self, op: ChangeOp) -> bool {
98 match &self.ops {
99 Some(ops) => ops.contains(&op),
100 None => true,
101 }
102 }
103}
104
105pub async fn diff_snapshots(
114 root_uri: &str,
115 from: &Snapshot,
116 to: &Snapshot,
117 filter: &ChangeFilter,
118 branch: Option<String>,
119) -> Result<ChangeSet> {
120 let table_store = TableStore::new(root_uri);
121 let mut all_keys: HashSet<String> = HashSet::new();
122 for entry in from.entries() {
123 all_keys.insert(entry.table_key.clone());
124 }
125 for entry in to.entries() {
126 all_keys.insert(entry.table_key.clone());
127 }
128
129 let mut changes = Vec::new();
130
131 for table_key in &all_keys {
132 if !filter.matches_table(table_key) {
133 continue;
134 }
135
136 let from_entry = from.entry(table_key);
137 let to_entry = to.entry(table_key);
138
139 if same_state(from_entry, to_entry) {
141 continue;
142 }
143
144 let (kind, type_name) = parse_table_key(table_key);
145 let is_edge = kind == EntityKind::Edge;
146
147 let table_changes = if from_entry.is_none() {
148 diff_table_added(&table_store, to, table_key, is_edge, filter).await?
150 } else if to_entry.is_none() {
151 diff_table_removed(&table_store, from, table_key, is_edge, filter).await?
153 } else if same_lineage(from_entry, to_entry) {
154 diff_table_same_lineage(
156 &table_store,
157 from_entry.unwrap(),
158 to_entry.unwrap(),
159 is_edge,
160 filter,
161 )
162 .await?
163 } else {
164 diff_table_cross_branch(&table_store, from, to, table_key, is_edge, filter).await?
166 };
167
168 for mut c in table_changes {
169 c.table_key = table_key.clone();
170 c.kind = kind;
171 c.type_name = type_name.to_string();
172 if c.manifest_version == 0 {
173 c.manifest_version = to.version();
174 }
175 changes.push(c);
176 }
177 }
178
179 let stats = compute_stats(&changes);
180 Ok(ChangeSet {
181 from_version: from.version(),
182 to_version: to.version(),
183 branch,
184 changes,
185 stats,
186 })
187}
188
189fn same_state(a: Option<&SubTableEntry>, b: Option<&SubTableEntry>) -> bool {
190 match (a, b) {
191 (None, None) => true,
192 (Some(a), Some(b)) => {
193 a.table_version == b.table_version && a.table_branch == b.table_branch
194 }
195 _ => false,
196 }
197}
198
199fn same_lineage(from: Option<&SubTableEntry>, to: Option<&SubTableEntry>) -> bool {
200 match (from, to) {
201 (Some(f), Some(t)) => f.table_branch == t.table_branch,
202 _ => false,
203 }
204}
205
206fn compute_stats(changes: &[EntityChange]) -> ChangeStats {
207 let mut stats = ChangeStats::default();
208 let mut types = HashSet::new();
209 for c in changes {
210 match c.op {
211 ChangeOp::Insert => stats.inserts += 1,
212 ChangeOp::Update => stats.updates += 1,
213 ChangeOp::Delete => stats.deletes += 1,
214 }
215 types.insert(c.type_name.clone());
216 }
217 stats.types_affected = types.into_iter().collect();
218 stats.types_affected.sort();
219 stats
220}
221
222async fn diff_table_same_lineage(
225 table_store: &TableStore,
226 from_entry: &SubTableEntry,
227 to_entry: &SubTableEntry,
228 is_edge: bool,
229 filter: &ChangeFilter,
230) -> Result<Vec<EntityChange>> {
231 let vf = from_entry.table_version;
232 let vt = to_entry.table_version;
233 let storage: &dyn TableStorage = table_store;
234 let to_ds = storage.open_snapshot_at_entry(to_entry).await?;
235
236 let cols: Vec<&str> = if is_edge {
237 vec!["id", "src", "dst", "_row_last_updated_at_version"]
238 } else {
239 vec!["id", "_row_last_updated_at_version"]
240 };
241
242 let wants_inserts = filter.wants_op(ChangeOp::Insert);
243 let wants_updates = filter.wants_op(ChangeOp::Update);
244 let wants_deletes = filter.wants_op(ChangeOp::Delete);
245
246 let mut changes = Vec::new();
247
248 if wants_inserts || wants_updates {
258 let filter_sql = format!(
259 "_row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}",
260 vf, vt
261 );
262 let changed_rows = scan_with_filter(storage, &to_ds, &cols, &filter_sql).await?;
263
264 if !changed_rows.is_empty() {
265 let from_ds = storage.open_snapshot_at_entry(from_entry).await?;
267 let from_ids: HashSet<String> = scan_id_set(storage, &from_ds, &["id"])
268 .await?
269 .into_iter()
270 .map(|r| r.id)
271 .collect();
272
273 for row in changed_rows {
274 if from_ids.contains(&row.id) {
275 if wants_updates {
276 changes.push(entity_change_from_row(&row, ChangeOp::Update, is_edge));
277 }
278 } else if wants_inserts {
279 changes.push(entity_change_from_row(&row, ChangeOp::Insert, is_edge));
280 }
281 }
282 }
283 }
284
285 if wants_deletes {
287 let from_ds = storage.open_snapshot_at_entry(from_entry).await?;
288 let deleted = deleted_ids_by_set_diff(storage, &from_ds, &to_ds, is_edge).await?;
289 changes.extend(deleted);
290 }
291
292 Ok(changes)
293}
294
295async fn diff_table_cross_branch(
298 table_store: &TableStore,
299 from_snap: &Snapshot,
300 to_snap: &Snapshot,
301 table_key: &str,
302 is_edge: bool,
303 filter: &ChangeFilter,
304) -> Result<Vec<EntityChange>> {
305 let storage: &dyn TableStorage = table_store;
306 let from_ds = storage
307 .open_snapshot_at_table(from_snap, table_key)
308 .await?;
309 let to_ds = storage.open_snapshot_at_table(to_snap, table_key).await?;
310
311 let from_rows = scan_all_rows_ordered(storage, &from_ds, is_edge).await?;
312 let to_rows = scan_all_rows_ordered(storage, &to_ds, is_edge).await?;
313
314 let mut changes = Vec::new();
315 let mut fi = 0;
316 let mut ti = 0;
317
318 while fi < from_rows.len() || ti < to_rows.len() {
319 let from_id = from_rows.get(fi).map(|r| r.id.as_str());
320 let to_id = to_rows.get(ti).map(|r| r.id.as_str());
321
322 match (from_id, to_id) {
323 (Some(fid), Some(tid)) if fid < tid => {
324 if filter.wants_op(ChangeOp::Delete) {
326 changes.push(entity_change_from_row(
327 &from_rows[fi],
328 ChangeOp::Delete,
329 is_edge,
330 ));
331 }
332 fi += 1;
333 }
334 (Some(fid), Some(tid)) if fid > tid => {
335 if filter.wants_op(ChangeOp::Insert) {
337 changes.push(entity_change_from_row(
338 &to_rows[ti],
339 ChangeOp::Insert,
340 is_edge,
341 ));
342 }
343 ti += 1;
344 }
345 (Some(_), Some(_)) => {
346 if from_rows[fi].signature != to_rows[ti].signature
348 && filter.wants_op(ChangeOp::Update)
349 {
350 changes.push(entity_change_from_row(
351 &to_rows[ti],
352 ChangeOp::Update,
353 is_edge,
354 ));
355 }
356 fi += 1;
357 ti += 1;
358 }
359 (Some(_), None) => {
360 if filter.wants_op(ChangeOp::Delete) {
361 changes.push(entity_change_from_row(
362 &from_rows[fi],
363 ChangeOp::Delete,
364 is_edge,
365 ));
366 }
367 fi += 1;
368 }
369 (None, Some(_)) => {
370 if filter.wants_op(ChangeOp::Insert) {
371 changes.push(entity_change_from_row(
372 &to_rows[ti],
373 ChangeOp::Insert,
374 is_edge,
375 ));
376 }
377 ti += 1;
378 }
379 (None, None) => break,
380 }
381 }
382
383 Ok(changes)
384}
385
386async fn diff_table_added(
389 table_store: &TableStore,
390 to_snap: &Snapshot,
391 table_key: &str,
392 is_edge: bool,
393 filter: &ChangeFilter,
394) -> Result<Vec<EntityChange>> {
395 if !filter.wants_op(ChangeOp::Insert) {
396 return Ok(Vec::new());
397 }
398 let storage: &dyn TableStorage = table_store;
399 let ds = storage.open_snapshot_at_table(to_snap, table_key).await?;
400 let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?;
401 Ok(rows
402 .into_iter()
403 .map(|r| entity_change_from_row(&r, ChangeOp::Insert, is_edge))
404 .collect())
405}
406
407async fn diff_table_removed(
408 table_store: &TableStore,
409 from_snap: &Snapshot,
410 table_key: &str,
411 is_edge: bool,
412 filter: &ChangeFilter,
413) -> Result<Vec<EntityChange>> {
414 if !filter.wants_op(ChangeOp::Delete) {
415 return Ok(Vec::new());
416 }
417 let storage: &dyn TableStorage = table_store;
418 let ds = storage
419 .open_snapshot_at_table(from_snap, table_key)
420 .await?;
421 let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?;
422 Ok(rows
423 .into_iter()
424 .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
425 .collect())
426}
427
428async fn scan_with_filter(
432 storage: &dyn TableStorage,
433 ds: &SnapshotHandle,
434 cols: &[&str],
435 filter_sql: &str,
436) -> Result<Vec<ScannedRow>> {
437 let batches = storage
438 .scan(ds, Some(cols), Some(filter_sql), None)
439 .await?;
440 Ok(extract_rows(&batches))
441}
442
443async fn scan_all_rows_ordered(
445 storage: &dyn TableStorage,
446 ds: &SnapshotHandle,
447 is_edge: bool,
448) -> Result<Vec<ScannedRow>> {
449 let batches = storage
450 .scan(
451 ds,
452 None,
453 None,
454 Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
455 )
456 .await?;
457 Ok(extract_rows_with_signature(&batches, is_edge))
458}
459
460async fn deleted_ids_by_set_diff(
462 storage: &dyn TableStorage,
463 from_ds: &SnapshotHandle,
464 to_ds: &SnapshotHandle,
465 is_edge: bool,
466) -> Result<Vec<EntityChange>> {
467 let cols: Vec<&str> = if is_edge {
468 vec!["id", "src", "dst"]
469 } else {
470 vec!["id"]
471 };
472
473 let from_rows = scan_id_set(storage, from_ds, &cols).await?;
474 let to_ids: HashSet<String> = scan_id_set(storage, to_ds, &["id"])
475 .await?
476 .into_iter()
477 .map(|r| r.id)
478 .collect();
479
480 Ok(from_rows
481 .into_iter()
482 .filter(|r| !to_ids.contains(&r.id))
483 .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
484 .collect())
485}
486
487async fn scan_id_set(
488 storage: &dyn TableStorage,
489 ds: &SnapshotHandle,
490 cols: &[&str],
491) -> Result<Vec<ScannedRow>> {
492 let batches = storage.scan(ds, Some(cols), None, None).await?;
493 Ok(extract_rows(&batches))
494}
495
496#[derive(Debug, Clone)]
499struct ScannedRow {
500 id: String,
501 src: Option<String>,
502 dst: Option<String>,
503 signature: String,
504 change_version: Option<u64>,
505}
506
507fn extract_rows(batches: &[RecordBatch]) -> Vec<ScannedRow> {
508 let mut rows = Vec::new();
509 for batch in batches {
510 let ids = batch
511 .column_by_name("id")
512 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
513 let Some(ids) = ids else { continue };
514 let srcs = batch
515 .column_by_name("src")
516 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
517 let dsts = batch
518 .column_by_name("dst")
519 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
520 for i in 0..ids.len() {
521 rows.push(ScannedRow {
522 id: ids.value(i).to_string(),
523 src: srcs.map(|a| a.value(i).to_string()),
524 dst: dsts.map(|a| a.value(i).to_string()),
525 signature: String::new(),
526 change_version: batch
527 .column_by_name("_row_last_updated_at_version")
528 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
529 .map(|versions| versions.value(i)),
530 });
531 }
532 }
533 rows
534}
535
536fn extract_rows_with_signature(batches: &[RecordBatch], is_edge: bool) -> Vec<ScannedRow> {
537 let mut rows = Vec::new();
538 for batch in batches {
539 let ids = batch
540 .column_by_name("id")
541 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
542 let Some(ids) = ids else { continue };
543 let srcs = if is_edge {
544 batch
545 .column_by_name("src")
546 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
547 } else {
548 None
549 };
550 let dsts = if is_edge {
551 batch
552 .column_by_name("dst")
553 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
554 } else {
555 None
556 };
557 for i in 0..ids.len() {
558 let mut values = Vec::with_capacity(batch.num_columns());
559 for (field, col) in batch.schema().fields().iter().zip(batch.columns()) {
560 if field.name().starts_with("_row_") {
561 continue;
562 }
563 if let Ok(v) = array_value_to_string(col.as_ref(), i) {
564 values.push(v);
565 }
566 }
567 rows.push(ScannedRow {
568 id: ids.value(i).to_string(),
569 src: srcs.map(|a| a.value(i).to_string()),
570 dst: dsts.map(|a| a.value(i).to_string()),
571 signature: values.join("\x1f"),
572 change_version: batch
573 .column_by_name("_row_last_updated_at_version")
574 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
575 .map(|versions| versions.value(i)),
576 });
577 }
578 }
579 rows
580}
581
582fn entity_change_from_row(row: &ScannedRow, op: ChangeOp, is_edge: bool) -> EntityChange {
583 EntityChange {
584 table_key: String::new(),
585 kind: if is_edge {
586 EntityKind::Edge
587 } else {
588 EntityKind::Node
589 },
590 type_name: String::new(),
591 id: row.id.clone(),
592 op,
593 manifest_version: row.change_version.unwrap_or(0),
594 endpoints: if is_edge {
595 Some(Endpoints {
596 src: row.src.clone().unwrap_or_default(),
597 dst: row.dst.clone().unwrap_or_default(),
598 })
599 } else {
600 None
601 },
602 }
603}