calimero_node_primitives/
delta_buffer.rs1use std::collections::HashSet;
21
22use calimero_crypto::Nonce;
23use calimero_primitives::hash::Hash;
24use calimero_primitives::identity::PublicKey;
25
26pub const DEFAULT_BUFFER_CAPACITY: usize = 10_000;
28
29pub const MIN_RECOMMENDED_CAPACITY: usize = 100;
34
35#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum PushResult {
41 Added,
43 Duplicate,
45 Evicted([u8; 32]),
48 DroppedZeroCapacity([u8; 32]),
51}
52
53impl PushResult {
54 #[must_use]
56 pub fn was_added(&self) -> bool {
57 matches!(self, Self::Added | Self::Evicted(_))
58 }
59
60 #[must_use]
62 pub fn had_data_loss(&self) -> bool {
63 matches!(self, Self::Evicted(_) | Self::DroppedZeroCapacity(_))
64 }
65
66 #[must_use]
68 pub fn lost_delta_id(&self) -> Option<[u8; 32]> {
69 match self {
70 Self::Evicted(id) | Self::DroppedZeroCapacity(id) => Some(*id),
71 Self::Added | Self::Duplicate => None,
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
86pub struct BufferedDelta {
87 pub id: [u8; 32],
89 pub parents: Vec<[u8; 32]>,
91 pub hlc: u64,
93 pub payload: Vec<u8>,
95 pub nonce: Nonce,
97 pub author_id: PublicKey,
99 pub root_hash: Hash,
101 pub events: Option<Vec<u8>>,
103 pub source_peer: libp2p::PeerId,
105}
106
107#[derive(Debug)]
121pub struct DeltaBuffer {
122 deltas: std::collections::VecDeque<BufferedDelta>,
124 seen_ids: HashSet<[u8; 32]>,
126 sync_start_hlc: u64,
128 capacity: usize,
130 drops: u64,
132}
133
134impl DeltaBuffer {
135 #[must_use]
142 pub fn new(capacity: usize, sync_start_hlc: u64) -> Self {
143 Self {
144 deltas: std::collections::VecDeque::with_capacity(capacity.min(1000)),
145 seen_ids: HashSet::with_capacity(capacity.min(1000)),
146 sync_start_hlc,
147 capacity,
148 drops: 0,
149 }
150 }
151
152 #[must_use]
156 pub fn is_capacity_below_recommended(&self) -> bool {
157 self.capacity < MIN_RECOMMENDED_CAPACITY
158 }
159
160 pub fn push(&mut self, delta: BufferedDelta) -> PushResult {
178 let delta_id = delta.id;
179
180 if self.capacity == 0 {
182 self.drops += 1;
183 return PushResult::DroppedZeroCapacity(delta_id);
184 }
185
186 if self.seen_ids.contains(&delta_id) {
188 return PushResult::Duplicate;
189 }
190
191 if self.deltas.len() >= self.capacity {
192 if let Some(evicted) = self.deltas.pop_front() {
194 self.seen_ids.remove(&evicted.id);
195 let evicted_id = evicted.id;
196 self.drops += 1;
197 self.seen_ids.insert(delta_id);
198 self.deltas.push_back(delta);
199 PushResult::Evicted(evicted_id)
200 } else {
201 self.seen_ids.insert(delta_id);
203 self.deltas.push_back(delta);
204 PushResult::Added
205 }
206 } else {
207 self.seen_ids.insert(delta_id);
208 self.deltas.push_back(delta);
209 PushResult::Added
210 }
211 }
212
213 #[must_use]
218 pub fn drain(&mut self) -> Vec<BufferedDelta> {
219 self.seen_ids.clear();
220 self.deltas.drain(..).collect()
221 }
222
223 #[must_use]
227 pub fn contains(&self, id: &[u8; 32]) -> bool {
228 self.seen_ids.contains(id)
229 }
230
231 #[must_use]
233 pub fn len(&self) -> usize {
234 self.deltas.len()
235 }
236
237 #[must_use]
239 pub fn is_empty(&self) -> bool {
240 self.deltas.is_empty()
241 }
242
243 #[must_use]
245 pub fn sync_start_hlc(&self) -> u64 {
246 self.sync_start_hlc
247 }
248
249 #[must_use]
253 pub fn drops(&self) -> u64 {
254 self.drops
255 }
256
257 #[must_use]
259 pub fn capacity(&self) -> usize {
260 self.capacity
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 fn make_test_delta(id: u8) -> BufferedDelta {
269 BufferedDelta {
270 id: [id; 32],
271 parents: vec![[0; 32]],
272 hlc: 12345,
273 payload: vec![1, 2, 3],
274 nonce: [0; 12],
275 author_id: PublicKey::from([0; 32]),
276 root_hash: Hash::from([0; 32]),
277 events: None,
278 source_peer: libp2p::PeerId::random(),
279 }
280 }
281
282 #[test]
283 fn test_buffer_basic() {
284 let mut buffer = DeltaBuffer::new(100, 12345);
285 assert!(buffer.is_empty());
286 assert_eq!(buffer.sync_start_hlc(), 12345);
287 assert_eq!(buffer.capacity(), 100);
288 assert_eq!(buffer.drops(), 0);
289 assert!(!buffer.is_capacity_below_recommended());
290
291 let result = buffer.push(make_test_delta(1));
292 assert_eq!(result, PushResult::Added, "Should add without eviction");
293 assert!(result.was_added());
294 assert!(!result.had_data_loss());
295 assert_eq!(buffer.len(), 1);
296
297 let drained = buffer.drain();
298 assert_eq!(drained.len(), 1);
299 assert!(buffer.is_empty());
300 }
301
302 #[test]
303 fn test_buffer_only_during_sync() {
304 let mut buffer = DeltaBuffer::new(10, 12345);
306 assert!(buffer.is_empty());
307
308 assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
310 assert_eq!(buffer.push(make_test_delta(2)), PushResult::Added);
311 assert_eq!(buffer.len(), 2);
312
313 let drained = buffer.drain();
315 assert_eq!(drained.len(), 2);
316 assert_eq!(drained[0].id[0], 1);
317 assert_eq!(drained[1].id[0], 2);
318 }
319
320 #[test]
321 fn test_buffer_overflow_drops_oldest() {
322 let mut buffer = DeltaBuffer::new(2, 0);
323
324 assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
326 assert_eq!(buffer.push(make_test_delta(2)), PushResult::Added);
327 assert_eq!(buffer.drops(), 0);
328
329 let result = buffer.push(make_test_delta(3));
331 assert_eq!(result, PushResult::Evicted([1; 32]), "Should evict delta 1");
332 assert!(result.had_data_loss());
333 assert_eq!(result.lost_delta_id(), Some([1; 32]));
334 assert_eq!(buffer.drops(), 1);
335 assert_eq!(buffer.len(), 2);
336
337 let result = buffer.push(make_test_delta(4));
339 assert_eq!(result, PushResult::Evicted([2; 32]), "Should evict delta 2");
340 assert_eq!(buffer.drops(), 2);
341 assert_eq!(buffer.len(), 2);
342
343 let drained = buffer.drain();
345 assert_eq!(drained.len(), 2);
346 assert_eq!(drained[0].id[0], 3);
347 assert_eq!(drained[1].id[0], 4);
348 }
349
350 #[test]
351 fn test_zero_capacity_drops_immediately() {
352 let mut buffer = DeltaBuffer::new(0, 0);
353 assert!(buffer.is_empty());
354 assert_eq!(buffer.capacity(), 0);
355 assert_eq!(buffer.drops(), 0);
356 assert!(buffer.is_capacity_below_recommended());
357
358 let result = buffer.push(make_test_delta(1));
360 assert_eq!(
361 result,
362 PushResult::DroppedZeroCapacity([1; 32]),
363 "Zero capacity should drop incoming delta"
364 );
365 assert!(result.had_data_loss());
366 assert!(!result.was_added());
367 assert_eq!(result.lost_delta_id(), Some([1; 32]));
368 assert_eq!(buffer.drops(), 1);
369 assert!(buffer.is_empty(), "Buffer should remain empty");
370 assert_eq!(buffer.len(), 0);
371
372 let result = buffer.push(make_test_delta(2));
374 assert_eq!(result, PushResult::DroppedZeroCapacity([2; 32]));
375 assert_eq!(buffer.drops(), 2);
376 assert!(buffer.is_empty());
377 }
378
379 #[test]
380 fn test_finish_sync_returns_fifo() {
381 let mut buffer = DeltaBuffer::new(100, 0);
382
383 buffer.push(make_test_delta(1));
385 buffer.push(make_test_delta(2));
386 buffer.push(make_test_delta(3));
387
388 let drained = buffer.drain();
390 assert_eq!(drained.len(), 3);
391 assert_eq!(drained[0].id[0], 1);
392 assert_eq!(drained[1].id[0], 2);
393 assert_eq!(drained[2].id[0], 3);
394 }
395
396 #[test]
397 fn test_cancel_sync_clears_buffer() {
398 let mut buffer = DeltaBuffer::new(100, 0);
399 buffer.push(make_test_delta(1));
400 buffer.push(make_test_delta(2));
401 assert_eq!(buffer.len(), 2);
402
403 let _ = buffer.drain();
405 assert!(buffer.is_empty());
406 assert_eq!(buffer.len(), 0);
407 }
408
409 #[test]
410 fn test_drops_counter_observable() {
411 let mut buffer = DeltaBuffer::new(1, 0);
412 assert_eq!(buffer.drops(), 0);
413
414 buffer.push(make_test_delta(1));
415 assert_eq!(buffer.drops(), 0);
416
417 buffer.push(make_test_delta(2));
419 assert_eq!(buffer.drops(), 1);
420
421 buffer.push(make_test_delta(3));
422 assert_eq!(buffer.drops(), 2);
423
424 buffer.push(make_test_delta(4));
425 assert_eq!(buffer.drops(), 3);
426 }
427
428 #[test]
429 fn test_deduplication_prevents_double_buffering() {
430 let mut buffer = DeltaBuffer::new(10, 0);
431
432 assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
434 assert_eq!(buffer.len(), 1);
435
436 let result = buffer.push(make_test_delta(1));
438 assert_eq!(result, PushResult::Duplicate);
439 assert!(!result.had_data_loss());
440 assert!(!result.was_added()); assert_eq!(buffer.len(), 1); assert_eq!(buffer.push(make_test_delta(2)), PushResult::Added);
445 assert_eq!(buffer.len(), 2);
446 }
447
448 #[test]
449 fn test_deduplication_cleared_on_drain() {
450 let mut buffer = DeltaBuffer::new(10, 0);
451
452 assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
454 assert!(buffer.contains(&[1; 32]));
455
456 let _ = buffer.drain();
458 assert!(!buffer.contains(&[1; 32]));
459
460 assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
462 assert_eq!(buffer.len(), 1);
463 }
464
465 #[test]
466 fn test_deduplication_cleared_on_eviction() {
467 let mut buffer = DeltaBuffer::new(2, 0);
468
469 buffer.push(make_test_delta(1));
471 buffer.push(make_test_delta(2));
472 assert!(buffer.contains(&[1; 32]));
473
474 buffer.push(make_test_delta(3));
476 assert!(!buffer.contains(&[1; 32])); assert!(buffer.contains(&[2; 32]));
478 assert!(buffer.contains(&[3; 32]));
479
480 let result = buffer.push(make_test_delta(1));
482 assert_eq!(result, PushResult::Evicted([2; 32])); }
484
485 #[test]
486 fn test_capacity_below_recommended() {
487 let buffer = DeltaBuffer::new(50, 0);
489 assert!(buffer.is_capacity_below_recommended());
490
491 let buffer = DeltaBuffer::new(MIN_RECOMMENDED_CAPACITY, 0);
493 assert!(!buffer.is_capacity_below_recommended());
494
495 let buffer = DeltaBuffer::new(MIN_RECOMMENDED_CAPACITY + 1, 0);
497 assert!(!buffer.is_capacity_below_recommended());
498 }
499}