Skip to main content

nodedb_types/sync/
wire.rs

1//! Sync wire protocol: frame format and message types.
2//!
3//! Frame format: `[msg_type: 1B][length: 4B LE][rkyv/msgpack body]`
4//!
5//! Message types:
6//! - `0x01` Handshake (client → server)
7//! - `0x02` HandshakeAck (server → client)
8//! - `0x10` DeltaPush (client → server)
9//! - `0x11` DeltaAck (server → client)
10//! - `0x12` DeltaReject (server → client)
11//! - `0x20` ShapeSubscribe (client → server)
12//! - `0x21` ShapeSnapshot (server → client)
13//! - `0x22` ShapeDelta (server → client)
14//! - `0x23` ShapeUnsubscribe (client → server)
15//! - `0x30` VectorClockSync (bidirectional)
16//! - `0xFF` Ping/Pong (bidirectional)
17
18use std::collections::HashMap;
19
20use serde::{Deserialize, Serialize};
21
22use super::compensation::CompensationHint;
23use super::shape::ShapeDefinition;
24
25/// Sync message type identifiers.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27#[repr(u8)]
28pub enum SyncMessageType {
29    Handshake = 0x01,
30    HandshakeAck = 0x02,
31    DeltaPush = 0x10,
32    DeltaAck = 0x11,
33    DeltaReject = 0x12,
34    ShapeSubscribe = 0x20,
35    ShapeSnapshot = 0x21,
36    ShapeDelta = 0x22,
37    ShapeUnsubscribe = 0x23,
38    VectorClockSync = 0x30,
39    /// Timeseries metric batch push (client → server, 0x40).
40    TimeseriesPush = 0x40,
41    /// Timeseries push acknowledgment (server → client, 0x41).
42    TimeseriesAck = 0x41,
43    /// Re-sync request (bidirectional, 0x50).
44    /// Sent when sequence gaps or checksum failures are detected.
45    ResyncRequest = 0x50,
46    /// Downstream throttle (client → server, 0x52).
47    /// Sent when Lite's incoming queue is overwhelmed.
48    Throttle = 0x52,
49    /// Token refresh request (client → server, 0x60).
50    TokenRefresh = 0x60,
51    /// Token refresh acknowledgment (server → client, 0x61).
52    TokenRefreshAck = 0x61,
53    PingPong = 0xFF,
54}
55
56impl SyncMessageType {
57    pub fn from_u8(v: u8) -> Option<Self> {
58        match v {
59            0x01 => Some(Self::Handshake),
60            0x02 => Some(Self::HandshakeAck),
61            0x10 => Some(Self::DeltaPush),
62            0x11 => Some(Self::DeltaAck),
63            0x12 => Some(Self::DeltaReject),
64            0x20 => Some(Self::ShapeSubscribe),
65            0x21 => Some(Self::ShapeSnapshot),
66            0x22 => Some(Self::ShapeDelta),
67            0x23 => Some(Self::ShapeUnsubscribe),
68            0x30 => Some(Self::VectorClockSync),
69            0x40 => Some(Self::TimeseriesPush),
70            0x41 => Some(Self::TimeseriesAck),
71            0x50 => Some(Self::ResyncRequest),
72            0x52 => Some(Self::Throttle),
73            0x60 => Some(Self::TokenRefresh),
74            0x61 => Some(Self::TokenRefreshAck),
75            0xFF => Some(Self::PingPong),
76            _ => None,
77        }
78    }
79}
80
81/// Wire frame: wraps a message type + serialized body.
82///
83/// Layout: `[msg_type: 1B][length: 4B LE][body: N bytes]`
84/// Total header: 5 bytes.
85pub struct SyncFrame {
86    pub msg_type: SyncMessageType,
87    pub body: Vec<u8>,
88}
89
90impl SyncFrame {
91    pub const HEADER_SIZE: usize = 5;
92
93    /// Serialize a frame to bytes.
94    pub fn to_bytes(&self) -> Vec<u8> {
95        let len = self.body.len() as u32;
96        let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
97        buf.push(self.msg_type as u8);
98        buf.extend_from_slice(&len.to_le_bytes());
99        buf.extend_from_slice(&self.body);
100        buf
101    }
102
103    /// Deserialize a frame from bytes.
104    ///
105    /// Returns `None` if the data is too short or the message type is unknown.
106    pub fn from_bytes(data: &[u8]) -> Option<Self> {
107        if data.len() < Self::HEADER_SIZE {
108            return None;
109        }
110        let msg_type = SyncMessageType::from_u8(data[0])?;
111        let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
112        if data.len() < Self::HEADER_SIZE + len {
113            return None;
114        }
115        let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
116        Some(Self { msg_type, body })
117    }
118
119    /// Create a frame with a MessagePack-serialized body.
120    pub fn new_msgpack<T: Serialize>(msg_type: SyncMessageType, value: &T) -> Option<Self> {
121        let body = rmp_serde::to_vec_named(value).ok()?;
122        Some(Self { msg_type, body })
123    }
124
125    /// Create a frame from a serializable value, falling back to an empty
126    /// body if serialization fails.
127    pub fn encode_or_empty<T: Serialize>(msg_type: SyncMessageType, value: &T) -> Self {
128        Self::new_msgpack(msg_type, value).unwrap_or(Self {
129            msg_type,
130            body: Vec::new(),
131        })
132    }
133
134    /// Deserialize the body from MessagePack.
135    pub fn decode_body<'a, T: Deserialize<'a>>(&'a self) -> Option<T> {
136        rmp_serde::from_slice(&self.body).ok()
137    }
138}
139
140// ─── Message Payloads ───────────────────────────────────────────────────────
141
142/// Handshake message (client → server, 0x01).
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct HandshakeMsg {
145    /// JWT bearer token for authentication.
146    pub jwt_token: String,
147    /// Client's vector clock: `{ collection: { doc_id: lamport_ts } }`.
148    pub vector_clock: HashMap<String, HashMap<String, u64>>,
149    /// Shape IDs the client is subscribed to.
150    pub subscribed_shapes: Vec<String>,
151    /// Client version string.
152    pub client_version: String,
153    /// Lite instance identity (UUID v7). Empty for legacy clients.
154    #[serde(default)]
155    pub lite_id: String,
156    /// Monotonic epoch counter (incremented on every open). 0 for legacy clients.
157    #[serde(default)]
158    pub epoch: u64,
159    /// Wire format version. Server rejects connections with incompatible versions.
160    /// 0 = legacy client (pre-wire-version; treated as version 1).
161    #[serde(default)]
162    pub wire_version: u16,
163}
164
165/// Handshake acknowledgment (server → client, 0x02).
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct HandshakeAckMsg {
168    /// Whether the handshake succeeded.
169    pub success: bool,
170    /// Session ID assigned by the server.
171    pub session_id: String,
172    /// Server's vector clock (for initial sync).
173    pub server_clock: HashMap<String, u64>,
174    /// Error message (if !success).
175    pub error: Option<String>,
176    /// Fork detection: if true, client must regenerate LiteId and reconnect.
177    #[serde(default)]
178    pub fork_detected: bool,
179    /// Server's wire format version (for client-side compatibility check).
180    #[serde(default)]
181    pub server_wire_version: u16,
182}
183
184/// Delta push message (client → server, 0x10).
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct DeltaPushMsg {
187    /// Collection the delta applies to.
188    pub collection: String,
189    /// Document ID.
190    pub document_id: String,
191    /// Loro CRDT delta bytes.
192    pub delta: Vec<u8>,
193    /// Client's peer ID (for CRDT identity).
194    pub peer_id: u64,
195    /// Per-mutation unique ID for dedup.
196    pub mutation_id: u64,
197    /// CRC32C checksum of `delta` bytes for integrity verification.
198    /// Computed by sender, validated by receiver. 0 for legacy clients.
199    #[serde(default)]
200    pub checksum: u32,
201}
202
203/// Delta acknowledgment (server → client, 0x11).
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct DeltaAckMsg {
206    /// Mutation ID being acknowledged.
207    pub mutation_id: u64,
208    /// Server-assigned LSN for this mutation.
209    pub lsn: u64,
210}
211
212/// Delta rejection (server → client, 0x12).
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct DeltaRejectMsg {
215    /// Mutation ID being rejected.
216    pub mutation_id: u64,
217    /// Reason for rejection.
218    pub reason: String,
219    /// Compensation hints for the client.
220    pub compensation: Option<CompensationHint>,
221}
222
223/// Shape subscribe request (client → server, 0x20).
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct ShapeSubscribeMsg {
226    /// Shape definition to subscribe to.
227    pub shape: ShapeDefinition,
228}
229
230/// Shape snapshot response (server → client, 0x21).
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct ShapeSnapshotMsg {
233    /// Shape ID this snapshot belongs to.
234    pub shape_id: String,
235    /// Initial dataset: serialized document rows matching the shape.
236    pub data: Vec<u8>,
237    /// LSN at snapshot time — deltas after this LSN will follow.
238    pub snapshot_lsn: u64,
239    /// Number of documents in the snapshot.
240    pub doc_count: usize,
241}
242
243/// Shape delta message (server → client, 0x22).
244#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct ShapeDeltaMsg {
246    /// Shape ID this delta applies to.
247    pub shape_id: String,
248    /// Collection affected.
249    pub collection: String,
250    /// Document ID affected.
251    pub document_id: String,
252    /// Operation type: "INSERT", "UPDATE", "DELETE".
253    pub operation: String,
254    /// Delta payload (CRDT delta bytes or document value).
255    pub delta: Vec<u8>,
256    /// WAL LSN of this mutation.
257    pub lsn: u64,
258}
259
260/// Shape unsubscribe request (client → server, 0x23).
261#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct ShapeUnsubscribeMsg {
263    pub shape_id: String,
264}
265
266/// Vector clock sync message (bidirectional, 0x30).
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct VectorClockSyncMsg {
269    /// Per-collection clock: `{ collection: max_lsn }`.
270    pub clocks: HashMap<String, u64>,
271    /// Sender's node/peer ID.
272    pub sender_id: u64,
273}
274
275/// Re-sync request message (bidirectional, 0x50).
276///
277/// Sent when a receiver detects:
278/// - Sequence gap: missing `mutation_id`s in the delta stream
279/// - Checksum failure: CRC32C mismatch on a delta payload
280/// - State divergence: local state inconsistent with received deltas
281///
282/// On receiving a ResyncRequest, the sender should:
283/// 1. Re-send all deltas from `from_mutation_id` onwards, OR
284/// 2. Send a full snapshot if `from_mutation_id` is 0
285#[derive(Debug, Clone, Serialize, Deserialize)]
286pub struct ResyncRequestMsg {
287    /// Reason for requesting re-sync.
288    pub reason: ResyncReason,
289    /// Resume from this mutation ID (0 = full re-sync).
290    pub from_mutation_id: u64,
291    /// Collection scope (empty = all collections).
292    pub collection: String,
293}
294
295/// Reason for a re-sync request.
296#[derive(Debug, Clone, Serialize, Deserialize)]
297pub enum ResyncReason {
298    /// Detected missing mutation IDs in the delta stream.
299    SequenceGap {
300        /// The expected next mutation ID.
301        expected: u64,
302        /// The mutation ID that was actually received.
303        received: u64,
304    },
305    /// CRC32C checksum mismatch on a delta payload.
306    ChecksumMismatch {
307        /// The mutation ID of the corrupted delta.
308        mutation_id: u64,
309    },
310    /// Corruption detected on cold start, need full re-sync.
311    CorruptedState,
312}
313
314/// Downstream throttle message (client → server, 0x52).
315///
316/// Sent by Lite when its incoming shape delta queue is overwhelmed.
317/// Origin should reduce its push rate for this peer until a
318/// `Throttle { throttle: false }` is received.
319#[derive(Debug, Clone, Serialize, Deserialize)]
320pub struct ThrottleMsg {
321    /// `true` to enable throttling, `false` to release.
322    pub throttle: bool,
323    /// Current queue depth at Lite (informational).
324    pub queue_depth: u64,
325    /// Suggested max deltas per second (0 = use server default).
326    pub suggested_rate: u64,
327}
328
329/// Token refresh request (client → server, 0x60).
330///
331/// Sent by Lite before the current JWT expires. The client provides
332/// a fresh token obtained from the application's auth layer.
333/// Origin validates the new token and either upgrades the session
334/// or disconnects if the token is invalid.
335#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct TokenRefreshMsg {
337    /// New JWT bearer token.
338    pub new_token: String,
339}
340
341/// Token refresh acknowledgment (server → client, 0x61).
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct TokenRefreshAckMsg {
344    /// Whether the token refresh succeeded.
345    pub success: bool,
346    /// Error message (if !success).
347    pub error: Option<String>,
348    /// Seconds until this new token expires (so Lite can schedule next refresh).
349    #[serde(default)]
350    pub expires_in_secs: u64,
351}
352
353/// Ping/Pong keepalive (0xFF).
354#[derive(Debug, Clone, Serialize, Deserialize)]
355pub struct PingPongMsg {
356    /// Timestamp (epoch milliseconds) for RTT measurement.
357    pub timestamp_ms: u64,
358    /// Whether this is a pong (response to ping).
359    pub is_pong: bool,
360}
361
362/// Timeseries metric batch push (client → server, 0x40).
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct TimeseriesPushMsg {
365    /// Source Lite instance ID (UUID v7).
366    pub lite_id: String,
367    /// Collection name.
368    pub collection: String,
369    /// Gorilla-encoded timestamp block.
370    pub ts_block: Vec<u8>,
371    /// Gorilla-encoded value block.
372    pub val_block: Vec<u8>,
373    /// Raw LE u64 series ID block.
374    pub series_block: Vec<u8>,
375    /// Number of samples in this batch.
376    pub sample_count: u64,
377    /// Min timestamp in this batch.
378    pub min_ts: i64,
379    /// Max timestamp in this batch.
380    pub max_ts: i64,
381    /// Per-series sync watermark: highest LSN already synced for each series.
382    /// Only samples after these watermarks are included.
383    pub watermarks: HashMap<u64, u64>,
384}
385
386/// Timeseries push acknowledgment (server → client, 0x41).
387#[derive(Debug, Clone, Serialize, Deserialize)]
388pub struct TimeseriesAckMsg {
389    /// Collection acknowledged.
390    pub collection: String,
391    /// Number of samples accepted.
392    pub accepted: u64,
393    /// Number of samples rejected (duplicates, out-of-retention, etc.)
394    pub rejected: u64,
395    /// Server-assigned LSN for this batch (used as sync watermark).
396    pub lsn: u64,
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402
403    #[test]
404    fn frame_roundtrip() {
405        let ping = PingPongMsg {
406            timestamp_ms: 12345,
407            is_pong: false,
408        };
409        let frame = SyncFrame::new_msgpack(SyncMessageType::PingPong, &ping).unwrap();
410        let bytes = frame.to_bytes();
411        let decoded = SyncFrame::from_bytes(&bytes).unwrap();
412        assert_eq!(decoded.msg_type, SyncMessageType::PingPong);
413        let decoded_ping: PingPongMsg = decoded.decode_body().unwrap();
414        assert_eq!(decoded_ping.timestamp_ms, 12345);
415        assert!(!decoded_ping.is_pong);
416    }
417
418    #[test]
419    fn handshake_serialization() {
420        let msg = HandshakeMsg {
421            jwt_token: "test.jwt.token".into(),
422            vector_clock: HashMap::new(),
423            subscribed_shapes: vec!["shape1".into()],
424            client_version: "0.1.0".into(),
425            lite_id: String::new(),
426            epoch: 0,
427            wire_version: 1,
428        };
429        let frame = SyncFrame::new_msgpack(SyncMessageType::Handshake, &msg).unwrap();
430        let bytes = frame.to_bytes();
431        assert!(bytes.len() > SyncFrame::HEADER_SIZE);
432        assert_eq!(bytes[0], 0x01);
433    }
434
435    #[test]
436    fn delta_reject_with_compensation() {
437        let reject = DeltaRejectMsg {
438            mutation_id: 42,
439            reason: "unique violation".into(),
440            compensation: Some(CompensationHint::UniqueViolation {
441                field: "email".into(),
442                conflicting_value: "alice@example.com".into(),
443            }),
444        };
445        let frame = SyncFrame::new_msgpack(SyncMessageType::DeltaReject, &reject).unwrap();
446        let decoded: DeltaRejectMsg = SyncFrame::from_bytes(&frame.to_bytes())
447            .unwrap()
448            .decode_body()
449            .unwrap();
450        assert_eq!(decoded.mutation_id, 42);
451        assert!(matches!(
452            decoded.compensation,
453            Some(CompensationHint::UniqueViolation { .. })
454        ));
455    }
456
457    #[test]
458    fn message_type_roundtrip() {
459        for v in [
460            0x01, 0x02, 0x10, 0x11, 0x12, 0x20, 0x21, 0x22, 0x23, 0x30, 0x40, 0x41, 0x50, 0x52,
461            0x60, 0x61, 0xFF,
462        ] {
463            let mt = SyncMessageType::from_u8(v).unwrap();
464            assert_eq!(mt as u8, v);
465        }
466        assert!(SyncMessageType::from_u8(0x99).is_none());
467    }
468
469    #[test]
470    fn shape_subscribe_roundtrip() {
471        let msg = ShapeSubscribeMsg {
472            shape: ShapeDefinition {
473                shape_id: "s1".into(),
474                tenant_id: 1,
475                shape_type: super::super::shape::ShapeType::Vector {
476                    collection: "embeddings".into(),
477                    field_name: None,
478                },
479                description: "all embeddings".into(),
480                field_filter: vec![],
481            },
482        };
483        let frame = SyncFrame::new_msgpack(SyncMessageType::ShapeSubscribe, &msg).unwrap();
484        let decoded: ShapeSubscribeMsg = SyncFrame::from_bytes(&frame.to_bytes())
485            .unwrap()
486            .decode_body()
487            .unwrap();
488        assert_eq!(decoded.shape.shape_id, "s1");
489    }
490}