Skip to main content

nodedb_types/sync/wire/
frame.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Wire frame format and message-type discriminants.
4
5/// Sync message type identifiers.
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7#[repr(u8)]
8#[non_exhaustive]
9pub enum SyncMessageType {
10    Handshake = 0x01,
11    HandshakeAck = 0x02,
12    DeltaPush = 0x10,
13    DeltaAck = 0x11,
14    DeltaReject = 0x12,
15    /// Collection purged notification (server → client, 0x14).
16    /// Sent when an Origin collection is hard-deleted (UNDROP window
17    /// expired or explicit `DROP COLLECTION ... PURGE`). The client
18    /// must drop local Loro state and remove the collection's redb
19    /// record; future deltas for the collection are not sync-eligible.
20    CollectionPurged = 0x14,
21    ShapeSubscribe = 0x20,
22    ShapeSnapshot = 0x21,
23    ShapeDelta = 0x22,
24    ShapeUnsubscribe = 0x23,
25    VectorClockSync = 0x30,
26    /// Timeseries metric batch push (client → server, 0x40).
27    TimeseriesPush = 0x40,
28    /// Timeseries push acknowledgment (server → client, 0x41).
29    TimeseriesAck = 0x41,
30    /// Re-sync request (bidirectional, 0x50).
31    /// Sent when sequence gaps or checksum failures are detected.
32    ResyncRequest = 0x50,
33    /// Downstream throttle (client → server, 0x52).
34    /// Sent when Lite's incoming queue is overwhelmed.
35    Throttle = 0x52,
36    /// Token refresh request (client → server, 0x60).
37    TokenRefresh = 0x60,
38    /// Token refresh acknowledgment (server → client, 0x61).
39    TokenRefreshAck = 0x61,
40    /// Definition sync (server → client, 0x70).
41    /// Carries function/trigger/procedure definitions from Origin to Lite.
42    DefinitionSync = 0x70,
43    /// Presence update (client → server, 0x80).
44    PresenceUpdate = 0x80,
45    /// Presence broadcast (server → all subscribers except sender, 0x81).
46    PresenceBroadcast = 0x81,
47    /// Presence leave (server → all subscribers, 0x82).
48    PresenceLeave = 0x82,
49    /// Array CRDT delta (single op, client → server, 0x90).
50    ArrayDelta = 0x90,
51    /// Array CRDT delta batch (multiple ops, client → server, 0x91).
52    ArrayDeltaBatch = 0x91,
53    /// Array snapshot header (server → client, 0x92).
54    ArraySnapshot = 0x92,
55    /// Array snapshot chunk (server → client, 0x93).
56    ArraySnapshotChunk = 0x93,
57    /// Array schema CRDT sync (bidirectional, 0x94).
58    ArraySchema = 0x94,
59    /// Array ack — advances GC frontier (client → server, 0x95).
60    ArrayAck = 0x95,
61    /// Array reject (server → client, 0x96). Compensation hint.
62    ArrayReject = 0x96,
63    /// Array catchup request (client → server, 0x97).
64    ArrayCatchupRequest = 0x97,
65    PingPong = 0xFF,
66}
67
68impl SyncMessageType {
69    pub fn from_u8(v: u8) -> Option<Self> {
70        match v {
71            0x01 => Some(Self::Handshake),
72            0x02 => Some(Self::HandshakeAck),
73            0x10 => Some(Self::DeltaPush),
74            0x11 => Some(Self::DeltaAck),
75            0x12 => Some(Self::DeltaReject),
76            0x14 => Some(Self::CollectionPurged),
77            0x20 => Some(Self::ShapeSubscribe),
78            0x21 => Some(Self::ShapeSnapshot),
79            0x22 => Some(Self::ShapeDelta),
80            0x23 => Some(Self::ShapeUnsubscribe),
81            0x30 => Some(Self::VectorClockSync),
82            0x40 => Some(Self::TimeseriesPush),
83            0x41 => Some(Self::TimeseriesAck),
84            0x50 => Some(Self::ResyncRequest),
85            0x52 => Some(Self::Throttle),
86            0x60 => Some(Self::TokenRefresh),
87            0x61 => Some(Self::TokenRefreshAck),
88            0x70 => Some(Self::DefinitionSync),
89            0x80 => Some(Self::PresenceUpdate),
90            0x81 => Some(Self::PresenceBroadcast),
91            0x82 => Some(Self::PresenceLeave),
92            0x90 => Some(Self::ArrayDelta),
93            0x91 => Some(Self::ArrayDeltaBatch),
94            0x92 => Some(Self::ArraySnapshot),
95            0x93 => Some(Self::ArraySnapshotChunk),
96            0x94 => Some(Self::ArraySchema),
97            0x95 => Some(Self::ArrayAck),
98            0x96 => Some(Self::ArrayReject),
99            0x97 => Some(Self::ArrayCatchupRequest),
100            0xFF => Some(Self::PingPong),
101            _ => None,
102        }
103    }
104}
105
106/// Wire frame: wraps a message type + serialized body.
107///
108/// Layout: `[msg_type: 1B][length: 4B LE][body: N bytes]`
109/// Total header: 5 bytes.
110///
111/// `#[non_exhaustive]` — additional header fields (e.g. compression flag,
112/// session token) may be added without breaking downstream consumers.
113#[non_exhaustive]
114#[derive(Clone)]
115pub struct SyncFrame {
116    pub msg_type: SyncMessageType,
117    pub body: Vec<u8>,
118}
119
120impl SyncFrame {
121    pub const HEADER_SIZE: usize = 5;
122
123    /// Serialize a frame to bytes.
124    pub fn to_bytes(&self) -> Vec<u8> {
125        let len = self.body.len() as u32;
126        let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
127        buf.push(self.msg_type as u8);
128        buf.extend_from_slice(&len.to_le_bytes());
129        buf.extend_from_slice(&self.body);
130        buf
131    }
132
133    /// Deserialize a frame from bytes.
134    ///
135    /// Returns `None` if the data is too short or the message type is unknown.
136    pub fn from_bytes(data: &[u8]) -> Option<Self> {
137        if data.len() < Self::HEADER_SIZE {
138            return None;
139        }
140        let msg_type = SyncMessageType::from_u8(data[0])?;
141        let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
142        if data.len() < Self::HEADER_SIZE + len {
143            return None;
144        }
145        let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
146        Some(Self { msg_type, body })
147    }
148
149    /// Create a frame with a MessagePack-serialized body.
150    pub fn new_msgpack<T: zerompk::ToMessagePack>(
151        msg_type: SyncMessageType,
152        value: &T,
153    ) -> Option<Self> {
154        let body = zerompk::to_msgpack_vec(value).ok()?;
155        Some(Self { msg_type, body })
156    }
157
158    /// Try to encode a value into a SyncFrame body.
159    ///
160    /// Returns `None` and logs an error on serialization failure — callers
161    /// should propagate via `?`. The protocol must never ship a frame
162    /// whose body did not serialize successfully.
163    pub fn try_encode<T: zerompk::ToMessagePack>(
164        msg_type: SyncMessageType,
165        value: &T,
166    ) -> Option<Self> {
167        match zerompk::to_msgpack_vec(value) {
168            Ok(body) => Some(Self { msg_type, body }),
169            Err(e) => {
170                tracing::error!(
171                    msg_type = msg_type as u8,
172                    error = %e,
173                    "failed to encode sync frame body; dropping response"
174                );
175                None
176            }
177        }
178    }
179
180    /// Deserialize the body from MessagePack.
181    pub fn decode_body<T: zerompk::FromMessagePackOwned>(&self) -> Option<T> {
182        zerompk::from_msgpack(&self.body).ok()
183    }
184}