Skip to main content

nodedb_types/sync/wire/
frame.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Wire frame format and message-type discriminants.
4//!
5//! Additional opcodes beyond what the module-level doc in `mod.rs` lists:
6//! - `0xA2` VectorInsert (client → server)
7//! - `0xA3` VectorInsertAck (server → client)
8//! - `0xA4` VectorDelete (client → server)
9//! - `0xA5` VectorDeleteAck (server → client)
10//! - `0xA6` FtsIndex (client → server)
11//! - `0xA7` FtsIndexAck (server → client)
12//! - `0xA8` FtsDelete (client → server)
13//! - `0xA9` FtsDeleteAck (server → client)
14//! - `0xAA` SpatialInsert (client → server)
15//! - `0xAB` SpatialInsertAck (server → client)
16//! - `0xAC` SpatialDelete (client → server)
17//! - `0xAD` SpatialDeleteAck (server → client)
18
19/// Sync message type identifiers.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21#[repr(u8)]
22#[non_exhaustive]
23pub enum SyncMessageType {
24    Handshake = 0x01,
25    HandshakeAck = 0x02,
26    DeltaPush = 0x10,
27    DeltaAck = 0x11,
28    DeltaReject = 0x12,
29    /// Collection purged notification (server → client, 0x14).
30    /// Sent when an Origin collection is hard-deleted (UNDROP window
31    /// expired or explicit `DROP COLLECTION ... PURGE`). The client
32    /// must drop local Loro state and remove the collection's redb
33    /// record; future deltas for the collection are not sync-eligible.
34    CollectionPurged = 0x14,
35    ShapeSubscribe = 0x20,
36    ShapeSnapshot = 0x21,
37    ShapeDelta = 0x22,
38    ShapeUnsubscribe = 0x23,
39    VectorClockSync = 0x30,
40    /// Timeseries metric batch push (client → server, 0x40).
41    TimeseriesPush = 0x40,
42    /// Timeseries push acknowledgment (server → client, 0x41).
43    TimeseriesAck = 0x41,
44    /// Re-sync request (bidirectional, 0x50).
45    /// Sent when sequence gaps or checksum failures are detected.
46    ResyncRequest = 0x50,
47    /// Downstream throttle (client → server, 0x52).
48    /// Sent when Lite's incoming queue is overwhelmed.
49    Throttle = 0x52,
50    /// Token refresh request (client → server, 0x60).
51    TokenRefresh = 0x60,
52    /// Token refresh acknowledgment (server → client, 0x61).
53    TokenRefreshAck = 0x61,
54    /// Definition sync (server → client, 0x70).
55    /// Carries function/trigger/procedure definitions from Origin to Lite.
56    DefinitionSync = 0x70,
57    /// Presence update (client → server, 0x80).
58    PresenceUpdate = 0x80,
59    /// Presence broadcast (server → all subscribers except sender, 0x81).
60    PresenceBroadcast = 0x81,
61    /// Presence leave (server → all subscribers, 0x82).
62    PresenceLeave = 0x82,
63    /// Array CRDT delta (single op, client → server, 0x90).
64    ArrayDelta = 0x90,
65    /// Array CRDT delta batch (multiple ops, client → server, 0x91).
66    ArrayDeltaBatch = 0x91,
67    /// Array snapshot header (server → client, 0x92).
68    ArraySnapshot = 0x92,
69    /// Array snapshot chunk (server → client, 0x93).
70    ArraySnapshotChunk = 0x93,
71    /// Array schema CRDT sync (bidirectional, 0x94).
72    ArraySchema = 0x94,
73    /// Array ack — advances GC frontier (client → server, 0x95).
74    ArrayAck = 0x95,
75    /// Array reject (server → client, 0x96). Compensation hint.
76    ArrayReject = 0x96,
77    /// Array catchup request (client → server, 0x97).
78    ArrayCatchupRequest = 0x97,
79    /// Columnar batch insert (client → server, 0xA0).
80    ColumnarInsert = 0xA0,
81    /// Columnar insert acknowledgment (server → client, 0xA1).
82    ColumnarInsertAck = 0xA1,
83    /// Vector insert (client → server, 0xA2).
84    VectorInsert = 0xA2,
85    /// Vector insert acknowledgment (server → client, 0xA3).
86    VectorInsertAck = 0xA3,
87    /// Vector delete (client → server, 0xA4).
88    VectorDelete = 0xA4,
89    /// Vector delete acknowledgment (server → client, 0xA5).
90    VectorDeleteAck = 0xA5,
91    /// FTS document index (client → server, 0xA6).
92    FtsIndex = 0xA6,
93    /// FTS index acknowledgment (server → client, 0xA7).
94    FtsIndexAck = 0xA7,
95    /// FTS document delete (client → server, 0xA8).
96    FtsDelete = 0xA8,
97    /// FTS delete acknowledgment (server → client, 0xA9).
98    FtsDeleteAck = 0xA9,
99    /// Spatial geometry insert (client → server, 0xAA).
100    SpatialInsert = 0xAA,
101    /// Spatial insert acknowledgment (server → client, 0xAB).
102    SpatialInsertAck = 0xAB,
103    /// Spatial geometry delete (client → server, 0xAC).
104    SpatialDelete = 0xAC,
105    /// Spatial delete acknowledgment (server → client, 0xAD).
106    SpatialDeleteAck = 0xAD,
107    PingPong = 0xFF,
108}
109
110impl SyncMessageType {
111    pub fn from_u8(v: u8) -> Option<Self> {
112        match v {
113            0x01 => Some(Self::Handshake),
114            0x02 => Some(Self::HandshakeAck),
115            0x10 => Some(Self::DeltaPush),
116            0x11 => Some(Self::DeltaAck),
117            0x12 => Some(Self::DeltaReject),
118            0x14 => Some(Self::CollectionPurged),
119            0x20 => Some(Self::ShapeSubscribe),
120            0x21 => Some(Self::ShapeSnapshot),
121            0x22 => Some(Self::ShapeDelta),
122            0x23 => Some(Self::ShapeUnsubscribe),
123            0x30 => Some(Self::VectorClockSync),
124            0x40 => Some(Self::TimeseriesPush),
125            0x41 => Some(Self::TimeseriesAck),
126            0x50 => Some(Self::ResyncRequest),
127            0x52 => Some(Self::Throttle),
128            0x60 => Some(Self::TokenRefresh),
129            0x61 => Some(Self::TokenRefreshAck),
130            0x70 => Some(Self::DefinitionSync),
131            0x80 => Some(Self::PresenceUpdate),
132            0x81 => Some(Self::PresenceBroadcast),
133            0x82 => Some(Self::PresenceLeave),
134            0x90 => Some(Self::ArrayDelta),
135            0x91 => Some(Self::ArrayDeltaBatch),
136            0x92 => Some(Self::ArraySnapshot),
137            0x93 => Some(Self::ArraySnapshotChunk),
138            0x94 => Some(Self::ArraySchema),
139            0x95 => Some(Self::ArrayAck),
140            0x96 => Some(Self::ArrayReject),
141            0x97 => Some(Self::ArrayCatchupRequest),
142            0xA0 => Some(Self::ColumnarInsert),
143            0xA1 => Some(Self::ColumnarInsertAck),
144            0xA2 => Some(Self::VectorInsert),
145            0xA3 => Some(Self::VectorInsertAck),
146            0xA4 => Some(Self::VectorDelete),
147            0xA5 => Some(Self::VectorDeleteAck),
148            0xA6 => Some(Self::FtsIndex),
149            0xA7 => Some(Self::FtsIndexAck),
150            0xA8 => Some(Self::FtsDelete),
151            0xA9 => Some(Self::FtsDeleteAck),
152            0xAA => Some(Self::SpatialInsert),
153            0xAB => Some(Self::SpatialInsertAck),
154            0xAC => Some(Self::SpatialDelete),
155            0xAD => Some(Self::SpatialDeleteAck),
156            0xFF => Some(Self::PingPong),
157            _ => None,
158        }
159    }
160}
161
162/// Wire frame: wraps a message type + serialized body.
163///
164/// Layout: `[msg_type: 1B][length: 4B LE][body: N bytes]`
165/// Total header: 5 bytes.
166///
167/// `#[non_exhaustive]` — additional header fields (e.g. compression flag,
168/// session token) may be added without breaking downstream consumers.
169#[non_exhaustive]
170#[derive(Clone)]
171pub struct SyncFrame {
172    pub msg_type: SyncMessageType,
173    pub body: Vec<u8>,
174}
175
176impl SyncFrame {
177    pub const HEADER_SIZE: usize = 5;
178
179    /// Serialize a frame to bytes.
180    pub fn to_bytes(&self) -> Vec<u8> {
181        let len = self.body.len() as u32;
182        let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
183        buf.push(self.msg_type as u8);
184        buf.extend_from_slice(&len.to_le_bytes());
185        buf.extend_from_slice(&self.body);
186        buf
187    }
188
189    /// Deserialize a frame from bytes.
190    ///
191    /// Returns `None` if the data is too short or the message type is unknown.
192    pub fn from_bytes(data: &[u8]) -> Option<Self> {
193        if data.len() < Self::HEADER_SIZE {
194            return None;
195        }
196        let msg_type = SyncMessageType::from_u8(data[0])?;
197        let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
198        if data.len() < Self::HEADER_SIZE + len {
199            return None;
200        }
201        let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
202        Some(Self { msg_type, body })
203    }
204
205    /// Create a frame with a MessagePack-serialized body.
206    pub fn new_msgpack<T: zerompk::ToMessagePack>(
207        msg_type: SyncMessageType,
208        value: &T,
209    ) -> Option<Self> {
210        let body = zerompk::to_msgpack_vec(value).ok()?;
211        Some(Self { msg_type, body })
212    }
213
214    /// Try to encode a value into a SyncFrame body.
215    ///
216    /// Returns `None` and logs an error on serialization failure — callers
217    /// should propagate via `?`. The protocol must never ship a frame
218    /// whose body did not serialize successfully.
219    pub fn try_encode<T: zerompk::ToMessagePack>(
220        msg_type: SyncMessageType,
221        value: &T,
222    ) -> Option<Self> {
223        match zerompk::to_msgpack_vec(value) {
224            Ok(body) => Some(Self { msg_type, body }),
225            Err(e) => {
226                tracing::error!(
227                    msg_type = msg_type as u8,
228                    error = %e,
229                    "failed to encode sync frame body; dropping response"
230                );
231                None
232            }
233        }
234    }
235
236    /// Deserialize the body from MessagePack.
237    pub fn decode_body<T: zerompk::FromMessagePackOwned>(&self) -> Option<T> {
238        zerompk::from_msgpack(&self.body).ok()
239    }
240}