hive_btle/sync/
protocol.rs

1//! GATT Sync Protocol for HIVE-Lite
2//!
3//! Coordinates batching, delta encoding, and chunked transfer over GATT
4//! characteristics for efficient BLE sync.
5
6#[cfg(not(feature = "std"))]
7use alloc::{collections::BTreeMap, collections::VecDeque, vec::Vec};
8#[cfg(feature = "std")]
9use std::collections::{HashMap, VecDeque};
10
11use super::batch::{BatchAccumulator, BatchConfig, OperationBatch};
12use super::crdt::CrdtOperation;
13use super::delta::{DeltaEncoder, VectorClock};
14use crate::NodeId;
15
16/// Default MTU for BLE
17pub const DEFAULT_MTU: usize = 23;
18
19/// Maximum MTU for BLE 5.0+
20pub const MAX_MTU: usize = 517;
21
22/// Header size for chunks
23pub const CHUNK_HEADER_SIZE: usize = 8;
24
25/// Chunk header for multi-MTU messages
26#[derive(Debug, Clone, Copy)]
27pub struct ChunkHeader {
28    /// Unique message ID
29    pub message_id: u32,
30    /// Index of this chunk (0-based)
31    pub chunk_index: u16,
32    /// Total number of chunks
33    pub total_chunks: u16,
34}
35
36impl ChunkHeader {
37    /// Encode header to bytes
38    pub fn encode(&self) -> [u8; CHUNK_HEADER_SIZE] {
39        let mut buf = [0u8; CHUNK_HEADER_SIZE];
40        buf[0..4].copy_from_slice(&self.message_id.to_le_bytes());
41        buf[4..6].copy_from_slice(&self.chunk_index.to_le_bytes());
42        buf[6..8].copy_from_slice(&self.total_chunks.to_le_bytes());
43        buf
44    }
45
46    /// Decode header from bytes
47    pub fn decode(data: &[u8]) -> Option<Self> {
48        if data.len() < CHUNK_HEADER_SIZE {
49            return None;
50        }
51        Some(Self {
52            message_id: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
53            chunk_index: u16::from_le_bytes([data[4], data[5]]),
54            total_chunks: u16::from_le_bytes([data[6], data[7]]),
55        })
56    }
57}
58
59/// A chunk of data to send
60#[derive(Debug, Clone)]
61pub struct SyncChunk {
62    /// Header
63    pub header: ChunkHeader,
64    /// Payload data
65    pub payload: Vec<u8>,
66}
67
68impl SyncChunk {
69    /// Encode chunk to bytes
70    pub fn encode(&self) -> Vec<u8> {
71        let mut buf = Vec::with_capacity(CHUNK_HEADER_SIZE + self.payload.len());
72        buf.extend_from_slice(&self.header.encode());
73        buf.extend_from_slice(&self.payload);
74        buf
75    }
76
77    /// Decode chunk from bytes
78    pub fn decode(data: &[u8]) -> Option<Self> {
79        let header = ChunkHeader::decode(data)?;
80        let payload = data[CHUNK_HEADER_SIZE..].to_vec();
81        Some(Self { header, payload })
82    }
83
84    /// Get total encoded size
85    pub fn encoded_size(&self) -> usize {
86        CHUNK_HEADER_SIZE + self.payload.len()
87    }
88}
89
90/// Reassembles chunks into complete messages
91#[derive(Debug)]
92pub struct ChunkReassembler {
93    /// Partial messages being assembled
94    #[cfg(feature = "std")]
95    partials: HashMap<u32, PartialMessage>,
96    #[cfg(not(feature = "std"))]
97    partials: BTreeMap<u32, PartialMessage>,
98
99    /// Maximum number of partial messages to track (for future use)
100    #[allow(dead_code)]
101    max_partials: usize,
102
103    /// Timeout for partial messages (ms)
104    partial_timeout_ms: u64,
105}
106
107/// A message being reassembled
108#[derive(Debug, Clone)]
109struct PartialMessage {
110    /// Total expected chunks
111    total_chunks: u16,
112    /// Received chunks (index -> data)
113    #[cfg(feature = "std")]
114    chunks: HashMap<u16, Vec<u8>>,
115    #[cfg(not(feature = "std"))]
116    chunks: BTreeMap<u16, Vec<u8>>,
117    /// Time first chunk was received
118    started_at: u64,
119}
120
121impl ChunkReassembler {
122    /// Create a new reassembler
123    pub fn new() -> Self {
124        Self {
125            #[cfg(feature = "std")]
126            partials: HashMap::new(),
127            #[cfg(not(feature = "std"))]
128            partials: BTreeMap::new(),
129            max_partials: 8,
130            partial_timeout_ms: 30_000,
131        }
132    }
133
134    /// Process a received chunk
135    ///
136    /// Returns the complete message if all chunks received
137    pub fn process(&mut self, chunk: SyncChunk, current_time_ms: u64) -> Option<Vec<u8>> {
138        let msg_id = chunk.header.message_id;
139
140        // Single-chunk message
141        if chunk.header.total_chunks == 1 {
142            return Some(chunk.payload);
143        }
144
145        // Get or create partial
146        let partial = self
147            .partials
148            .entry(msg_id)
149            .or_insert_with(|| PartialMessage {
150                total_chunks: chunk.header.total_chunks,
151                #[cfg(feature = "std")]
152                chunks: HashMap::new(),
153                #[cfg(not(feature = "std"))]
154                chunks: BTreeMap::new(),
155                started_at: current_time_ms,
156            });
157
158        // Insert chunk
159        partial
160            .chunks
161            .insert(chunk.header.chunk_index, chunk.payload);
162
163        // Check if complete
164        if partial.chunks.len() == partial.total_chunks as usize {
165            let partial = self.partials.remove(&msg_id)?;
166
167            // Reassemble in order
168            let mut result = Vec::new();
169            for i in 0..partial.total_chunks {
170                if let Some(data) = partial.chunks.get(&i) {
171                    result.extend_from_slice(data);
172                } else {
173                    // Missing chunk - shouldn't happen
174                    return None;
175                }
176            }
177            return Some(result);
178        }
179
180        None
181    }
182
183    /// Clean up timed-out partial messages
184    pub fn cleanup(&mut self, current_time_ms: u64) {
185        self.partials
186            .retain(|_, partial| current_time_ms - partial.started_at < self.partial_timeout_ms);
187    }
188
189    /// Get number of messages being assembled
190    pub fn pending_count(&self) -> usize {
191        self.partials.len()
192    }
193}
194
195impl Default for ChunkReassembler {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201/// Split data into MTU-sized chunks
202pub fn chunk_data(data: &[u8], mtu: usize, message_id: u32) -> Vec<SyncChunk> {
203    let payload_size = mtu.saturating_sub(CHUNK_HEADER_SIZE);
204    if payload_size == 0 {
205        return Vec::new();
206    }
207
208    let total_chunks = data.len().div_ceil(payload_size);
209    let total_chunks = total_chunks.max(1) as u16;
210
211    let mut chunks = Vec::with_capacity(total_chunks as usize);
212
213    for (i, chunk_data) in data.chunks(payload_size).enumerate() {
214        chunks.push(SyncChunk {
215            header: ChunkHeader {
216                message_id,
217                chunk_index: i as u16,
218                total_chunks,
219            },
220            payload: chunk_data.to_vec(),
221        });
222    }
223
224    // Handle empty data
225    if chunks.is_empty() {
226        chunks.push(SyncChunk {
227            header: ChunkHeader {
228                message_id,
229                chunk_index: 0,
230                total_chunks: 1,
231            },
232            payload: Vec::new(),
233        });
234    }
235
236    chunks
237}
238
239/// State of the sync protocol
240#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
241pub enum SyncState {
242    /// Idle, not syncing
243    #[default]
244    Idle,
245    /// Sending data to peer
246    Sending,
247    /// Receiving data from peer
248    Receiving,
249    /// Waiting for acknowledgment
250    WaitingAck,
251}
252
253/// Configuration for the sync protocol
254#[derive(Debug, Clone)]
255pub struct SyncConfig {
256    /// Negotiated MTU
257    pub mtu: usize,
258    /// Batch configuration
259    pub batch: BatchConfig,
260    /// Sync interval in milliseconds
261    pub sync_interval_ms: u64,
262    /// Enable delta encoding
263    pub enable_delta: bool,
264    /// Maximum retries per chunk
265    pub max_retries: u8,
266}
267
268impl Default for SyncConfig {
269    fn default() -> Self {
270        Self {
271            mtu: DEFAULT_MTU,
272            batch: BatchConfig::default(),
273            sync_interval_ms: 5000,
274            enable_delta: true,
275            max_retries: 3,
276        }
277    }
278}
279
280impl SyncConfig {
281    /// Config for low-power operation
282    pub fn low_power() -> Self {
283        Self {
284            mtu: DEFAULT_MTU,
285            batch: BatchConfig::low_power(),
286            sync_interval_ms: 30_000,
287            enable_delta: true,
288            max_retries: 2,
289        }
290    }
291
292    /// Config for responsive operation
293    pub fn responsive() -> Self {
294        Self {
295            mtu: MAX_MTU,
296            batch: BatchConfig::responsive(),
297            sync_interval_ms: 1000,
298            enable_delta: true,
299            max_retries: 3,
300        }
301    }
302}
303
304/// GATT Sync Protocol coordinator
305///
306/// Ties together batching, delta encoding, and chunked transfer
307/// for efficient CRDT sync over BLE.
308pub struct GattSyncProtocol {
309    /// Our node ID
310    node_id: NodeId,
311
312    /// Configuration
313    config: SyncConfig,
314
315    /// Current state
316    state: SyncState,
317
318    /// Batch accumulator
319    batch: BatchAccumulator,
320
321    /// Delta encoder
322    delta: DeltaEncoder,
323
324    /// Vector clock
325    vector_clock: VectorClock,
326
327    /// Outgoing chunk queue
328    tx_queue: VecDeque<SyncChunk>,
329
330    /// Chunk reassembler for incoming data
331    rx_reassembler: ChunkReassembler,
332
333    /// Next message ID
334    next_message_id: u32,
335
336    /// Current time (set externally)
337    current_time_ms: u64,
338
339    /// Last sync time
340    last_sync_time_ms: u64,
341}
342
343impl GattSyncProtocol {
344    /// Create a new sync protocol instance
345    pub fn new(node_id: NodeId, config: SyncConfig) -> Self {
346        Self {
347            node_id,
348            batch: BatchAccumulator::new(config.batch.clone()),
349            delta: DeltaEncoder::new(node_id),
350            vector_clock: VectorClock::new(),
351            config,
352            state: SyncState::Idle,
353            tx_queue: VecDeque::new(),
354            rx_reassembler: ChunkReassembler::new(),
355            next_message_id: 1,
356            current_time_ms: 0,
357            last_sync_time_ms: 0,
358        }
359    }
360
361    /// Create with default config
362    pub fn with_defaults(node_id: NodeId) -> Self {
363        Self::new(node_id, SyncConfig::default())
364    }
365
366    /// Set the current time
367    pub fn set_time(&mut self, time_ms: u64) {
368        self.current_time_ms = time_ms;
369    }
370
371    /// Set the MTU (after negotiation)
372    pub fn set_mtu(&mut self, mtu: usize) {
373        self.config.mtu = mtu;
374    }
375
376    /// Get current state
377    pub fn state(&self) -> SyncState {
378        self.state
379    }
380
381    /// Get the vector clock
382    pub fn vector_clock(&self) -> &VectorClock {
383        &self.vector_clock
384    }
385
386    /// Add a peer for sync
387    pub fn add_peer(&mut self, peer_id: &NodeId) {
388        self.delta.add_peer(peer_id);
389    }
390
391    /// Remove a peer
392    pub fn remove_peer(&mut self, peer_id: &NodeId) {
393        self.delta.remove_peer(peer_id);
394    }
395
396    /// Queue a CRDT operation for sync
397    pub fn queue_operation(&mut self, op: CrdtOperation) -> bool {
398        // Update vector clock
399        self.vector_clock.increment(&self.node_id);
400
401        // Add to batch
402        self.batch.add(op, self.current_time_ms)
403    }
404
405    /// Check if we should sync now
406    pub fn should_sync(&self) -> bool {
407        self.batch.should_flush(self.current_time_ms)
408    }
409
410    /// Prepare a sync to a peer
411    ///
412    /// Returns the chunks to send
413    pub fn prepare_sync(&mut self, peer_id: &NodeId) -> Vec<SyncChunk> {
414        // Flush the batch
415        let batch = match self.batch.flush(self.current_time_ms) {
416            Some(b) => b,
417            None => return Vec::new(),
418        };
419
420        // Filter with delta encoding
421        let operations = if self.config.enable_delta {
422            self.delta.filter_for_peer(peer_id, &batch.operations)
423        } else {
424            batch.operations.clone()
425        };
426
427        if operations.is_empty() {
428            return Vec::new();
429        }
430
431        // Create a batch with filtered operations
432        let filtered_batch = OperationBatch {
433            operations: operations.clone(),
434            total_bytes: operations.iter().map(|o| o.size()).sum(),
435            created_at: batch.created_at,
436        };
437
438        // Encode the batch
439        let encoded = filtered_batch.encode();
440
441        // Chunk it
442        let msg_id = self.next_message_id;
443        self.next_message_id = self.next_message_id.wrapping_add(1);
444
445        let chunks = chunk_data(&encoded, self.config.mtu, msg_id);
446
447        // Mark as sent (after we have the filtered list)
448        self.delta.mark_sent(peer_id, &operations);
449        self.delta.record_sent(peer_id, encoded.len());
450
451        self.state = SyncState::Sending;
452        self.last_sync_time_ms = self.current_time_ms;
453
454        chunks
455    }
456
457    /// Get next chunk to send (if any)
458    pub fn next_tx_chunk(&mut self) -> Option<SyncChunk> {
459        self.tx_queue.pop_front()
460    }
461
462    /// Queue chunks for sending
463    pub fn queue_chunks(&mut self, chunks: Vec<SyncChunk>) {
464        self.tx_queue.extend(chunks);
465    }
466
467    /// Check if there are chunks to send
468    pub fn has_pending_tx(&self) -> bool {
469        !self.tx_queue.is_empty()
470    }
471
472    /// Process a received chunk
473    ///
474    /// Returns decoded operations if message is complete
475    pub fn process_received(
476        &mut self,
477        chunk: SyncChunk,
478        peer_id: &NodeId,
479    ) -> Option<Vec<CrdtOperation>> {
480        self.state = SyncState::Receiving;
481
482        // Reassemble
483        let complete = self.rx_reassembler.process(chunk, self.current_time_ms)?;
484
485        // Decode batch
486        let batch = OperationBatch::decode(&complete)?;
487
488        // Record stats
489        self.delta
490            .record_received(peer_id, complete.len(), self.current_time_ms);
491
492        // Update vector clock with received operations
493        for op in &batch.operations {
494            let timestamp = match op {
495                CrdtOperation::UpdatePosition { timestamp, .. } => *timestamp,
496                CrdtOperation::UpdateHealth { timestamp, .. } => *timestamp,
497                CrdtOperation::UpdateRegister { timestamp, .. } => *timestamp,
498                CrdtOperation::IncrementCounter { .. } => 0,
499            };
500            if timestamp > 0 {
501                self.vector_clock.update(peer_id, timestamp);
502            }
503        }
504
505        self.state = SyncState::Idle;
506        Some(batch.operations)
507    }
508
509    /// Acknowledge completion of send
510    pub fn ack_send(&mut self) {
511        if self.tx_queue.is_empty() {
512            self.state = SyncState::Idle;
513        }
514    }
515
516    /// Reset protocol state (e.g., on reconnection)
517    pub fn reset(&mut self) {
518        self.state = SyncState::Idle;
519        self.tx_queue.clear();
520        self.rx_reassembler = ChunkReassembler::new();
521    }
522
523    /// Reset sync state for a specific peer
524    pub fn reset_peer(&mut self, peer_id: &NodeId) {
525        self.delta.reset_peer(peer_id);
526    }
527
528    /// Run periodic maintenance
529    pub fn tick(&mut self) {
530        self.rx_reassembler.cleanup(self.current_time_ms);
531    }
532
533    /// Get sync statistics
534    pub fn stats(&self) -> SyncStats {
535        let delta_stats = self.delta.stats();
536        SyncStats {
537            bytes_sent: delta_stats.total_bytes_sent,
538            bytes_received: delta_stats.total_bytes_received,
539            syncs_completed: delta_stats.total_syncs,
540            pending_operations: self.batch.pending_count(),
541            pending_tx_chunks: self.tx_queue.len(),
542            pending_rx_messages: self.rx_reassembler.pending_count(),
543        }
544    }
545}
546
547/// Sync statistics
548#[derive(Debug, Clone, Default)]
549pub struct SyncStats {
550    /// Total bytes sent
551    pub bytes_sent: u64,
552    /// Total bytes received
553    pub bytes_received: u64,
554    /// Number of completed syncs
555    pub syncs_completed: u32,
556    /// Pending operations in batch
557    pub pending_operations: usize,
558    /// Pending TX chunks
559    pub pending_tx_chunks: usize,
560    /// Pending RX messages being reassembled
561    pub pending_rx_messages: usize,
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use crate::sync::crdt::Position;
568
569    fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
570        CrdtOperation::UpdatePosition {
571            node_id: NodeId::new(node_id),
572            position: Position::new(37.0, -122.0),
573            timestamp,
574        }
575    }
576
577    #[test]
578    fn test_chunk_header_encode_decode() {
579        let header = ChunkHeader {
580            message_id: 0x12345678,
581            chunk_index: 5,
582            total_chunks: 10,
583        };
584
585        let encoded = header.encode();
586        let decoded = ChunkHeader::decode(&encoded).unwrap();
587
588        assert_eq!(decoded.message_id, 0x12345678);
589        assert_eq!(decoded.chunk_index, 5);
590        assert_eq!(decoded.total_chunks, 10);
591    }
592
593    #[test]
594    fn test_chunk_data_single() {
595        let data = vec![1, 2, 3, 4, 5];
596        let chunks = chunk_data(&data, 100, 1);
597
598        assert_eq!(chunks.len(), 1);
599        assert_eq!(chunks[0].header.total_chunks, 1);
600        assert_eq!(chunks[0].payload, data);
601    }
602
603    #[test]
604    fn test_chunk_data_multiple() {
605        let data = vec![0u8; 100];
606        let mtu = 20; // 8 header + 12 payload
607        let chunks = chunk_data(&data, mtu, 1);
608
609        // 100 bytes / 12 per chunk = 9 chunks
610        assert_eq!(chunks.len(), 9);
611        assert_eq!(chunks[0].header.total_chunks, 9);
612
613        // Verify payload sizes
614        for (i, chunk) in chunks.iter().enumerate() {
615            assert_eq!(chunk.header.chunk_index, i as u16);
616            if i < 8 {
617                assert_eq!(chunk.payload.len(), 12);
618            } else {
619                assert_eq!(chunk.payload.len(), 4); // Last chunk
620            }
621        }
622    }
623
624    #[test]
625    fn test_chunk_reassembler_single() {
626        let mut reassembler = ChunkReassembler::new();
627
628        let chunk = SyncChunk {
629            header: ChunkHeader {
630                message_id: 1,
631                chunk_index: 0,
632                total_chunks: 1,
633            },
634            payload: vec![1, 2, 3],
635        };
636
637        let result = reassembler.process(chunk, 0).unwrap();
638        assert_eq!(result, vec![1, 2, 3]);
639    }
640
641    #[test]
642    fn test_chunk_reassembler_multiple() {
643        let mut reassembler = ChunkReassembler::new();
644
645        // Send chunks out of order
646        let chunk2 = SyncChunk {
647            header: ChunkHeader {
648                message_id: 1,
649                chunk_index: 1,
650                total_chunks: 3,
651            },
652            payload: vec![4, 5, 6],
653        };
654
655        let chunk1 = SyncChunk {
656            header: ChunkHeader {
657                message_id: 1,
658                chunk_index: 0,
659                total_chunks: 3,
660            },
661            payload: vec![1, 2, 3],
662        };
663
664        let chunk3 = SyncChunk {
665            header: ChunkHeader {
666                message_id: 1,
667                chunk_index: 2,
668                total_chunks: 3,
669            },
670            payload: vec![7, 8, 9],
671        };
672
673        assert!(reassembler.process(chunk2, 0).is_none());
674        assert!(reassembler.process(chunk1, 0).is_none());
675
676        let result = reassembler.process(chunk3, 0).unwrap();
677        assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
678    }
679
680    #[test]
681    fn test_sync_protocol_basic() {
682        let node1 = NodeId::new(1);
683        let node2 = NodeId::new(2);
684
685        let mut proto1 = GattSyncProtocol::with_defaults(node1);
686        proto1.add_peer(&node2);
687        proto1.set_mtu(100);
688
689        // Queue some operations
690        proto1.queue_operation(make_position_op(1, 1000));
691        proto1.queue_operation(make_position_op(1, 1001));
692
693        // Force batch to be ready
694        proto1.set_time(10000);
695
696        // Prepare sync
697        let chunks = proto1.prepare_sync(&node2);
698        assert!(!chunks.is_empty());
699    }
700
701    #[test]
702    fn test_sync_protocol_round_trip() {
703        let node1 = NodeId::new(1);
704        let node2 = NodeId::new(2);
705
706        let mut proto1 = GattSyncProtocol::with_defaults(node1);
707        let mut proto2 = GattSyncProtocol::with_defaults(node2);
708
709        proto1.add_peer(&node2);
710        proto2.add_peer(&node1);
711
712        proto1.set_mtu(100);
713        proto2.set_mtu(100);
714
715        // Node 1 queues operation
716        proto1.queue_operation(make_position_op(1, 1000));
717        proto1.set_time(10000);
718
719        // Node 1 prepares sync
720        let chunks = proto1.prepare_sync(&node2);
721
722        // Node 2 receives chunks
723        let mut ops = None;
724        for chunk in chunks {
725            ops = proto2.process_received(chunk, &node1);
726        }
727
728        // Verify operation received
729        let ops = ops.unwrap();
730        assert_eq!(ops.len(), 1);
731    }
732
733    #[test]
734    fn test_sync_config_profiles() {
735        let low_power = SyncConfig::low_power();
736        assert_eq!(low_power.sync_interval_ms, 30_000);
737
738        let responsive = SyncConfig::responsive();
739        assert_eq!(responsive.sync_interval_ms, 1000);
740    }
741
742    #[test]
743    fn test_sync_stats() {
744        let proto = GattSyncProtocol::with_defaults(NodeId::new(1));
745        let stats = proto.stats();
746
747        assert_eq!(stats.bytes_sent, 0);
748        assert_eq!(stats.pending_operations, 0);
749    }
750}