Skip to main content

rivven_protocol/
types.rs

1//! Message data types for protocol transport
2
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5
6/// Schema type (format) for schema registry
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
8#[serde(rename_all = "UPPERCASE")]
9pub enum SchemaType {
10    /// Apache Avro (recommended for production)
11    #[default]
12    #[serde(alias = "avro", alias = "AVRO")]
13    Avro,
14
15    /// JSON Schema
16    #[serde(alias = "json", alias = "JSON")]
17    Json,
18
19    /// Protocol Buffers
20    #[serde(alias = "protobuf", alias = "PROTOBUF")]
21    Protobuf,
22}
23
24impl SchemaType {
25    pub fn as_str(&self) -> &'static str {
26        match self {
27            SchemaType::Avro => "AVRO",
28            SchemaType::Json => "JSON",
29            SchemaType::Protobuf => "PROTOBUF",
30        }
31    }
32}
33
34impl std::fmt::Display for SchemaType {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(f, "{}", self.as_str())
37    }
38}
39
40impl std::str::FromStr for SchemaType {
41    type Err = String;
42
43    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
44        match s.to_uppercase().as_str() {
45            "AVRO" => Ok(SchemaType::Avro),
46            "JSON" | "JSONSCHEMA" | "JSON_SCHEMA" => Ok(SchemaType::Json),
47            "PROTOBUF" | "PROTO" => Ok(SchemaType::Protobuf),
48            _ => Err(format!("Unknown schema type: {}", s)),
49        }
50    }
51}
52
53/// Serialized message data for transport
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct MessageData {
56    /// Message offset in the partition
57    pub offset: u64,
58    /// Optional message key
59    #[serde(with = "crate::serde_utils::option_bytes_serde")]
60    pub key: Option<Bytes>,
61    /// Message value/payload
62    #[serde(with = "crate::serde_utils::bytes_serde")]
63    pub value: Bytes,
64    /// Timestamp in milliseconds since epoch
65    pub timestamp: i64,
66}
67
68impl MessageData {
69    /// Create a new message
70    pub fn new(offset: u64, value: impl Into<Bytes>, timestamp: i64) -> Self {
71        Self {
72            offset,
73            key: None,
74            value: value.into(),
75            timestamp,
76        }
77    }
78
79    /// Set the key
80    pub fn with_key(mut self, key: impl Into<Bytes>) -> Self {
81        self.key = Some(key.into());
82        self
83    }
84
85    /// Get the value as bytes
86    pub fn value_bytes(&self) -> &[u8] {
87        &self.value
88    }
89
90    /// Get the key as bytes if present
91    pub fn key_bytes(&self) -> Option<&[u8]> {
92        self.key.as_ref().map(|k| k.as_ref())
93    }
94
95    /// Get the size of this message (key + value)
96    pub fn size(&self) -> usize {
97        let key_size = self.key.as_ref().map(|k| k.len()).unwrap_or(0);
98        key_size + self.value.len()
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    #[test]
107    fn test_message_data() {
108        let msg = MessageData::new(42, b"hello".to_vec(), 1234567890).with_key(b"key1".to_vec());
109
110        assert_eq!(msg.offset, 42);
111        assert_eq!(msg.value_bytes(), b"hello");
112        assert_eq!(msg.key_bytes(), Some(b"key1".as_slice()));
113        assert_eq!(msg.size(), 4 + 5); // key + value
114    }
115
116    #[test]
117    fn test_message_data_no_key() {
118        let msg = MessageData::new(0, b"data".to_vec(), 0);
119
120        assert!(msg.key.is_none());
121        assert_eq!(msg.key_bytes(), None);
122        assert_eq!(msg.size(), 4);
123    }
124}