selium_common/protocol/
frame.rs

1use crate::types::Operation;
2use anyhow::{bail, Result};
3use bytes::{BufMut, Bytes, BytesMut};
4use serde::{Deserialize, Serialize};
5
6const REGISTER_PUBLISHER: u8 = 0x0;
7const REGISTER_SUBSCRIBER: u8 = 0x1;
8const MESSAGE: u8 = 0x2;
9
10#[derive(Clone, Debug, PartialEq)]
11pub enum Frame {
12    RegisterPublisher(PublisherPayload),
13    RegisterSubscriber(SubscriberPayload),
14    Message(Bytes),
15}
16
17impl Frame {
18    pub fn get_length(&self) -> Result<u64> {
19        let length = match self {
20            Self::RegisterPublisher(payload) => bincode::serialized_size(payload)?,
21            Self::RegisterSubscriber(payload) => bincode::serialized_size(payload)?,
22            Self::Message(bytes) => bytes.len() as u64,
23        };
24
25        Ok(length)
26    }
27
28    pub fn get_type(&self) -> u8 {
29        match self {
30            Self::RegisterPublisher(_) => REGISTER_PUBLISHER,
31            Self::RegisterSubscriber(_) => REGISTER_SUBSCRIBER,
32            Self::Message(_) => MESSAGE,
33        }
34    }
35
36    pub fn get_topic(&self) -> Option<&str> {
37        match self {
38            Self::RegisterPublisher(p) => Some(&p.topic),
39            Self::RegisterSubscriber(s) => Some(&s.topic),
40            Self::Message(_) => None,
41        }
42    }
43
44    pub fn write_to_bytes(self, dst: &mut BytesMut) -> Result<()> {
45        match self {
46            Frame::RegisterPublisher(payload) => bincode::serialize_into(dst.writer(), &payload)?,
47            Frame::RegisterSubscriber(payload) => bincode::serialize_into(dst.writer(), &payload)?,
48            Frame::Message(bytes) => dst.extend_from_slice(&bytes),
49        }
50
51        Ok(())
52    }
53}
54
55impl TryFrom<(u8, BytesMut)> for Frame {
56    type Error = anyhow::Error;
57
58    fn try_from((message_type, bytes): (u8, BytesMut)) -> Result<Self> {
59        let frame = match message_type {
60            REGISTER_PUBLISHER => Frame::RegisterPublisher(bincode::deserialize(&bytes)?),
61            REGISTER_SUBSCRIBER => Frame::RegisterSubscriber(bincode::deserialize(&bytes)?),
62            MESSAGE => Frame::Message(bytes.into()),
63            _ => bail!("Unknown message type"),
64        };
65
66        Ok(frame)
67    }
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
71pub struct PublisherPayload {
72    pub topic: String,
73    pub retention_policy: u64,
74    pub operations: Vec<Operation>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78pub struct SubscriberPayload {
79    pub topic: String,
80    pub retention_policy: u64,
81    pub operations: Vec<Operation>,
82}