xrpc/message/
types.rs

1use serde::{Deserialize, Serialize};
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use crate::error::{Result, RpcError};
5
6pub const MAGIC: [u8; 4] = [0x58, 0x52, 0x50, 0x43]; // XRPC
7pub const VERSION: u8 = 1;
8pub const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
9pub const MIN_HEADER_SIZE: usize = 4 + 1 + 1 + 4 + 8 + 1;
10
11static NEXT_MESSAGE_ID: AtomicU64 = AtomicU64::new(1);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct MessageId(pub u64);
15
16impl MessageId {
17    pub fn new() -> Self {
18        MessageId(NEXT_MESSAGE_ID.fetch_add(1, Ordering::Relaxed))
19    }
20
21    pub fn from_raw(id: u64) -> Self {
22        MessageId(id)
23    }
24
25    pub fn raw(&self) -> u64 {
26        self.0
27    }
28}
29
30impl Default for MessageId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl std::fmt::Display for MessageId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
43#[repr(u8)]
44pub enum MessageType {
45    Call = 0,
46    Reply = 1,
47    Notification = 2,
48    Error = 3,
49    StreamChunk = 4,
50    StreamEnd = 5,
51}
52
53impl MessageType {
54    pub fn from_u8(value: u8) -> Result<Self> {
55        match value {
56            0 => Ok(MessageType::Call),
57            1 => Ok(MessageType::Reply),
58            2 => Ok(MessageType::Notification),
59            3 => Ok(MessageType::Error),
60            4 => Ok(MessageType::StreamChunk),
61            5 => Ok(MessageType::StreamEnd),
62            _ => Err(RpcError::InvalidMessage(format!(
63                "Unknown message type: {}",
64                value
65            ))),
66        }
67    }
68
69    pub fn to_u8(self) -> u8 {
70        self as u8
71    }
72}
73
74/// Compression algorithm used for message payload
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
76#[repr(u8)]
77#[derive(Default)]
78pub enum CompressionType {
79    #[default]
80    None = 0,
81    Lz4 = 1,
82    Zstd = 2,
83}
84
85impl CompressionType {
86    pub fn from_u8(value: u8) -> Result<Self> {
87        match value {
88            0 => Ok(CompressionType::None),
89            1 => Ok(CompressionType::Lz4),
90            2 => Ok(CompressionType::Zstd),
91            _ => Err(RpcError::InvalidMessage(format!(
92                "Unknown compression type: {}",
93                value
94            ))),
95        }
96    }
97
98    pub fn to_u8(self) -> u8 {
99        self as u8
100    }
101}
102
103/// Message flags packed into a single byte
104#[derive(Debug, Clone, Copy, Default)]
105pub struct MessageFlags {
106    /// Whether the message payload is compressed
107    pub compressed: bool,
108    /// Whether this is a streaming message
109    pub streaming: bool,
110    /// Whether this is part of a batch
111    pub batch: bool,
112}
113
114impl MessageFlags {
115    pub fn from_u8(value: u8) -> Self {
116        Self {
117            compressed: (value & 0x01) != 0,
118            streaming: (value & 0x02) != 0,
119            batch: (value & 0x04) != 0,
120        }
121    }
122
123    pub fn to_u8(self) -> u8 {
124        let mut flags = 0u8;
125        if self.compressed {
126            flags |= 0x01;
127        }
128        if self.streaming {
129            flags |= 0x02;
130        }
131        if self.batch {
132            flags |= 0x04;
133        }
134        flags
135    }
136}