Skip to main content

rivven_core/
message.rs

1use crate::serde_utils::{bytes_serde, option_bytes_serde};
2use bytes::Bytes;
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5
6/// Represents a single message in Rivven
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct Message {
9    /// Unique offset within the partition
10    pub offset: u64,
11
12    /// Message key (optional, used for partitioning)
13    #[serde(with = "option_bytes_serde")]
14    pub key: Option<Bytes>,
15
16    /// Message payload
17    #[serde(with = "bytes_serde")]
18    pub value: Bytes,
19
20    /// Timestamp when message was created
21    pub timestamp: DateTime<Utc>,
22
23    /// Optional headers for metadata
24    pub headers: Vec<(String, Vec<u8>)>,
25}
26
27impl Message {
28    /// Create a new message
29    pub fn new(value: Bytes) -> Self {
30        Self {
31            offset: 0,
32            key: None,
33            value,
34            timestamp: Utc::now(),
35            headers: Vec::new(),
36        }
37    }
38
39    /// Create a message with a key
40    pub fn with_key(key: Bytes, value: Bytes) -> Self {
41        Self {
42            offset: 0,
43            key: Some(key),
44            value,
45            timestamp: Utc::now(),
46            headers: Vec::new(),
47        }
48    }
49
50    /// Add a header to the message
51    pub fn add_header(mut self, key: String, value: Vec<u8>) -> Self {
52        self.headers.push((key, value));
53        self
54    }
55
56    /// Serialize to bytes
57    pub fn to_bytes(&self) -> crate::Result<Vec<u8>> {
58        Ok(postcard::to_allocvec(self)?)
59    }
60
61    /// Deserialize from bytes
62    pub fn from_bytes(data: &[u8]) -> crate::Result<Self> {
63        Ok(postcard::from_bytes(data)?)
64    }
65}