Skip to main content

nodedb_types/sync/wire/
frame.rs

1//! Wire frame format and message-type discriminants.
2
3/// Sync message type identifiers.
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5#[repr(u8)]
6pub enum SyncMessageType {
7    Handshake = 0x01,
8    HandshakeAck = 0x02,
9    DeltaPush = 0x10,
10    DeltaAck = 0x11,
11    DeltaReject = 0x12,
12    /// Collection purged notification (server → client, 0x14).
13    /// Sent when an Origin collection is hard-deleted (UNDROP window
14    /// expired or explicit `DROP COLLECTION ... PURGE`). The client
15    /// must drop local Loro state and remove the collection's redb
16    /// record; future deltas for the collection are not sync-eligible.
17    CollectionPurged = 0x14,
18    ShapeSubscribe = 0x20,
19    ShapeSnapshot = 0x21,
20    ShapeDelta = 0x22,
21    ShapeUnsubscribe = 0x23,
22    VectorClockSync = 0x30,
23    /// Timeseries metric batch push (client → server, 0x40).
24    TimeseriesPush = 0x40,
25    /// Timeseries push acknowledgment (server → client, 0x41).
26    TimeseriesAck = 0x41,
27    /// Re-sync request (bidirectional, 0x50).
28    /// Sent when sequence gaps or checksum failures are detected.
29    ResyncRequest = 0x50,
30    /// Downstream throttle (client → server, 0x52).
31    /// Sent when Lite's incoming queue is overwhelmed.
32    Throttle = 0x52,
33    /// Token refresh request (client → server, 0x60).
34    TokenRefresh = 0x60,
35    /// Token refresh acknowledgment (server → client, 0x61).
36    TokenRefreshAck = 0x61,
37    /// Definition sync (server → client, 0x70).
38    /// Carries function/trigger/procedure definitions from Origin to Lite.
39    DefinitionSync = 0x70,
40    /// Presence update (client → server, 0x80).
41    PresenceUpdate = 0x80,
42    /// Presence broadcast (server → all subscribers except sender, 0x81).
43    PresenceBroadcast = 0x81,
44    /// Presence leave (server → all subscribers, 0x82).
45    PresenceLeave = 0x82,
46    PingPong = 0xFF,
47}
48
49impl SyncMessageType {
50    pub fn from_u8(v: u8) -> Option<Self> {
51        match v {
52            0x01 => Some(Self::Handshake),
53            0x02 => Some(Self::HandshakeAck),
54            0x10 => Some(Self::DeltaPush),
55            0x11 => Some(Self::DeltaAck),
56            0x12 => Some(Self::DeltaReject),
57            0x14 => Some(Self::CollectionPurged),
58            0x20 => Some(Self::ShapeSubscribe),
59            0x21 => Some(Self::ShapeSnapshot),
60            0x22 => Some(Self::ShapeDelta),
61            0x23 => Some(Self::ShapeUnsubscribe),
62            0x30 => Some(Self::VectorClockSync),
63            0x40 => Some(Self::TimeseriesPush),
64            0x41 => Some(Self::TimeseriesAck),
65            0x50 => Some(Self::ResyncRequest),
66            0x52 => Some(Self::Throttle),
67            0x60 => Some(Self::TokenRefresh),
68            0x61 => Some(Self::TokenRefreshAck),
69            0x70 => Some(Self::DefinitionSync),
70            0x80 => Some(Self::PresenceUpdate),
71            0x81 => Some(Self::PresenceBroadcast),
72            0x82 => Some(Self::PresenceLeave),
73            0xFF => Some(Self::PingPong),
74            _ => None,
75        }
76    }
77}
78
79/// Wire frame: wraps a message type + serialized body.
80///
81/// Layout: `[msg_type: 1B][length: 4B LE][body: N bytes]`
82/// Total header: 5 bytes.
83#[derive(Clone)]
84pub struct SyncFrame {
85    pub msg_type: SyncMessageType,
86    pub body: Vec<u8>,
87}
88
89impl SyncFrame {
90    pub const HEADER_SIZE: usize = 5;
91
92    /// Serialize a frame to bytes.
93    pub fn to_bytes(&self) -> Vec<u8> {
94        let len = self.body.len() as u32;
95        let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
96        buf.push(self.msg_type as u8);
97        buf.extend_from_slice(&len.to_le_bytes());
98        buf.extend_from_slice(&self.body);
99        buf
100    }
101
102    /// Deserialize a frame from bytes.
103    ///
104    /// Returns `None` if the data is too short or the message type is unknown.
105    pub fn from_bytes(data: &[u8]) -> Option<Self> {
106        if data.len() < Self::HEADER_SIZE {
107            return None;
108        }
109        let msg_type = SyncMessageType::from_u8(data[0])?;
110        let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
111        if data.len() < Self::HEADER_SIZE + len {
112            return None;
113        }
114        let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
115        Some(Self { msg_type, body })
116    }
117
118    /// Create a frame with a MessagePack-serialized body.
119    pub fn new_msgpack<T: zerompk::ToMessagePack>(
120        msg_type: SyncMessageType,
121        value: &T,
122    ) -> Option<Self> {
123        let body = zerompk::to_msgpack_vec(value).ok()?;
124        Some(Self { msg_type, body })
125    }
126
127    /// Create a frame from a serializable value, falling back to an empty
128    /// body if serialization fails.
129    pub fn encode_or_empty<T: zerompk::ToMessagePack>(
130        msg_type: SyncMessageType,
131        value: &T,
132    ) -> Self {
133        Self::new_msgpack(msg_type, value).unwrap_or(Self {
134            msg_type,
135            body: Vec::new(),
136        })
137    }
138
139    /// Deserialize the body from MessagePack.
140    pub fn decode_body<T: zerompk::FromMessagePackOwned>(&self) -> Option<T> {
141        zerompk::from_msgpack(&self.body).ok()
142    }
143}