1use std::collections::HashMap;
29
30use serde::{Deserialize, Serialize};
31
32use super::compensation::CompensationHint;
33use super::shape::ShapeDefinition;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37#[repr(u8)]
38pub enum SyncMessageType {
39 Handshake = 0x01,
40 HandshakeAck = 0x02,
41 DeltaPush = 0x10,
42 DeltaAck = 0x11,
43 DeltaReject = 0x12,
44 ShapeSubscribe = 0x20,
45 ShapeSnapshot = 0x21,
46 ShapeDelta = 0x22,
47 ShapeUnsubscribe = 0x23,
48 VectorClockSync = 0x30,
49 TimeseriesPush = 0x40,
51 TimeseriesAck = 0x41,
53 ResyncRequest = 0x50,
56 Throttle = 0x52,
59 TokenRefresh = 0x60,
61 TokenRefreshAck = 0x61,
63 DefinitionSync = 0x70,
66 PresenceUpdate = 0x80,
69 PresenceBroadcast = 0x81,
71 PresenceLeave = 0x82,
74 PingPong = 0xFF,
75}
76
77impl SyncMessageType {
78 pub fn from_u8(v: u8) -> Option<Self> {
79 match v {
80 0x01 => Some(Self::Handshake),
81 0x02 => Some(Self::HandshakeAck),
82 0x10 => Some(Self::DeltaPush),
83 0x11 => Some(Self::DeltaAck),
84 0x12 => Some(Self::DeltaReject),
85 0x20 => Some(Self::ShapeSubscribe),
86 0x21 => Some(Self::ShapeSnapshot),
87 0x22 => Some(Self::ShapeDelta),
88 0x23 => Some(Self::ShapeUnsubscribe),
89 0x30 => Some(Self::VectorClockSync),
90 0x40 => Some(Self::TimeseriesPush),
91 0x41 => Some(Self::TimeseriesAck),
92 0x50 => Some(Self::ResyncRequest),
93 0x52 => Some(Self::Throttle),
94 0x60 => Some(Self::TokenRefresh),
95 0x61 => Some(Self::TokenRefreshAck),
96 0x70 => Some(Self::DefinitionSync),
97 0x80 => Some(Self::PresenceUpdate),
98 0x81 => Some(Self::PresenceBroadcast),
99 0x82 => Some(Self::PresenceLeave),
100 0xFF => Some(Self::PingPong),
101 _ => None,
102 }
103 }
104}
105
106#[derive(Clone)]
111pub struct SyncFrame {
112 pub msg_type: SyncMessageType,
113 pub body: Vec<u8>,
114}
115
116impl SyncFrame {
117 pub const HEADER_SIZE: usize = 5;
118
119 pub fn to_bytes(&self) -> Vec<u8> {
121 let len = self.body.len() as u32;
122 let mut buf = Vec::with_capacity(Self::HEADER_SIZE + self.body.len());
123 buf.push(self.msg_type as u8);
124 buf.extend_from_slice(&len.to_le_bytes());
125 buf.extend_from_slice(&self.body);
126 buf
127 }
128
129 pub fn from_bytes(data: &[u8]) -> Option<Self> {
133 if data.len() < Self::HEADER_SIZE {
134 return None;
135 }
136 let msg_type = SyncMessageType::from_u8(data[0])?;
137 let len = u32::from_le_bytes(data[1..5].try_into().ok()?) as usize;
138 if data.len() < Self::HEADER_SIZE + len {
139 return None;
140 }
141 let body = data[Self::HEADER_SIZE..Self::HEADER_SIZE + len].to_vec();
142 Some(Self { msg_type, body })
143 }
144
145 pub fn new_msgpack<T: zerompk::ToMessagePack>(
147 msg_type: SyncMessageType,
148 value: &T,
149 ) -> Option<Self> {
150 let body = zerompk::to_msgpack_vec(value).ok()?;
151 Some(Self { msg_type, body })
152 }
153
154 pub fn encode_or_empty<T: zerompk::ToMessagePack>(
157 msg_type: SyncMessageType,
158 value: &T,
159 ) -> Self {
160 Self::new_msgpack(msg_type, value).unwrap_or(Self {
161 msg_type,
162 body: Vec::new(),
163 })
164 }
165
166 pub fn decode_body<T: zerompk::FromMessagePackOwned>(&self) -> Option<T> {
168 zerompk::from_msgpack(&self.body).ok()
169 }
170}
171
172#[derive(
176 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
177)]
178pub struct HandshakeMsg {
179 pub jwt_token: String,
181 pub vector_clock: HashMap<String, HashMap<String, u64>>,
183 pub subscribed_shapes: Vec<String>,
185 pub client_version: String,
187 #[serde(default)]
189 pub lite_id: String,
190 #[serde(default)]
192 pub epoch: u64,
193 #[serde(default)]
196 pub wire_version: u16,
197}
198
199#[derive(
201 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
202)]
203pub struct HandshakeAckMsg {
204 pub success: bool,
206 pub session_id: String,
208 pub server_clock: HashMap<String, u64>,
210 pub error: Option<String>,
212 #[serde(default)]
214 pub fork_detected: bool,
215 #[serde(default)]
217 pub server_wire_version: u16,
218}
219
220#[derive(
222 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
223)]
224pub struct DeltaPushMsg {
225 pub collection: String,
227 pub document_id: String,
229 pub delta: Vec<u8>,
231 pub peer_id: u64,
233 pub mutation_id: u64,
235 #[serde(default)]
238 pub checksum: u32,
239}
240
241#[derive(
243 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
244)]
245pub struct DeltaAckMsg {
246 pub mutation_id: u64,
248 pub lsn: u64,
250}
251
252#[derive(
254 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
255)]
256pub struct DeltaRejectMsg {
257 pub mutation_id: u64,
259 pub reason: String,
261 pub compensation: Option<CompensationHint>,
263}
264
265#[derive(
267 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
268)]
269pub struct ShapeSubscribeMsg {
270 pub shape: ShapeDefinition,
272}
273
274#[derive(
276 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
277)]
278pub struct ShapeSnapshotMsg {
279 pub shape_id: String,
281 pub data: Vec<u8>,
283 pub snapshot_lsn: u64,
285 pub doc_count: usize,
287}
288
289#[derive(
291 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
292)]
293pub struct ShapeDeltaMsg {
294 pub shape_id: String,
296 pub collection: String,
298 pub document_id: String,
300 pub operation: String,
302 pub delta: Vec<u8>,
304 pub lsn: u64,
306}
307
308#[derive(
310 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
311)]
312pub struct ShapeUnsubscribeMsg {
313 pub shape_id: String,
314}
315
316#[derive(
318 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
319)]
320pub struct VectorClockSyncMsg {
321 pub clocks: HashMap<String, u64>,
323 pub sender_id: u64,
325}
326
327#[derive(
338 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
339)]
340pub struct ResyncRequestMsg {
341 pub reason: ResyncReason,
343 pub from_mutation_id: u64,
345 pub collection: String,
347}
348
349#[derive(
351 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
352)]
353pub enum ResyncReason {
354 SequenceGap {
356 expected: u64,
358 received: u64,
360 },
361 ChecksumMismatch {
363 mutation_id: u64,
365 },
366 CorruptedState,
368}
369
370#[derive(
376 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
377)]
378pub struct ThrottleMsg {
379 pub throttle: bool,
381 pub queue_depth: u64,
383 pub suggested_rate: u64,
385}
386
387#[derive(
394 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
395)]
396pub struct TokenRefreshMsg {
397 pub new_token: String,
399}
400
401#[derive(
403 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
404)]
405pub struct TokenRefreshAckMsg {
406 pub success: bool,
408 pub error: Option<String>,
410 #[serde(default)]
412 pub expires_in_secs: u64,
413}
414
415#[derive(
417 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
418)]
419pub struct PingPongMsg {
420 pub timestamp_ms: u64,
422 pub is_pong: bool,
424}
425
426#[derive(
428 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
429)]
430pub struct TimeseriesPushMsg {
431 pub lite_id: String,
433 pub collection: String,
435 pub ts_block: Vec<u8>,
437 pub val_block: Vec<u8>,
439 pub series_block: Vec<u8>,
441 pub sample_count: u64,
443 pub min_ts: i64,
445 pub max_ts: i64,
447 pub watermarks: HashMap<u64, u64>,
450}
451
452#[derive(
454 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
455)]
456pub struct TimeseriesAckMsg {
457 pub collection: String,
459 pub accepted: u64,
461 pub rejected: u64,
463 pub lsn: u64,
465}
466
467#[derive(
472 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
473)]
474pub struct DefinitionSyncMsg {
475 pub definition_type: String,
477 pub name: String,
479 pub action: String,
481 pub payload: Vec<u8>,
483}
484
485#[derive(
496 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
497)]
498pub struct PresenceUpdateMsg {
499 pub channel: String,
501 pub state: Vec<u8>,
505}
506
507#[derive(
509 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
510)]
511pub struct PeerPresence {
512 pub user_id: String,
514 pub state: Vec<u8>,
516 pub last_seen_ms: u64,
518}
519
520#[derive(
525 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
526)]
527pub struct PresenceBroadcastMsg {
528 pub channel: String,
530 pub peers: Vec<PeerPresence>,
532}
533
534#[derive(
539 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
540)]
541pub struct PresenceLeaveMsg {
542 pub channel: String,
544 pub user_id: String,
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551
552 #[test]
553 fn frame_roundtrip() {
554 let ping = PingPongMsg {
555 timestamp_ms: 12345,
556 is_pong: false,
557 };
558 let frame = SyncFrame::new_msgpack(SyncMessageType::PingPong, &ping).unwrap();
559 let bytes = frame.to_bytes();
560 let decoded = SyncFrame::from_bytes(&bytes).unwrap();
561 assert_eq!(decoded.msg_type, SyncMessageType::PingPong);
562 let decoded_ping: PingPongMsg = decoded.decode_body().unwrap();
563 assert_eq!(decoded_ping.timestamp_ms, 12345);
564 assert!(!decoded_ping.is_pong);
565 }
566
567 #[test]
568 fn handshake_serialization() {
569 let msg = HandshakeMsg {
570 jwt_token: "test.jwt.token".into(),
571 vector_clock: HashMap::new(),
572 subscribed_shapes: vec!["shape1".into()],
573 client_version: "0.1.0".into(),
574 lite_id: String::new(),
575 epoch: 0,
576 wire_version: 1,
577 };
578 let frame = SyncFrame::new_msgpack(SyncMessageType::Handshake, &msg).unwrap();
579 let bytes = frame.to_bytes();
580 assert!(bytes.len() > SyncFrame::HEADER_SIZE);
581 assert_eq!(bytes[0], 0x01);
582 }
583
584 #[test]
585 fn delta_reject_with_compensation() {
586 let reject = DeltaRejectMsg {
587 mutation_id: 42,
588 reason: "unique violation".into(),
589 compensation: Some(CompensationHint::UniqueViolation {
590 field: "email".into(),
591 conflicting_value: "alice@example.com".into(),
592 }),
593 };
594 let frame = SyncFrame::new_msgpack(SyncMessageType::DeltaReject, &reject).unwrap();
595 let decoded: DeltaRejectMsg = SyncFrame::from_bytes(&frame.to_bytes())
596 .unwrap()
597 .decode_body()
598 .unwrap();
599 assert_eq!(decoded.mutation_id, 42);
600 assert!(matches!(
601 decoded.compensation,
602 Some(CompensationHint::UniqueViolation { .. })
603 ));
604 }
605
606 #[test]
607 fn message_type_roundtrip() {
608 for v in [
609 0x01, 0x02, 0x10, 0x11, 0x12, 0x20, 0x21, 0x22, 0x23, 0x30, 0x40, 0x41, 0x50, 0x52,
610 0x60, 0x61, 0x70, 0x80, 0x81, 0x82, 0xFF,
611 ] {
612 let mt = SyncMessageType::from_u8(v).unwrap();
613 assert_eq!(mt as u8, v);
614 }
615 assert!(SyncMessageType::from_u8(0x99).is_none());
616 }
617
618 #[test]
619 fn shape_subscribe_roundtrip() {
620 let msg = ShapeSubscribeMsg {
621 shape: ShapeDefinition {
622 shape_id: "s1".into(),
623 tenant_id: 1,
624 shape_type: super::super::shape::ShapeType::Vector {
625 collection: "embeddings".into(),
626 field_name: None,
627 },
628 description: "all embeddings".into(),
629 field_filter: vec![],
630 },
631 };
632 let frame = SyncFrame::new_msgpack(SyncMessageType::ShapeSubscribe, &msg).unwrap();
633 let decoded: ShapeSubscribeMsg = SyncFrame::from_bytes(&frame.to_bytes())
634 .unwrap()
635 .decode_body()
636 .unwrap();
637 assert_eq!(decoded.shape.shape_id, "s1");
638 }
639
640 #[test]
641 fn presence_update_roundtrip() {
642 let msg = PresenceUpdateMsg {
643 channel: "doc:doc-123".into(),
644 state: b"user_id:user-42,cursor:blk-7:42".to_vec(),
645 };
646 let frame = SyncFrame::new_msgpack(SyncMessageType::PresenceUpdate, &msg).unwrap();
647 let bytes = frame.to_bytes();
648 assert_eq!(bytes[0], 0x80);
649 let decoded: PresenceUpdateMsg = SyncFrame::from_bytes(&bytes)
650 .unwrap()
651 .decode_body()
652 .unwrap();
653 assert_eq!(decoded.channel, "doc:doc-123");
654 assert!(!decoded.state.is_empty());
655 }
656
657 #[test]
658 fn presence_broadcast_roundtrip() {
659 let msg = PresenceBroadcastMsg {
660 channel: "doc:doc-123".into(),
661 peers: vec![
662 PeerPresence {
663 user_id: "user-42".into(),
664 state: vec![0xDE, 0xAD],
665 last_seen_ms: 150,
666 },
667 PeerPresence {
668 user_id: "user-99".into(),
669 state: vec![0xBE, 0xEF],
670 last_seen_ms: 2300,
671 },
672 ],
673 };
674 let frame = SyncFrame::new_msgpack(SyncMessageType::PresenceBroadcast, &msg).unwrap();
675 let decoded: PresenceBroadcastMsg = SyncFrame::from_bytes(&frame.to_bytes())
676 .unwrap()
677 .decode_body()
678 .unwrap();
679 assert_eq!(decoded.channel, "doc:doc-123");
680 assert_eq!(decoded.peers.len(), 2);
681 assert_eq!(decoded.peers[0].user_id, "user-42");
682 assert_eq!(decoded.peers[1].last_seen_ms, 2300);
683 }
684
685 #[test]
686 fn presence_leave_roundtrip() {
687 let msg = PresenceLeaveMsg {
688 channel: "doc:doc-123".into(),
689 user_id: "user-42".into(),
690 };
691 let frame = SyncFrame::new_msgpack(SyncMessageType::PresenceLeave, &msg).unwrap();
692 let bytes = frame.to_bytes();
693 assert_eq!(bytes[0], 0x82);
694 let decoded: PresenceLeaveMsg = SyncFrame::from_bytes(&bytes)
695 .unwrap()
696 .decode_body()
697 .unwrap();
698 assert_eq!(decoded.channel, "doc:doc-123");
699 assert_eq!(decoded.user_id, "user-42");
700 }
701}