1#[cfg(not(feature = "std"))]
22use alloc::{collections::BTreeMap, collections::VecDeque, vec::Vec};
23#[cfg(feature = "std")]
24use std::collections::{HashMap, VecDeque};
25
26use super::batch::{BatchAccumulator, BatchConfig, OperationBatch};
27use super::crdt::CrdtOperation;
28use super::delta::{DeltaEncoder, VectorClock};
29use crate::NodeId;
30
31pub const DEFAULT_MTU: usize = 23;
33
34pub const MAX_MTU: usize = 517;
36
37pub const CHUNK_HEADER_SIZE: usize = 8;
39
40#[derive(Debug, Clone, Copy)]
42pub struct ChunkHeader {
43 pub message_id: u32,
45 pub chunk_index: u16,
47 pub total_chunks: u16,
49}
50
51impl ChunkHeader {
52 pub fn encode(&self) -> [u8; CHUNK_HEADER_SIZE] {
54 let mut buf = [0u8; CHUNK_HEADER_SIZE];
55 buf[0..4].copy_from_slice(&self.message_id.to_le_bytes());
56 buf[4..6].copy_from_slice(&self.chunk_index.to_le_bytes());
57 buf[6..8].copy_from_slice(&self.total_chunks.to_le_bytes());
58 buf
59 }
60
61 pub fn decode(data: &[u8]) -> Option<Self> {
63 if data.len() < CHUNK_HEADER_SIZE {
64 return None;
65 }
66 Some(Self {
67 message_id: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
68 chunk_index: u16::from_le_bytes([data[4], data[5]]),
69 total_chunks: u16::from_le_bytes([data[6], data[7]]),
70 })
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct SyncChunk {
77 pub header: ChunkHeader,
79 pub payload: Vec<u8>,
81}
82
83impl SyncChunk {
84 pub fn encode(&self) -> Vec<u8> {
86 let mut buf = Vec::with_capacity(CHUNK_HEADER_SIZE + self.payload.len());
87 buf.extend_from_slice(&self.header.encode());
88 buf.extend_from_slice(&self.payload);
89 buf
90 }
91
92 pub fn decode(data: &[u8]) -> Option<Self> {
94 let header = ChunkHeader::decode(data)?;
95 let payload = data[CHUNK_HEADER_SIZE..].to_vec();
96 Some(Self { header, payload })
97 }
98
99 pub fn encoded_size(&self) -> usize {
101 CHUNK_HEADER_SIZE + self.payload.len()
102 }
103}
104
105#[derive(Debug)]
107pub struct ChunkReassembler {
108 #[cfg(feature = "std")]
110 partials: HashMap<u32, PartialMessage>,
111 #[cfg(not(feature = "std"))]
112 partials: BTreeMap<u32, PartialMessage>,
113
114 #[allow(dead_code)]
116 max_partials: usize,
117
118 partial_timeout_ms: u64,
120}
121
122#[derive(Debug, Clone)]
124struct PartialMessage {
125 total_chunks: u16,
127 #[cfg(feature = "std")]
129 chunks: HashMap<u16, Vec<u8>>,
130 #[cfg(not(feature = "std"))]
131 chunks: BTreeMap<u16, Vec<u8>>,
132 started_at: u64,
134}
135
136impl ChunkReassembler {
137 pub fn new() -> Self {
139 Self {
140 #[cfg(feature = "std")]
141 partials: HashMap::new(),
142 #[cfg(not(feature = "std"))]
143 partials: BTreeMap::new(),
144 max_partials: 8,
145 partial_timeout_ms: 30_000,
146 }
147 }
148
149 pub fn process(&mut self, chunk: SyncChunk, current_time_ms: u64) -> Option<Vec<u8>> {
153 let msg_id = chunk.header.message_id;
154
155 if chunk.header.total_chunks == 1 {
157 return Some(chunk.payload);
158 }
159
160 let partial = self
162 .partials
163 .entry(msg_id)
164 .or_insert_with(|| PartialMessage {
165 total_chunks: chunk.header.total_chunks,
166 #[cfg(feature = "std")]
167 chunks: HashMap::new(),
168 #[cfg(not(feature = "std"))]
169 chunks: BTreeMap::new(),
170 started_at: current_time_ms,
171 });
172
173 partial
175 .chunks
176 .insert(chunk.header.chunk_index, chunk.payload);
177
178 if partial.chunks.len() == partial.total_chunks as usize {
180 let partial = self.partials.remove(&msg_id)?;
181
182 let mut result = Vec::new();
184 for i in 0..partial.total_chunks {
185 if let Some(data) = partial.chunks.get(&i) {
186 result.extend_from_slice(data);
187 } else {
188 return None;
190 }
191 }
192 return Some(result);
193 }
194
195 None
196 }
197
198 pub fn cleanup(&mut self, current_time_ms: u64) {
200 self.partials
201 .retain(|_, partial| current_time_ms - partial.started_at < self.partial_timeout_ms);
202 }
203
204 pub fn pending_count(&self) -> usize {
206 self.partials.len()
207 }
208}
209
210impl Default for ChunkReassembler {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216pub fn chunk_data(data: &[u8], mtu: usize, message_id: u32) -> Vec<SyncChunk> {
218 let payload_size = mtu.saturating_sub(CHUNK_HEADER_SIZE);
219 if payload_size == 0 {
220 return Vec::new();
221 }
222
223 let total_chunks = data.len().div_ceil(payload_size);
224 let total_chunks = total_chunks.max(1) as u16;
225
226 let mut chunks = Vec::with_capacity(total_chunks as usize);
227
228 for (i, chunk_data) in data.chunks(payload_size).enumerate() {
229 chunks.push(SyncChunk {
230 header: ChunkHeader {
231 message_id,
232 chunk_index: i as u16,
233 total_chunks,
234 },
235 payload: chunk_data.to_vec(),
236 });
237 }
238
239 if chunks.is_empty() {
241 chunks.push(SyncChunk {
242 header: ChunkHeader {
243 message_id,
244 chunk_index: 0,
245 total_chunks: 1,
246 },
247 payload: Vec::new(),
248 });
249 }
250
251 chunks
252}
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
256pub enum SyncState {
257 #[default]
259 Idle,
260 Sending,
262 Receiving,
264 WaitingAck,
266}
267
268#[derive(Debug, Clone)]
270pub struct SyncConfig {
271 pub mtu: usize,
273 pub batch: BatchConfig,
275 pub sync_interval_ms: u64,
277 pub enable_delta: bool,
279 pub max_retries: u8,
281}
282
283impl Default for SyncConfig {
284 fn default() -> Self {
285 Self {
286 mtu: DEFAULT_MTU,
287 batch: BatchConfig::default(),
288 sync_interval_ms: 5000,
289 enable_delta: true,
290 max_retries: 3,
291 }
292 }
293}
294
295impl SyncConfig {
296 pub fn low_power() -> Self {
298 Self {
299 mtu: DEFAULT_MTU,
300 batch: BatchConfig::low_power(),
301 sync_interval_ms: 30_000,
302 enable_delta: true,
303 max_retries: 2,
304 }
305 }
306
307 pub fn responsive() -> Self {
309 Self {
310 mtu: MAX_MTU,
311 batch: BatchConfig::responsive(),
312 sync_interval_ms: 1000,
313 enable_delta: true,
314 max_retries: 3,
315 }
316 }
317}
318
319pub struct GattSyncProtocol {
324 node_id: NodeId,
326
327 config: SyncConfig,
329
330 state: SyncState,
332
333 batch: BatchAccumulator,
335
336 delta: DeltaEncoder,
338
339 vector_clock: VectorClock,
341
342 tx_queue: VecDeque<SyncChunk>,
344
345 rx_reassembler: ChunkReassembler,
347
348 next_message_id: u32,
350
351 current_time_ms: u64,
353
354 last_sync_time_ms: u64,
356}
357
358impl GattSyncProtocol {
359 pub fn new(node_id: NodeId, config: SyncConfig) -> Self {
361 Self {
362 node_id,
363 batch: BatchAccumulator::new(config.batch.clone()),
364 delta: DeltaEncoder::new(node_id),
365 vector_clock: VectorClock::new(),
366 config,
367 state: SyncState::Idle,
368 tx_queue: VecDeque::new(),
369 rx_reassembler: ChunkReassembler::new(),
370 next_message_id: 1,
371 current_time_ms: 0,
372 last_sync_time_ms: 0,
373 }
374 }
375
376 pub fn with_defaults(node_id: NodeId) -> Self {
378 Self::new(node_id, SyncConfig::default())
379 }
380
381 pub fn set_time(&mut self, time_ms: u64) {
383 self.current_time_ms = time_ms;
384 }
385
386 pub fn set_mtu(&mut self, mtu: usize) {
388 self.config.mtu = mtu;
389 }
390
391 pub fn state(&self) -> SyncState {
393 self.state
394 }
395
396 pub fn vector_clock(&self) -> &VectorClock {
398 &self.vector_clock
399 }
400
401 pub fn add_peer(&mut self, peer_id: &NodeId) {
403 self.delta.add_peer(peer_id);
404 }
405
406 pub fn remove_peer(&mut self, peer_id: &NodeId) {
408 self.delta.remove_peer(peer_id);
409 }
410
411 pub fn queue_operation(&mut self, op: CrdtOperation) -> bool {
413 self.vector_clock.increment(&self.node_id);
415
416 self.batch.add(op, self.current_time_ms)
418 }
419
420 pub fn should_sync(&self) -> bool {
422 self.batch.should_flush(self.current_time_ms)
423 }
424
425 pub fn prepare_sync(&mut self, peer_id: &NodeId) -> Vec<SyncChunk> {
429 let batch = match self.batch.flush(self.current_time_ms) {
431 Some(b) => b,
432 None => return Vec::new(),
433 };
434
435 let operations = if self.config.enable_delta {
437 self.delta.filter_for_peer(peer_id, &batch.operations)
438 } else {
439 batch.operations.clone()
440 };
441
442 if operations.is_empty() {
443 return Vec::new();
444 }
445
446 let filtered_batch = OperationBatch {
448 operations: operations.clone(),
449 total_bytes: operations.iter().map(|o| o.size()).sum(),
450 created_at: batch.created_at,
451 };
452
453 let encoded = filtered_batch.encode();
455
456 let msg_id = self.next_message_id;
458 self.next_message_id = self.next_message_id.wrapping_add(1);
459
460 let chunks = chunk_data(&encoded, self.config.mtu, msg_id);
461
462 self.delta.mark_sent(peer_id, &operations);
464 self.delta.record_sent(peer_id, encoded.len());
465
466 self.state = SyncState::Sending;
467 self.last_sync_time_ms = self.current_time_ms;
468
469 chunks
470 }
471
472 pub fn next_tx_chunk(&mut self) -> Option<SyncChunk> {
474 self.tx_queue.pop_front()
475 }
476
477 pub fn queue_chunks(&mut self, chunks: Vec<SyncChunk>) {
479 self.tx_queue.extend(chunks);
480 }
481
482 pub fn has_pending_tx(&self) -> bool {
484 !self.tx_queue.is_empty()
485 }
486
487 pub fn process_received(
491 &mut self,
492 chunk: SyncChunk,
493 peer_id: &NodeId,
494 ) -> Option<Vec<CrdtOperation>> {
495 self.state = SyncState::Receiving;
496
497 let complete = self.rx_reassembler.process(chunk, self.current_time_ms)?;
499
500 let batch = OperationBatch::decode(&complete)?;
502
503 self.delta
505 .record_received(peer_id, complete.len(), self.current_time_ms);
506
507 for op in &batch.operations {
509 let timestamp = match op {
510 CrdtOperation::UpdatePosition { timestamp, .. } => *timestamp,
511 CrdtOperation::UpdateHealth { timestamp, .. } => *timestamp,
512 CrdtOperation::UpdateRegister { timestamp, .. } => *timestamp,
513 CrdtOperation::IncrementCounter { .. } => 0,
514 };
515 if timestamp > 0 {
516 self.vector_clock.update(peer_id, timestamp);
517 }
518 }
519
520 self.state = SyncState::Idle;
521 Some(batch.operations)
522 }
523
524 pub fn ack_send(&mut self) {
526 if self.tx_queue.is_empty() {
527 self.state = SyncState::Idle;
528 }
529 }
530
531 pub fn reset(&mut self) {
533 self.state = SyncState::Idle;
534 self.tx_queue.clear();
535 self.rx_reassembler = ChunkReassembler::new();
536 }
537
538 pub fn reset_peer(&mut self, peer_id: &NodeId) {
540 self.delta.reset_peer(peer_id);
541 }
542
543 pub fn tick(&mut self) {
545 self.rx_reassembler.cleanup(self.current_time_ms);
546 }
547
548 pub fn stats(&self) -> SyncStats {
550 let delta_stats = self.delta.stats();
551 SyncStats {
552 bytes_sent: delta_stats.total_bytes_sent,
553 bytes_received: delta_stats.total_bytes_received,
554 syncs_completed: delta_stats.total_syncs,
555 pending_operations: self.batch.pending_count(),
556 pending_tx_chunks: self.tx_queue.len(),
557 pending_rx_messages: self.rx_reassembler.pending_count(),
558 }
559 }
560}
561
562#[derive(Debug, Clone, Default)]
564pub struct SyncStats {
565 pub bytes_sent: u64,
567 pub bytes_received: u64,
569 pub syncs_completed: u32,
571 pub pending_operations: usize,
573 pub pending_tx_chunks: usize,
575 pub pending_rx_messages: usize,
577}
578
579#[cfg(test)]
580mod tests {
581 use super::*;
582 use crate::sync::crdt::Position;
583
584 fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
585 CrdtOperation::UpdatePosition {
586 node_id: NodeId::new(node_id),
587 position: Position::new(37.0, -122.0),
588 timestamp,
589 }
590 }
591
592 #[test]
593 fn test_chunk_header_encode_decode() {
594 let header = ChunkHeader {
595 message_id: 0x12345678,
596 chunk_index: 5,
597 total_chunks: 10,
598 };
599
600 let encoded = header.encode();
601 let decoded = ChunkHeader::decode(&encoded).unwrap();
602
603 assert_eq!(decoded.message_id, 0x12345678);
604 assert_eq!(decoded.chunk_index, 5);
605 assert_eq!(decoded.total_chunks, 10);
606 }
607
608 #[test]
609 fn test_chunk_data_single() {
610 let data = vec![1, 2, 3, 4, 5];
611 let chunks = chunk_data(&data, 100, 1);
612
613 assert_eq!(chunks.len(), 1);
614 assert_eq!(chunks[0].header.total_chunks, 1);
615 assert_eq!(chunks[0].payload, data);
616 }
617
618 #[test]
619 fn test_chunk_data_multiple() {
620 let data = vec![0u8; 100];
621 let mtu = 20; let chunks = chunk_data(&data, mtu, 1);
623
624 assert_eq!(chunks.len(), 9);
626 assert_eq!(chunks[0].header.total_chunks, 9);
627
628 for (i, chunk) in chunks.iter().enumerate() {
630 assert_eq!(chunk.header.chunk_index, i as u16);
631 if i < 8 {
632 assert_eq!(chunk.payload.len(), 12);
633 } else {
634 assert_eq!(chunk.payload.len(), 4); }
636 }
637 }
638
639 #[test]
640 fn test_chunk_reassembler_single() {
641 let mut reassembler = ChunkReassembler::new();
642
643 let chunk = SyncChunk {
644 header: ChunkHeader {
645 message_id: 1,
646 chunk_index: 0,
647 total_chunks: 1,
648 },
649 payload: vec![1, 2, 3],
650 };
651
652 let result = reassembler.process(chunk, 0).unwrap();
653 assert_eq!(result, vec![1, 2, 3]);
654 }
655
656 #[test]
657 fn test_chunk_reassembler_multiple() {
658 let mut reassembler = ChunkReassembler::new();
659
660 let chunk2 = SyncChunk {
662 header: ChunkHeader {
663 message_id: 1,
664 chunk_index: 1,
665 total_chunks: 3,
666 },
667 payload: vec![4, 5, 6],
668 };
669
670 let chunk1 = SyncChunk {
671 header: ChunkHeader {
672 message_id: 1,
673 chunk_index: 0,
674 total_chunks: 3,
675 },
676 payload: vec![1, 2, 3],
677 };
678
679 let chunk3 = SyncChunk {
680 header: ChunkHeader {
681 message_id: 1,
682 chunk_index: 2,
683 total_chunks: 3,
684 },
685 payload: vec![7, 8, 9],
686 };
687
688 assert!(reassembler.process(chunk2, 0).is_none());
689 assert!(reassembler.process(chunk1, 0).is_none());
690
691 let result = reassembler.process(chunk3, 0).unwrap();
692 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
693 }
694
695 #[test]
696 fn test_sync_protocol_basic() {
697 let node1 = NodeId::new(1);
698 let node2 = NodeId::new(2);
699
700 let mut proto1 = GattSyncProtocol::with_defaults(node1);
701 proto1.add_peer(&node2);
702 proto1.set_mtu(100);
703
704 proto1.queue_operation(make_position_op(1, 1000));
706 proto1.queue_operation(make_position_op(1, 1001));
707
708 proto1.set_time(10000);
710
711 let chunks = proto1.prepare_sync(&node2);
713 assert!(!chunks.is_empty());
714 }
715
716 #[test]
717 fn test_sync_protocol_round_trip() {
718 let node1 = NodeId::new(1);
719 let node2 = NodeId::new(2);
720
721 let mut proto1 = GattSyncProtocol::with_defaults(node1);
722 let mut proto2 = GattSyncProtocol::with_defaults(node2);
723
724 proto1.add_peer(&node2);
725 proto2.add_peer(&node1);
726
727 proto1.set_mtu(100);
728 proto2.set_mtu(100);
729
730 proto1.queue_operation(make_position_op(1, 1000));
732 proto1.set_time(10000);
733
734 let chunks = proto1.prepare_sync(&node2);
736
737 let mut ops = None;
739 for chunk in chunks {
740 ops = proto2.process_received(chunk, &node1);
741 }
742
743 let ops = ops.unwrap();
745 assert_eq!(ops.len(), 1);
746 }
747
748 #[test]
749 fn test_sync_config_profiles() {
750 let low_power = SyncConfig::low_power();
751 assert_eq!(low_power.sync_interval_ms, 30_000);
752
753 let responsive = SyncConfig::responsive();
754 assert_eq!(responsive.sync_interval_ms, 1000);
755 }
756
757 #[test]
758 fn test_sync_stats() {
759 let proto = GattSyncProtocol::with_defaults(NodeId::new(1));
760 let stats = proto.stats();
761
762 assert_eq!(stats.bytes_sent, 0);
763 assert_eq!(stats.pending_operations, 0);
764 }
765}