Skip to main content

calimero_node_primitives/
delta_buffer.rs

1//! Delta buffering for sync scenarios (Invariant I6).
2//!
3//! When a snapshot sync is in progress, incoming deltas are buffered so they
4//! can be replayed after the snapshot completes. This ensures that:
5//! 1. Deltas arriving during sync aren't lost (Invariant I6 - Liveness Guarantee)
6//! 2. Event handlers can execute for buffered deltas after context is initialized
7//!
8//! ## Delivery Contract
9//!
10//! - **Buffer size**: Configurable, default 10,000 deltas per context
11//! - **Drop policy**: Oldest-first when buffer full (with metric increment)
12//! - **Backpressure**: None (fire-and-forget from network layer)
13//! - **Metrics**: `drops` counter MUST be observable
14//!
15//! ## Minimum Capacity Warning
16//!
17//! If buffer capacity is set below `MIN_RECOMMENDED_CAPACITY`, a warning should
18//! be logged at startup. Zero capacity is valid but will drop ALL deltas.
19
20use std::collections::HashSet;
21
22use calimero_crypto::Nonce;
23use calimero_primitives::hash::Hash;
24use calimero_primitives::identity::PublicKey;
25
26/// Default buffer capacity (10,000 deltas per context).
27pub const DEFAULT_BUFFER_CAPACITY: usize = 10_000;
28
29/// Minimum recommended buffer capacity.
30///
31/// Capacities below this value may cause excessive delta loss under normal load.
32/// A warning should be logged if capacity is set below this threshold.
33pub const MIN_RECOMMENDED_CAPACITY: usize = 100;
34
35/// Result of pushing a delta to the buffer.
36///
37/// Provides clear semantics about what happened to both the incoming delta
38/// and any evicted delta.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum PushResult {
41    /// Delta was added to the buffer without eviction.
42    Added,
43    /// Delta was a duplicate (already in buffer) - no action taken.
44    Duplicate,
45    /// Delta was added, but oldest delta was evicted due to capacity.
46    /// Contains the ID of the evicted delta.
47    Evicted([u8; 32]),
48    /// Delta was dropped immediately (zero capacity buffer).
49    /// Contains the ID of the dropped delta.
50    DroppedZeroCapacity([u8; 32]),
51}
52
53impl PushResult {
54    /// Returns true if the delta was successfully added to the buffer.
55    #[must_use]
56    pub fn was_added(&self) -> bool {
57        matches!(self, Self::Added | Self::Evicted(_))
58    }
59
60    /// Returns true if any delta was lost (evicted or dropped).
61    #[must_use]
62    pub fn had_data_loss(&self) -> bool {
63        matches!(self, Self::Evicted(_) | Self::DroppedZeroCapacity(_))
64    }
65
66    /// Returns the ID of the lost delta, if any.
67    #[must_use]
68    pub fn lost_delta_id(&self) -> Option<[u8; 32]> {
69        match self {
70            Self::Evicted(id) | Self::DroppedZeroCapacity(id) => Some(*id),
71            Self::Added | Self::Duplicate => None,
72        }
73    }
74}
75
76/// A single buffered delta.
77///
78/// Contains ALL fields needed for replay after snapshot sync completes.
79/// Previously missing fields (nonce, author_id, root_hash, events) caused
80/// data loss because deltas couldn't be decrypted or processed.
81///
82/// **POC Bug 7**: This struct MUST include all fields for replay - not just
83/// `id`, `parents`, `hlc`, `payload`, but also `nonce`, `author_id`, `root_hash`,
84/// `events`, and `source_peer`.
85#[derive(Debug, Clone)]
86pub struct BufferedDelta {
87    /// Delta ID.
88    pub id: [u8; 32],
89    /// Parent IDs.
90    pub parents: Vec<[u8; 32]>,
91    /// HLC timestamp.
92    pub hlc: u64,
93    /// Serialized (encrypted) payload.
94    pub payload: Vec<u8>,
95    /// Nonce for decryption (12 bytes for XChaCha20-Poly1305).
96    pub nonce: Nonce,
97    /// Author public key (needed to get sender key for decryption).
98    pub author_id: PublicKey,
99    /// Expected root hash after applying this delta.
100    pub root_hash: Hash,
101    /// Optional serialized events (for handler execution after replay).
102    pub events: Option<Vec<u8>>,
103    /// Source peer ID (for requesting sender key if needed).
104    pub source_peer: libp2p::PeerId,
105}
106
107/// Buffer for storing deltas during snapshot sync.
108///
109/// Implements Invariant I6: Deltas received during state-based sync MUST be
110/// preserved and applied after sync completes.
111///
112/// When the buffer is full, the oldest delta is evicted (FIFO eviction policy)
113/// and the `drops` counter is incremented. Drops MUST be observable via metrics.
114///
115/// ## Deduplication
116///
117/// The buffer tracks seen delta IDs to prevent duplicate deltas from being buffered.
118/// This protects against replay attacks where an adversary might flood the buffer
119/// with duplicate deltas to cause eviction of legitimate deltas.
120#[derive(Debug)]
121pub struct DeltaBuffer {
122    /// Buffered deltas (FIFO queue - oldest at front).
123    deltas: std::collections::VecDeque<BufferedDelta>,
124    /// Set of delta IDs currently in the buffer (for O(1) deduplication).
125    seen_ids: HashSet<[u8; 32]>,
126    /// HLC timestamp when buffering started.
127    sync_start_hlc: u64,
128    /// Maximum buffer size before eviction.
129    capacity: usize,
130    /// Number of deltas dropped due to buffer overflow (observable metric).
131    drops: u64,
132}
133
134impl DeltaBuffer {
135    /// Create a new delta buffer with specified capacity.
136    ///
137    /// # Capacity Warning
138    ///
139    /// If capacity is below `MIN_RECOMMENDED_CAPACITY`, callers should log a
140    /// warning at startup. Zero capacity is valid but will drop ALL deltas.
141    #[must_use]
142    pub fn new(capacity: usize, sync_start_hlc: u64) -> Self {
143        Self {
144            deltas: std::collections::VecDeque::with_capacity(capacity.min(1000)),
145            seen_ids: HashSet::with_capacity(capacity.min(1000)),
146            sync_start_hlc,
147            capacity,
148            drops: 0,
149        }
150    }
151
152    /// Check if capacity is below recommended minimum.
153    ///
154    /// Callers should log a warning at session start if this returns true.
155    #[must_use]
156    pub fn is_capacity_below_recommended(&self) -> bool {
157        self.capacity < MIN_RECOMMENDED_CAPACITY
158    }
159
160    /// Add a delta to the buffer.
161    ///
162    /// Returns a `PushResult` indicating what happened:
163    /// - `Added`: Delta was added successfully
164    /// - `Duplicate`: Delta ID was already in buffer (no action taken)
165    /// - `Evicted(id)`: Delta was added but oldest delta was evicted
166    /// - `DroppedZeroCapacity(id)`: Delta was dropped (zero capacity buffer)
167    ///
168    /// # Deduplication
169    ///
170    /// If a delta with the same ID is already in the buffer, it is not added
171    /// again and `PushResult::Duplicate` is returned. This prevents replay attacks.
172    ///
173    /// # Edge case: zero capacity
174    ///
175    /// If capacity is 0, the incoming delta is immediately dropped (not added)
176    /// and `PushResult::DroppedZeroCapacity` is returned with the dropped delta's ID.
177    pub fn push(&mut self, delta: BufferedDelta) -> PushResult {
178        let delta_id = delta.id;
179
180        // Handle zero capacity: drop incoming delta immediately
181        if self.capacity == 0 {
182            self.drops += 1;
183            return PushResult::DroppedZeroCapacity(delta_id);
184        }
185
186        // Deduplication check (#2: prevents replay attacks)
187        if self.seen_ids.contains(&delta_id) {
188            return PushResult::Duplicate;
189        }
190
191        if self.deltas.len() >= self.capacity {
192            // Evict oldest delta (front of queue)
193            if let Some(evicted) = self.deltas.pop_front() {
194                self.seen_ids.remove(&evicted.id);
195                let evicted_id = evicted.id;
196                self.drops += 1;
197                self.seen_ids.insert(delta_id);
198                self.deltas.push_back(delta);
199                PushResult::Evicted(evicted_id)
200            } else {
201                // This shouldn't happen, but handle gracefully
202                self.seen_ids.insert(delta_id);
203                self.deltas.push_back(delta);
204                PushResult::Added
205            }
206        } else {
207            self.seen_ids.insert(delta_id);
208            self.deltas.push_back(delta);
209            PushResult::Added
210        }
211    }
212
213    /// Get all buffered deltas for replay, clearing the buffer.
214    ///
215    /// Returns deltas in FIFO order (oldest first), preserving causality.
216    /// Also clears the deduplication set.
217    #[must_use]
218    pub fn drain(&mut self) -> Vec<BufferedDelta> {
219        self.seen_ids.clear();
220        self.deltas.drain(..).collect()
221    }
222
223    /// Check if a delta ID is already in the buffer.
224    ///
225    /// This is O(1) due to the internal HashSet tracking.
226    #[must_use]
227    pub fn contains(&self, id: &[u8; 32]) -> bool {
228        self.seen_ids.contains(id)
229    }
230
231    /// Number of buffered deltas.
232    #[must_use]
233    pub fn len(&self) -> usize {
234        self.deltas.len()
235    }
236
237    /// Check if buffer is empty.
238    #[must_use]
239    pub fn is_empty(&self) -> bool {
240        self.deltas.is_empty()
241    }
242
243    /// Get the sync start HLC.
244    #[must_use]
245    pub fn sync_start_hlc(&self) -> u64 {
246        self.sync_start_hlc
247    }
248
249    /// Get the number of deltas dropped due to buffer overflow.
250    ///
251    /// This metric MUST be observable per Invariant I6 delivery contract.
252    #[must_use]
253    pub fn drops(&self) -> u64 {
254        self.drops
255    }
256
257    /// Get the buffer capacity.
258    #[must_use]
259    pub fn capacity(&self) -> usize {
260        self.capacity
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    fn make_test_delta(id: u8) -> BufferedDelta {
269        BufferedDelta {
270            id: [id; 32],
271            parents: vec![[0; 32]],
272            hlc: 12345,
273            payload: vec![1, 2, 3],
274            nonce: [0; 12],
275            author_id: PublicKey::from([0; 32]),
276            root_hash: Hash::from([0; 32]),
277            events: None,
278            source_peer: libp2p::PeerId::random(),
279        }
280    }
281
282    #[test]
283    fn test_buffer_basic() {
284        let mut buffer = DeltaBuffer::new(100, 12345);
285        assert!(buffer.is_empty());
286        assert_eq!(buffer.sync_start_hlc(), 12345);
287        assert_eq!(buffer.capacity(), 100);
288        assert_eq!(buffer.drops(), 0);
289        assert!(!buffer.is_capacity_below_recommended());
290
291        let result = buffer.push(make_test_delta(1));
292        assert_eq!(result, PushResult::Added, "Should add without eviction");
293        assert!(result.was_added());
294        assert!(!result.had_data_loss());
295        assert_eq!(buffer.len(), 1);
296
297        let drained = buffer.drain();
298        assert_eq!(drained.len(), 1);
299        assert!(buffer.is_empty());
300    }
301
302    #[test]
303    fn test_buffer_only_during_sync() {
304        // Buffer should only accept deltas - caller decides when to buffer
305        let mut buffer = DeltaBuffer::new(10, 12345);
306        assert!(buffer.is_empty());
307
308        // Push deltas
309        assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
310        assert_eq!(buffer.push(make_test_delta(2)), PushResult::Added);
311        assert_eq!(buffer.len(), 2);
312
313        // Drain returns all in FIFO order
314        let drained = buffer.drain();
315        assert_eq!(drained.len(), 2);
316        assert_eq!(drained[0].id[0], 1);
317        assert_eq!(drained[1].id[0], 2);
318    }
319
320    #[test]
321    fn test_buffer_overflow_drops_oldest() {
322        let mut buffer = DeltaBuffer::new(2, 0);
323
324        // Fill buffer
325        assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
326        assert_eq!(buffer.push(make_test_delta(2)), PushResult::Added);
327        assert_eq!(buffer.drops(), 0);
328
329        // Third delta causes eviction of oldest (delta 1)
330        let result = buffer.push(make_test_delta(3));
331        assert_eq!(result, PushResult::Evicted([1; 32]), "Should evict delta 1");
332        assert!(result.had_data_loss());
333        assert_eq!(result.lost_delta_id(), Some([1; 32]));
334        assert_eq!(buffer.drops(), 1);
335        assert_eq!(buffer.len(), 2);
336
337        // Fourth delta causes another eviction (delta 2)
338        let result = buffer.push(make_test_delta(4));
339        assert_eq!(result, PushResult::Evicted([2; 32]), "Should evict delta 2");
340        assert_eq!(buffer.drops(), 2);
341        assert_eq!(buffer.len(), 2);
342
343        // Verify remaining deltas are 3 and 4 (FIFO order)
344        let drained = buffer.drain();
345        assert_eq!(drained.len(), 2);
346        assert_eq!(drained[0].id[0], 3);
347        assert_eq!(drained[1].id[0], 4);
348    }
349
350    #[test]
351    fn test_zero_capacity_drops_immediately() {
352        let mut buffer = DeltaBuffer::new(0, 0);
353        assert!(buffer.is_empty());
354        assert_eq!(buffer.capacity(), 0);
355        assert_eq!(buffer.drops(), 0);
356        assert!(buffer.is_capacity_below_recommended());
357
358        // First push should drop immediately
359        let result = buffer.push(make_test_delta(1));
360        assert_eq!(
361            result,
362            PushResult::DroppedZeroCapacity([1; 32]),
363            "Zero capacity should drop incoming delta"
364        );
365        assert!(result.had_data_loss());
366        assert!(!result.was_added());
367        assert_eq!(result.lost_delta_id(), Some([1; 32]));
368        assert_eq!(buffer.drops(), 1);
369        assert!(buffer.is_empty(), "Buffer should remain empty");
370        assert_eq!(buffer.len(), 0);
371
372        // Second push should also drop
373        let result = buffer.push(make_test_delta(2));
374        assert_eq!(result, PushResult::DroppedZeroCapacity([2; 32]));
375        assert_eq!(buffer.drops(), 2);
376        assert!(buffer.is_empty());
377    }
378
379    #[test]
380    fn test_finish_sync_returns_fifo() {
381        let mut buffer = DeltaBuffer::new(100, 0);
382
383        // Add deltas in order
384        buffer.push(make_test_delta(1));
385        buffer.push(make_test_delta(2));
386        buffer.push(make_test_delta(3));
387
388        // Drain should return in FIFO order
389        let drained = buffer.drain();
390        assert_eq!(drained.len(), 3);
391        assert_eq!(drained[0].id[0], 1);
392        assert_eq!(drained[1].id[0], 2);
393        assert_eq!(drained[2].id[0], 3);
394    }
395
396    #[test]
397    fn test_cancel_sync_clears_buffer() {
398        let mut buffer = DeltaBuffer::new(100, 0);
399        buffer.push(make_test_delta(1));
400        buffer.push(make_test_delta(2));
401        assert_eq!(buffer.len(), 2);
402
403        // Simulate cancel by draining and discarding
404        let _ = buffer.drain();
405        assert!(buffer.is_empty());
406        assert_eq!(buffer.len(), 0);
407    }
408
409    #[test]
410    fn test_drops_counter_observable() {
411        let mut buffer = DeltaBuffer::new(1, 0);
412        assert_eq!(buffer.drops(), 0);
413
414        buffer.push(make_test_delta(1));
415        assert_eq!(buffer.drops(), 0);
416
417        // Each overflow increments drops
418        buffer.push(make_test_delta(2));
419        assert_eq!(buffer.drops(), 1);
420
421        buffer.push(make_test_delta(3));
422        assert_eq!(buffer.drops(), 2);
423
424        buffer.push(make_test_delta(4));
425        assert_eq!(buffer.drops(), 3);
426    }
427
428    #[test]
429    fn test_deduplication_prevents_double_buffering() {
430        let mut buffer = DeltaBuffer::new(10, 0);
431
432        // Add a delta
433        assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
434        assert_eq!(buffer.len(), 1);
435
436        // Try to add same delta again - should be duplicate
437        let result = buffer.push(make_test_delta(1));
438        assert_eq!(result, PushResult::Duplicate);
439        assert!(!result.had_data_loss());
440        assert!(!result.was_added()); // Duplicate counts as "not added"
441        assert_eq!(buffer.len(), 1); // Still only 1
442
443        // Add a different delta - should work
444        assert_eq!(buffer.push(make_test_delta(2)), PushResult::Added);
445        assert_eq!(buffer.len(), 2);
446    }
447
448    #[test]
449    fn test_deduplication_cleared_on_drain() {
450        let mut buffer = DeltaBuffer::new(10, 0);
451
452        // Add a delta
453        assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
454        assert!(buffer.contains(&[1; 32]));
455
456        // Drain
457        let _ = buffer.drain();
458        assert!(!buffer.contains(&[1; 32]));
459
460        // Now can add same delta again
461        assert_eq!(buffer.push(make_test_delta(1)), PushResult::Added);
462        assert_eq!(buffer.len(), 1);
463    }
464
465    #[test]
466    fn test_deduplication_cleared_on_eviction() {
467        let mut buffer = DeltaBuffer::new(2, 0);
468
469        // Fill buffer
470        buffer.push(make_test_delta(1));
471        buffer.push(make_test_delta(2));
472        assert!(buffer.contains(&[1; 32]));
473
474        // Evict delta 1 by adding delta 3
475        buffer.push(make_test_delta(3));
476        assert!(!buffer.contains(&[1; 32])); // delta 1 evicted
477        assert!(buffer.contains(&[2; 32]));
478        assert!(buffer.contains(&[3; 32]));
479
480        // Can now add delta 1 again (it was evicted)
481        let result = buffer.push(make_test_delta(1));
482        assert_eq!(result, PushResult::Evicted([2; 32])); // delta 2 gets evicted
483    }
484
485    #[test]
486    fn test_capacity_below_recommended() {
487        // Below recommended
488        let buffer = DeltaBuffer::new(50, 0);
489        assert!(buffer.is_capacity_below_recommended());
490
491        // At recommended
492        let buffer = DeltaBuffer::new(MIN_RECOMMENDED_CAPACITY, 0);
493        assert!(!buffer.is_capacity_below_recommended());
494
495        // Above recommended
496        let buffer = DeltaBuffer::new(MIN_RECOMMENDED_CAPACITY + 1, 0);
497        assert!(!buffer.is_capacity_below_recommended());
498    }
499}