Skip to main content

nodedb_types/sync/
wire.rs

1//! Sync wire protocol: frame format and message types.
2//!
3//! Frame format: `[msg_type: 1B][length: 4B LE][rkyv/msgpack body]`
4//!
5//! Message types:
6//! - `0x01` Handshake (client → server)
7//! - `0x02` HandshakeAck (server → client)
8//! - `0x10` DeltaPush (client → server)
9//! - `0x11` DeltaAck (server → client)
10//! - `0x12` DeltaReject (server → client)
11//! - `0x20` ShapeSubscribe (client → server)
12//! - `0x21` ShapeSnapshot (server → client)
13//! - `0x22` ShapeDelta (server → client)
14//! - `0x23` ShapeUnsubscribe (client → server)
15//! - `0x30` VectorClockSync (bidirectional)
16//! - `0x40` TimeseriesPush (client → server)
17//! - `0x41` TimeseriesAck (server → client)
18//! - `0x50` ResyncRequest (bidirectional)
19//! - `0x52` Throttle (client → server)
20//! - `0x60` TokenRefresh (client → server)
21//! - `0x61` TokenRefreshAck (server → client)
22//! - `0x70` DefinitionSync (server → client)
23//! - `0x80` PresenceUpdate (client → server)
24//! - `0x81` PresenceBroadcast (server → all subscribers)
25//! - `0x82` PresenceLeave (server → all subscribers)
26//! - `0xFF` Ping/Pong (bidirectional)
27
28use std::collections::HashMap;
29
30use serde::{Deserialize, Serialize};
31
32use super::compensation::CompensationHint;
33use super::shape::ShapeDefinition;
34
35/// Sync message type identifiers.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37#[repr(u8)]
38pub enum SyncMessageType {
39    Handshake = 0x01,
40    HandshakeAck = 0x02,
41    DeltaPush = 0x10,
42    DeltaAck = 0x11,
43    DeltaReject = 0x12,
44    ShapeSubscribe = 0x20,
45    ShapeSnapshot = 0x21,
46    ShapeDelta = 0x22,
47    ShapeUnsubscribe = 0x23,
48    VectorClockSync = 0x30,
49    /// Timeseries metric batch push (client → server, 0x40).
50    TimeseriesPush = 0x40,
51    /// Timeseries push acknowledgment (server → client, 0x41).
52    TimeseriesAck = 0x41,
53    /// Re-sync request (bidirectional, 0x50).
54    /// Sent when sequence gaps or checksum failures are detected.
55    ResyncRequest = 0x50,
56    /// Downstream throttle (client → server, 0x52).
57    /// Sent when Lite's incoming queue is overwhelmed.
58    Throttle = 0x52,
59    /// Token refresh request (client → server, 0x60).
60    TokenRefresh = 0x60,
61    /// Token refresh acknowledgment (server → client, 0x61).
62    TokenRefreshAck = 0x61,
63    /// Definition sync (server → client, 0x70).
64    /// Carries function/trigger/procedure definitions from Origin to Lite.
65    DefinitionSync = 0x70,
66    /// Presence update (client → server, 0x80).
67    /// Ephemeral user state broadcast (cursor, selection, typing indicator).
68    PresenceUpdate = 0x80,
69    /// Presence broadcast (server → all subscribers except sender, 0x81).
70    PresenceBroadcast = 0x81,
71    /// Presence leave (server → all subscribers, 0x82).
72    /// Auto-emitted on WebSocket disconnect or TTL expiry.
73    PresenceLeave = 0x82,
74    PingPong = 0xFF,
75}
76
77impl SyncMessageType {
78    pub fn from_u8(v: u8) -> Option<Self> {
79        match v {
80            0x01 => Some(Self::Handshake),
81            0x02 => Some(Self::HandshakeAck),
82            0x10 => Some(Self::DeltaPush),
83            0x11 => Some(Self::DeltaAck),
84            0x12 => Some(Self::DeltaReject),
85            0x20 => Some(Self::ShapeSubscribe),
86            0x21 => Some(Self::ShapeSnapshot),
87            0x22 => Some(Self::ShapeDelta),
88            0x23 => Some(Self::ShapeUnsubscribe),
89            0x30 => Some(Self::VectorClockSync),
90            0x40 => Some(Self::TimeseriesPush),
91            0x41 => Some(Self::TimeseriesAck),
92            0x50 => Some(Self::ResyncRequest),
93            0x52 => Some(Self::Throttle),
94            0x60 => Some(Self::TokenRefresh),
95            0x61 => Some(Self::TokenRefreshAck),
96            0x70 => Some(Self::DefinitionSync),
97            0x80 => Some(Self::PresenceUpdate),
98            0x81 => Some(Self::PresenceBroadcast),
99            0x82 => Some(Self::PresenceLeave),
100            0xFF => Some(Self::PingPong),
101            _ => None,
102        }
103    }
104}
105
106/// Wire frame: wraps a message type + serialized body.
107///
108/// Layout: `[msg_type: 1B][length: 4B LE][body: N bytes]`
109/// Total header: 5 bytes.
110#[derive(Clone)]
111pub struct SyncFrame {
112    pub msg_type: SyncMessageType,
113    pub body: Vec<u8>,
114}
115
116impl SyncFrame {
117    pub const HEADER_SIZE: usize = 5;
118
119    /// Serialize a frame to bytes.
120    pub fn to_bytes(&self) -> Vec<u8> {
121        let len = self.body.len() as u32;
122        let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
123        buf.push(self.msg_type as u8);
124        buf.extend_from_slice(&len.to_le_bytes());
125        buf.extend_from_slice(&self.body);
126        buf
127    }
128
129    /// Deserialize a frame from bytes.
130    ///
131    /// Returns `None` if the data is too short or the message type is unknown.
132    pub fn from_bytes(data: &[u8]) -> Option<Self> {
133        if data.len() < Self::HEADER_SIZE {
134            return None;
135        }
136        let msg_type = SyncMessageType::from_u8(data[0])?;
137        let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
138        if data.len() < Self::HEADER_SIZE + len {
139            return None;
140        }
141        let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
142        Some(Self { msg_type, body })
143    }
144
145    /// Create a frame with a MessagePack-serialized body.
146    pub fn new_msgpack<T: zerompk::ToMessagePack>(
147        msg_type: SyncMessageType,
148        value: &T,
149    ) -> Option<Self> {
150        let body = zerompk::to_msgpack_vec(value).ok()?;
151        Some(Self { msg_type, body })
152    }
153
154    /// Create a frame from a serializable value, falling back to an empty
155    /// body if serialization fails.
156    pub fn encode_or_empty<T: zerompk::ToMessagePack>(
157        msg_type: SyncMessageType,
158        value: &T,
159    ) -> Self {
160        Self::new_msgpack(msg_type, value).unwrap_or(Self {
161            msg_type,
162            body: Vec::new(),
163        })
164    }
165
166    /// Deserialize the body from MessagePack.
167    pub fn decode_body<T: zerompk::FromMessagePackOwned>(&self) -> Option<T> {
168        zerompk::from_msgpack(&self.body).ok()
169    }
170}
171
172// ─── Message Payloads ───────────────────────────────────────────────────────
173
174/// Handshake message (client → server, 0x01).
175#[derive(
176    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
177)]
178pub struct HandshakeMsg {
179    /// JWT bearer token for authentication.
180    pub jwt_token: String,
181    /// Client's vector clock: `{ collection: { doc_id: lamport_ts } }`.
182    pub vector_clock: HashMap<String, HashMap<String, u64>>,
183    /// Shape IDs the client is subscribed to.
184    pub subscribed_shapes: Vec<String>,
185    /// Client version string.
186    pub client_version: String,
187    /// Lite instance identity (UUID v7). Empty for legacy clients.
188    #[serde(default)]
189    pub lite_id: String,
190    /// Monotonic epoch counter (incremented on every open). 0 for legacy clients.
191    #[serde(default)]
192    pub epoch: u64,
193    /// Wire format version. Server rejects connections with incompatible versions.
194    /// 0 = legacy client (pre-wire-version; treated as version 1).
195    #[serde(default)]
196    pub wire_version: u16,
197}
198
199/// Handshake acknowledgment (server → client, 0x02).
200#[derive(
201    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
202)]
203pub struct HandshakeAckMsg {
204    /// Whether the handshake succeeded.
205    pub success: bool,
206    /// Session ID assigned by the server.
207    pub session_id: String,
208    /// Server's vector clock (for initial sync).
209    pub server_clock: HashMap<String, u64>,
210    /// Error message (if !success).
211    pub error: Option<String>,
212    /// Fork detection: if true, client must regenerate LiteId and reconnect.
213    #[serde(default)]
214    pub fork_detected: bool,
215    /// Server's wire format version (for client-side compatibility check).
216    #[serde(default)]
217    pub server_wire_version: u16,
218}
219
220/// Delta push message (client → server, 0x10).
221#[derive(
222    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
223)]
224pub struct DeltaPushMsg {
225    /// Collection the delta applies to.
226    pub collection: String,
227    /// Document ID.
228    pub document_id: String,
229    /// Loro CRDT delta bytes.
230    pub delta: Vec<u8>,
231    /// Client's peer ID (for CRDT identity).
232    pub peer_id: u64,
233    /// Per-mutation unique ID for dedup.
234    pub mutation_id: u64,
235    /// CRC32C checksum of `delta` bytes for integrity verification.
236    /// Computed by sender, validated by receiver. 0 for legacy clients.
237    #[serde(default)]
238    pub checksum: u32,
239}
240
241/// Delta acknowledgment (server → client, 0x11).
242#[derive(
243    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
244)]
245pub struct DeltaAckMsg {
246    /// Mutation ID being acknowledged.
247    pub mutation_id: u64,
248    /// Server-assigned LSN for this mutation.
249    pub lsn: u64,
250}
251
252/// Delta rejection (server → client, 0x12).
253#[derive(
254    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
255)]
256pub struct DeltaRejectMsg {
257    /// Mutation ID being rejected.
258    pub mutation_id: u64,
259    /// Reason for rejection.
260    pub reason: String,
261    /// Compensation hints for the client.
262    pub compensation: Option<CompensationHint>,
263}
264
265/// Shape subscribe request (client → server, 0x20).
266#[derive(
267    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
268)]
269pub struct ShapeSubscribeMsg {
270    /// Shape definition to subscribe to.
271    pub shape: ShapeDefinition,
272}
273
274/// Shape snapshot response (server → client, 0x21).
275#[derive(
276    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
277)]
278pub struct ShapeSnapshotMsg {
279    /// Shape ID this snapshot belongs to.
280    pub shape_id: String,
281    /// Initial dataset: serialized document rows matching the shape.
282    pub data: Vec<u8>,
283    /// LSN at snapshot time — deltas after this LSN will follow.
284    pub snapshot_lsn: u64,
285    /// Number of documents in the snapshot.
286    pub doc_count: usize,
287}
288
289/// Shape delta message (server → client, 0x22).
290#[derive(
291    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
292)]
293pub struct ShapeDeltaMsg {
294    /// Shape ID this delta applies to.
295    pub shape_id: String,
296    /// Collection affected.
297    pub collection: String,
298    /// Document ID affected.
299    pub document_id: String,
300    /// Operation type: "INSERT", "UPDATE", "DELETE".
301    pub operation: String,
302    /// Delta payload (CRDT delta bytes or document value).
303    pub delta: Vec<u8>,
304    /// WAL LSN of this mutation.
305    pub lsn: u64,
306}
307
308/// Shape unsubscribe request (client → server, 0x23).
309#[derive(
310    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
311)]
312pub struct ShapeUnsubscribeMsg {
313    pub shape_id: String,
314}
315
316/// Vector clock sync message (bidirectional, 0x30).
317#[derive(
318    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
319)]
320pub struct VectorClockSyncMsg {
321    /// Per-collection clock: `{ collection: max_lsn }`.
322    pub clocks: HashMap<String, u64>,
323    /// Sender's node/peer ID.
324    pub sender_id: u64,
325}
326
327/// Re-sync request message (bidirectional, 0x50).
328///
329/// Sent when a receiver detects:
330/// - Sequence gap: missing `mutation_id`s in the delta stream
331/// - Checksum failure: CRC32C mismatch on a delta payload
332/// - State divergence: local state inconsistent with received deltas
333///
334/// On receiving a ResyncRequest, the sender should:
335/// 1. Re-send all deltas from `from_mutation_id` onwards, OR
336/// 2. Send a full snapshot if `from_mutation_id` is 0
337#[derive(
338    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
339)]
340pub struct ResyncRequestMsg {
341    /// Reason for requesting re-sync.
342    pub reason: ResyncReason,
343    /// Resume from this mutation ID (0 = full re-sync).
344    pub from_mutation_id: u64,
345    /// Collection scope (empty = all collections).
346    pub collection: String,
347}
348
349/// Reason for a re-sync request.
350#[derive(
351    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
352)]
353pub enum ResyncReason {
354    /// Detected missing mutation IDs in the delta stream.
355    SequenceGap {
356        /// The expected next mutation ID.
357        expected: u64,
358        /// The mutation ID that was actually received.
359        received: u64,
360    },
361    /// CRC32C checksum mismatch on a delta payload.
362    ChecksumMismatch {
363        /// The mutation ID of the corrupted delta.
364        mutation_id: u64,
365    },
366    /// Corruption detected on cold start, need full re-sync.
367    CorruptedState,
368}
369
370/// Downstream throttle message (client → server, 0x52).
371///
372/// Sent by Lite when its incoming shape delta queue is overwhelmed.
373/// Origin should reduce its push rate for this peer until a
374/// `Throttle { throttle: false }` is received.
375#[derive(
376    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
377)]
378pub struct ThrottleMsg {
379    /// `true` to enable throttling, `false` to release.
380    pub throttle: bool,
381    /// Current queue depth at Lite (informational).
382    pub queue_depth: u64,
383    /// Suggested max deltas per second (0 = use server default).
384    pub suggested_rate: u64,
385}
386
387/// Token refresh request (client → server, 0x60).
388///
389/// Sent by Lite before the current JWT expires. The client provides
390/// a fresh token obtained from the application's auth layer.
391/// Origin validates the new token and either upgrades the session
392/// or disconnects if the token is invalid.
393#[derive(
394    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
395)]
396pub struct TokenRefreshMsg {
397    /// New JWT bearer token.
398    pub new_token: String,
399}
400
401/// Token refresh acknowledgment (server → client, 0x61).
402#[derive(
403    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
404)]
405pub struct TokenRefreshAckMsg {
406    /// Whether the token refresh succeeded.
407    pub success: bool,
408    /// Error message (if !success).
409    pub error: Option<String>,
410    /// Seconds until this new token expires (so Lite can schedule next refresh).
411    #[serde(default)]
412    pub expires_in_secs: u64,
413}
414
415/// Ping/Pong keepalive (0xFF).
416#[derive(
417    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
418)]
419pub struct PingPongMsg {
420    /// Timestamp (epoch milliseconds) for RTT measurement.
421    pub timestamp_ms: u64,
422    /// Whether this is a pong (response to ping).
423    pub is_pong: bool,
424}
425
426/// Timeseries metric batch push (client → server, 0x40).
427#[derive(
428    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
429)]
430pub struct TimeseriesPushMsg {
431    /// Source Lite instance ID (UUID v7).
432    pub lite_id: String,
433    /// Collection name.
434    pub collection: String,
435    /// Gorilla-encoded timestamp block.
436    pub ts_block: Vec<u8>,
437    /// Gorilla-encoded value block.
438    pub val_block: Vec<u8>,
439    /// Raw LE u64 series ID block.
440    pub series_block: Vec<u8>,
441    /// Number of samples in this batch.
442    pub sample_count: u64,
443    /// Min timestamp in this batch.
444    pub min_ts: i64,
445    /// Max timestamp in this batch.
446    pub max_ts: i64,
447    /// Per-series sync watermark: highest LSN already synced for each series.
448    /// Only samples after these watermarks are included.
449    pub watermarks: HashMap<u64, u64>,
450}
451
452/// Timeseries push acknowledgment (server → client, 0x41).
453#[derive(
454    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
455)]
456pub struct TimeseriesAckMsg {
457    /// Collection acknowledged.
458    pub collection: String,
459    /// Number of samples accepted.
460    pub accepted: u64,
461    /// Number of samples rejected (duplicates, out-of-retention, etc.)
462    pub rejected: u64,
463    /// Server-assigned LSN for this batch (used as sync watermark).
464    pub lsn: u64,
465}
466
467/// Definition sync message (server → client, 0x70).
468///
469/// Carries function/trigger/procedure definitions from Origin to Lite.
470/// Sent when definitions are created, modified, or dropped on Origin.
471#[derive(
472    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
473)]
474pub struct DefinitionSyncMsg {
475    /// Type of definition: "function", "trigger", "procedure".
476    pub definition_type: String,
477    /// The definition name.
478    pub name: String,
479    /// Action: "put" (create/replace) or "delete" (drop).
480    pub action: String,
481    /// Serialized definition body (JSON). Empty for "delete" actions.
482    pub payload: Vec<u8>,
483}
484
485// ─── Presence / Awareness Messages ─────────────────────────────────────────
486
487/// Presence update message (client → server, 0x80).
488///
489/// Sends ephemeral user state to a channel. The server broadcasts the state
490/// to all other subscribers of the same channel. Presence is NOT persisted,
491/// NOT CRDT-merged — it is fire-and-forget with latest-state-wins semantics.
492///
493/// Sending a `PresenceUpdate` implicitly subscribes the sender to the channel
494/// (if not already subscribed).
495#[derive(
496    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
497)]
498pub struct PresenceUpdateMsg {
499    /// Channel scoping key (e.g., `"doc:doc-123"`, `"workspace:ws-acme"`).
500    pub channel: String,
501    /// Opaque user state (MessagePack-encoded application-defined payload).
502    /// Common fields: user_id, user_name, cursor_position, selection_range,
503    /// active_document_id, color, avatar_url.
504    pub state: Vec<u8>,
505}
506
507/// A single peer's presence state within a channel.
508#[derive(
509    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
510)]
511pub struct PeerPresence {
512    /// User identifier.
513    pub user_id: String,
514    /// Opaque user state (same format as `PresenceUpdateMsg::state`).
515    pub state: Vec<u8>,
516    /// Milliseconds since this peer's last update.
517    pub last_seen_ms: u64,
518}
519
520/// Presence broadcast message (server → all subscribers except sender, 0x81).
521///
522/// Contains the full set of currently-present peers in the channel.
523/// Sent whenever any peer updates their state or leaves.
524#[derive(
525    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
526)]
527pub struct PresenceBroadcastMsg {
528    /// Channel this broadcast belongs to.
529    pub channel: String,
530    /// All currently-present peers and their latest state.
531    pub peers: Vec<PeerPresence>,
532}
533
534/// Presence leave message (server → all subscribers, 0x82).
535///
536/// Emitted when a peer disconnects (WebSocket close) or when their
537/// presence TTL expires (no heartbeat within `presence_ttl_ms`).
538#[derive(
539    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
540)]
541pub struct PresenceLeaveMsg {
542    /// Channel the user left.
543    pub channel: String,
544    /// User who left.
545    pub user_id: String,
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551
552    #[test]
553    fn frame_roundtrip() {
554        let ping = PingPongMsg {
555            timestamp_ms: 12345,
556            is_pong: false,
557        };
558        let frame = SyncFrame::new_msgpack(SyncMessageType::PingPong, &ping).unwrap();
559        let bytes = frame.to_bytes();
560        let decoded = SyncFrame::from_bytes(&bytes).unwrap();
561        assert_eq!(decoded.msg_type, SyncMessageType::PingPong);
562        let decoded_ping: PingPongMsg = decoded.decode_body().unwrap();
563        assert_eq!(decoded_ping.timestamp_ms, 12345);
564        assert!(!decoded_ping.is_pong);
565    }
566
567    #[test]
568    fn handshake_serialization() {
569        let msg = HandshakeMsg {
570            jwt_token: "test.jwt.token".into(),
571            vector_clock: HashMap::new(),
572            subscribed_shapes: vec!["shape1".into()],
573            client_version: "0.1.0".into(),
574            lite_id: String::new(),
575            epoch: 0,
576            wire_version: 1,
577        };
578        let frame = SyncFrame::new_msgpack(SyncMessageType::Handshake, &msg).unwrap();
579        let bytes = frame.to_bytes();
580        assert!(bytes.len() > SyncFrame::HEADER_SIZE);
581        assert_eq!(bytes[0], 0x01);
582    }
583
584    #[test]
585    fn delta_reject_with_compensation() {
586        let reject = DeltaRejectMsg {
587            mutation_id: 42,
588            reason: "unique violation".into(),
589            compensation: Some(CompensationHint::UniqueViolation {
590                field: "email".into(),
591                conflicting_value: "alice@example.com".into(),
592            }),
593        };
594        let frame = SyncFrame::new_msgpack(SyncMessageType::DeltaReject, &reject).unwrap();
595        let decoded: DeltaRejectMsg = SyncFrame::from_bytes(&frame.to_bytes())
596            .unwrap()
597            .decode_body()
598            .unwrap();
599        assert_eq!(decoded.mutation_id, 42);
600        assert!(matches!(
601            decoded.compensation,
602            Some(CompensationHint::UniqueViolation { .. })
603        ));
604    }
605
606    #[test]
607    fn message_type_roundtrip() {
608        for v in [
609            0x01, 0x02, 0x10, 0x11, 0x12, 0x20, 0x21, 0x22, 0x23, 0x30, 0x40, 0x41, 0x50, 0x52,
610            0x60, 0x61, 0x70, 0x80, 0x81, 0x82, 0xFF,
611        ] {
612            let mt = SyncMessageType::from_u8(v).unwrap();
613            assert_eq!(mt as u8, v);
614        }
615        assert!(SyncMessageType::from_u8(0x99).is_none());
616    }
617
618    #[test]
619    fn shape_subscribe_roundtrip() {
620        let msg = ShapeSubscribeMsg {
621            shape: ShapeDefinition {
622                shape_id: "s1".into(),
623                tenant_id: 1,
624                shape_type: super::super::shape::ShapeType::Vector {
625                    collection: "embeddings".into(),
626                    field_name: None,
627                },
628                description: "all embeddings".into(),
629                field_filter: vec![],
630            },
631        };
632        let frame = SyncFrame::new_msgpack(SyncMessageType::ShapeSubscribe, &msg).unwrap();
633        let decoded: ShapeSubscribeMsg = SyncFrame::from_bytes(&frame.to_bytes())
634            .unwrap()
635            .decode_body()
636            .unwrap();
637        assert_eq!(decoded.shape.shape_id, "s1");
638    }
639
640    #[test]
641    fn presence_update_roundtrip() {
642        let msg = PresenceUpdateMsg {
643            channel: "doc:doc-123".into(),
644            state: b"user_id:user-42,cursor:blk-7:42".to_vec(),
645        };
646        let frame = SyncFrame::new_msgpack(SyncMessageType::PresenceUpdate, &msg).unwrap();
647        let bytes = frame.to_bytes();
648        assert_eq!(bytes[0], 0x80);
649        let decoded: PresenceUpdateMsg = SyncFrame::from_bytes(&bytes)
650            .unwrap()
651            .decode_body()
652            .unwrap();
653        assert_eq!(decoded.channel, "doc:doc-123");
654        assert!(!decoded.state.is_empty());
655    }
656
657    #[test]
658    fn presence_broadcast_roundtrip() {
659        let msg = PresenceBroadcastMsg {
660            channel: "doc:doc-123".into(),
661            peers: vec![
662                PeerPresence {
663                    user_id: "user-42".into(),
664                    state: vec![0xDE, 0xAD],
665                    last_seen_ms: 150,
666                },
667                PeerPresence {
668                    user_id: "user-99".into(),
669                    state: vec![0xBE, 0xEF],
670                    last_seen_ms: 2300,
671                },
672            ],
673        };
674        let frame = SyncFrame::new_msgpack(SyncMessageType::PresenceBroadcast, &msg).unwrap();
675        let decoded: PresenceBroadcastMsg = SyncFrame::from_bytes(&frame.to_bytes())
676            .unwrap()
677            .decode_body()
678            .unwrap();
679        assert_eq!(decoded.channel, "doc:doc-123");
680        assert_eq!(decoded.peers.len(), 2);
681        assert_eq!(decoded.peers[0].user_id, "user-42");
682        assert_eq!(decoded.peers[1].last_seen_ms, 2300);
683    }
684
685    #[test]
686    fn presence_leave_roundtrip() {
687        let msg = PresenceLeaveMsg {
688            channel: "doc:doc-123".into(),
689            user_id: "user-42".into(),
690        };
691        let frame = SyncFrame::new_msgpack(SyncMessageType::PresenceLeave, &msg).unwrap();
692        let bytes = frame.to_bytes();
693        assert_eq!(bytes[0], 0x82);
694        let decoded: PresenceLeaveMsg = SyncFrame::from_bytes(&bytes)
695            .unwrap()
696            .decode_body()
697            .unwrap();
698        assert_eq!(decoded.channel, "doc:doc-123");
699        assert_eq!(decoded.user_id, "user-42");
700    }
701}