Skip to main content

rivven_protocol/
types.rs

1//! Message data types for protocol transport
2
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5
6/// Serialized message data for transport
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
8pub struct MessageData {
9    /// Message offset in the partition
10    pub offset: u64,
11    /// Optional message key
12    #[serde(with = "rivven_core::serde_utils::option_bytes_serde")]
13    pub key: Option<Bytes>,
14    /// Message value/payload
15    #[serde(with = "rivven_core::serde_utils::bytes_serde")]
16    pub value: Bytes,
17    /// Timestamp in milliseconds since epoch
18    pub timestamp: i64,
19}
20
21impl MessageData {
22    /// Create a new message
23    pub fn new(offset: u64, value: impl Into<Bytes>, timestamp: i64) -> Self {
24        Self {
25            offset,
26            key: None,
27            value: value.into(),
28            timestamp,
29        }
30    }
31
32    /// Set the key
33    pub fn with_key(mut self, key: impl Into<Bytes>) -> Self {
34        self.key = Some(key.into());
35        self
36    }
37
38    /// Get the value as bytes
39    pub fn value_bytes(&self) -> &[u8] {
40        &self.value
41    }
42
43    /// Get the key as bytes if present
44    pub fn key_bytes(&self) -> Option<&[u8]> {
45        self.key.as_ref().map(|k| k.as_ref())
46    }
47
48    /// Get the size of this message (key + value)
49    pub fn size(&self) -> usize {
50        let key_size = self.key.as_ref().map(|k| k.len()).unwrap_or(0);
51        key_size + self.value.len()
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58
59    #[test]
60    fn test_message_data() {
61        let msg = MessageData::new(42, b"hello".to_vec(), 1234567890).with_key(b"key1".to_vec());
62
63        assert_eq!(msg.offset, 42);
64        assert_eq!(msg.value_bytes(), b"hello");
65        assert_eq!(msg.key_bytes(), Some(b"key1".as_slice()));
66        assert_eq!(msg.size(), 4 + 5); // key + value
67    }
68
69    #[test]
70    fn test_message_data_no_key() {
71        let msg = MessageData::new(0, b"data".to_vec(), 0);
72
73        assert!(msg.key.is_none());
74        assert_eq!(msg.key_bytes(), None);
75        assert_eq!(msg.size(), 4);
76    }
77}