Skip to main content

kyu_delta/
batch.rs

1use std::collections::HashSet;
2
3use hashbrown::HashMap;
4use smol_str::SmolStr;
5
6use crate::delta::GraphDelta;
7use crate::node_key::NodeKey;
8use crate::value::DeltaValue;
9use crate::vector_clock::VectorClock;
10
11/// A batch of graph deltas from a single source, committed atomically.
12///
13/// All deltas in a batch are applied atomically (all or nothing).
14/// The `timestamp` is used for last-write-wins conflict resolution.
15#[derive(Clone, Debug, PartialEq)]
16pub struct DeltaBatch {
17    /// Source identifier: `"file:src/main.rs"`, `"doc:invoice_12345"`, etc.
18    pub source: SmolStr,
19    /// Timestamp for last-write-wins ordering. Higher wins.
20    pub timestamp: u64,
21    /// The mutations in this batch.
22    pub deltas: Vec<GraphDelta>,
23    /// Optional causal ordering clock. `None` for most workloads.
24    pub vector_clock: Option<VectorClock>,
25}
26
27impl DeltaBatch {
28    pub fn new(source: impl Into<SmolStr>, timestamp: u64) -> Self {
29        Self {
30            source: source.into(),
31            timestamp,
32            deltas: Vec::new(),
33            vector_clock: None,
34        }
35    }
36
37    pub fn with_vector_clock(
38        source: impl Into<SmolStr>,
39        timestamp: u64,
40        clock: VectorClock,
41    ) -> Self {
42        Self {
43            source: source.into(),
44            timestamp,
45            deltas: Vec::new(),
46            vector_clock: Some(clock),
47        }
48    }
49
50    pub fn push(&mut self, delta: GraphDelta) {
51        self.deltas.push(delta);
52    }
53
54    pub fn extend(&mut self, deltas: impl IntoIterator<Item = GraphDelta>) {
55        self.deltas.extend(deltas);
56    }
57
58    pub fn len(&self) -> usize {
59        self.deltas.len()
60    }
61
62    pub fn is_empty(&self) -> bool {
63        self.deltas.is_empty()
64    }
65
66    pub fn iter(&self) -> impl Iterator<Item = &GraphDelta> {
67        self.deltas.iter()
68    }
69
70    pub fn node_upsert_count(&self) -> usize {
71        self.deltas
72            .iter()
73            .filter(|d| matches!(d, GraphDelta::UpsertNode { .. }))
74            .count()
75    }
76
77    pub fn edge_upsert_count(&self) -> usize {
78        self.deltas
79            .iter()
80            .filter(|d| matches!(d, GraphDelta::UpsertEdge { .. }))
81            .count()
82    }
83
84    pub fn delete_count(&self) -> usize {
85        self.deltas.iter().filter(|d| d.is_delete()).count()
86    }
87
88    /// Collect all unique node table labels referenced in this batch.
89    pub fn referenced_labels(&self) -> Vec<SmolStr> {
90        let mut labels = HashSet::new();
91        for delta in &self.deltas {
92            for key in delta.referenced_keys() {
93                labels.insert(key.label.clone());
94            }
95        }
96        labels.into_iter().collect()
97    }
98
99    /// Collect all unique relationship types referenced in this batch.
100    pub fn referenced_rel_types(&self) -> Vec<SmolStr> {
101        let mut types = HashSet::new();
102        for delta in &self.deltas {
103            match delta {
104                GraphDelta::UpsertEdge { rel_type, .. }
105                | GraphDelta::DeleteEdge { rel_type, .. } => {
106                    types.insert(rel_type.clone());
107                }
108                _ => {}
109            }
110        }
111        types.into_iter().collect()
112    }
113}
114
115/// Builder for ergonomic `DeltaBatch` construction.
116pub struct DeltaBatchBuilder {
117    batch: DeltaBatch,
118}
119
120impl DeltaBatchBuilder {
121    pub fn new(source: impl Into<SmolStr>, timestamp: u64) -> Self {
122        Self {
123            batch: DeltaBatch::new(source, timestamp),
124        }
125    }
126
127    pub fn upsert_node(
128        mut self,
129        label: impl Into<SmolStr>,
130        primary_key: impl Into<SmolStr>,
131        labels: Vec<SmolStr>,
132        props: impl IntoIterator<Item = (impl Into<SmolStr>, DeltaValue)>,
133    ) -> Self {
134        let props_map: HashMap<SmolStr, DeltaValue> =
135            props.into_iter().map(|(k, v)| (k.into(), v)).collect();
136        self.batch.push(GraphDelta::UpsertNode {
137            key: NodeKey::new(label, primary_key),
138            labels,
139            props: props_map,
140        });
141        self
142    }
143
144    pub fn upsert_edge(
145        mut self,
146        src_label: impl Into<SmolStr>,
147        src_key: impl Into<SmolStr>,
148        rel_type: impl Into<SmolStr>,
149        dst_label: impl Into<SmolStr>,
150        dst_key: impl Into<SmolStr>,
151        props: impl IntoIterator<Item = (impl Into<SmolStr>, DeltaValue)>,
152    ) -> Self {
153        let props_map: HashMap<SmolStr, DeltaValue> =
154            props.into_iter().map(|(k, v)| (k.into(), v)).collect();
155        self.batch.push(GraphDelta::UpsertEdge {
156            src: NodeKey::new(src_label, src_key),
157            rel_type: rel_type.into(),
158            dst: NodeKey::new(dst_label, dst_key),
159            props: props_map,
160        });
161        self
162    }
163
164    pub fn delete_node(
165        mut self,
166        label: impl Into<SmolStr>,
167        primary_key: impl Into<SmolStr>,
168    ) -> Self {
169        self.batch.push(GraphDelta::DeleteNode {
170            key: NodeKey::new(label, primary_key),
171        });
172        self
173    }
174
175    pub fn delete_edge(
176        mut self,
177        src_label: impl Into<SmolStr>,
178        src_key: impl Into<SmolStr>,
179        rel_type: impl Into<SmolStr>,
180        dst_label: impl Into<SmolStr>,
181        dst_key: impl Into<SmolStr>,
182    ) -> Self {
183        self.batch.push(GraphDelta::DeleteEdge {
184            src: NodeKey::new(src_label, src_key),
185            rel_type: rel_type.into(),
186            dst: NodeKey::new(dst_label, dst_key),
187        });
188        self
189    }
190
191    pub fn build(self) -> DeltaBatch {
192        self.batch
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn new_batch_is_empty() {
202        let b = DeltaBatch::new("test", 1);
203        assert!(b.is_empty());
204        assert_eq!(b.len(), 0);
205        assert!(b.vector_clock.is_none());
206    }
207
208    #[test]
209    fn push_delta() {
210        let mut b = DeltaBatch::new("test", 1);
211        b.push(GraphDelta::DeleteNode {
212            key: NodeKey::new("A", "1"),
213        });
214        assert_eq!(b.len(), 1);
215    }
216
217    #[test]
218    fn extend_deltas() {
219        let mut b = DeltaBatch::new("test", 1);
220        let deltas = vec![
221            GraphDelta::DeleteNode {
222                key: NodeKey::new("A", "1"),
223            },
224            GraphDelta::DeleteNode {
225                key: NodeKey::new("A", "2"),
226            },
227        ];
228        b.extend(deltas);
229        assert_eq!(b.len(), 2);
230    }
231
232    #[test]
233    fn len_and_is_empty() {
234        let mut b = DeltaBatch::new("test", 1);
235        assert!(b.is_empty());
236        b.push(GraphDelta::DeleteNode {
237            key: NodeKey::new("A", "1"),
238        });
239        assert!(!b.is_empty());
240        assert_eq!(b.len(), 1);
241    }
242
243    #[test]
244    fn node_upsert_count() {
245        let batch = DeltaBatchBuilder::new("test", 1)
246            .upsert_node(
247                "F",
248                "a",
249                vec![],
250                std::iter::empty::<(SmolStr, DeltaValue)>(),
251            )
252            .upsert_node(
253                "F",
254                "b",
255                vec![],
256                std::iter::empty::<(SmolStr, DeltaValue)>(),
257            )
258            .upsert_edge(
259                "F",
260                "a",
261                "calls",
262                "F",
263                "b",
264                std::iter::empty::<(SmolStr, DeltaValue)>(),
265            )
266            .build();
267        assert_eq!(batch.node_upsert_count(), 2);
268    }
269
270    #[test]
271    fn edge_upsert_count() {
272        let batch = DeltaBatchBuilder::new("test", 1)
273            .upsert_node(
274                "F",
275                "a",
276                vec![],
277                std::iter::empty::<(SmolStr, DeltaValue)>(),
278            )
279            .upsert_edge(
280                "F",
281                "a",
282                "calls",
283                "F",
284                "b",
285                std::iter::empty::<(SmolStr, DeltaValue)>(),
286            )
287            .upsert_edge(
288                "F",
289                "b",
290                "calls",
291                "F",
292                "c",
293                std::iter::empty::<(SmolStr, DeltaValue)>(),
294            )
295            .build();
296        assert_eq!(batch.edge_upsert_count(), 2);
297    }
298
299    #[test]
300    fn delete_count() {
301        let batch = DeltaBatchBuilder::new("test", 1)
302            .delete_node("F", "old")
303            .delete_edge("F", "a", "calls", "F", "old")
304            .upsert_node(
305                "F",
306                "new",
307                vec![],
308                std::iter::empty::<(SmolStr, DeltaValue)>(),
309            )
310            .build();
311        assert_eq!(batch.delete_count(), 2);
312    }
313
314    #[test]
315    fn referenced_labels() {
316        let batch = DeltaBatchBuilder::new("test", 1)
317            .upsert_node(
318                "Function",
319                "main",
320                vec![],
321                std::iter::empty::<(SmolStr, DeltaValue)>(),
322            )
323            .upsert_edge(
324                "Function",
325                "main",
326                "calls",
327                "File",
328                "lib.rs",
329                std::iter::empty::<(SmolStr, DeltaValue)>(),
330            )
331            .build();
332        let mut labels = batch.referenced_labels();
333        labels.sort();
334        assert_eq!(labels, vec![SmolStr::new("File"), SmolStr::new("Function")]);
335    }
336
337    #[test]
338    fn referenced_rel_types() {
339        let batch = DeltaBatchBuilder::new("test", 1)
340            .upsert_edge(
341                "F",
342                "a",
343                "calls",
344                "F",
345                "b",
346                std::iter::empty::<(SmolStr, DeltaValue)>(),
347            )
348            .upsert_edge(
349                "F",
350                "a",
351                "imports",
352                "M",
353                "x",
354                std::iter::empty::<(SmolStr, DeltaValue)>(),
355            )
356            .build();
357        let mut types = batch.referenced_rel_types();
358        types.sort();
359        assert_eq!(types, vec![SmolStr::new("calls"), SmolStr::new("imports")]);
360    }
361
362    #[test]
363    fn builder_upsert_node() {
364        let batch = DeltaBatchBuilder::new("src", 100)
365            .upsert_node(
366                "Function",
367                "main",
368                vec![],
369                [("lines", DeltaValue::Int64(42))],
370            )
371            .build();
372        assert_eq!(batch.len(), 1);
373        assert_eq!(batch.source, "src");
374        assert_eq!(batch.timestamp, 100);
375        match &batch.deltas[0] {
376            GraphDelta::UpsertNode { key, props, .. } => {
377                assert_eq!(key.label, "Function");
378                assert_eq!(props.get("lines"), Some(&DeltaValue::Int64(42)));
379            }
380            _ => panic!("expected UpsertNode"),
381        }
382    }
383
384    #[test]
385    fn builder_upsert_edge() {
386        let batch = DeltaBatchBuilder::new("src", 100)
387            .upsert_edge(
388                "F",
389                "a",
390                "calls",
391                "F",
392                "b",
393                std::iter::empty::<(SmolStr, DeltaValue)>(),
394            )
395            .build();
396        assert_eq!(batch.len(), 1);
397        match &batch.deltas[0] {
398            GraphDelta::UpsertEdge {
399                src, rel_type, dst, ..
400            } => {
401                assert_eq!(src.primary_key, "a");
402                assert_eq!(*rel_type, "calls");
403                assert_eq!(dst.primary_key, "b");
404            }
405            _ => panic!("expected UpsertEdge"),
406        }
407    }
408
409    #[test]
410    fn builder_delete_node() {
411        let batch = DeltaBatchBuilder::new("src", 1)
412            .delete_node("Function", "old")
413            .build();
414        assert_eq!(batch.len(), 1);
415        assert!(batch.deltas[0].is_delete());
416    }
417
418    #[test]
419    fn builder_delete_edge() {
420        let batch = DeltaBatchBuilder::new("src", 1)
421            .delete_edge("F", "a", "calls", "F", "b")
422            .build();
423        assert_eq!(batch.len(), 1);
424        assert!(batch.deltas[0].is_delete());
425        assert!(batch.deltas[0].is_edge_op());
426    }
427
428    #[test]
429    fn builder_chained_operations() {
430        let batch = DeltaBatchBuilder::new("file:src/main.rs", 1000)
431            .upsert_node(
432                "Function",
433                "main",
434                vec![SmolStr::new("Public")],
435                [("lines", DeltaValue::Int64(42))],
436            )
437            .upsert_node(
438                "Function",
439                "helper",
440                vec![],
441                [("lines", DeltaValue::Int64(10))],
442            )
443            .upsert_edge(
444                "Function",
445                "main",
446                "calls",
447                "Function",
448                "helper",
449                std::iter::empty::<(SmolStr, DeltaValue)>(),
450            )
451            .delete_node("Function", "old_func")
452            .build();
453
454        assert_eq!(batch.len(), 4);
455        assert_eq!(batch.node_upsert_count(), 2);
456        assert_eq!(batch.edge_upsert_count(), 1);
457        assert_eq!(batch.delete_count(), 1);
458    }
459
460    #[test]
461    fn with_vector_clock() {
462        let mut vc = VectorClock::new();
463        vc.set("w1", 5);
464        let batch = DeltaBatch::with_vector_clock("src", 100, vc.clone());
465        assert!(batch.vector_clock.is_some());
466        assert_eq!(batch.vector_clock.unwrap().get("w1"), 5);
467    }
468}