hive_btle/sync/
protocol.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! GATT Sync Protocol for HIVE-Lite
17//!
18//! Coordinates batching, delta encoding, and chunked transfer over GATT
19//! characteristics for efficient BLE sync.
20
21#[cfg(not(feature = "std"))]
22use alloc::{collections::BTreeMap, collections::VecDeque, vec::Vec};
23#[cfg(feature = "std")]
24use std::collections::{HashMap, VecDeque};
25
26use super::batch::{BatchAccumulator, BatchConfig, OperationBatch};
27use super::crdt::CrdtOperation;
28use super::delta::{DeltaEncoder, VectorClock};
29use crate::NodeId;
30
31/// Default MTU for BLE
32pub const DEFAULT_MTU: usize = 23;
33
34/// Maximum MTU for BLE 5.0+
35pub const MAX_MTU: usize = 517;
36
37/// Header size for chunks
38pub const CHUNK_HEADER_SIZE: usize = 8;
39
40/// Chunk header for multi-MTU messages
41#[derive(Debug, Clone, Copy)]
42pub struct ChunkHeader {
43    /// Unique message ID
44    pub message_id: u32,
45    /// Index of this chunk (0-based)
46    pub chunk_index: u16,
47    /// Total number of chunks
48    pub total_chunks: u16,
49}
50
51impl ChunkHeader {
52    /// Encode header to bytes
53    pub fn encode(&self) -> [u8; CHUNK_HEADER_SIZE] {
54        let mut buf = [0u8; CHUNK_HEADER_SIZE];
55        buf[0..4].copy_from_slice(&self.message_id.to_le_bytes());
56        buf[4..6].copy_from_slice(&self.chunk_index.to_le_bytes());
57        buf[6..8].copy_from_slice(&self.total_chunks.to_le_bytes());
58        buf
59    }
60
61    /// Decode header from bytes
62    pub fn decode(data: &[u8]) -> Option<Self> {
63        if data.len() < CHUNK_HEADER_SIZE {
64            return None;
65        }
66        Some(Self {
67            message_id: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
68            chunk_index: u16::from_le_bytes([data[4], data[5]]),
69            total_chunks: u16::from_le_bytes([data[6], data[7]]),
70        })
71    }
72}
73
74/// A chunk of data to send
75#[derive(Debug, Clone)]
76pub struct SyncChunk {
77    /// Header
78    pub header: ChunkHeader,
79    /// Payload data
80    pub payload: Vec<u8>,
81}
82
83impl SyncChunk {
84    /// Encode chunk to bytes
85    pub fn encode(&self) -> Vec<u8> {
86        let mut buf = Vec::with_capacity(CHUNK_HEADER_SIZE + self.payload.len());
87        buf.extend_from_slice(&self.header.encode());
88        buf.extend_from_slice(&self.payload);
89        buf
90    }
91
92    /// Decode chunk from bytes
93    pub fn decode(data: &[u8]) -> Option<Self> {
94        let header = ChunkHeader::decode(data)?;
95        let payload = data[CHUNK_HEADER_SIZE..].to_vec();
96        Some(Self { header, payload })
97    }
98
99    /// Get total encoded size
100    pub fn encoded_size(&self) -> usize {
101        CHUNK_HEADER_SIZE + self.payload.len()
102    }
103}
104
105/// Reassembles chunks into complete messages
106#[derive(Debug)]
107pub struct ChunkReassembler {
108    /// Partial messages being assembled
109    #[cfg(feature = "std")]
110    partials: HashMap<u32, PartialMessage>,
111    #[cfg(not(feature = "std"))]
112    partials: BTreeMap<u32, PartialMessage>,
113
114    /// Maximum number of partial messages to track (for future use)
115    #[allow(dead_code)]
116    max_partials: usize,
117
118    /// Timeout for partial messages (ms)
119    partial_timeout_ms: u64,
120}
121
122/// A message being reassembled
123#[derive(Debug, Clone)]
124struct PartialMessage {
125    /// Total expected chunks
126    total_chunks: u16,
127    /// Received chunks (index -> data)
128    #[cfg(feature = "std")]
129    chunks: HashMap<u16, Vec<u8>>,
130    #[cfg(not(feature = "std"))]
131    chunks: BTreeMap<u16, Vec<u8>>,
132    /// Time first chunk was received
133    started_at: u64,
134}
135
136impl ChunkReassembler {
137    /// Create a new reassembler
138    pub fn new() -> Self {
139        Self {
140            #[cfg(feature = "std")]
141            partials: HashMap::new(),
142            #[cfg(not(feature = "std"))]
143            partials: BTreeMap::new(),
144            max_partials: 8,
145            partial_timeout_ms: 30_000,
146        }
147    }
148
149    /// Process a received chunk
150    ///
151    /// Returns the complete message if all chunks received
152    pub fn process(&mut self, chunk: SyncChunk, current_time_ms: u64) -> Option<Vec<u8>> {
153        let msg_id = chunk.header.message_id;
154
155        // Single-chunk message
156        if chunk.header.total_chunks == 1 {
157            return Some(chunk.payload);
158        }
159
160        // Get or create partial
161        let partial = self
162            .partials
163            .entry(msg_id)
164            .or_insert_with(|| PartialMessage {
165                total_chunks: chunk.header.total_chunks,
166                #[cfg(feature = "std")]
167                chunks: HashMap::new(),
168                #[cfg(not(feature = "std"))]
169                chunks: BTreeMap::new(),
170                started_at: current_time_ms,
171            });
172
173        // Insert chunk
174        partial
175            .chunks
176            .insert(chunk.header.chunk_index, chunk.payload);
177
178        // Check if complete
179        if partial.chunks.len() == partial.total_chunks as usize {
180            let partial = self.partials.remove(&msg_id)?;
181
182            // Reassemble in order
183            let mut result = Vec::new();
184            for i in 0..partial.total_chunks {
185                if let Some(data) = partial.chunks.get(&i) {
186                    result.extend_from_slice(data);
187                } else {
188                    // Missing chunk - shouldn't happen
189                    return None;
190                }
191            }
192            return Some(result);
193        }
194
195        None
196    }
197
198    /// Clean up timed-out partial messages
199    pub fn cleanup(&mut self, current_time_ms: u64) {
200        self.partials
201            .retain(|_, partial| current_time_ms - partial.started_at < self.partial_timeout_ms);
202    }
203
204    /// Get number of messages being assembled
205    pub fn pending_count(&self) -> usize {
206        self.partials.len()
207    }
208}
209
210impl Default for ChunkReassembler {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216/// Split data into MTU-sized chunks
217pub fn chunk_data(data: &[u8], mtu: usize, message_id: u32) -> Vec<SyncChunk> {
218    let payload_size = mtu.saturating_sub(CHUNK_HEADER_SIZE);
219    if payload_size == 0 {
220        return Vec::new();
221    }
222
223    let total_chunks = data.len().div_ceil(payload_size);
224    let total_chunks = total_chunks.max(1) as u16;
225
226    let mut chunks = Vec::with_capacity(total_chunks as usize);
227
228    for (i, chunk_data) in data.chunks(payload_size).enumerate() {
229        chunks.push(SyncChunk {
230            header: ChunkHeader {
231                message_id,
232                chunk_index: i as u16,
233                total_chunks,
234            },
235            payload: chunk_data.to_vec(),
236        });
237    }
238
239    // Handle empty data
240    if chunks.is_empty() {
241        chunks.push(SyncChunk {
242            header: ChunkHeader {
243                message_id,
244                chunk_index: 0,
245                total_chunks: 1,
246            },
247            payload: Vec::new(),
248        });
249    }
250
251    chunks
252}
253
254/// State of the sync protocol
255#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
256pub enum SyncState {
257    /// Idle, not syncing
258    #[default]
259    Idle,
260    /// Sending data to peer
261    Sending,
262    /// Receiving data from peer
263    Receiving,
264    /// Waiting for acknowledgment
265    WaitingAck,
266}
267
268/// Configuration for the sync protocol
269#[derive(Debug, Clone)]
270pub struct SyncConfig {
271    /// Negotiated MTU
272    pub mtu: usize,
273    /// Batch configuration
274    pub batch: BatchConfig,
275    /// Sync interval in milliseconds
276    pub sync_interval_ms: u64,
277    /// Enable delta encoding
278    pub enable_delta: bool,
279    /// Maximum retries per chunk
280    pub max_retries: u8,
281}
282
283impl Default for SyncConfig {
284    fn default() -> Self {
285        Self {
286            mtu: DEFAULT_MTU,
287            batch: BatchConfig::default(),
288            sync_interval_ms: 5000,
289            enable_delta: true,
290            max_retries: 3,
291        }
292    }
293}
294
295impl SyncConfig {
296    /// Config for low-power operation
297    pub fn low_power() -> Self {
298        Self {
299            mtu: DEFAULT_MTU,
300            batch: BatchConfig::low_power(),
301            sync_interval_ms: 30_000,
302            enable_delta: true,
303            max_retries: 2,
304        }
305    }
306
307    /// Config for responsive operation
308    pub fn responsive() -> Self {
309        Self {
310            mtu: MAX_MTU,
311            batch: BatchConfig::responsive(),
312            sync_interval_ms: 1000,
313            enable_delta: true,
314            max_retries: 3,
315        }
316    }
317}
318
319/// GATT Sync Protocol coordinator
320///
321/// Ties together batching, delta encoding, and chunked transfer
322/// for efficient CRDT sync over BLE.
323pub struct GattSyncProtocol {
324    /// Our node ID
325    node_id: NodeId,
326
327    /// Configuration
328    config: SyncConfig,
329
330    /// Current state
331    state: SyncState,
332
333    /// Batch accumulator
334    batch: BatchAccumulator,
335
336    /// Delta encoder
337    delta: DeltaEncoder,
338
339    /// Vector clock
340    vector_clock: VectorClock,
341
342    /// Outgoing chunk queue
343    tx_queue: VecDeque<SyncChunk>,
344
345    /// Chunk reassembler for incoming data
346    rx_reassembler: ChunkReassembler,
347
348    /// Next message ID
349    next_message_id: u32,
350
351    /// Current time (set externally)
352    current_time_ms: u64,
353
354    /// Last sync time
355    last_sync_time_ms: u64,
356}
357
358impl GattSyncProtocol {
359    /// Create a new sync protocol instance
360    pub fn new(node_id: NodeId, config: SyncConfig) -> Self {
361        Self {
362            node_id,
363            batch: BatchAccumulator::new(config.batch.clone()),
364            delta: DeltaEncoder::new(node_id),
365            vector_clock: VectorClock::new(),
366            config,
367            state: SyncState::Idle,
368            tx_queue: VecDeque::new(),
369            rx_reassembler: ChunkReassembler::new(),
370            next_message_id: 1,
371            current_time_ms: 0,
372            last_sync_time_ms: 0,
373        }
374    }
375
376    /// Create with default config
377    pub fn with_defaults(node_id: NodeId) -> Self {
378        Self::new(node_id, SyncConfig::default())
379    }
380
381    /// Set the current time
382    pub fn set_time(&mut self, time_ms: u64) {
383        self.current_time_ms = time_ms;
384    }
385
386    /// Set the MTU (after negotiation)
387    pub fn set_mtu(&mut self, mtu: usize) {
388        self.config.mtu = mtu;
389    }
390
391    /// Get current state
392    pub fn state(&self) -> SyncState {
393        self.state
394    }
395
396    /// Get the vector clock
397    pub fn vector_clock(&self) -> &VectorClock {
398        &self.vector_clock
399    }
400
401    /// Add a peer for sync
402    pub fn add_peer(&mut self, peer_id: &NodeId) {
403        self.delta.add_peer(peer_id);
404    }
405
406    /// Remove a peer
407    pub fn remove_peer(&mut self, peer_id: &NodeId) {
408        self.delta.remove_peer(peer_id);
409    }
410
411    /// Queue a CRDT operation for sync
412    pub fn queue_operation(&mut self, op: CrdtOperation) -> bool {
413        // Update vector clock
414        self.vector_clock.increment(&self.node_id);
415
416        // Add to batch
417        self.batch.add(op, self.current_time_ms)
418    }
419
420    /// Check if we should sync now
421    pub fn should_sync(&self) -> bool {
422        self.batch.should_flush(self.current_time_ms)
423    }
424
425    /// Prepare a sync to a peer
426    ///
427    /// Returns the chunks to send
428    pub fn prepare_sync(&mut self, peer_id: &NodeId) -> Vec<SyncChunk> {
429        // Flush the batch
430        let batch = match self.batch.flush(self.current_time_ms) {
431            Some(b) => b,
432            None => return Vec::new(),
433        };
434
435        // Filter with delta encoding
436        let operations = if self.config.enable_delta {
437            self.delta.filter_for_peer(peer_id, &batch.operations)
438        } else {
439            batch.operations.clone()
440        };
441
442        if operations.is_empty() {
443            return Vec::new();
444        }
445
446        // Create a batch with filtered operations
447        let filtered_batch = OperationBatch {
448            operations: operations.clone(),
449            total_bytes: operations.iter().map(|o| o.size()).sum(),
450            created_at: batch.created_at,
451        };
452
453        // Encode the batch
454        let encoded = filtered_batch.encode();
455
456        // Chunk it
457        let msg_id = self.next_message_id;
458        self.next_message_id = self.next_message_id.wrapping_add(1);
459
460        let chunks = chunk_data(&encoded, self.config.mtu, msg_id);
461
462        // Mark as sent (after we have the filtered list)
463        self.delta.mark_sent(peer_id, &operations);
464        self.delta.record_sent(peer_id, encoded.len());
465
466        self.state = SyncState::Sending;
467        self.last_sync_time_ms = self.current_time_ms;
468
469        chunks
470    }
471
472    /// Get next chunk to send (if any)
473    pub fn next_tx_chunk(&mut self) -> Option<SyncChunk> {
474        self.tx_queue.pop_front()
475    }
476
477    /// Queue chunks for sending
478    pub fn queue_chunks(&mut self, chunks: Vec<SyncChunk>) {
479        self.tx_queue.extend(chunks);
480    }
481
482    /// Check if there are chunks to send
483    pub fn has_pending_tx(&self) -> bool {
484        !self.tx_queue.is_empty()
485    }
486
487    /// Process a received chunk
488    ///
489    /// Returns decoded operations if message is complete
490    pub fn process_received(
491        &mut self,
492        chunk: SyncChunk,
493        peer_id: &NodeId,
494    ) -> Option<Vec<CrdtOperation>> {
495        self.state = SyncState::Receiving;
496
497        // Reassemble
498        let complete = self.rx_reassembler.process(chunk, self.current_time_ms)?;
499
500        // Decode batch
501        let batch = OperationBatch::decode(&complete)?;
502
503        // Record stats
504        self.delta
505            .record_received(peer_id, complete.len(), self.current_time_ms);
506
507        // Update vector clock with received operations
508        for op in &batch.operations {
509            let timestamp = match op {
510                CrdtOperation::UpdatePosition { timestamp, .. } => *timestamp,
511                CrdtOperation::UpdateHealth { timestamp, .. } => *timestamp,
512                CrdtOperation::UpdateRegister { timestamp, .. } => *timestamp,
513                CrdtOperation::IncrementCounter { .. } => 0,
514            };
515            if timestamp > 0 {
516                self.vector_clock.update(peer_id, timestamp);
517            }
518        }
519
520        self.state = SyncState::Idle;
521        Some(batch.operations)
522    }
523
524    /// Acknowledge completion of send
525    pub fn ack_send(&mut self) {
526        if self.tx_queue.is_empty() {
527            self.state = SyncState::Idle;
528        }
529    }
530
531    /// Reset protocol state (e.g., on reconnection)
532    pub fn reset(&mut self) {
533        self.state = SyncState::Idle;
534        self.tx_queue.clear();
535        self.rx_reassembler = ChunkReassembler::new();
536    }
537
538    /// Reset sync state for a specific peer
539    pub fn reset_peer(&mut self, peer_id: &NodeId) {
540        self.delta.reset_peer(peer_id);
541    }
542
543    /// Run periodic maintenance
544    pub fn tick(&mut self) {
545        self.rx_reassembler.cleanup(self.current_time_ms);
546    }
547
548    /// Get sync statistics
549    pub fn stats(&self) -> SyncStats {
550        let delta_stats = self.delta.stats();
551        SyncStats {
552            bytes_sent: delta_stats.total_bytes_sent,
553            bytes_received: delta_stats.total_bytes_received,
554            syncs_completed: delta_stats.total_syncs,
555            pending_operations: self.batch.pending_count(),
556            pending_tx_chunks: self.tx_queue.len(),
557            pending_rx_messages: self.rx_reassembler.pending_count(),
558        }
559    }
560}
561
562/// Sync statistics
563#[derive(Debug, Clone, Default)]
564pub struct SyncStats {
565    /// Total bytes sent
566    pub bytes_sent: u64,
567    /// Total bytes received
568    pub bytes_received: u64,
569    /// Number of completed syncs
570    pub syncs_completed: u32,
571    /// Pending operations in batch
572    pub pending_operations: usize,
573    /// Pending TX chunks
574    pub pending_tx_chunks: usize,
575    /// Pending RX messages being reassembled
576    pub pending_rx_messages: usize,
577}
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582    use crate::sync::crdt::Position;
583
584    fn make_position_op(node_id: u32, timestamp: u64) -> CrdtOperation {
585        CrdtOperation::UpdatePosition {
586            node_id: NodeId::new(node_id),
587            position: Position::new(37.0, -122.0),
588            timestamp,
589        }
590    }
591
592    #[test]
593    fn test_chunk_header_encode_decode() {
594        let header = ChunkHeader {
595            message_id: 0x12345678,
596            chunk_index: 5,
597            total_chunks: 10,
598        };
599
600        let encoded = header.encode();
601        let decoded = ChunkHeader::decode(&encoded).unwrap();
602
603        assert_eq!(decoded.message_id, 0x12345678);
604        assert_eq!(decoded.chunk_index, 5);
605        assert_eq!(decoded.total_chunks, 10);
606    }
607
608    #[test]
609    fn test_chunk_data_single() {
610        let data = vec![1, 2, 3, 4, 5];
611        let chunks = chunk_data(&data, 100, 1);
612
613        assert_eq!(chunks.len(), 1);
614        assert_eq!(chunks[0].header.total_chunks, 1);
615        assert_eq!(chunks[0].payload, data);
616    }
617
618    #[test]
619    fn test_chunk_data_multiple() {
620        let data = vec![0u8; 100];
621        let mtu = 20; // 8 header + 12 payload
622        let chunks = chunk_data(&data, mtu, 1);
623
624        // 100 bytes / 12 per chunk = 9 chunks
625        assert_eq!(chunks.len(), 9);
626        assert_eq!(chunks[0].header.total_chunks, 9);
627
628        // Verify payload sizes
629        for (i, chunk) in chunks.iter().enumerate() {
630            assert_eq!(chunk.header.chunk_index, i as u16);
631            if i < 8 {
632                assert_eq!(chunk.payload.len(), 12);
633            } else {
634                assert_eq!(chunk.payload.len(), 4); // Last chunk
635            }
636        }
637    }
638
639    #[test]
640    fn test_chunk_reassembler_single() {
641        let mut reassembler = ChunkReassembler::new();
642
643        let chunk = SyncChunk {
644            header: ChunkHeader {
645                message_id: 1,
646                chunk_index: 0,
647                total_chunks: 1,
648            },
649            payload: vec![1, 2, 3],
650        };
651
652        let result = reassembler.process(chunk, 0).unwrap();
653        assert_eq!(result, vec![1, 2, 3]);
654    }
655
656    #[test]
657    fn test_chunk_reassembler_multiple() {
658        let mut reassembler = ChunkReassembler::new();
659
660        // Send chunks out of order
661        let chunk2 = SyncChunk {
662            header: ChunkHeader {
663                message_id: 1,
664                chunk_index: 1,
665                total_chunks: 3,
666            },
667            payload: vec![4, 5, 6],
668        };
669
670        let chunk1 = SyncChunk {
671            header: ChunkHeader {
672                message_id: 1,
673                chunk_index: 0,
674                total_chunks: 3,
675            },
676            payload: vec![1, 2, 3],
677        };
678
679        let chunk3 = SyncChunk {
680            header: ChunkHeader {
681                message_id: 1,
682                chunk_index: 2,
683                total_chunks: 3,
684            },
685            payload: vec![7, 8, 9],
686        };
687
688        assert!(reassembler.process(chunk2, 0).is_none());
689        assert!(reassembler.process(chunk1, 0).is_none());
690
691        let result = reassembler.process(chunk3, 0).unwrap();
692        assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
693    }
694
695    #[test]
696    fn test_sync_protocol_basic() {
697        let node1 = NodeId::new(1);
698        let node2 = NodeId::new(2);
699
700        let mut proto1 = GattSyncProtocol::with_defaults(node1);
701        proto1.add_peer(&node2);
702        proto1.set_mtu(100);
703
704        // Queue some operations
705        proto1.queue_operation(make_position_op(1, 1000));
706        proto1.queue_operation(make_position_op(1, 1001));
707
708        // Force batch to be ready
709        proto1.set_time(10000);
710
711        // Prepare sync
712        let chunks = proto1.prepare_sync(&node2);
713        assert!(!chunks.is_empty());
714    }
715
716    #[test]
717    fn test_sync_protocol_round_trip() {
718        let node1 = NodeId::new(1);
719        let node2 = NodeId::new(2);
720
721        let mut proto1 = GattSyncProtocol::with_defaults(node1);
722        let mut proto2 = GattSyncProtocol::with_defaults(node2);
723
724        proto1.add_peer(&node2);
725        proto2.add_peer(&node1);
726
727        proto1.set_mtu(100);
728        proto2.set_mtu(100);
729
730        // Node 1 queues operation
731        proto1.queue_operation(make_position_op(1, 1000));
732        proto1.set_time(10000);
733
734        // Node 1 prepares sync
735        let chunks = proto1.prepare_sync(&node2);
736
737        // Node 2 receives chunks
738        let mut ops = None;
739        for chunk in chunks {
740            ops = proto2.process_received(chunk, &node1);
741        }
742
743        // Verify operation received
744        let ops = ops.unwrap();
745        assert_eq!(ops.len(), 1);
746    }
747
748    #[test]
749    fn test_sync_config_profiles() {
750        let low_power = SyncConfig::low_power();
751        assert_eq!(low_power.sync_interval_ms, 30_000);
752
753        let responsive = SyncConfig::responsive();
754        assert_eq!(responsive.sync_interval_ms, 1000);
755    }
756
757    #[test]
758    fn test_sync_stats() {
759        let proto = GattSyncProtocol::with_defaults(NodeId::new(1));
760        let stats = proto.stats();
761
762        assert_eq!(stats.bytes_sent, 0);
763        assert_eq!(stats.pending_operations, 0);
764    }
765}