Skip to main content

rivven_protocol/
types.rs

1//! Message data types for protocol transport
2
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5
6/// Schema type (format) for schema registry
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
8#[serde(rename_all = "UPPERCASE")]
9pub enum SchemaType {
10    /// Apache Avro (recommended for production)
11    #[default]
12    #[serde(alias = "avro", alias = "AVRO")]
13    Avro,
14
15    /// JSON Schema
16    #[serde(alias = "json", alias = "JSON")]
17    Json,
18
19    /// Protocol Buffers
20    #[serde(alias = "protobuf", alias = "PROTOBUF")]
21    Protobuf,
22}
23
24impl SchemaType {
25    pub fn as_str(&self) -> &'static str {
26        match self {
27            SchemaType::Avro => "AVRO",
28            SchemaType::Json => "JSON",
29            SchemaType::Protobuf => "PROTOBUF",
30        }
31    }
32}
33
34impl std::fmt::Display for SchemaType {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(f, "{}", self.as_str())
37    }
38}
39
40impl std::str::FromStr for SchemaType {
41    type Err = String;
42
43    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
44        match s.to_uppercase().as_str() {
45            "AVRO" => Ok(SchemaType::Avro),
46            "JSON" | "JSONSCHEMA" | "JSON_SCHEMA" => Ok(SchemaType::Json),
47            "PROTOBUF" | "PROTO" => Ok(SchemaType::Protobuf),
48            _ => Err(format!("Unknown schema type: {}", s)),
49        }
50    }
51}
52
53/// Serialized message data for transport
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct MessageData {
56    /// Message offset in the partition
57    pub offset: u64,
58    /// Partition the message belongs to
59    #[serde(default)]
60    pub partition: u32,
61    /// Optional message key
62    #[serde(with = "crate::serde_utils::option_bytes_serde")]
63    pub key: Option<Bytes>,
64    /// Message value/payload
65    #[serde(with = "crate::serde_utils::bytes_serde")]
66    pub value: Bytes,
67    /// Timestamp in milliseconds since epoch
68    pub timestamp: i64,
69    /// Record headers (key-value metadata)
70    #[serde(default)]
71    pub headers: Vec<(String, Vec<u8>)>,
72}
73
74impl MessageData {
75    /// Create a new message
76    pub fn new(offset: u64, value: impl Into<Bytes>, timestamp: i64) -> Self {
77        Self {
78            offset,
79            partition: 0,
80            key: None,
81            value: value.into(),
82            timestamp,
83            headers: Vec::new(),
84        }
85    }
86
87    /// Set the key
88    pub fn with_key(mut self, key: impl Into<Bytes>) -> Self {
89        self.key = Some(key.into());
90        self
91    }
92
93    /// Set the partition
94    pub fn with_partition(mut self, partition: u32) -> Self {
95        self.partition = partition;
96        self
97    }
98
99    /// Add a header
100    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<Vec<u8>>) -> Self {
101        self.headers.push((key.into(), value.into()));
102        self
103    }
104
105    /// Get the value as bytes
106    pub fn value_bytes(&self) -> &[u8] {
107        &self.value
108    }
109
110    /// Get the key as bytes if present
111    pub fn key_bytes(&self) -> Option<&[u8]> {
112        self.key.as_ref().map(|k| k.as_ref())
113    }
114
115    /// Get the size of this message (key + value + headers)
116    pub fn size(&self) -> usize {
117        let key_size = self.key.as_ref().map(|k| k.len()).unwrap_or(0);
118        let header_size: usize = self.headers.iter().map(|(k, v)| k.len() + v.len()).sum();
119        key_size + self.value.len() + header_size
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn test_message_data() {
129        let msg = MessageData::new(42, b"hello".to_vec(), 1234567890).with_key(b"key1".to_vec());
130
131        assert_eq!(msg.offset, 42);
132        assert_eq!(msg.value_bytes(), b"hello");
133        assert_eq!(msg.key_bytes(), Some(b"key1".as_slice()));
134        assert_eq!(msg.size(), 4 + 5); // key + value
135    }
136
137    #[test]
138    fn test_message_data_no_key() {
139        let msg = MessageData::new(0, b"data".to_vec(), 0);
140
141        assert!(msg.key.is_none());
142        assert_eq!(msg.key_bytes(), None);
143        assert_eq!(msg.size(), 4);
144    }
145}