nodedb_types/sync/wire/frame.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Wire frame format and message-type discriminants.
4//!
5//! Additional opcodes beyond what the module-level doc in `mod.rs` lists:
6//! - `0xA2` VectorInsert (client → server)
7//! - `0xA3` VectorInsertAck (server → client)
8//! - `0xA4` VectorDelete (client → server)
9//! - `0xA5` VectorDeleteAck (server → client)
10//! - `0xA6` FtsIndex (client → server)
11//! - `0xA7` FtsIndexAck (server → client)
12//! - `0xA8` FtsDelete (client → server)
13//! - `0xA9` FtsDeleteAck (server → client)
14//! - `0xAA` SpatialInsert (client → server)
15//! - `0xAB` SpatialInsertAck (server → client)
16//! - `0xAC` SpatialDelete (client → server)
17//! - `0xAD` SpatialDeleteAck (server → client)
18
19/// Sync message type identifiers.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21#[repr(u8)]
22#[non_exhaustive]
23pub enum SyncMessageType {
24 Handshake = 0x01,
25 HandshakeAck = 0x02,
26 DeltaPush = 0x10,
27 DeltaAck = 0x11,
28 DeltaReject = 0x12,
29 /// Collection purged notification (server → client, 0x14).
30 /// Sent when an Origin collection is hard-deleted (UNDROP window
31 /// expired or explicit `DROP COLLECTION ... PURGE`). The client
32 /// must drop local Loro state and remove the collection's redb
33 /// record; future deltas for the collection are not sync-eligible.
34 CollectionPurged = 0x14,
35 ShapeSubscribe = 0x20,
36 ShapeSnapshot = 0x21,
37 ShapeDelta = 0x22,
38 ShapeUnsubscribe = 0x23,
39 VectorClockSync = 0x30,
40 /// Timeseries metric batch push (client → server, 0x40).
41 TimeseriesPush = 0x40,
42 /// Timeseries push acknowledgment (server → client, 0x41).
43 TimeseriesAck = 0x41,
44 /// Re-sync request (bidirectional, 0x50).
45 /// Sent when sequence gaps or checksum failures are detected.
46 ResyncRequest = 0x50,
47 /// Downstream throttle (client → server, 0x52).
48 /// Sent when Lite's incoming queue is overwhelmed.
49 Throttle = 0x52,
50 /// Token refresh request (client → server, 0x60).
51 TokenRefresh = 0x60,
52 /// Token refresh acknowledgment (server → client, 0x61).
53 TokenRefreshAck = 0x61,
54 /// Definition sync (server → client, 0x70).
55 /// Carries function/trigger/procedure definitions from Origin to Lite.
56 DefinitionSync = 0x70,
57 /// Presence update (client → server, 0x80).
58 PresenceUpdate = 0x80,
59 /// Presence broadcast (server → all subscribers except sender, 0x81).
60 PresenceBroadcast = 0x81,
61 /// Presence leave (server → all subscribers, 0x82).
62 PresenceLeave = 0x82,
63 /// Array CRDT delta (single op, client → server, 0x90).
64 ArrayDelta = 0x90,
65 /// Array CRDT delta batch (multiple ops, client → server, 0x91).
66 ArrayDeltaBatch = 0x91,
67 /// Array snapshot header (server → client, 0x92).
68 ArraySnapshot = 0x92,
69 /// Array snapshot chunk (server → client, 0x93).
70 ArraySnapshotChunk = 0x93,
71 /// Array schema CRDT sync (bidirectional, 0x94).
72 ArraySchema = 0x94,
73 /// Array ack — advances GC frontier (client → server, 0x95).
74 ArrayAck = 0x95,
75 /// Array reject (server → client, 0x96). Compensation hint.
76 ArrayReject = 0x96,
77 /// Array catchup request (client → server, 0x97).
78 ArrayCatchupRequest = 0x97,
79 /// Columnar batch insert (client → server, 0xA0).
80 ColumnarInsert = 0xA0,
81 /// Columnar insert acknowledgment (server → client, 0xA1).
82 ColumnarInsertAck = 0xA1,
83 /// Vector insert (client → server, 0xA2).
84 VectorInsert = 0xA2,
85 /// Vector insert acknowledgment (server → client, 0xA3).
86 VectorInsertAck = 0xA3,
87 /// Vector delete (client → server, 0xA4).
88 VectorDelete = 0xA4,
89 /// Vector delete acknowledgment (server → client, 0xA5).
90 VectorDeleteAck = 0xA5,
91 /// FTS document index (client → server, 0xA6).
92 FtsIndex = 0xA6,
93 /// FTS index acknowledgment (server → client, 0xA7).
94 FtsIndexAck = 0xA7,
95 /// FTS document delete (client → server, 0xA8).
96 FtsDelete = 0xA8,
97 /// FTS delete acknowledgment (server → client, 0xA9).
98 FtsDeleteAck = 0xA9,
99 /// Spatial geometry insert (client → server, 0xAA).
100 SpatialInsert = 0xAA,
101 /// Spatial insert acknowledgment (server → client, 0xAB).
102 SpatialInsertAck = 0xAB,
103 /// Spatial geometry delete (client → server, 0xAC).
104 SpatialDelete = 0xAC,
105 /// Spatial delete acknowledgment (server → client, 0xAD).
106 SpatialDeleteAck = 0xAD,
107 PingPong = 0xFF,
108}
109
110impl SyncMessageType {
111 pub fn from_u8(v: u8) -> Option<Self> {
112 match v {
113 0x01 => Some(Self::Handshake),
114 0x02 => Some(Self::HandshakeAck),
115 0x10 => Some(Self::DeltaPush),
116 0x11 => Some(Self::DeltaAck),
117 0x12 => Some(Self::DeltaReject),
118 0x14 => Some(Self::CollectionPurged),
119 0x20 => Some(Self::ShapeSubscribe),
120 0x21 => Some(Self::ShapeSnapshot),
121 0x22 => Some(Self::ShapeDelta),
122 0x23 => Some(Self::ShapeUnsubscribe),
123 0x30 => Some(Self::VectorClockSync),
124 0x40 => Some(Self::TimeseriesPush),
125 0x41 => Some(Self::TimeseriesAck),
126 0x50 => Some(Self::ResyncRequest),
127 0x52 => Some(Self::Throttle),
128 0x60 => Some(Self::TokenRefresh),
129 0x61 => Some(Self::TokenRefreshAck),
130 0x70 => Some(Self::DefinitionSync),
131 0x80 => Some(Self::PresenceUpdate),
132 0x81 => Some(Self::PresenceBroadcast),
133 0x82 => Some(Self::PresenceLeave),
134 0x90 => Some(Self::ArrayDelta),
135 0x91 => Some(Self::ArrayDeltaBatch),
136 0x92 => Some(Self::ArraySnapshot),
137 0x93 => Some(Self::ArraySnapshotChunk),
138 0x94 => Some(Self::ArraySchema),
139 0x95 => Some(Self::ArrayAck),
140 0x96 => Some(Self::ArrayReject),
141 0x97 => Some(Self::ArrayCatchupRequest),
142 0xA0 => Some(Self::ColumnarInsert),
143 0xA1 => Some(Self::ColumnarInsertAck),
144 0xA2 => Some(Self::VectorInsert),
145 0xA3 => Some(Self::VectorInsertAck),
146 0xA4 => Some(Self::VectorDelete),
147 0xA5 => Some(Self::VectorDeleteAck),
148 0xA6 => Some(Self::FtsIndex),
149 0xA7 => Some(Self::FtsIndexAck),
150 0xA8 => Some(Self::FtsDelete),
151 0xA9 => Some(Self::FtsDeleteAck),
152 0xAA => Some(Self::SpatialInsert),
153 0xAB => Some(Self::SpatialInsertAck),
154 0xAC => Some(Self::SpatialDelete),
155 0xAD => Some(Self::SpatialDeleteAck),
156 0xFF => Some(Self::PingPong),
157 _ => None,
158 }
159 }
160}
161
162/// Wire frame: wraps a message type + serialized body.
163///
164/// Layout: `[msg_type: 1B][length: 4B LE][body: N bytes]`
165/// Total header: 5 bytes.
166///
167/// `#[non_exhaustive]` — additional header fields (e.g. compression flag,
168/// session token) may be added without breaking downstream consumers.
169#[non_exhaustive]
170#[derive(Clone)]
171pub struct SyncFrame {
172 pub msg_type: SyncMessageType,
173 pub body: Vec<u8>,
174}
175
176impl SyncFrame {
177 pub const HEADER_SIZE: usize = 5;
178
179 /// Serialize a frame to bytes.
180 pub fn to_bytes(&self) -> Vec<u8> {
181 let len = self.body.len() as u32;
182 let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
183 buf.push(self.msg_type as u8);
184 buf.extend_from_slice(&len.to_le_bytes());
185 buf.extend_from_slice(&self.body);
186 buf
187 }
188
189 /// Deserialize a frame from bytes.
190 ///
191 /// Returns `None` if the data is too short or the message type is unknown.
192 pub fn from_bytes(data: &[u8]) -> Option<Self> {
193 if data.len() < Self::HEADER_SIZE {
194 return None;
195 }
196 let msg_type = SyncMessageType::from_u8(data[0])?;
197 let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
198 if data.len() < Self::HEADER_SIZE + len {
199 return None;
200 }
201 let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
202 Some(Self { msg_type, body })
203 }
204
205 /// Create a frame with a MessagePack-serialized body.
206 pub fn new_msgpack<T: zerompk::ToMessagePack>(
207 msg_type: SyncMessageType,
208 value: &T,
209 ) -> Option<Self> {
210 let body = zerompk::to_msgpack_vec(value).ok()?;
211 Some(Self { msg_type, body })
212 }
213
214 /// Try to encode a value into a SyncFrame body.
215 ///
216 /// Returns `None` and logs an error on serialization failure — callers
217 /// should propagate via `?`. The protocol must never ship a frame
218 /// whose body did not serialize successfully.
219 pub fn try_encode<T: zerompk::ToMessagePack>(
220 msg_type: SyncMessageType,
221 value: &T,
222 ) -> Option<Self> {
223 match zerompk::to_msgpack_vec(value) {
224 Ok(body) => Some(Self { msg_type, body }),
225 Err(e) => {
226 tracing::error!(
227 msg_type = msg_type as u8,
228 error = %e,
229 "failed to encode sync frame body; dropping response"
230 );
231 None
232 }
233 }
234 }
235
236 /// Deserialize the body from MessagePack.
237 pub fn decode_body<T: zerompk::FromMessagePackOwned>(&self) -> Option<T> {
238 zerompk::from_msgpack(&self.body).ok()
239 }
240}