1#[cfg(not(feature = "std"))]
9use alloc::vec::Vec;
10
11use super::crdt::CrdtOperation;
12
13#[derive(Debug, Clone)]
15pub struct BatchConfig {
16 pub max_wait_ms: u64,
18 pub max_bytes: usize,
20 pub max_operations: usize,
22 pub min_interval_ms: u64,
24}
25
26impl Default for BatchConfig {
27 fn default() -> Self {
28 Self {
29 max_wait_ms: 5000, max_bytes: 512, max_operations: 20, min_interval_ms: 1000, }
34 }
35}
36
37impl BatchConfig {
38 pub fn low_power() -> Self {
40 Self {
41 max_wait_ms: 30_000, max_bytes: 1024,
43 max_operations: 50,
44 min_interval_ms: 10_000, }
46 }
47
48 pub fn responsive() -> Self {
50 Self {
51 max_wait_ms: 1000, max_bytes: 256,
53 max_operations: 10,
54 min_interval_ms: 500,
55 }
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct OperationBatch {
62 pub operations: Vec<CrdtOperation>,
64 pub total_bytes: usize,
66 pub created_at: u64,
68}
69
70impl OperationBatch {
71 pub fn is_empty(&self) -> bool {
73 self.operations.is_empty()
74 }
75
76 pub fn len(&self) -> usize {
78 self.operations.len()
79 }
80
81 pub fn encode(&self) -> Vec<u8> {
83 let mut buf = Vec::with_capacity(self.total_bytes + 4);
84 buf.extend_from_slice(&(self.operations.len() as u16).to_le_bytes());
86 for op in &self.operations {
88 let encoded = op.encode();
89 buf.extend_from_slice(&(encoded.len() as u16).to_le_bytes());
90 buf.extend_from_slice(&encoded);
91 }
92 buf
93 }
94
95 pub fn decode(data: &[u8]) -> Option<Self> {
97 if data.len() < 2 {
98 return None;
99 }
100
101 let op_count = u16::from_le_bytes([data[0], data[1]]) as usize;
102 let mut operations = Vec::with_capacity(op_count);
103 let mut offset = 2;
104 let mut total_bytes = 0;
105
106 for _ in 0..op_count {
107 if offset + 2 > data.len() {
108 return None;
109 }
110 let op_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
111 offset += 2;
112
113 if offset + op_len > data.len() {
114 return None;
115 }
116 let op = CrdtOperation::decode(&data[offset..offset + op_len])?;
117 total_bytes += op.size();
118 operations.push(op);
119 offset += op_len;
120 }
121
122 Some(Self {
123 operations,
124 total_bytes,
125 created_at: 0,
126 })
127 }
128}
129
130#[derive(Debug)]
132pub struct BatchAccumulator {
133 config: BatchConfig,
135 pending: Vec<CrdtOperation>,
137 bytes_accumulated: usize,
139 first_pending_time: Option<u64>,
141 last_sync_time: u64,
143}
144
145impl BatchAccumulator {
146 pub fn new(config: BatchConfig) -> Self {
148 Self {
149 config,
150 pending: Vec::new(),
151 bytes_accumulated: 0,
152 first_pending_time: None,
153 last_sync_time: 0,
154 }
155 }
156
157 pub fn with_defaults() -> Self {
159 Self::new(BatchConfig::default())
160 }
161
162 pub fn add(&mut self, op: CrdtOperation, current_time_ms: u64) -> bool {
166 let op_size = op.size();
167
168 if self.bytes_accumulated + op_size > self.config.max_bytes && !self.pending.is_empty() {
170 return false;
171 }
172 if self.pending.len() >= self.config.max_operations {
173 return false;
174 }
175
176 if self.first_pending_time.is_none() {
178 self.first_pending_time = Some(current_time_ms);
179 }
180
181 self.pending.push(op);
182 self.bytes_accumulated += op_size;
183 true
184 }
185
186 pub fn should_flush(&self, current_time_ms: u64) -> bool {
188 if self.pending.is_empty() {
189 return false;
190 }
191
192 if current_time_ms < self.last_sync_time + self.config.min_interval_ms {
194 return false;
195 }
196
197 if self.bytes_accumulated >= self.config.max_bytes {
199 return true;
200 }
201 if self.pending.len() >= self.config.max_operations {
202 return true;
203 }
204
205 if let Some(first_time) = self.first_pending_time {
207 if current_time_ms >= first_time + self.config.max_wait_ms {
208 return true;
209 }
210 }
211
212 false
213 }
214
215 pub fn flush(&mut self, current_time_ms: u64) -> Option<OperationBatch> {
217 if self.pending.is_empty() {
218 return None;
219 }
220
221 let batch = OperationBatch {
222 operations: core::mem::take(&mut self.pending),
223 total_bytes: self.bytes_accumulated,
224 created_at: current_time_ms,
225 };
226
227 self.bytes_accumulated = 0;
228 self.first_pending_time = None;
229 self.last_sync_time = current_time_ms;
230
231 Some(batch)
232 }
233
234 pub fn force_flush(&mut self, current_time_ms: u64) -> Option<OperationBatch> {
236 self.flush(current_time_ms)
237 }
238
239 pub fn pending_count(&self) -> usize {
241 self.pending.len()
242 }
243
244 pub fn accumulated_bytes(&self) -> usize {
246 self.bytes_accumulated
247 }
248
249 pub fn has_pending(&self) -> bool {
251 !self.pending.is_empty()
252 }
253
254 pub fn clear(&mut self) {
256 self.pending.clear();
257 self.bytes_accumulated = 0;
258 self.first_pending_time = None;
259 }
260
261 pub fn config(&self) -> &BatchConfig {
263 &self.config
264 }
265
266 pub fn set_config(&mut self, config: BatchConfig) {
268 self.config = config;
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::sync::crdt::Position;
276 use crate::NodeId;
277
278 fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
279 CrdtOperation::UpdatePosition {
280 node_id: NodeId::new(node_id),
281 position: Position::new(37.0, -122.0),
282 timestamp,
283 }
284 }
285
286 #[test]
287 fn test_batch_config_defaults() {
288 let config = BatchConfig::default();
289 assert_eq!(config.max_wait_ms, 5000);
290 assert_eq!(config.max_bytes, 512);
291 }
292
293 #[test]
294 fn test_accumulator_add() {
295 let mut acc = BatchAccumulator::with_defaults();
296
297 let op = make_position_op(1, 1000);
298 assert!(acc.add(op, 1000));
299 assert_eq!(acc.pending_count(), 1);
300 assert!(acc.has_pending());
301 }
302
303 #[test]
304 fn test_accumulator_max_operations() {
305 let config = BatchConfig {
306 max_operations: 2,
307 ..Default::default()
308 };
309 let mut acc = BatchAccumulator::new(config);
310
311 assert!(acc.add(make_position_op(1, 1000), 1000));
312 assert!(acc.add(make_position_op(2, 1001), 1001));
313 assert!(!acc.add(make_position_op(3, 1002), 1002)); assert_eq!(acc.pending_count(), 2);
315 }
316
317 #[test]
318 fn test_accumulator_flush() {
319 let mut acc = BatchAccumulator::with_defaults();
320
321 acc.add(make_position_op(1, 1000), 1000);
322 acc.add(make_position_op(2, 1001), 1001);
323
324 let batch = acc.flush(2000).unwrap();
325 assert_eq!(batch.len(), 2);
326 assert!(!acc.has_pending());
327 }
328
329 #[test]
330 fn test_should_flush_time() {
331 let config = BatchConfig {
332 max_wait_ms: 100,
333 min_interval_ms: 0,
334 ..Default::default()
335 };
336 let mut acc = BatchAccumulator::new(config);
337
338 acc.add(make_position_op(1, 1000), 1000);
339
340 assert!(!acc.should_flush(1050));
342
343 assert!(acc.should_flush(1100));
345 }
346
347 #[test]
348 fn test_should_flush_size() {
349 let config = BatchConfig {
350 max_bytes: 20, min_interval_ms: 0,
352 ..Default::default()
353 };
354 let mut acc = BatchAccumulator::new(config);
355
356 assert!(acc.add(make_position_op(1, 1000), 1000));
358 assert!(acc.should_flush(1000));
360 }
361
362 #[test]
363 fn test_min_interval() {
364 let config = BatchConfig {
365 max_wait_ms: 100,
366 min_interval_ms: 1000,
367 ..Default::default()
368 };
369 let mut acc = BatchAccumulator::new(config);
370
371 acc.add(make_position_op(1, 0), 0);
372 acc.flush(0);
373
374 acc.add(make_position_op(2, 100), 100);
375
376 assert!(!acc.should_flush(500));
378
379 assert!(acc.should_flush(1100));
381 }
382
383 #[test]
384 fn test_batch_encode_decode() {
385 let batch = OperationBatch {
386 operations: vec![make_position_op(1, 1000), make_position_op(2, 2000)],
387 total_bytes: 100,
388 created_at: 3000,
389 };
390
391 let encoded = batch.encode();
392 let decoded = OperationBatch::decode(&encoded).unwrap();
393
394 assert_eq!(decoded.len(), 2);
395 }
396
397 #[test]
398 fn test_clear() {
399 let mut acc = BatchAccumulator::with_defaults();
400
401 acc.add(make_position_op(1, 1000), 1000);
402 acc.add(make_position_op(2, 1001), 1001);
403
404 acc.clear();
405 assert!(!acc.has_pending());
406 assert_eq!(acc.accumulated_bytes(), 0);
407 }
408
409 #[test]
410 fn test_force_flush() {
411 let config = BatchConfig {
412 min_interval_ms: 10000, ..Default::default()
414 };
415 let mut acc = BatchAccumulator::new(config);
416
417 acc.add(make_position_op(1, 0), 0);
418 acc.flush(0);
419
420 acc.add(make_position_op(2, 100), 100);
421
422 assert!(!acc.should_flush(100));
424
425 let batch = acc.force_flush(100);
427 assert!(batch.is_some());
428 }
429}