1#[cfg(not(feature = "std"))]
7use alloc::{collections::BTreeMap, collections::VecDeque, vec::Vec};
8#[cfg(feature = "std")]
9use std::collections::{HashMap, VecDeque};
10
11use super::batch::{BatchAccumulator, BatchConfig, OperationBatch};
12use super::crdt::CrdtOperation;
13use super::delta::{DeltaEncoder, VectorClock};
14use crate::NodeId;
15
16pub const DEFAULT_MTU: usize = 23;
18
19pub const MAX_MTU: usize = 517;
21
22pub const CHUNK_HEADER_SIZE: usize = 8;
24
25#[derive(Debug, Clone, Copy)]
27pub struct ChunkHeader {
28 pub message_id: u32,
30 pub chunk_index: u16,
32 pub total_chunks: u16,
34}
35
36impl ChunkHeader {
37 pub fn encode(&self) -> [u8; CHUNK_HEADER_SIZE] {
39 let mut buf = [0u8; CHUNK_HEADER_SIZE];
40 buf[0..4].copy_from_slice(&self.message_id.to_le_bytes());
41 buf[4..6].copy_from_slice(&self.chunk_index.to_le_bytes());
42 buf[6..8].copy_from_slice(&self.total_chunks.to_le_bytes());
43 buf
44 }
45
46 pub fn decode(data: &[u8]) -> Option<Self> {
48 if data.len() < CHUNK_HEADER_SIZE {
49 return None;
50 }
51 Some(Self {
52 message_id: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
53 chunk_index: u16::from_le_bytes([data[4], data[5]]),
54 total_chunks: u16::from_le_bytes([data[6], data[7]]),
55 })
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct SyncChunk {
62 pub header: ChunkHeader,
64 pub payload: Vec<u8>,
66}
67
68impl SyncChunk {
69 pub fn encode(&self) -> Vec<u8> {
71 let mut buf = Vec::with_capacity(CHUNK_HEADER_SIZE + self.payload.len());
72 buf.extend_from_slice(&self.header.encode());
73 buf.extend_from_slice(&self.payload);
74 buf
75 }
76
77 pub fn decode(data: &[u8]) -> Option<Self> {
79 let header = ChunkHeader::decode(data)?;
80 let payload = data[CHUNK_HEADER_SIZE..].to_vec();
81 Some(Self { header, payload })
82 }
83
84 pub fn encoded_size(&self) -> usize {
86 CHUNK_HEADER_SIZE + self.payload.len()
87 }
88}
89
90#[derive(Debug)]
92pub struct ChunkReassembler {
93 #[cfg(feature = "std")]
95 partials: HashMap<u32, PartialMessage>,
96 #[cfg(not(feature = "std"))]
97 partials: BTreeMap<u32, PartialMessage>,
98
99 #[allow(dead_code)]
101 max_partials: usize,
102
103 partial_timeout_ms: u64,
105}
106
107#[derive(Debug, Clone)]
109struct PartialMessage {
110 total_chunks: u16,
112 #[cfg(feature = "std")]
114 chunks: HashMap<u16, Vec<u8>>,
115 #[cfg(not(feature = "std"))]
116 chunks: BTreeMap<u16, Vec<u8>>,
117 started_at: u64,
119}
120
121impl ChunkReassembler {
122 pub fn new() -> Self {
124 Self {
125 #[cfg(feature = "std")]
126 partials: HashMap::new(),
127 #[cfg(not(feature = "std"))]
128 partials: BTreeMap::new(),
129 max_partials: 8,
130 partial_timeout_ms: 30_000,
131 }
132 }
133
134 pub fn process(&mut self, chunk: SyncChunk, current_time_ms: u64) -> Option<Vec<u8>> {
138 let msg_id = chunk.header.message_id;
139
140 if chunk.header.total_chunks == 1 {
142 return Some(chunk.payload);
143 }
144
145 let partial = self
147 .partials
148 .entry(msg_id)
149 .or_insert_with(|| PartialMessage {
150 total_chunks: chunk.header.total_chunks,
151 #[cfg(feature = "std")]
152 chunks: HashMap::new(),
153 #[cfg(not(feature = "std"))]
154 chunks: BTreeMap::new(),
155 started_at: current_time_ms,
156 });
157
158 partial
160 .chunks
161 .insert(chunk.header.chunk_index, chunk.payload);
162
163 if partial.chunks.len() == partial.total_chunks as usize {
165 let partial = self.partials.remove(&msg_id)?;
166
167 let mut result = Vec::new();
169 for i in 0..partial.total_chunks {
170 if let Some(data) = partial.chunks.get(&i) {
171 result.extend_from_slice(data);
172 } else {
173 return None;
175 }
176 }
177 return Some(result);
178 }
179
180 None
181 }
182
183 pub fn cleanup(&mut self, current_time_ms: u64) {
185 self.partials
186 .retain(|_, partial| current_time_ms - partial.started_at < self.partial_timeout_ms);
187 }
188
189 pub fn pending_count(&self) -> usize {
191 self.partials.len()
192 }
193}
194
195impl Default for ChunkReassembler {
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201pub fn chunk_data(data: &[u8], mtu: usize, message_id: u32) -> Vec<SyncChunk> {
203 let payload_size = mtu.saturating_sub(CHUNK_HEADER_SIZE);
204 if payload_size == 0 {
205 return Vec::new();
206 }
207
208 let total_chunks = data.len().div_ceil(payload_size);
209 let total_chunks = total_chunks.max(1) as u16;
210
211 let mut chunks = Vec::with_capacity(total_chunks as usize);
212
213 for (i, chunk_data) in data.chunks(payload_size).enumerate() {
214 chunks.push(SyncChunk {
215 header: ChunkHeader {
216 message_id,
217 chunk_index: i as u16,
218 total_chunks,
219 },
220 payload: chunk_data.to_vec(),
221 });
222 }
223
224 if chunks.is_empty() {
226 chunks.push(SyncChunk {
227 header: ChunkHeader {
228 message_id,
229 chunk_index: 0,
230 total_chunks: 1,
231 },
232 payload: Vec::new(),
233 });
234 }
235
236 chunks
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
241pub enum SyncState {
242 #[default]
244 Idle,
245 Sending,
247 Receiving,
249 WaitingAck,
251}
252
253#[derive(Debug, Clone)]
255pub struct SyncConfig {
256 pub mtu: usize,
258 pub batch: BatchConfig,
260 pub sync_interval_ms: u64,
262 pub enable_delta: bool,
264 pub max_retries: u8,
266}
267
268impl Default for SyncConfig {
269 fn default() -> Self {
270 Self {
271 mtu: DEFAULT_MTU,
272 batch: BatchConfig::default(),
273 sync_interval_ms: 5000,
274 enable_delta: true,
275 max_retries: 3,
276 }
277 }
278}
279
280impl SyncConfig {
281 pub fn low_power() -> Self {
283 Self {
284 mtu: DEFAULT_MTU,
285 batch: BatchConfig::low_power(),
286 sync_interval_ms: 30_000,
287 enable_delta: true,
288 max_retries: 2,
289 }
290 }
291
292 pub fn responsive() -> Self {
294 Self {
295 mtu: MAX_MTU,
296 batch: BatchConfig::responsive(),
297 sync_interval_ms: 1000,
298 enable_delta: true,
299 max_retries: 3,
300 }
301 }
302}
303
304pub struct GattSyncProtocol {
309 node_id: NodeId,
311
312 config: SyncConfig,
314
315 state: SyncState,
317
318 batch: BatchAccumulator,
320
321 delta: DeltaEncoder,
323
324 vector_clock: VectorClock,
326
327 tx_queue: VecDeque<SyncChunk>,
329
330 rx_reassembler: ChunkReassembler,
332
333 next_message_id: u32,
335
336 current_time_ms: u64,
338
339 last_sync_time_ms: u64,
341}
342
343impl GattSyncProtocol {
344 pub fn new(node_id: NodeId, config: SyncConfig) -> Self {
346 Self {
347 node_id,
348 batch: BatchAccumulator::new(config.batch.clone()),
349 delta: DeltaEncoder::new(node_id),
350 vector_clock: VectorClock::new(),
351 config,
352 state: SyncState::Idle,
353 tx_queue: VecDeque::new(),
354 rx_reassembler: ChunkReassembler::new(),
355 next_message_id: 1,
356 current_time_ms: 0,
357 last_sync_time_ms: 0,
358 }
359 }
360
361 pub fn with_defaults(node_id: NodeId) -> Self {
363 Self::new(node_id, SyncConfig::default())
364 }
365
366 pub fn set_time(&mut self, time_ms: u64) {
368 self.current_time_ms = time_ms;
369 }
370
371 pub fn set_mtu(&mut self, mtu: usize) {
373 self.config.mtu = mtu;
374 }
375
376 pub fn state(&self) -> SyncState {
378 self.state
379 }
380
381 pub fn vector_clock(&self) -> &VectorClock {
383 &self.vector_clock
384 }
385
386 pub fn add_peer(&mut self, peer_id: &NodeId) {
388 self.delta.add_peer(peer_id);
389 }
390
391 pub fn remove_peer(&mut self, peer_id: &NodeId) {
393 self.delta.remove_peer(peer_id);
394 }
395
396 pub fn queue_operation(&mut self, op: CrdtOperation) -> bool {
398 self.vector_clock.increment(&self.node_id);
400
401 self.batch.add(op, self.current_time_ms)
403 }
404
405 pub fn should_sync(&self) -> bool {
407 self.batch.should_flush(self.current_time_ms)
408 }
409
410 pub fn prepare_sync(&mut self, peer_id: &NodeId) -> Vec<SyncChunk> {
414 let batch = match self.batch.flush(self.current_time_ms) {
416 Some(b) => b,
417 None => return Vec::new(),
418 };
419
420 let operations = if self.config.enable_delta {
422 self.delta.filter_for_peer(peer_id, &batch.operations)
423 } else {
424 batch.operations.clone()
425 };
426
427 if operations.is_empty() {
428 return Vec::new();
429 }
430
431 let filtered_batch = OperationBatch {
433 operations: operations.clone(),
434 total_bytes: operations.iter().map(|o| o.size()).sum(),
435 created_at: batch.created_at,
436 };
437
438 let encoded = filtered_batch.encode();
440
441 let msg_id = self.next_message_id;
443 self.next_message_id = self.next_message_id.wrapping_add(1);
444
445 let chunks = chunk_data(&encoded, self.config.mtu, msg_id);
446
447 self.delta.mark_sent(peer_id, &operations);
449 self.delta.record_sent(peer_id, encoded.len());
450
451 self.state = SyncState::Sending;
452 self.last_sync_time_ms = self.current_time_ms;
453
454 chunks
455 }
456
457 pub fn next_tx_chunk(&mut self) -> Option<SyncChunk> {
459 self.tx_queue.pop_front()
460 }
461
462 pub fn queue_chunks(&mut self, chunks: Vec<SyncChunk>) {
464 self.tx_queue.extend(chunks);
465 }
466
467 pub fn has_pending_tx(&self) -> bool {
469 !self.tx_queue.is_empty()
470 }
471
472 pub fn process_received(
476 &mut self,
477 chunk: SyncChunk,
478 peer_id: &NodeId,
479 ) -> Option<Vec<CrdtOperation>> {
480 self.state = SyncState::Receiving;
481
482 let complete = self.rx_reassembler.process(chunk, self.current_time_ms)?;
484
485 let batch = OperationBatch::decode(&complete)?;
487
488 self.delta
490 .record_received(peer_id, complete.len(), self.current_time_ms);
491
492 for op in &batch.operations {
494 let timestamp = match op {
495 CrdtOperation::UpdatePosition { timestamp, .. } => *timestamp,
496 CrdtOperation::UpdateHealth { timestamp, .. } => *timestamp,
497 CrdtOperation::UpdateRegister { timestamp, .. } => *timestamp,
498 CrdtOperation::IncrementCounter { .. } => 0,
499 };
500 if timestamp > 0 {
501 self.vector_clock.update(peer_id, timestamp);
502 }
503 }
504
505 self.state = SyncState::Idle;
506 Some(batch.operations)
507 }
508
509 pub fn ack_send(&mut self) {
511 if self.tx_queue.is_empty() {
512 self.state = SyncState::Idle;
513 }
514 }
515
516 pub fn reset(&mut self) {
518 self.state = SyncState::Idle;
519 self.tx_queue.clear();
520 self.rx_reassembler = ChunkReassembler::new();
521 }
522
523 pub fn reset_peer(&mut self, peer_id: &NodeId) {
525 self.delta.reset_peer(peer_id);
526 }
527
528 pub fn tick(&mut self) {
530 self.rx_reassembler.cleanup(self.current_time_ms);
531 }
532
533 pub fn stats(&self) -> SyncStats {
535 let delta_stats = self.delta.stats();
536 SyncStats {
537 bytes_sent: delta_stats.total_bytes_sent,
538 bytes_received: delta_stats.total_bytes_received,
539 syncs_completed: delta_stats.total_syncs,
540 pending_operations: self.batch.pending_count(),
541 pending_tx_chunks: self.tx_queue.len(),
542 pending_rx_messages: self.rx_reassembler.pending_count(),
543 }
544 }
545}
546
547#[derive(Debug, Clone, Default)]
549pub struct SyncStats {
550 pub bytes_sent: u64,
552 pub bytes_received: u64,
554 pub syncs_completed: u32,
556 pub pending_operations: usize,
558 pub pending_tx_chunks: usize,
560 pub pending_rx_messages: usize,
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use crate::sync::crdt::Position;
568
569 fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
570 CrdtOperation::UpdatePosition {
571 node_id: NodeId::new(node_id),
572 position: Position::new(37.0, -122.0),
573 timestamp,
574 }
575 }
576
577 #[test]
578 fn test_chunk_header_encode_decode() {
579 let header = ChunkHeader {
580 message_id: 0x12345678,
581 chunk_index: 5,
582 total_chunks: 10,
583 };
584
585 let encoded = header.encode();
586 let decoded = ChunkHeader::decode(&encoded).unwrap();
587
588 assert_eq!(decoded.message_id, 0x12345678);
589 assert_eq!(decoded.chunk_index, 5);
590 assert_eq!(decoded.total_chunks, 10);
591 }
592
593 #[test]
594 fn test_chunk_data_single() {
595 let data = vec![1, 2, 3, 4, 5];
596 let chunks = chunk_data(&data, 100, 1);
597
598 assert_eq!(chunks.len(), 1);
599 assert_eq!(chunks[0].header.total_chunks, 1);
600 assert_eq!(chunks[0].payload, data);
601 }
602
603 #[test]
604 fn test_chunk_data_multiple() {
605 let data = vec![0u8; 100];
606 let mtu = 20; let chunks = chunk_data(&data, mtu, 1);
608
609 assert_eq!(chunks.len(), 9);
611 assert_eq!(chunks[0].header.total_chunks, 9);
612
613 for (i, chunk) in chunks.iter().enumerate() {
615 assert_eq!(chunk.header.chunk_index, i as u16);
616 if i < 8 {
617 assert_eq!(chunk.payload.len(), 12);
618 } else {
619 assert_eq!(chunk.payload.len(), 4); }
621 }
622 }
623
624 #[test]
625 fn test_chunk_reassembler_single() {
626 let mut reassembler = ChunkReassembler::new();
627
628 let chunk = SyncChunk {
629 header: ChunkHeader {
630 message_id: 1,
631 chunk_index: 0,
632 total_chunks: 1,
633 },
634 payload: vec![1, 2, 3],
635 };
636
637 let result = reassembler.process(chunk, 0).unwrap();
638 assert_eq!(result, vec![1, 2, 3]);
639 }
640
641 #[test]
642 fn test_chunk_reassembler_multiple() {
643 let mut reassembler = ChunkReassembler::new();
644
645 let chunk2 = SyncChunk {
647 header: ChunkHeader {
648 message_id: 1,
649 chunk_index: 1,
650 total_chunks: 3,
651 },
652 payload: vec![4, 5, 6],
653 };
654
655 let chunk1 = SyncChunk {
656 header: ChunkHeader {
657 message_id: 1,
658 chunk_index: 0,
659 total_chunks: 3,
660 },
661 payload: vec![1, 2, 3],
662 };
663
664 let chunk3 = SyncChunk {
665 header: ChunkHeader {
666 message_id: 1,
667 chunk_index: 2,
668 total_chunks: 3,
669 },
670 payload: vec![7, 8, 9],
671 };
672
673 assert!(reassembler.process(chunk2, 0).is_none());
674 assert!(reassembler.process(chunk1, 0).is_none());
675
676 let result = reassembler.process(chunk3, 0).unwrap();
677 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
678 }
679
680 #[test]
681 fn test_sync_protocol_basic() {
682 let node1 = NodeId::new(1);
683 let node2 = NodeId::new(2);
684
685 let mut proto1 = GattSyncProtocol::with_defaults(node1);
686 proto1.add_peer(&node2);
687 proto1.set_mtu(100);
688
689 proto1.queue_operation(make_position_op(1, 1000));
691 proto1.queue_operation(make_position_op(1, 1001));
692
693 proto1.set_time(10000);
695
696 let chunks = proto1.prepare_sync(&node2);
698 assert!(!chunks.is_empty());
699 }
700
701 #[test]
702 fn test_sync_protocol_round_trip() {
703 let node1 = NodeId::new(1);
704 let node2 = NodeId::new(2);
705
706 let mut proto1 = GattSyncProtocol::with_defaults(node1);
707 let mut proto2 = GattSyncProtocol::with_defaults(node2);
708
709 proto1.add_peer(&node2);
710 proto2.add_peer(&node1);
711
712 proto1.set_mtu(100);
713 proto2.set_mtu(100);
714
715 proto1.queue_operation(make_position_op(1, 1000));
717 proto1.set_time(10000);
718
719 let chunks = proto1.prepare_sync(&node2);
721
722 let mut ops = None;
724 for chunk in chunks {
725 ops = proto2.process_received(chunk, &node1);
726 }
727
728 let ops = ops.unwrap();
730 assert_eq!(ops.len(), 1);
731 }
732
733 #[test]
734 fn test_sync_config_profiles() {
735 let low_power = SyncConfig::low_power();
736 assert_eq!(low_power.sync_interval_ms, 30_000);
737
738 let responsive = SyncConfig::responsive();
739 assert_eq!(responsive.sync_interval_ms, 1000);
740 }
741
742 #[test]
743 fn test_sync_stats() {
744 let proto = GattSyncProtocol::with_defaults(NodeId::new(1));
745 let stats = proto.stats();
746
747 assert_eq!(stats.bytes_sent, 0);
748 assert_eq!(stats.pending_operations, 0);
749 }
750}