1use std::collections::HashSet;
2
3use hashbrown::HashMap;
4use smol_str::SmolStr;
5
6use crate::delta::GraphDelta;
7use crate::node_key::NodeKey;
8use crate::value::DeltaValue;
9use crate::vector_clock::VectorClock;
10
11#[derive(Clone, Debug, PartialEq)]
16pub struct DeltaBatch {
17 pub source: SmolStr,
19 pub timestamp: u64,
21 pub deltas: Vec<GraphDelta>,
23 pub vector_clock: Option<VectorClock>,
25}
26
27impl DeltaBatch {
28 pub fn new(source: impl Into<SmolStr>, timestamp: u64) -> Self {
29 Self {
30 source: source.into(),
31 timestamp,
32 deltas: Vec::new(),
33 vector_clock: None,
34 }
35 }
36
37 pub fn with_vector_clock(
38 source: impl Into<SmolStr>,
39 timestamp: u64,
40 clock: VectorClock,
41 ) -> Self {
42 Self {
43 source: source.into(),
44 timestamp,
45 deltas: Vec::new(),
46 vector_clock: Some(clock),
47 }
48 }
49
50 pub fn push(&mut self, delta: GraphDelta) {
51 self.deltas.push(delta);
52 }
53
54 pub fn extend(&mut self, deltas: impl IntoIterator<Item = GraphDelta>) {
55 self.deltas.extend(deltas);
56 }
57
58 pub fn len(&self) -> usize {
59 self.deltas.len()
60 }
61
62 pub fn is_empty(&self) -> bool {
63 self.deltas.is_empty()
64 }
65
66 pub fn iter(&self) -> impl Iterator<Item = &GraphDelta> {
67 self.deltas.iter()
68 }
69
70 pub fn node_upsert_count(&self) -> usize {
71 self.deltas
72 .iter()
73 .filter(|d| matches!(d, GraphDelta::UpsertNode { .. }))
74 .count()
75 }
76
77 pub fn edge_upsert_count(&self) -> usize {
78 self.deltas
79 .iter()
80 .filter(|d| matches!(d, GraphDelta::UpsertEdge { .. }))
81 .count()
82 }
83
84 pub fn delete_count(&self) -> usize {
85 self.deltas.iter().filter(|d| d.is_delete()).count()
86 }
87
88 pub fn referenced_labels(&self) -> Vec<SmolStr> {
90 let mut labels = HashSet::new();
91 for delta in &self.deltas {
92 for key in delta.referenced_keys() {
93 labels.insert(key.label.clone());
94 }
95 }
96 labels.into_iter().collect()
97 }
98
99 pub fn referenced_rel_types(&self) -> Vec<SmolStr> {
101 let mut types = HashSet::new();
102 for delta in &self.deltas {
103 match delta {
104 GraphDelta::UpsertEdge { rel_type, .. }
105 | GraphDelta::DeleteEdge { rel_type, .. } => {
106 types.insert(rel_type.clone());
107 }
108 _ => {}
109 }
110 }
111 types.into_iter().collect()
112 }
113}
114
115pub struct DeltaBatchBuilder {
117 batch: DeltaBatch,
118}
119
120impl DeltaBatchBuilder {
121 pub fn new(source: impl Into<SmolStr>, timestamp: u64) -> Self {
122 Self {
123 batch: DeltaBatch::new(source, timestamp),
124 }
125 }
126
127 pub fn upsert_node(
128 mut self,
129 label: impl Into<SmolStr>,
130 primary_key: impl Into<SmolStr>,
131 labels: Vec<SmolStr>,
132 props: impl IntoIterator<Item = (impl Into<SmolStr>, DeltaValue)>,
133 ) -> Self {
134 let props_map: HashMap<SmolStr, DeltaValue> =
135 props.into_iter().map(|(k, v)| (k.into(), v)).collect();
136 self.batch.push(GraphDelta::UpsertNode {
137 key: NodeKey::new(label, primary_key),
138 labels,
139 props: props_map,
140 });
141 self
142 }
143
144 pub fn upsert_edge(
145 mut self,
146 src_label: impl Into<SmolStr>,
147 src_key: impl Into<SmolStr>,
148 rel_type: impl Into<SmolStr>,
149 dst_label: impl Into<SmolStr>,
150 dst_key: impl Into<SmolStr>,
151 props: impl IntoIterator<Item = (impl Into<SmolStr>, DeltaValue)>,
152 ) -> Self {
153 let props_map: HashMap<SmolStr, DeltaValue> =
154 props.into_iter().map(|(k, v)| (k.into(), v)).collect();
155 self.batch.push(GraphDelta::UpsertEdge {
156 src: NodeKey::new(src_label, src_key),
157 rel_type: rel_type.into(),
158 dst: NodeKey::new(dst_label, dst_key),
159 props: props_map,
160 });
161 self
162 }
163
164 pub fn delete_node(
165 mut self,
166 label: impl Into<SmolStr>,
167 primary_key: impl Into<SmolStr>,
168 ) -> Self {
169 self.batch.push(GraphDelta::DeleteNode {
170 key: NodeKey::new(label, primary_key),
171 });
172 self
173 }
174
175 pub fn delete_edge(
176 mut self,
177 src_label: impl Into<SmolStr>,
178 src_key: impl Into<SmolStr>,
179 rel_type: impl Into<SmolStr>,
180 dst_label: impl Into<SmolStr>,
181 dst_key: impl Into<SmolStr>,
182 ) -> Self {
183 self.batch.push(GraphDelta::DeleteEdge {
184 src: NodeKey::new(src_label, src_key),
185 rel_type: rel_type.into(),
186 dst: NodeKey::new(dst_label, dst_key),
187 });
188 self
189 }
190
191 pub fn build(self) -> DeltaBatch {
192 self.batch
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199
200 #[test]
201 fn new_batch_is_empty() {
202 let b = DeltaBatch::new("test", 1);
203 assert!(b.is_empty());
204 assert_eq!(b.len(), 0);
205 assert!(b.vector_clock.is_none());
206 }
207
208 #[test]
209 fn push_delta() {
210 let mut b = DeltaBatch::new("test", 1);
211 b.push(GraphDelta::DeleteNode {
212 key: NodeKey::new("A", "1"),
213 });
214 assert_eq!(b.len(), 1);
215 }
216
217 #[test]
218 fn extend_deltas() {
219 let mut b = DeltaBatch::new("test", 1);
220 let deltas = vec![
221 GraphDelta::DeleteNode {
222 key: NodeKey::new("A", "1"),
223 },
224 GraphDelta::DeleteNode {
225 key: NodeKey::new("A", "2"),
226 },
227 ];
228 b.extend(deltas);
229 assert_eq!(b.len(), 2);
230 }
231
232 #[test]
233 fn len_and_is_empty() {
234 let mut b = DeltaBatch::new("test", 1);
235 assert!(b.is_empty());
236 b.push(GraphDelta::DeleteNode {
237 key: NodeKey::new("A", "1"),
238 });
239 assert!(!b.is_empty());
240 assert_eq!(b.len(), 1);
241 }
242
243 #[test]
244 fn node_upsert_count() {
245 let batch = DeltaBatchBuilder::new("test", 1)
246 .upsert_node(
247 "F",
248 "a",
249 vec![],
250 std::iter::empty::<(SmolStr, DeltaValue)>(),
251 )
252 .upsert_node(
253 "F",
254 "b",
255 vec![],
256 std::iter::empty::<(SmolStr, DeltaValue)>(),
257 )
258 .upsert_edge(
259 "F",
260 "a",
261 "calls",
262 "F",
263 "b",
264 std::iter::empty::<(SmolStr, DeltaValue)>(),
265 )
266 .build();
267 assert_eq!(batch.node_upsert_count(), 2);
268 }
269
270 #[test]
271 fn edge_upsert_count() {
272 let batch = DeltaBatchBuilder::new("test", 1)
273 .upsert_node(
274 "F",
275 "a",
276 vec![],
277 std::iter::empty::<(SmolStr, DeltaValue)>(),
278 )
279 .upsert_edge(
280 "F",
281 "a",
282 "calls",
283 "F",
284 "b",
285 std::iter::empty::<(SmolStr, DeltaValue)>(),
286 )
287 .upsert_edge(
288 "F",
289 "b",
290 "calls",
291 "F",
292 "c",
293 std::iter::empty::<(SmolStr, DeltaValue)>(),
294 )
295 .build();
296 assert_eq!(batch.edge_upsert_count(), 2);
297 }
298
299 #[test]
300 fn delete_count() {
301 let batch = DeltaBatchBuilder::new("test", 1)
302 .delete_node("F", "old")
303 .delete_edge("F", "a", "calls", "F", "old")
304 .upsert_node(
305 "F",
306 "new",
307 vec![],
308 std::iter::empty::<(SmolStr, DeltaValue)>(),
309 )
310 .build();
311 assert_eq!(batch.delete_count(), 2);
312 }
313
314 #[test]
315 fn referenced_labels() {
316 let batch = DeltaBatchBuilder::new("test", 1)
317 .upsert_node(
318 "Function",
319 "main",
320 vec![],
321 std::iter::empty::<(SmolStr, DeltaValue)>(),
322 )
323 .upsert_edge(
324 "Function",
325 "main",
326 "calls",
327 "File",
328 "lib.rs",
329 std::iter::empty::<(SmolStr, DeltaValue)>(),
330 )
331 .build();
332 let mut labels = batch.referenced_labels();
333 labels.sort();
334 assert_eq!(labels, vec![SmolStr::new("File"), SmolStr::new("Function")]);
335 }
336
337 #[test]
338 fn referenced_rel_types() {
339 let batch = DeltaBatchBuilder::new("test", 1)
340 .upsert_edge(
341 "F",
342 "a",
343 "calls",
344 "F",
345 "b",
346 std::iter::empty::<(SmolStr, DeltaValue)>(),
347 )
348 .upsert_edge(
349 "F",
350 "a",
351 "imports",
352 "M",
353 "x",
354 std::iter::empty::<(SmolStr, DeltaValue)>(),
355 )
356 .build();
357 let mut types = batch.referenced_rel_types();
358 types.sort();
359 assert_eq!(types, vec![SmolStr::new("calls"), SmolStr::new("imports")]);
360 }
361
362 #[test]
363 fn builder_upsert_node() {
364 let batch = DeltaBatchBuilder::new("src", 100)
365 .upsert_node(
366 "Function",
367 "main",
368 vec![],
369 [("lines", DeltaValue::Int64(42))],
370 )
371 .build();
372 assert_eq!(batch.len(), 1);
373 assert_eq!(batch.source, "src");
374 assert_eq!(batch.timestamp, 100);
375 match &batch.deltas[0] {
376 GraphDelta::UpsertNode { key, props, .. } => {
377 assert_eq!(key.label, "Function");
378 assert_eq!(props.get("lines"), Some(&DeltaValue::Int64(42)));
379 }
380 _ => panic!("expected UpsertNode"),
381 }
382 }
383
384 #[test]
385 fn builder_upsert_edge() {
386 let batch = DeltaBatchBuilder::new("src", 100)
387 .upsert_edge(
388 "F",
389 "a",
390 "calls",
391 "F",
392 "b",
393 std::iter::empty::<(SmolStr, DeltaValue)>(),
394 )
395 .build();
396 assert_eq!(batch.len(), 1);
397 match &batch.deltas[0] {
398 GraphDelta::UpsertEdge {
399 src, rel_type, dst, ..
400 } => {
401 assert_eq!(src.primary_key, "a");
402 assert_eq!(*rel_type, "calls");
403 assert_eq!(dst.primary_key, "b");
404 }
405 _ => panic!("expected UpsertEdge"),
406 }
407 }
408
409 #[test]
410 fn builder_delete_node() {
411 let batch = DeltaBatchBuilder::new("src", 1)
412 .delete_node("Function", "old")
413 .build();
414 assert_eq!(batch.len(), 1);
415 assert!(batch.deltas[0].is_delete());
416 }
417
418 #[test]
419 fn builder_delete_edge() {
420 let batch = DeltaBatchBuilder::new("src", 1)
421 .delete_edge("F", "a", "calls", "F", "b")
422 .build();
423 assert_eq!(batch.len(), 1);
424 assert!(batch.deltas[0].is_delete());
425 assert!(batch.deltas[0].is_edge_op());
426 }
427
428 #[test]
429 fn builder_chained_operations() {
430 let batch = DeltaBatchBuilder::new("file:src/main.rs", 1000)
431 .upsert_node(
432 "Function",
433 "main",
434 vec![SmolStr::new("Public")],
435 [("lines", DeltaValue::Int64(42))],
436 )
437 .upsert_node(
438 "Function",
439 "helper",
440 vec![],
441 [("lines", DeltaValue::Int64(10))],
442 )
443 .upsert_edge(
444 "Function",
445 "main",
446 "calls",
447 "Function",
448 "helper",
449 std::iter::empty::<(SmolStr, DeltaValue)>(),
450 )
451 .delete_node("Function", "old_func")
452 .build();
453
454 assert_eq!(batch.len(), 4);
455 assert_eq!(batch.node_upsert_count(), 2);
456 assert_eq!(batch.edge_upsert_count(), 1);
457 assert_eq!(batch.delete_count(), 1);
458 }
459
460 #[test]
461 fn with_vector_clock() {
462 let mut vc = VectorClock::new();
463 vc.set("w1", 5);
464 let batch = DeltaBatch::with_vector_clock("src", 100, vc.clone());
465 assert!(batch.vector_clock.is_some());
466 assert_eq!(batch.vector_clock.unwrap().get("w1"), 5);
467 }
468}