1use std::collections::HashSet;
2
3use arrow_array::{Array, RecordBatch, StringArray, UInt64Array};
4use arrow_cast::display::array_value_to_string;
5use lance::dataset::scanner::ColumnOrdering;
6
7use crate::db::SubTableEntry;
8use crate::db::manifest::Snapshot;
9use crate::error::Result;
10use crate::table_store::TableStore;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum EntityKind {
16 Node,
17 Edge,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum ChangeOp {
22 Insert,
23 Update,
24 Delete,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct Endpoints {
29 pub src: String,
30 pub dst: String,
31}
32
33#[derive(Debug, Clone)]
34pub struct EntityChange {
35 pub table_key: String,
36 pub kind: EntityKind,
37 pub type_name: String,
38 pub id: String,
39 pub op: ChangeOp,
40 pub manifest_version: u64,
41 pub endpoints: Option<Endpoints>,
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct ChangeFilter {
46 pub kinds: Option<Vec<EntityKind>>,
47 pub type_names: Option<Vec<String>>,
48 pub ops: Option<Vec<ChangeOp>>,
49}
50
51#[derive(Debug, Clone, Default)]
52pub struct ChangeStats {
53 pub inserts: usize,
54 pub updates: usize,
55 pub deletes: usize,
56 pub types_affected: Vec<String>,
57}
58
59#[derive(Debug, Clone)]
60pub struct ChangeSet {
61 pub from_version: u64,
62 pub to_version: u64,
63 pub branch: Option<String>,
64 pub changes: Vec<EntityChange>,
65 pub stats: ChangeStats,
66}
67
68fn parse_table_key(table_key: &str) -> (EntityKind, &str) {
71 if let Some(name) = table_key.strip_prefix("node:") {
72 (EntityKind::Node, name)
73 } else if let Some(name) = table_key.strip_prefix("edge:") {
74 (EntityKind::Edge, name)
75 } else {
76 (EntityKind::Node, table_key)
77 }
78}
79
80impl ChangeFilter {
81 fn matches_table(&self, table_key: &str) -> bool {
82 let (kind, type_name) = parse_table_key(table_key);
83 if let Some(ref kinds) = self.kinds {
84 if !kinds.contains(&kind) {
85 return false;
86 }
87 }
88 if let Some(ref names) = self.type_names {
89 if !names.iter().any(|n| n == type_name) {
90 return false;
91 }
92 }
93 true
94 }
95
96 fn wants_op(&self, op: ChangeOp) -> bool {
97 match &self.ops {
98 Some(ops) => ops.contains(&op),
99 None => true,
100 }
101 }
102}
103
104pub async fn diff_snapshots(
113 root_uri: &str,
114 from: &Snapshot,
115 to: &Snapshot,
116 filter: &ChangeFilter,
117 branch: Option<String>,
118) -> Result<ChangeSet> {
119 let table_store = TableStore::new(root_uri);
120 let mut all_keys: HashSet<String> = HashSet::new();
121 for entry in from.entries() {
122 all_keys.insert(entry.table_key.clone());
123 }
124 for entry in to.entries() {
125 all_keys.insert(entry.table_key.clone());
126 }
127
128 let mut changes = Vec::new();
129
130 for table_key in &all_keys {
131 if !filter.matches_table(table_key) {
132 continue;
133 }
134
135 let from_entry = from.entry(table_key);
136 let to_entry = to.entry(table_key);
137
138 if same_state(from_entry, to_entry) {
140 continue;
141 }
142
143 let (kind, type_name) = parse_table_key(table_key);
144 let is_edge = kind == EntityKind::Edge;
145
146 let table_changes = if from_entry.is_none() {
147 diff_table_added(&table_store, to, table_key, is_edge, filter).await?
149 } else if to_entry.is_none() {
150 diff_table_removed(&table_store, from, table_key, is_edge, filter).await?
152 } else if same_lineage(from_entry, to_entry) {
153 diff_table_same_lineage(
155 &table_store,
156 from_entry.unwrap(),
157 to_entry.unwrap(),
158 is_edge,
159 filter,
160 )
161 .await?
162 } else {
163 diff_table_cross_branch(&table_store, from, to, table_key, is_edge, filter).await?
165 };
166
167 for mut c in table_changes {
168 c.table_key = table_key.clone();
169 c.kind = kind;
170 c.type_name = type_name.to_string();
171 if c.manifest_version == 0 {
172 c.manifest_version = to.version();
173 }
174 changes.push(c);
175 }
176 }
177
178 let stats = compute_stats(&changes);
179 Ok(ChangeSet {
180 from_version: from.version(),
181 to_version: to.version(),
182 branch,
183 changes,
184 stats,
185 })
186}
187
188fn same_state(a: Option<&SubTableEntry>, b: Option<&SubTableEntry>) -> bool {
189 match (a, b) {
190 (None, None) => true,
191 (Some(a), Some(b)) => {
192 a.table_version == b.table_version && a.table_branch == b.table_branch
193 }
194 _ => false,
195 }
196}
197
198fn same_lineage(from: Option<&SubTableEntry>, to: Option<&SubTableEntry>) -> bool {
199 match (from, to) {
200 (Some(f), Some(t)) => f.table_branch == t.table_branch,
201 _ => false,
202 }
203}
204
205fn compute_stats(changes: &[EntityChange]) -> ChangeStats {
206 let mut stats = ChangeStats::default();
207 let mut types = HashSet::new();
208 for c in changes {
209 match c.op {
210 ChangeOp::Insert => stats.inserts += 1,
211 ChangeOp::Update => stats.updates += 1,
212 ChangeOp::Delete => stats.deletes += 1,
213 }
214 types.insert(c.type_name.clone());
215 }
216 stats.types_affected = types.into_iter().collect();
217 stats.types_affected.sort();
218 stats
219}
220
221async fn diff_table_same_lineage(
224 table_store: &TableStore,
225 from_entry: &SubTableEntry,
226 to_entry: &SubTableEntry,
227 is_edge: bool,
228 filter: &ChangeFilter,
229) -> Result<Vec<EntityChange>> {
230 let vf = from_entry.table_version;
231 let vt = to_entry.table_version;
232 let to_ds = table_store.open_at_entry(to_entry).await?;
233
234 let cols: Vec<&str> = if is_edge {
235 vec!["id", "src", "dst", "_row_last_updated_at_version"]
236 } else {
237 vec!["id", "_row_last_updated_at_version"]
238 };
239
240 let wants_inserts = filter.wants_op(ChangeOp::Insert);
241 let wants_updates = filter.wants_op(ChangeOp::Update);
242 let wants_deletes = filter.wants_op(ChangeOp::Delete);
243
244 let mut changes = Vec::new();
245
246 if wants_inserts || wants_updates {
256 let filter_sql = format!(
257 "_row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}",
258 vf, vt
259 );
260 let changed_rows = scan_with_filter(table_store, &to_ds, &cols, &filter_sql).await?;
261
262 if !changed_rows.is_empty() {
263 let from_ds = table_store.open_at_entry(from_entry).await?;
265 let from_ids: HashSet<String> = scan_id_set(table_store, &from_ds, &["id"])
266 .await?
267 .into_iter()
268 .map(|r| r.id)
269 .collect();
270
271 for row in changed_rows {
272 if from_ids.contains(&row.id) {
273 if wants_updates {
274 changes.push(entity_change_from_row(&row, ChangeOp::Update, is_edge));
275 }
276 } else if wants_inserts {
277 changes.push(entity_change_from_row(&row, ChangeOp::Insert, is_edge));
278 }
279 }
280 }
281 }
282
283 if wants_deletes {
285 let from_ds = table_store.open_at_entry(from_entry).await?;
286 let deleted = deleted_ids_by_set_diff(table_store, &from_ds, &to_ds, is_edge).await?;
287 changes.extend(deleted);
288 }
289
290 Ok(changes)
291}
292
293async fn diff_table_cross_branch(
296 table_store: &TableStore,
297 from_snap: &Snapshot,
298 to_snap: &Snapshot,
299 table_key: &str,
300 is_edge: bool,
301 filter: &ChangeFilter,
302) -> Result<Vec<EntityChange>> {
303 let from_ds = table_store
304 .open_snapshot_table(from_snap, table_key)
305 .await?;
306 let to_ds = table_store.open_snapshot_table(to_snap, table_key).await?;
307
308 let from_rows = scan_all_rows_ordered(table_store, &from_ds, is_edge).await?;
309 let to_rows = scan_all_rows_ordered(table_store, &to_ds, is_edge).await?;
310
311 let mut changes = Vec::new();
312 let mut fi = 0;
313 let mut ti = 0;
314
315 while fi < from_rows.len() || ti < to_rows.len() {
316 let from_id = from_rows.get(fi).map(|r| r.id.as_str());
317 let to_id = to_rows.get(ti).map(|r| r.id.as_str());
318
319 match (from_id, to_id) {
320 (Some(fid), Some(tid)) if fid < tid => {
321 if filter.wants_op(ChangeOp::Delete) {
323 changes.push(entity_change_from_row(
324 &from_rows[fi],
325 ChangeOp::Delete,
326 is_edge,
327 ));
328 }
329 fi += 1;
330 }
331 (Some(fid), Some(tid)) if fid > tid => {
332 if filter.wants_op(ChangeOp::Insert) {
334 changes.push(entity_change_from_row(
335 &to_rows[ti],
336 ChangeOp::Insert,
337 is_edge,
338 ));
339 }
340 ti += 1;
341 }
342 (Some(_), Some(_)) => {
343 if from_rows[fi].signature != to_rows[ti].signature
345 && filter.wants_op(ChangeOp::Update)
346 {
347 changes.push(entity_change_from_row(
348 &to_rows[ti],
349 ChangeOp::Update,
350 is_edge,
351 ));
352 }
353 fi += 1;
354 ti += 1;
355 }
356 (Some(_), None) => {
357 if filter.wants_op(ChangeOp::Delete) {
358 changes.push(entity_change_from_row(
359 &from_rows[fi],
360 ChangeOp::Delete,
361 is_edge,
362 ));
363 }
364 fi += 1;
365 }
366 (None, Some(_)) => {
367 if filter.wants_op(ChangeOp::Insert) {
368 changes.push(entity_change_from_row(
369 &to_rows[ti],
370 ChangeOp::Insert,
371 is_edge,
372 ));
373 }
374 ti += 1;
375 }
376 (None, None) => break,
377 }
378 }
379
380 Ok(changes)
381}
382
383async fn diff_table_added(
386 table_store: &TableStore,
387 to_snap: &Snapshot,
388 table_key: &str,
389 is_edge: bool,
390 filter: &ChangeFilter,
391) -> Result<Vec<EntityChange>> {
392 if !filter.wants_op(ChangeOp::Insert) {
393 return Ok(Vec::new());
394 }
395 let ds = table_store.open_snapshot_table(to_snap, table_key).await?;
396 let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?;
397 Ok(rows
398 .into_iter()
399 .map(|r| entity_change_from_row(&r, ChangeOp::Insert, is_edge))
400 .collect())
401}
402
403async fn diff_table_removed(
404 table_store: &TableStore,
405 from_snap: &Snapshot,
406 table_key: &str,
407 is_edge: bool,
408 filter: &ChangeFilter,
409) -> Result<Vec<EntityChange>> {
410 if !filter.wants_op(ChangeOp::Delete) {
411 return Ok(Vec::new());
412 }
413 let ds = table_store
414 .open_snapshot_table(from_snap, table_key)
415 .await?;
416 let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?;
417 Ok(rows
418 .into_iter()
419 .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
420 .collect())
421}
422
423async fn scan_with_filter(
427 table_store: &TableStore,
428 ds: &lance::Dataset,
429 cols: &[&str],
430 filter_sql: &str,
431) -> Result<Vec<ScannedRow>> {
432 let batches = table_store
433 .scan(ds, Some(cols), Some(filter_sql), None)
434 .await?;
435 Ok(extract_rows(&batches))
436}
437
438async fn scan_all_rows_ordered(
440 table_store: &TableStore,
441 ds: &lance::Dataset,
442 is_edge: bool,
443) -> Result<Vec<ScannedRow>> {
444 let batches = table_store
445 .scan(
446 ds,
447 None,
448 None,
449 Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]),
450 )
451 .await?;
452 Ok(extract_rows_with_signature(&batches, is_edge))
453}
454
455async fn deleted_ids_by_set_diff(
457 table_store: &TableStore,
458 from_ds: &lance::Dataset,
459 to_ds: &lance::Dataset,
460 is_edge: bool,
461) -> Result<Vec<EntityChange>> {
462 let cols: Vec<&str> = if is_edge {
463 vec!["id", "src", "dst"]
464 } else {
465 vec!["id"]
466 };
467
468 let from_rows = scan_id_set(table_store, from_ds, &cols).await?;
469 let to_ids: HashSet<String> = scan_id_set(table_store, to_ds, &["id"])
470 .await?
471 .into_iter()
472 .map(|r| r.id)
473 .collect();
474
475 Ok(from_rows
476 .into_iter()
477 .filter(|r| !to_ids.contains(&r.id))
478 .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
479 .collect())
480}
481
482async fn scan_id_set(
483 table_store: &TableStore,
484 ds: &lance::Dataset,
485 cols: &[&str],
486) -> Result<Vec<ScannedRow>> {
487 let batches = table_store.scan(ds, Some(cols), None, None).await?;
488 Ok(extract_rows(&batches))
489}
490
491#[derive(Debug, Clone)]
494struct ScannedRow {
495 id: String,
496 src: Option<String>,
497 dst: Option<String>,
498 signature: String,
499 change_version: Option<u64>,
500}
501
502fn extract_rows(batches: &[RecordBatch]) -> Vec<ScannedRow> {
503 let mut rows = Vec::new();
504 for batch in batches {
505 let ids = batch
506 .column_by_name("id")
507 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
508 let Some(ids) = ids else { continue };
509 let srcs = batch
510 .column_by_name("src")
511 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
512 let dsts = batch
513 .column_by_name("dst")
514 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
515 for i in 0..ids.len() {
516 rows.push(ScannedRow {
517 id: ids.value(i).to_string(),
518 src: srcs.map(|a| a.value(i).to_string()),
519 dst: dsts.map(|a| a.value(i).to_string()),
520 signature: String::new(),
521 change_version: batch
522 .column_by_name("_row_last_updated_at_version")
523 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
524 .map(|versions| versions.value(i)),
525 });
526 }
527 }
528 rows
529}
530
531fn extract_rows_with_signature(batches: &[RecordBatch], is_edge: bool) -> Vec<ScannedRow> {
532 let mut rows = Vec::new();
533 for batch in batches {
534 let ids = batch
535 .column_by_name("id")
536 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
537 let Some(ids) = ids else { continue };
538 let srcs = if is_edge {
539 batch
540 .column_by_name("src")
541 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
542 } else {
543 None
544 };
545 let dsts = if is_edge {
546 batch
547 .column_by_name("dst")
548 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
549 } else {
550 None
551 };
552 for i in 0..ids.len() {
553 let mut values = Vec::with_capacity(batch.num_columns());
554 for (field, col) in batch.schema().fields().iter().zip(batch.columns()) {
555 if field.name().starts_with("_row_") {
556 continue;
557 }
558 if let Ok(v) = array_value_to_string(col.as_ref(), i) {
559 values.push(v);
560 }
561 }
562 rows.push(ScannedRow {
563 id: ids.value(i).to_string(),
564 src: srcs.map(|a| a.value(i).to_string()),
565 dst: dsts.map(|a| a.value(i).to_string()),
566 signature: values.join("\x1f"),
567 change_version: batch
568 .column_by_name("_row_last_updated_at_version")
569 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
570 .map(|versions| versions.value(i)),
571 });
572 }
573 }
574 rows
575}
576
577fn entity_change_from_row(row: &ScannedRow, op: ChangeOp, is_edge: bool) -> EntityChange {
578 EntityChange {
579 table_key: String::new(),
580 kind: if is_edge {
581 EntityKind::Edge
582 } else {
583 EntityKind::Node
584 },
585 type_name: String::new(),
586 id: row.id.clone(),
587 op,
588 manifest_version: row.change_version.unwrap_or(0),
589 endpoints: if is_edge {
590 Some(Endpoints {
591 src: row.src.clone().unwrap_or_default(),
592 dst: row.dst.clone().unwrap_or_default(),
593 })
594 } else {
595 None
596 },
597 }
598}