hive_btle/sync/
batch.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Batch Accumulator for HIVE-Lite Sync
17//!
18//! Collects CRDT operations over a configurable time window or size limit,
19//! then emits them as a batch for efficient BLE transmission.
20//!
21//! This reduces radio activity and power consumption on constrained devices.
22
23#[cfg(not(feature = "std"))]
24use alloc::vec::Vec;
25
26use super::crdt::CrdtOperation;
27
28/// Configuration for batch accumulation
29#[derive(Debug, Clone)]
30pub struct BatchConfig {
31    /// Maximum time to wait before sending (milliseconds)
32    pub max_wait_ms: u64,
33    /// Maximum bytes to accumulate before sending
34    pub max_bytes: usize,
35    /// Maximum number of operations to accumulate
36    pub max_operations: usize,
37    /// Minimum time between syncs (milliseconds)
38    pub min_interval_ms: u64,
39}
40
41impl Default for BatchConfig {
42    fn default() -> Self {
43        Self {
44            max_wait_ms: 5000,     // 5 seconds
45            max_bytes: 512,        // Half a typical MTU-extended packet
46            max_operations: 20,    // Reasonable batch size
47            min_interval_ms: 1000, // At most 1 sync/second
48        }
49    }
50}
51
52impl BatchConfig {
53    /// Create a config optimized for power efficiency
54    pub fn low_power() -> Self {
55        Self {
56            max_wait_ms: 30_000, // 30 seconds
57            max_bytes: 1024,
58            max_operations: 50,
59            min_interval_ms: 10_000, // At most 1 sync every 10 seconds
60        }
61    }
62
63    /// Create a config for more responsive sync
64    pub fn responsive() -> Self {
65        Self {
66            max_wait_ms: 1000, // 1 second
67            max_bytes: 256,
68            max_operations: 10,
69            min_interval_ms: 500,
70        }
71    }
72}
73
74/// Batch of operations ready to send
75#[derive(Debug, Clone)]
76pub struct OperationBatch {
77    /// Operations in this batch
78    pub operations: Vec<CrdtOperation>,
79    /// Total size in bytes
80    pub total_bytes: usize,
81    /// Timestamp when batch was created
82    pub created_at: u64,
83}
84
85impl OperationBatch {
86    /// Check if batch is empty
87    pub fn is_empty(&self) -> bool {
88        self.operations.is_empty()
89    }
90
91    /// Get number of operations
92    pub fn len(&self) -> usize {
93        self.operations.len()
94    }
95
96    /// Encode all operations to bytes
97    pub fn encode(&self) -> Vec<u8> {
98        let mut buf = Vec::with_capacity(self.total_bytes + 4);
99        // Operation count
100        buf.extend_from_slice(&(self.operations.len() as u16).to_le_bytes());
101        // Encode each operation
102        for op in &self.operations {
103            let encoded = op.encode();
104            buf.extend_from_slice(&(encoded.len() as u16).to_le_bytes());
105            buf.extend_from_slice(&encoded);
106        }
107        buf
108    }
109
110    /// Decode operations from bytes
111    pub fn decode(data: &[u8]) -> Option<Self> {
112        if data.len() < 2 {
113            return None;
114        }
115
116        let op_count = u16::from_le_bytes([data[0], data[1]]) as usize;
117        let mut operations = Vec::with_capacity(op_count);
118        let mut offset = 2;
119        let mut total_bytes = 0;
120
121        for _ in 0..op_count {
122            if offset + 2 > data.len() {
123                return None;
124            }
125            let op_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
126            offset += 2;
127
128            if offset + op_len > data.len() {
129                return None;
130            }
131            let op = CrdtOperation::decode(&data[offset..offset + op_len])?;
132            total_bytes += op.size();
133            operations.push(op);
134            offset += op_len;
135        }
136
137        Some(Self {
138            operations,
139            total_bytes,
140            created_at: 0,
141        })
142    }
143}
144
145/// Accumulates CRDT operations for batched transmission
146#[derive(Debug)]
147pub struct BatchAccumulator {
148    /// Configuration
149    config: BatchConfig,
150    /// Pending operations
151    pending: Vec<CrdtOperation>,
152    /// Accumulated bytes
153    bytes_accumulated: usize,
154    /// Time of first pending operation
155    first_pending_time: Option<u64>,
156    /// Time of last sync
157    last_sync_time: u64,
158}
159
160impl BatchAccumulator {
161    /// Create a new accumulator with the given config
162    pub fn new(config: BatchConfig) -> Self {
163        Self {
164            config,
165            pending: Vec::new(),
166            bytes_accumulated: 0,
167            first_pending_time: None,
168            last_sync_time: 0,
169        }
170    }
171
172    /// Create with default config
173    pub fn with_defaults() -> Self {
174        Self::new(BatchConfig::default())
175    }
176
177    /// Add an operation to the batch
178    ///
179    /// Returns true if the operation was added, false if batch is full
180    pub fn add(&mut self, op: CrdtOperation, current_time_ms: u64) -> bool {
181        let op_size = op.size();
182
183        // Check if we have room
184        if self.bytes_accumulated + op_size > self.config.max_bytes && !self.pending.is_empty() {
185            return false;
186        }
187        if self.pending.len() >= self.config.max_operations {
188            return false;
189        }
190
191        // Record first pending time
192        if self.first_pending_time.is_none() {
193            self.first_pending_time = Some(current_time_ms);
194        }
195
196        self.pending.push(op);
197        self.bytes_accumulated += op_size;
198        true
199    }
200
201    /// Check if the batch should be flushed
202    pub fn should_flush(&self, current_time_ms: u64) -> bool {
203        if self.pending.is_empty() {
204            return false;
205        }
206
207        // Check if minimum interval has passed
208        if current_time_ms < self.last_sync_time + self.config.min_interval_ms {
209            return false;
210        }
211
212        // Check size limits
213        if self.bytes_accumulated >= self.config.max_bytes {
214            return true;
215        }
216        if self.pending.len() >= self.config.max_operations {
217            return true;
218        }
219
220        // Check time limit
221        if let Some(first_time) = self.first_pending_time {
222            if current_time_ms >= first_time + self.config.max_wait_ms {
223                return true;
224            }
225        }
226
227        false
228    }
229
230    /// Flush pending operations into a batch
231    pub fn flush(&mut self, current_time_ms: u64) -> Option<OperationBatch> {
232        if self.pending.is_empty() {
233            return None;
234        }
235
236        let batch = OperationBatch {
237            operations: core::mem::take(&mut self.pending),
238            total_bytes: self.bytes_accumulated,
239            created_at: current_time_ms,
240        };
241
242        self.bytes_accumulated = 0;
243        self.first_pending_time = None;
244        self.last_sync_time = current_time_ms;
245
246        Some(batch)
247    }
248
249    /// Force flush regardless of timing constraints
250    pub fn force_flush(&mut self, current_time_ms: u64) -> Option<OperationBatch> {
251        self.flush(current_time_ms)
252    }
253
254    /// Get number of pending operations
255    pub fn pending_count(&self) -> usize {
256        self.pending.len()
257    }
258
259    /// Get accumulated bytes
260    pub fn accumulated_bytes(&self) -> usize {
261        self.bytes_accumulated
262    }
263
264    /// Check if there are pending operations
265    pub fn has_pending(&self) -> bool {
266        !self.pending.is_empty()
267    }
268
269    /// Clear all pending operations without flushing
270    pub fn clear(&mut self) {
271        self.pending.clear();
272        self.bytes_accumulated = 0;
273        self.first_pending_time = None;
274    }
275
276    /// Get the config
277    pub fn config(&self) -> &BatchConfig {
278        &self.config
279    }
280
281    /// Update the config
282    pub fn set_config(&mut self, config: BatchConfig) {
283        self.config = config;
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::sync::crdt::Position;
291    use crate::NodeId;
292
293    fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
294        CrdtOperation::UpdatePosition {
295            node_id: NodeId::new(node_id),
296            position: Position::new(37.0, -122.0),
297            timestamp,
298        }
299    }
300
301    #[test]
302    fn test_batch_config_defaults() {
303        let config = BatchConfig::default();
304        assert_eq!(config.max_wait_ms, 5000);
305        assert_eq!(config.max_bytes, 512);
306    }
307
308    #[test]
309    fn test_accumulator_add() {
310        let mut acc = BatchAccumulator::with_defaults();
311
312        let op = make_position_op(1, 1000);
313        assert!(acc.add(op, 1000));
314        assert_eq!(acc.pending_count(), 1);
315        assert!(acc.has_pending());
316    }
317
318    #[test]
319    fn test_accumulator_max_operations() {
320        let config = BatchConfig {
321            max_operations: 2,
322            ..Default::default()
323        };
324        let mut acc = BatchAccumulator::new(config);
325
326        assert!(acc.add(make_position_op(1, 1000), 1000));
327        assert!(acc.add(make_position_op(2, 1001), 1001));
328        assert!(!acc.add(make_position_op(3, 1002), 1002)); // Should fail
329        assert_eq!(acc.pending_count(), 2);
330    }
331
332    #[test]
333    fn test_accumulator_flush() {
334        let mut acc = BatchAccumulator::with_defaults();
335
336        acc.add(make_position_op(1, 1000), 1000);
337        acc.add(make_position_op(2, 1001), 1001);
338
339        let batch = acc.flush(2000).unwrap();
340        assert_eq!(batch.len(), 2);
341        assert!(!acc.has_pending());
342    }
343
344    #[test]
345    fn test_should_flush_time() {
346        let config = BatchConfig {
347            max_wait_ms: 100,
348            min_interval_ms: 0,
349            ..Default::default()
350        };
351        let mut acc = BatchAccumulator::new(config);
352
353        acc.add(make_position_op(1, 1000), 1000);
354
355        // Not enough time passed
356        assert!(!acc.should_flush(1050));
357
358        // Time limit reached
359        assert!(acc.should_flush(1100));
360    }
361
362    #[test]
363    fn test_should_flush_size() {
364        let config = BatchConfig {
365            max_bytes: 20, // Small limit (each position op is ~21 bytes)
366            min_interval_ms: 0,
367            ..Default::default()
368        };
369        let mut acc = BatchAccumulator::new(config);
370
371        // First op (21 bytes) exceeds max_bytes (20), but allowed since batch is empty
372        assert!(acc.add(make_position_op(1, 1000), 1000));
373        // Should flush because we've exceeded max_bytes
374        assert!(acc.should_flush(1000));
375    }
376
377    #[test]
378    fn test_min_interval() {
379        let config = BatchConfig {
380            max_wait_ms: 100,
381            min_interval_ms: 1000,
382            ..Default::default()
383        };
384        let mut acc = BatchAccumulator::new(config);
385
386        acc.add(make_position_op(1, 0), 0);
387        acc.flush(0);
388
389        acc.add(make_position_op(2, 100), 100);
390
391        // Min interval not passed
392        assert!(!acc.should_flush(500));
393
394        // Min interval passed
395        assert!(acc.should_flush(1100));
396    }
397
398    #[test]
399    fn test_batch_encode_decode() {
400        let batch = OperationBatch {
401            operations: vec![make_position_op(1, 1000), make_position_op(2, 2000)],
402            total_bytes: 100,
403            created_at: 3000,
404        };
405
406        let encoded = batch.encode();
407        let decoded = OperationBatch::decode(&encoded).unwrap();
408
409        assert_eq!(decoded.len(), 2);
410    }
411
412    #[test]
413    fn test_clear() {
414        let mut acc = BatchAccumulator::with_defaults();
415
416        acc.add(make_position_op(1, 1000), 1000);
417        acc.add(make_position_op(2, 1001), 1001);
418
419        acc.clear();
420        assert!(!acc.has_pending());
421        assert_eq!(acc.accumulated_bytes(), 0);
422    }
423
424    #[test]
425    fn test_force_flush() {
426        let config = BatchConfig {
427            min_interval_ms: 10000, // Long interval
428            ..Default::default()
429        };
430        let mut acc = BatchAccumulator::new(config);
431
432        acc.add(make_position_op(1, 0), 0);
433        acc.flush(0);
434
435        acc.add(make_position_op(2, 100), 100);
436
437        // Normal flush blocked by min_interval
438        assert!(!acc.should_flush(100));
439
440        // Force flush works anyway
441        let batch = acc.force_flush(100);
442        assert!(batch.is_some());
443    }
444}