1use crate::checkout;
7use crate::commit::{CommitError, CommitsTable};
8use crate::object_store::GitObjectStore;
9use arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray, UInt8Array};
10use nusy_arrow_core::{QuerySpec, col};
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, PartialEq)]
15pub struct DiffEntry {
16 pub subject: String,
17 pub predicate: String,
18 pub object: String,
19 pub namespace: String,
20 pub y_layer: u8,
21 pub confidence: Option<f64>,
22 pub graph: Option<String>,
23 pub source_document: Option<String>,
24 pub source_chunk_id: Option<String>,
25 pub caused_by: Option<String>,
26 pub derived_from: Option<String>,
27 pub consolidated_at: Option<i64>,
28 pub certifiability_class: Option<String>,
29}
30
31#[derive(Debug, Clone, Default)]
33pub struct DiffResult {
34 pub added: Vec<DiffEntry>,
36 pub removed: Vec<DiffEntry>,
38}
39
40impl DiffResult {
41 pub fn is_empty(&self) -> bool {
42 self.added.is_empty() && self.removed.is_empty()
43 }
44
45 pub fn total_changes(&self) -> usize {
46 self.added.len() + self.removed.len()
47 }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52struct TripleKey {
53 subject: String,
54 predicate: String,
55 object: String,
56 namespace: String,
57}
58
59fn extract_triples(store: &nusy_arrow_core::ArrowGraphStore) -> HashMap<TripleKey, DiffEntry> {
61 let mut map = HashMap::new();
62
63 let batches = store
64 .query(&QuerySpec {
65 include_deleted: false,
66 ..Default::default()
67 })
68 .unwrap_or_default();
69
70 for batch in &batches {
71 let subjects = batch
72 .column(col::SUBJECT)
73 .as_any()
74 .downcast_ref::<StringArray>()
75 .expect("subject column");
76 let predicates = batch
77 .column(col::PREDICATE)
78 .as_any()
79 .downcast_ref::<StringArray>()
80 .expect("predicate column");
81 let objects = batch
82 .column(col::OBJECT)
83 .as_any()
84 .downcast_ref::<StringArray>()
85 .expect("object column");
86 let graphs = batch
87 .column(col::GRAPH)
88 .as_any()
89 .downcast_ref::<StringArray>()
90 .expect("graph column");
91 let namespaces = batch
92 .column(col::NAMESPACE)
93 .as_any()
94 .downcast_ref::<StringArray>()
95 .expect("namespace column");
96 let y_layers = batch
97 .column(col::Y_LAYER)
98 .as_any()
99 .downcast_ref::<UInt8Array>()
100 .expect("y_layer column");
101 let confidences = batch
102 .column(col::CONFIDENCE)
103 .as_any()
104 .downcast_ref::<Float64Array>()
105 .expect("confidence column");
106 let source_docs = batch
107 .column(col::SOURCE_DOCUMENT)
108 .as_any()
109 .downcast_ref::<StringArray>()
110 .expect("source_document column");
111 let source_chunks = batch
112 .column(col::SOURCE_CHUNK_ID)
113 .as_any()
114 .downcast_ref::<StringArray>()
115 .expect("source_chunk_id column");
116 let caused_bys = batch
117 .column(col::CAUSED_BY)
118 .as_any()
119 .downcast_ref::<StringArray>()
120 .expect("caused_by column");
121 let derived_froms = batch
122 .column(col::DERIVED_FROM)
123 .as_any()
124 .downcast_ref::<StringArray>()
125 .expect("derived_from column");
126 let consolidated_ats = batch
127 .column(col::CONSOLIDATED_AT)
128 .as_any()
129 .downcast_ref::<TimestampMillisecondArray>()
130 .expect("consolidated_at column");
131 let certifiability_classes = batch
132 .column(col::CERTIFIABILITY_CLASS)
133 .as_any()
134 .downcast_ref::<StringArray>()
135 .expect("certifiability_class column");
136
137 for i in 0..batch.num_rows() {
138 let key = TripleKey {
139 subject: subjects.value(i).to_string(),
140 predicate: predicates.value(i).to_string(),
141 object: objects.value(i).to_string(),
142 namespace: namespaces.value(i).to_string(),
143 };
144 let entry = DiffEntry {
145 subject: key.subject.clone(),
146 predicate: key.predicate.clone(),
147 object: key.object.clone(),
148 namespace: key.namespace.clone(),
149 y_layer: y_layers.value(i),
150 confidence: if confidences.is_null(i) {
151 None
152 } else {
153 Some(confidences.value(i))
154 },
155 graph: if graphs.is_null(i) {
156 None
157 } else {
158 Some(graphs.value(i).to_string())
159 },
160 source_document: if source_docs.is_null(i) {
161 None
162 } else {
163 Some(source_docs.value(i).to_string())
164 },
165 source_chunk_id: if source_chunks.is_null(i) {
166 None
167 } else {
168 Some(source_chunks.value(i).to_string())
169 },
170 caused_by: if caused_bys.is_null(i) {
171 None
172 } else {
173 Some(caused_bys.value(i).to_string())
174 },
175 derived_from: if derived_froms.is_null(i) {
176 None
177 } else {
178 Some(derived_froms.value(i).to_string())
179 },
180 consolidated_at: if consolidated_ats.is_null(i) {
181 None
182 } else {
183 Some(consolidated_ats.value(i))
184 },
185 certifiability_class: if certifiability_classes.is_null(i) {
186 None
187 } else {
188 Some(certifiability_classes.value(i).to_string())
189 },
190 };
191 map.insert(key, entry);
192 }
193 }
194
195 map
196}
197
198pub fn diff(
210 obj_store: &mut GitObjectStore,
211 commits_table: &CommitsTable,
212 base_commit_id: &str,
213 head_commit_id: &str,
214) -> Result<DiffResult, CommitError> {
215 checkout::checkout(obj_store, commits_table, base_commit_id)?;
217 let base_triples = extract_triples(&obj_store.store);
218
219 checkout::checkout(obj_store, commits_table, head_commit_id)?;
221 let head_triples = extract_triples(&obj_store.store);
222
223 let added: Vec<DiffEntry> = head_triples
225 .iter()
226 .filter(|(k, _)| !base_triples.contains_key(k))
227 .map(|(_, entry)| entry.clone())
228 .collect();
229
230 let removed: Vec<DiffEntry> = base_triples
232 .iter()
233 .filter(|(k, _)| !head_triples.contains_key(k))
234 .map(|(_, entry)| entry.clone())
235 .collect();
236
237 Ok(DiffResult { added, removed })
238}
239
240pub fn diff_nondestructive(
244 obj_store: &mut GitObjectStore,
245 commits_table: &CommitsTable,
246 base_commit_id: &str,
247 head_commit_id: &str,
248) -> Result<DiffResult, CommitError> {
249 let saved: Vec<(nusy_arrow_core::Namespace, Vec<arrow::array::RecordBatch>)> =
251 nusy_arrow_core::Namespace::ALL
252 .iter()
253 .map(|ns| {
254 let batches = obj_store.store.get_namespace_batches(*ns).to_vec();
255 (*ns, batches)
256 })
257 .collect();
258
259 let result = diff(obj_store, commits_table, base_commit_id, head_commit_id);
260
261 for (ns, batches) in saved {
263 obj_store.store.set_namespace_batches(ns, batches);
264 }
265
266 result
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::commit::create_commit;
273 use nusy_arrow_core::{Namespace, Triple, YLayer};
274
275 fn sample_triple(subj: &str, obj: &str) -> Triple {
276 Triple {
277 subject: subj.to_string(),
278 predicate: "rdf:type".to_string(),
279 object: obj.to_string(),
280 graph: None,
281 confidence: Some(0.9),
282 source_document: None,
283 source_chunk_id: None,
284 extracted_by: None,
285 caused_by: None,
286 derived_from: None,
287 consolidated_at: None,
288 certifiability_class: None,
289 }
290 }
291
292 #[test]
293 fn test_diff_detects_additions() {
294 let tmp = tempfile::tempdir().unwrap();
295 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
296 let mut commits = CommitsTable::new();
297
298 obj.store
300 .add_triple(
301 &sample_triple("s1", "A"),
302 Namespace::World,
303 YLayer::Semantic,
304 )
305 .unwrap();
306 let c1 = create_commit(&obj, &mut commits, vec![], "first", "DGX").unwrap();
307
308 obj.store
310 .add_triple(
311 &sample_triple("s2", "B"),
312 Namespace::World,
313 YLayer::Semantic,
314 )
315 .unwrap();
316 let c2 = create_commit(
317 &obj,
318 &mut commits,
319 vec![c1.commit_id.clone()],
320 "second",
321 "DGX",
322 )
323 .unwrap();
324
325 let result = diff(&mut obj, &commits, &c1.commit_id, &c2.commit_id).unwrap();
326 assert_eq!(result.added.len(), 1);
327 assert_eq!(result.removed.len(), 0);
328 assert_eq!(result.added[0].subject, "s2");
329 assert_eq!(result.added[0].y_layer, YLayer::Semantic.as_u8());
331 assert_eq!(result.added[0].confidence, Some(0.9));
332 }
333
334 #[test]
335 fn test_diff_detects_removals() {
336 let tmp = tempfile::tempdir().unwrap();
337 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
338 let mut commits = CommitsTable::new();
339
340 obj.store
342 .add_triple(
343 &sample_triple("s1", "A"),
344 Namespace::World,
345 YLayer::Semantic,
346 )
347 .unwrap();
348 let id2 = obj
349 .store
350 .add_triple(
351 &sample_triple("s2", "B"),
352 Namespace::World,
353 YLayer::Semantic,
354 )
355 .unwrap();
356 let c1 = create_commit(&obj, &mut commits, vec![], "first", "DGX").unwrap();
357
358 obj.store.delete(&id2).unwrap();
360 let c2 = create_commit(
361 &obj,
362 &mut commits,
363 vec![c1.commit_id.clone()],
364 "second",
365 "DGX",
366 )
367 .unwrap();
368
369 let result = diff(&mut obj, &commits, &c1.commit_id, &c2.commit_id).unwrap();
370 assert_eq!(result.removed.len(), 1);
371 assert_eq!(result.removed[0].subject, "s2");
372 }
373
374 #[test]
375 fn test_diff_nondestructive_preserves_state() {
376 let tmp = tempfile::tempdir().unwrap();
377 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
378 let mut commits = CommitsTable::new();
379
380 obj.store
381 .add_triple(
382 &sample_triple("s1", "A"),
383 Namespace::World,
384 YLayer::Semantic,
385 )
386 .unwrap();
387 let c1 = create_commit(&obj, &mut commits, vec![], "first", "DGX").unwrap();
388
389 obj.store
390 .add_triple(
391 &sample_triple("s2", "B"),
392 Namespace::World,
393 YLayer::Semantic,
394 )
395 .unwrap();
396 let c2 = create_commit(
397 &obj,
398 &mut commits,
399 vec![c1.commit_id.clone()],
400 "second",
401 "DGX",
402 )
403 .unwrap();
404
405 obj.store
407 .add_triple(
408 &sample_triple("uncommitted", "X"),
409 Namespace::World,
410 YLayer::Semantic,
411 )
412 .unwrap();
413 assert_eq!(obj.store.len(), 3); let result = diff_nondestructive(&mut obj, &commits, &c1.commit_id, &c2.commit_id).unwrap();
417 assert_eq!(result.added.len(), 1);
418
419 assert_eq!(obj.store.len(), 3);
421 }
422
423 #[test]
424 fn test_diff_no_changes() {
425 let tmp = tempfile::tempdir().unwrap();
426 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
427 let mut commits = CommitsTable::new();
428
429 obj.store
430 .add_triple(
431 &sample_triple("s1", "A"),
432 Namespace::World,
433 YLayer::Semantic,
434 )
435 .unwrap();
436 let c1 = create_commit(&obj, &mut commits, vec![], "first", "DGX").unwrap();
437
438 let c2 = create_commit(
440 &obj,
441 &mut commits,
442 vec![c1.commit_id.clone()],
443 "same",
444 "DGX",
445 )
446 .unwrap();
447
448 let result = diff(&mut obj, &commits, &c1.commit_id, &c2.commit_id).unwrap();
449 assert!(result.is_empty());
450 }
451}