hive_btle/sync/
batch.rs

1//! Batch Accumulator for HIVE-Lite Sync
2//!
3//! Collects CRDT operations over a configurable time window or size limit,
4//! then emits them as a batch for efficient BLE transmission.
5//!
6//! This reduces radio activity and power consumption on constrained devices.
7
8#[cfg(not(feature = "std"))]
9use alloc::vec::Vec;
10
11use super::crdt::CrdtOperation;
12
13/// Configuration for batch accumulation
14#[derive(Debug, Clone)]
15pub struct BatchConfig {
16    /// Maximum time to wait before sending (milliseconds)
17    pub max_wait_ms: u64,
18    /// Maximum bytes to accumulate before sending
19    pub max_bytes: usize,
20    /// Maximum number of operations to accumulate
21    pub max_operations: usize,
22    /// Minimum time between syncs (milliseconds)
23    pub min_interval_ms: u64,
24}
25
26impl Default for BatchConfig {
27    fn default() -> Self {
28        Self {
29            max_wait_ms: 5000,     // 5 seconds
30            max_bytes: 512,        // Half a typical MTU-extended packet
31            max_operations: 20,    // Reasonable batch size
32            min_interval_ms: 1000, // At most 1 sync/second
33        }
34    }
35}
36
37impl BatchConfig {
38    /// Create a config optimized for power efficiency
39    pub fn low_power() -> Self {
40        Self {
41            max_wait_ms: 30_000, // 30 seconds
42            max_bytes: 1024,
43            max_operations: 50,
44            min_interval_ms: 10_000, // At most 1 sync every 10 seconds
45        }
46    }
47
48    /// Create a config for more responsive sync
49    pub fn responsive() -> Self {
50        Self {
51            max_wait_ms: 1000, // 1 second
52            max_bytes: 256,
53            max_operations: 10,
54            min_interval_ms: 500,
55        }
56    }
57}
58
59/// Batch of operations ready to send
60#[derive(Debug, Clone)]
61pub struct OperationBatch {
62    /// Operations in this batch
63    pub operations: Vec<CrdtOperation>,
64    /// Total size in bytes
65    pub total_bytes: usize,
66    /// Timestamp when batch was created
67    pub created_at: u64,
68}
69
70impl OperationBatch {
71    /// Check if batch is empty
72    pub fn is_empty(&self) -> bool {
73        self.operations.is_empty()
74    }
75
76    /// Get number of operations
77    pub fn len(&self) -> usize {
78        self.operations.len()
79    }
80
81    /// Encode all operations to bytes
82    pub fn encode(&self) -> Vec<u8> {
83        let mut buf = Vec::with_capacity(self.total_bytes + 4);
84        // Operation count
85        buf.extend_from_slice(&(self.operations.len() as u16).to_le_bytes());
86        // Encode each operation
87        for op in &self.operations {
88            let encoded = op.encode();
89            buf.extend_from_slice(&(encoded.len() as u16).to_le_bytes());
90            buf.extend_from_slice(&encoded);
91        }
92        buf
93    }
94
95    /// Decode operations from bytes
96    pub fn decode(data: &[u8]) -> Option<Self> {
97        if data.len() < 2 {
98            return None;
99        }
100
101        let op_count = u16::from_le_bytes([data[0], data[1]]) as usize;
102        let mut operations = Vec::with_capacity(op_count);
103        let mut offset = 2;
104        let mut total_bytes = 0;
105
106        for _ in 0..op_count {
107            if offset + 2 > data.len() {
108                return None;
109            }
110            let op_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
111            offset += 2;
112
113            if offset + op_len > data.len() {
114                return None;
115            }
116            let op = CrdtOperation::decode(&data[offset..offset + op_len])?;
117            total_bytes += op.size();
118            operations.push(op);
119            offset += op_len;
120        }
121
122        Some(Self {
123            operations,
124            total_bytes,
125            created_at: 0,
126        })
127    }
128}
129
130/// Accumulates CRDT operations for batched transmission
131#[derive(Debug)]
132pub struct BatchAccumulator {
133    /// Configuration
134    config: BatchConfig,
135    /// Pending operations
136    pending: Vec<CrdtOperation>,
137    /// Accumulated bytes
138    bytes_accumulated: usize,
139    /// Time of first pending operation
140    first_pending_time: Option<u64>,
141    /// Time of last sync
142    last_sync_time: u64,
143}
144
145impl BatchAccumulator {
146    /// Create a new accumulator with the given config
147    pub fn new(config: BatchConfig) -> Self {
148        Self {
149            config,
150            pending: Vec::new(),
151            bytes_accumulated: 0,
152            first_pending_time: None,
153            last_sync_time: 0,
154        }
155    }
156
157    /// Create with default config
158    pub fn with_defaults() -> Self {
159        Self::new(BatchConfig::default())
160    }
161
162    /// Add an operation to the batch
163    ///
164    /// Returns true if the operation was added, false if batch is full
165    pub fn add(&mut self, op: CrdtOperation, current_time_ms: u64) -> bool {
166        let op_size = op.size();
167
168        // Check if we have room
169        if self.bytes_accumulated + op_size > self.config.max_bytes && !self.pending.is_empty() {
170            return false;
171        }
172        if self.pending.len() >= self.config.max_operations {
173            return false;
174        }
175
176        // Record first pending time
177        if self.first_pending_time.is_none() {
178            self.first_pending_time = Some(current_time_ms);
179        }
180
181        self.pending.push(op);
182        self.bytes_accumulated += op_size;
183        true
184    }
185
186    /// Check if the batch should be flushed
187    pub fn should_flush(&self, current_time_ms: u64) -> bool {
188        if self.pending.is_empty() {
189            return false;
190        }
191
192        // Check if minimum interval has passed
193        if current_time_ms < self.last_sync_time + self.config.min_interval_ms {
194            return false;
195        }
196
197        // Check size limits
198        if self.bytes_accumulated >= self.config.max_bytes {
199            return true;
200        }
201        if self.pending.len() >= self.config.max_operations {
202            return true;
203        }
204
205        // Check time limit
206        if let Some(first_time) = self.first_pending_time {
207            if current_time_ms >= first_time + self.config.max_wait_ms {
208                return true;
209            }
210        }
211
212        false
213    }
214
215    /// Flush pending operations into a batch
216    pub fn flush(&mut self, current_time_ms: u64) -> Option<OperationBatch> {
217        if self.pending.is_empty() {
218            return None;
219        }
220
221        let batch = OperationBatch {
222            operations: core::mem::take(&mut self.pending),
223            total_bytes: self.bytes_accumulated,
224            created_at: current_time_ms,
225        };
226
227        self.bytes_accumulated = 0;
228        self.first_pending_time = None;
229        self.last_sync_time = current_time_ms;
230
231        Some(batch)
232    }
233
234    /// Force flush regardless of timing constraints
235    pub fn force_flush(&mut self, current_time_ms: u64) -> Option<OperationBatch> {
236        self.flush(current_time_ms)
237    }
238
239    /// Get number of pending operations
240    pub fn pending_count(&self) -> usize {
241        self.pending.len()
242    }
243
244    /// Get accumulated bytes
245    pub fn accumulated_bytes(&self) -> usize {
246        self.bytes_accumulated
247    }
248
249    /// Check if there are pending operations
250    pub fn has_pending(&self) -> bool {
251        !self.pending.is_empty()
252    }
253
254    /// Clear all pending operations without flushing
255    pub fn clear(&mut self) {
256        self.pending.clear();
257        self.bytes_accumulated = 0;
258        self.first_pending_time = None;
259    }
260
261    /// Get the config
262    pub fn config(&self) -> &BatchConfig {
263        &self.config
264    }
265
266    /// Update the config
267    pub fn set_config(&mut self, config: BatchConfig) {
268        self.config = config;
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::sync::crdt::Position;
276    use crate::NodeId;
277
278    fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
279        CrdtOperation::UpdatePosition {
280            node_id: NodeId::new(node_id),
281            position: Position::new(37.0, -122.0),
282            timestamp,
283        }
284    }
285
286    #[test]
287    fn test_batch_config_defaults() {
288        let config = BatchConfig::default();
289        assert_eq!(config.max_wait_ms, 5000);
290        assert_eq!(config.max_bytes, 512);
291    }
292
293    #[test]
294    fn test_accumulator_add() {
295        let mut acc = BatchAccumulator::with_defaults();
296
297        let op = make_position_op(1, 1000);
298        assert!(acc.add(op, 1000));
299        assert_eq!(acc.pending_count(), 1);
300        assert!(acc.has_pending());
301    }
302
303    #[test]
304    fn test_accumulator_max_operations() {
305        let config = BatchConfig {
306            max_operations: 2,
307            ..Default::default()
308        };
309        let mut acc = BatchAccumulator::new(config);
310
311        assert!(acc.add(make_position_op(1, 1000), 1000));
312        assert!(acc.add(make_position_op(2, 1001), 1001));
313        assert!(!acc.add(make_position_op(3, 1002), 1002)); // Should fail
314        assert_eq!(acc.pending_count(), 2);
315    }
316
317    #[test]
318    fn test_accumulator_flush() {
319        let mut acc = BatchAccumulator::with_defaults();
320
321        acc.add(make_position_op(1, 1000), 1000);
322        acc.add(make_position_op(2, 1001), 1001);
323
324        let batch = acc.flush(2000).unwrap();
325        assert_eq!(batch.len(), 2);
326        assert!(!acc.has_pending());
327    }
328
329    #[test]
330    fn test_should_flush_time() {
331        let config = BatchConfig {
332            max_wait_ms: 100,
333            min_interval_ms: 0,
334            ..Default::default()
335        };
336        let mut acc = BatchAccumulator::new(config);
337
338        acc.add(make_position_op(1, 1000), 1000);
339
340        // Not enough time passed
341        assert!(!acc.should_flush(1050));
342
343        // Time limit reached
344        assert!(acc.should_flush(1100));
345    }
346
347    #[test]
348    fn test_should_flush_size() {
349        let config = BatchConfig {
350            max_bytes: 20, // Small limit (each position op is ~21 bytes)
351            min_interval_ms: 0,
352            ..Default::default()
353        };
354        let mut acc = BatchAccumulator::new(config);
355
356        // First op (21 bytes) exceeds max_bytes (20), but allowed since batch is empty
357        assert!(acc.add(make_position_op(1, 1000), 1000));
358        // Should flush because we've exceeded max_bytes
359        assert!(acc.should_flush(1000));
360    }
361
362    #[test]
363    fn test_min_interval() {
364        let config = BatchConfig {
365            max_wait_ms: 100,
366            min_interval_ms: 1000,
367            ..Default::default()
368        };
369        let mut acc = BatchAccumulator::new(config);
370
371        acc.add(make_position_op(1, 0), 0);
372        acc.flush(0);
373
374        acc.add(make_position_op(2, 100), 100);
375
376        // Min interval not passed
377        assert!(!acc.should_flush(500));
378
379        // Min interval passed
380        assert!(acc.should_flush(1100));
381    }
382
383    #[test]
384    fn test_batch_encode_decode() {
385        let batch = OperationBatch {
386            operations: vec![make_position_op(1, 1000), make_position_op(2, 2000)],
387            total_bytes: 100,
388            created_at: 3000,
389        };
390
391        let encoded = batch.encode();
392        let decoded = OperationBatch::decode(&encoded).unwrap();
393
394        assert_eq!(decoded.len(), 2);
395    }
396
397    #[test]
398    fn test_clear() {
399        let mut acc = BatchAccumulator::with_defaults();
400
401        acc.add(make_position_op(1, 1000), 1000);
402        acc.add(make_position_op(2, 1001), 1001);
403
404        acc.clear();
405        assert!(!acc.has_pending());
406        assert_eq!(acc.accumulated_bytes(), 0);
407    }
408
409    #[test]
410    fn test_force_flush() {
411        let config = BatchConfig {
412            min_interval_ms: 10000, // Long interval
413            ..Default::default()
414        };
415        let mut acc = BatchAccumulator::new(config);
416
417        acc.add(make_position_op(1, 0), 0);
418        acc.flush(0);
419
420        acc.add(make_position_op(2, 100), 100);
421
422        // Normal flush blocked by min_interval
423        assert!(!acc.should_flush(100));
424
425        // Force flush works anyway
426        let batch = acc.force_flush(100);
427        assert!(batch.is_some());
428    }
429}