pulse_protocol/
frames.rs

1//! Frame types for the Pulse protocol.
2//!
3//! Frames are the fundamental unit of communication in Pulse.
4//! Each frame is serialized using MessagePack for efficient binary encoding.
5
6use serde::{Deserialize, Serialize};
7
8/// Frame type identifiers.
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
10#[serde(into = "u8", try_from = "u8")]
11#[repr(u8)]
12pub enum FrameType {
13    Subscribe = 0x01,
14    Unsubscribe = 0x02,
15    Publish = 0x03,
16    Presence = 0x04,
17    Ack = 0x05,
18    Error = 0x06,
19    Ping = 0x07,
20    Pong = 0x08,
21    Connect = 0x09,
22    Connected = 0x0A,
23}
24
25impl From<FrameType> for u8 {
26    fn from(ft: FrameType) -> u8 {
27        ft as u8
28    }
29}
30
31impl TryFrom<u8> for FrameType {
32    type Error = &'static str;
33
34    fn try_from(value: u8) -> Result<Self, <Self as TryFrom<u8>>::Error> {
35        match value {
36            0x01 => Ok(FrameType::Subscribe),
37            0x02 => Ok(FrameType::Unsubscribe),
38            0x03 => Ok(FrameType::Publish),
39            0x04 => Ok(FrameType::Presence),
40            0x05 => Ok(FrameType::Ack),
41            0x06 => Ok(FrameType::Error),
42            0x07 => Ok(FrameType::Ping),
43            0x08 => Ok(FrameType::Pong),
44            0x09 => Ok(FrameType::Connect),
45            0x0A => Ok(FrameType::Connected),
46            _ => Err("Invalid frame type"),
47        }
48    }
49}
50
51/// Presence action types.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
53#[serde(into = "u8", try_from = "u8")]
54#[repr(u8)]
55pub enum PresenceAction {
56    /// Client joined the channel.
57    Join = 0,
58    /// Client left the channel.
59    Leave = 1,
60    /// Client updated their presence data.
61    Update = 2,
62    /// Server sending full presence state sync.
63    Sync = 3,
64}
65
66impl From<PresenceAction> for u8 {
67    fn from(pa: PresenceAction) -> u8 {
68        pa as u8
69    }
70}
71
72impl TryFrom<u8> for PresenceAction {
73    type Error = &'static str;
74
75    fn try_from(value: u8) -> Result<Self, Self::Error> {
76        match value {
77            0 => Ok(PresenceAction::Join),
78            1 => Ok(PresenceAction::Leave),
79            2 => Ok(PresenceAction::Update),
80            3 => Ok(PresenceAction::Sync),
81            _ => Err("Invalid presence action"),
82        }
83    }
84}
85
86/// A protocol frame.
87///
88/// Frames are the messages exchanged between clients and servers.
89/// Each frame type has specific fields relevant to its operation.
90#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
91#[serde(tag = "type")]
92pub enum Frame {
93    /// Subscribe to a channel.
94    #[serde(rename = "subscribe")]
95    Subscribe {
96        /// Request ID for acknowledgment.
97        id: u64,
98        /// Channel name to subscribe to.
99        channel: String,
100    },
101
102    /// Unsubscribe from a channel.
103    #[serde(rename = "unsubscribe")]
104    Unsubscribe {
105        /// Request ID for acknowledgment.
106        id: u64,
107        /// Channel name to unsubscribe from.
108        channel: String,
109    },
110
111    /// Publish a message to a channel.
112    #[serde(rename = "publish")]
113    Publish {
114        /// Optional request ID for acknowledgment.
115        #[serde(skip_serializing_if = "Option::is_none")]
116        id: Option<u64>,
117        /// Target channel.
118        channel: String,
119        /// Optional event name.
120        #[serde(skip_serializing_if = "Option::is_none")]
121        event: Option<String>,
122        /// Message payload.
123        #[serde(with = "serde_bytes")]
124        payload: Vec<u8>,
125    },
126
127    /// Presence update.
128    #[serde(rename = "presence")]
129    Presence {
130        /// Request ID.
131        id: u64,
132        /// Channel name.
133        channel: String,
134        /// Presence action.
135        action: PresenceAction,
136        /// Optional presence metadata.
137        #[serde(skip_serializing_if = "Option::is_none")]
138        data: Option<serde_json::Value>,
139    },
140
141    /// Acknowledgment of a request.
142    #[serde(rename = "ack")]
143    Ack {
144        /// ID of the acknowledged request.
145        id: u64,
146    },
147
148    /// Error response.
149    #[serde(rename = "error")]
150    Error {
151        /// ID of the failed request (0 if not applicable).
152        id: u64,
153        /// Error code.
154        code: u16,
155        /// Human-readable error message.
156        message: String,
157    },
158
159    /// Keepalive ping.
160    #[serde(rename = "ping")]
161    Ping {
162        /// Optional timestamp.
163        #[serde(skip_serializing_if = "Option::is_none")]
164        timestamp: Option<u64>,
165    },
166
167    /// Keepalive pong.
168    #[serde(rename = "pong")]
169    Pong {
170        /// Echoed timestamp from ping.
171        #[serde(skip_serializing_if = "Option::is_none")]
172        timestamp: Option<u64>,
173    },
174
175    /// Initial connection handshake.
176    #[serde(rename = "connect")]
177    Connect {
178        /// Protocol version.
179        version: u8,
180        /// Optional authentication token.
181        #[serde(skip_serializing_if = "Option::is_none")]
182        token: Option<String>,
183    },
184
185    /// Connection established response.
186    #[serde(rename = "connected")]
187    Connected {
188        /// Unique connection identifier.
189        connection_id: String,
190        /// Negotiated protocol version.
191        version: u8,
192        /// Recommended heartbeat interval in milliseconds.
193        heartbeat: u32,
194    },
195}
196
197impl Frame {
198    /// Get the frame type.
199    #[must_use]
200    pub fn frame_type(&self) -> FrameType {
201        match self {
202            Frame::Subscribe { .. } => FrameType::Subscribe,
203            Frame::Unsubscribe { .. } => FrameType::Unsubscribe,
204            Frame::Publish { .. } => FrameType::Publish,
205            Frame::Presence { .. } => FrameType::Presence,
206            Frame::Ack { .. } => FrameType::Ack,
207            Frame::Error { .. } => FrameType::Error,
208            Frame::Ping { .. } => FrameType::Ping,
209            Frame::Pong { .. } => FrameType::Pong,
210            Frame::Connect { .. } => FrameType::Connect,
211            Frame::Connected { .. } => FrameType::Connected,
212        }
213    }
214
215    /// Create a new Subscribe frame.
216    #[must_use]
217    pub fn subscribe(id: u64, channel: impl Into<String>) -> Self {
218        Frame::Subscribe {
219            id,
220            channel: channel.into(),
221        }
222    }
223
224    /// Create a new Unsubscribe frame.
225    #[must_use]
226    pub fn unsubscribe(id: u64, channel: impl Into<String>) -> Self {
227        Frame::Unsubscribe {
228            id,
229            channel: channel.into(),
230        }
231    }
232
233    /// Create a new Publish frame.
234    #[must_use]
235    pub fn publish(channel: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
236        Frame::Publish {
237            id: None,
238            channel: channel.into(),
239            event: None,
240            payload: payload.into(),
241        }
242    }
243
244    /// Create a new Publish frame with ID for acknowledgment.
245    #[must_use]
246    pub fn publish_with_ack(
247        id: u64,
248        channel: impl Into<String>,
249        payload: impl Into<Vec<u8>>,
250    ) -> Self {
251        Frame::Publish {
252            id: Some(id),
253            channel: channel.into(),
254            event: None,
255            payload: payload.into(),
256        }
257    }
258
259    /// Create a new Ack frame.
260    #[must_use]
261    pub fn ack(id: u64) -> Self {
262        Frame::Ack { id }
263    }
264
265    /// Create a new Error frame.
266    #[must_use]
267    pub fn error(id: u64, code: u16, message: impl Into<String>) -> Self {
268        Frame::Error {
269            id,
270            code,
271            message: message.into(),
272        }
273    }
274
275    /// Create a new Ping frame.
276    #[must_use]
277    pub fn ping() -> Self {
278        Frame::Ping { timestamp: None }
279    }
280
281    /// Create a new Ping frame with timestamp.
282    #[must_use]
283    pub fn ping_with_timestamp(timestamp: u64) -> Self {
284        Frame::Ping {
285            timestamp: Some(timestamp),
286        }
287    }
288
289    /// Create a new Pong frame.
290    #[must_use]
291    pub fn pong(timestamp: Option<u64>) -> Self {
292        Frame::Pong { timestamp }
293    }
294
295    /// Create a new Connect frame.
296    #[must_use]
297    pub fn connect(version: u8, token: Option<String>) -> Self {
298        Frame::Connect { version, token }
299    }
300
301    /// Create a new Connected frame.
302    #[must_use]
303    pub fn connected(connection_id: impl Into<String>, version: u8, heartbeat: u32) -> Self {
304        Frame::Connected {
305            connection_id: connection_id.into(),
306            version,
307            heartbeat,
308        }
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    #[test]
317    fn test_frame_type() {
318        let subscribe = Frame::subscribe(1, "test");
319        assert_eq!(subscribe.frame_type(), FrameType::Subscribe);
320
321        let publish = Frame::publish("test", b"hello".to_vec());
322        assert_eq!(publish.frame_type(), FrameType::Publish);
323    }
324
325    #[test]
326    fn test_presence_action_conversion() {
327        assert_eq!(PresenceAction::try_from(0), Ok(PresenceAction::Join));
328        assert_eq!(PresenceAction::try_from(1), Ok(PresenceAction::Leave));
329        assert_eq!(PresenceAction::try_from(2), Ok(PresenceAction::Update));
330        assert_eq!(PresenceAction::try_from(3), Ok(PresenceAction::Sync));
331        assert!(PresenceAction::try_from(4).is_err());
332    }
333}