1#[cfg(not(feature = "std"))]
24use alloc::vec::Vec;
25
26use super::crdt::CrdtOperation;
27
28#[derive(Debug, Clone)]
30pub struct BatchConfig {
31 pub max_wait_ms: u64,
33 pub max_bytes: usize,
35 pub max_operations: usize,
37 pub min_interval_ms: u64,
39}
40
41impl Default for BatchConfig {
42 fn default() -> Self {
43 Self {
44 max_wait_ms: 5000, max_bytes: 512, max_operations: 20, min_interval_ms: 1000, }
49 }
50}
51
52impl BatchConfig {
53 pub fn low_power() -> Self {
55 Self {
56 max_wait_ms: 30_000, max_bytes: 1024,
58 max_operations: 50,
59 min_interval_ms: 10_000, }
61 }
62
63 pub fn responsive() -> Self {
65 Self {
66 max_wait_ms: 1000, max_bytes: 256,
68 max_operations: 10,
69 min_interval_ms: 500,
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct OperationBatch {
77 pub operations: Vec<CrdtOperation>,
79 pub total_bytes: usize,
81 pub created_at: u64,
83}
84
85impl OperationBatch {
86 pub fn is_empty(&self) -> bool {
88 self.operations.is_empty()
89 }
90
91 pub fn len(&self) -> usize {
93 self.operations.len()
94 }
95
96 pub fn encode(&self) -> Vec<u8> {
98 let mut buf = Vec::with_capacity(self.total_bytes + 4);
99 buf.extend_from_slice(&(self.operations.len() as u16).to_le_bytes());
101 for op in &self.operations {
103 let encoded = op.encode();
104 buf.extend_from_slice(&(encoded.len() as u16).to_le_bytes());
105 buf.extend_from_slice(&encoded);
106 }
107 buf
108 }
109
110 pub fn decode(data: &[u8]) -> Option<Self> {
112 if data.len() < 2 {
113 return None;
114 }
115
116 let op_count = u16::from_le_bytes([data[0], data[1]]) as usize;
117 let mut operations = Vec::with_capacity(op_count);
118 let mut offset = 2;
119 let mut total_bytes = 0;
120
121 for _ in 0..op_count {
122 if offset + 2 > data.len() {
123 return None;
124 }
125 let op_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
126 offset += 2;
127
128 if offset + op_len > data.len() {
129 return None;
130 }
131 let op = CrdtOperation::decode(&data[offset..offset + op_len])?;
132 total_bytes += op.size();
133 operations.push(op);
134 offset += op_len;
135 }
136
137 Some(Self {
138 operations,
139 total_bytes,
140 created_at: 0,
141 })
142 }
143}
144
145#[derive(Debug)]
147pub struct BatchAccumulator {
148 config: BatchConfig,
150 pending: Vec<CrdtOperation>,
152 bytes_accumulated: usize,
154 first_pending_time: Option<u64>,
156 last_sync_time: u64,
158}
159
160impl BatchAccumulator {
161 pub fn new(config: BatchConfig) -> Self {
163 Self {
164 config,
165 pending: Vec::new(),
166 bytes_accumulated: 0,
167 first_pending_time: None,
168 last_sync_time: 0,
169 }
170 }
171
172 pub fn with_defaults() -> Self {
174 Self::new(BatchConfig::default())
175 }
176
177 pub fn add(&mut self, op: CrdtOperation, current_time_ms: u64) -> bool {
181 let op_size = op.size();
182
183 if self.bytes_accumulated + op_size > self.config.max_bytes && !self.pending.is_empty() {
185 return false;
186 }
187 if self.pending.len() >= self.config.max_operations {
188 return false;
189 }
190
191 if self.first_pending_time.is_none() {
193 self.first_pending_time = Some(current_time_ms);
194 }
195
196 self.pending.push(op);
197 self.bytes_accumulated += op_size;
198 true
199 }
200
201 pub fn should_flush(&self, current_time_ms: u64) -> bool {
203 if self.pending.is_empty() {
204 return false;
205 }
206
207 if current_time_ms < self.last_sync_time + self.config.min_interval_ms {
209 return false;
210 }
211
212 if self.bytes_accumulated >= self.config.max_bytes {
214 return true;
215 }
216 if self.pending.len() >= self.config.max_operations {
217 return true;
218 }
219
220 if let Some(first_time) = self.first_pending_time {
222 if current_time_ms >= first_time + self.config.max_wait_ms {
223 return true;
224 }
225 }
226
227 false
228 }
229
230 pub fn flush(&mut self, current_time_ms: u64) -> Option<OperationBatch> {
232 if self.pending.is_empty() {
233 return None;
234 }
235
236 let batch = OperationBatch {
237 operations: core::mem::take(&mut self.pending),
238 total_bytes: self.bytes_accumulated,
239 created_at: current_time_ms,
240 };
241
242 self.bytes_accumulated = 0;
243 self.first_pending_time = None;
244 self.last_sync_time = current_time_ms;
245
246 Some(batch)
247 }
248
249 pub fn force_flush(&mut self, current_time_ms: u64) -> Option<OperationBatch> {
251 self.flush(current_time_ms)
252 }
253
254 pub fn pending_count(&self) -> usize {
256 self.pending.len()
257 }
258
259 pub fn accumulated_bytes(&self) -> usize {
261 self.bytes_accumulated
262 }
263
264 pub fn has_pending(&self) -> bool {
266 !self.pending.is_empty()
267 }
268
269 pub fn clear(&mut self) {
271 self.pending.clear();
272 self.bytes_accumulated = 0;
273 self.first_pending_time = None;
274 }
275
276 pub fn config(&self) -> &BatchConfig {
278 &self.config
279 }
280
281 pub fn set_config(&mut self, config: BatchConfig) {
283 self.config = config;
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::sync::crdt::Position;
291 use crate::NodeId;
292
293 fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
294 CrdtOperation::UpdatePosition {
295 node_id: NodeId::new(node_id),
296 position: Position::new(37.0, -122.0),
297 timestamp,
298 }
299 }
300
301 #[test]
302 fn test_batch_config_defaults() {
303 let config = BatchConfig::default();
304 assert_eq!(config.max_wait_ms, 5000);
305 assert_eq!(config.max_bytes, 512);
306 }
307
308 #[test]
309 fn test_accumulator_add() {
310 let mut acc = BatchAccumulator::with_defaults();
311
312 let op = make_position_op(1, 1000);
313 assert!(acc.add(op, 1000));
314 assert_eq!(acc.pending_count(), 1);
315 assert!(acc.has_pending());
316 }
317
318 #[test]
319 fn test_accumulator_max_operations() {
320 let config = BatchConfig {
321 max_operations: 2,
322 ..Default::default()
323 };
324 let mut acc = BatchAccumulator::new(config);
325
326 assert!(acc.add(make_position_op(1, 1000), 1000));
327 assert!(acc.add(make_position_op(2, 1001), 1001));
328 assert!(!acc.add(make_position_op(3, 1002), 1002)); assert_eq!(acc.pending_count(), 2);
330 }
331
332 #[test]
333 fn test_accumulator_flush() {
334 let mut acc = BatchAccumulator::with_defaults();
335
336 acc.add(make_position_op(1, 1000), 1000);
337 acc.add(make_position_op(2, 1001), 1001);
338
339 let batch = acc.flush(2000).unwrap();
340 assert_eq!(batch.len(), 2);
341 assert!(!acc.has_pending());
342 }
343
344 #[test]
345 fn test_should_flush_time() {
346 let config = BatchConfig {
347 max_wait_ms: 100,
348 min_interval_ms: 0,
349 ..Default::default()
350 };
351 let mut acc = BatchAccumulator::new(config);
352
353 acc.add(make_position_op(1, 1000), 1000);
354
355 assert!(!acc.should_flush(1050));
357
358 assert!(acc.should_flush(1100));
360 }
361
362 #[test]
363 fn test_should_flush_size() {
364 let config = BatchConfig {
365 max_bytes: 20, min_interval_ms: 0,
367 ..Default::default()
368 };
369 let mut acc = BatchAccumulator::new(config);
370
371 assert!(acc.add(make_position_op(1, 1000), 1000));
373 assert!(acc.should_flush(1000));
375 }
376
377 #[test]
378 fn test_min_interval() {
379 let config = BatchConfig {
380 max_wait_ms: 100,
381 min_interval_ms: 1000,
382 ..Default::default()
383 };
384 let mut acc = BatchAccumulator::new(config);
385
386 acc.add(make_position_op(1, 0), 0);
387 acc.flush(0);
388
389 acc.add(make_position_op(2, 100), 100);
390
391 assert!(!acc.should_flush(500));
393
394 assert!(acc.should_flush(1100));
396 }
397
398 #[test]
399 fn test_batch_encode_decode() {
400 let batch = OperationBatch {
401 operations: vec![make_position_op(1, 1000), make_position_op(2, 2000)],
402 total_bytes: 100,
403 created_at: 3000,
404 };
405
406 let encoded = batch.encode();
407 let decoded = OperationBatch::decode(&encoded).unwrap();
408
409 assert_eq!(decoded.len(), 2);
410 }
411
412 #[test]
413 fn test_clear() {
414 let mut acc = BatchAccumulator::with_defaults();
415
416 acc.add(make_position_op(1, 1000), 1000);
417 acc.add(make_position_op(2, 1001), 1001);
418
419 acc.clear();
420 assert!(!acc.has_pending());
421 assert_eq!(acc.accumulated_bytes(), 0);
422 }
423
424 #[test]
425 fn test_force_flush() {
426 let config = BatchConfig {
427 min_interval_ms: 10000, ..Default::default()
429 };
430 let mut acc = BatchAccumulator::new(config);
431
432 acc.add(make_position_op(1, 0), 0);
433 acc.flush(0);
434
435 acc.add(make_position_op(2, 100), 100);
436
437 assert!(!acc.should_flush(100));
439
440 let batch = acc.force_flush(100);
442 assert!(batch.is_some());
443 }
444}