Skip to main content

rivven_core/
message.rs

1use crate::serde_utils::{bytes_serde, option_bytes_serde};
2use crate::transaction::TransactionMarker;
3use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7/// Represents a single message in Rivven
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct Message {
10    /// Unique offset within the partition
11    pub offset: u64,
12
13    /// Message key (optional, used for partitioning)
14    #[serde(with = "option_bytes_serde")]
15    pub key: Option<Bytes>,
16
17    /// Message payload
18    #[serde(with = "bytes_serde")]
19    pub value: Bytes,
20
21    /// Timestamp when message was created
22    pub timestamp: DateTime<Utc>,
23
24    /// Optional headers for metadata
25    pub headers: Vec<(String, Vec<u8>)>,
26
27    /// Producer ID (for transactional/idempotent messages)
28    /// None for non-transactional messages
29    #[serde(default)]
30    pub producer_id: Option<u64>,
31
32    /// Producer epoch (for fencing)
33    #[serde(default)]
34    pub producer_epoch: Option<u16>,
35
36    /// Transaction marker (Some for control records, None for data records)
37    /// Control records mark transaction boundaries (COMMIT/ABORT)
38    #[serde(default)]
39    pub transaction_marker: Option<TransactionMarker>,
40
41    /// Whether this message is part of an ongoing transaction
42    /// Used for read_committed filtering
43    #[serde(default)]
44    pub is_transactional: bool,
45}
46
47impl Message {
48    /// Create a new message
49    pub fn new(value: Bytes) -> Self {
50        Self {
51            offset: 0,
52            key: None,
53            value,
54            timestamp: Utc::now(),
55            headers: Vec::new(),
56            producer_id: None,
57            producer_epoch: None,
58            transaction_marker: None,
59            is_transactional: false,
60        }
61    }
62
63    /// Create a message with a key
64    pub fn with_key(key: Bytes, value: Bytes) -> Self {
65        Self {
66            offset: 0,
67            key: Some(key),
68            value,
69            timestamp: Utc::now(),
70            headers: Vec::new(),
71            producer_id: None,
72            producer_epoch: None,
73            transaction_marker: None,
74            is_transactional: false,
75        }
76    }
77
78    /// Create a transactional message
79    pub fn transactional(value: Bytes, producer_id: u64, producer_epoch: u16) -> Self {
80        Self {
81            offset: 0,
82            key: None,
83            value,
84            timestamp: Utc::now(),
85            headers: Vec::new(),
86            producer_id: Some(producer_id),
87            producer_epoch: Some(producer_epoch),
88            transaction_marker: None,
89            is_transactional: true,
90        }
91    }
92
93    /// Create a transactional message with a key
94    pub fn transactional_with_key(
95        key: Bytes,
96        value: Bytes,
97        producer_id: u64,
98        producer_epoch: u16,
99    ) -> Self {
100        Self {
101            offset: 0,
102            key: Some(key),
103            value,
104            timestamp: Utc::now(),
105            headers: Vec::new(),
106            producer_id: Some(producer_id),
107            producer_epoch: Some(producer_epoch),
108            transaction_marker: None,
109            is_transactional: true,
110        }
111    }
112
113    /// Create a transaction control record (COMMIT or ABORT marker)
114    pub fn control_record(
115        marker: TransactionMarker,
116        producer_id: u64,
117        producer_epoch: u16,
118    ) -> Self {
119        Self {
120            offset: 0,
121            key: None,
122            value: Bytes::new(), // Control records have empty value
123            timestamp: Utc::now(),
124            headers: Vec::new(),
125            producer_id: Some(producer_id),
126            producer_epoch: Some(producer_epoch),
127            transaction_marker: Some(marker),
128            is_transactional: true,
129        }
130    }
131
132    /// Check if this is a control record (transaction marker)
133    pub fn is_control_record(&self) -> bool {
134        self.transaction_marker.is_some()
135    }
136
137    /// Check if this message is from a committed transaction
138    /// Note: This is set after transaction completion, not during write
139    pub fn is_committed(&self) -> bool {
140        !self.is_transactional || matches!(self.transaction_marker, Some(TransactionMarker::Commit))
141    }
142
143    /// Add a header to the message
144    pub fn add_header(mut self, key: String, value: Vec<u8>) -> Self {
145        self.headers.push((key, value));
146        self
147    }
148
149    /// Mark as transactional
150    pub fn with_producer(
151        mut self,
152        producer_id: u64,
153        producer_epoch: u16,
154        transactional: bool,
155    ) -> Self {
156        self.producer_id = Some(producer_id);
157        self.producer_epoch = Some(producer_epoch);
158        self.is_transactional = transactional;
159        self
160    }
161
162    /// Serialize to bytes
163    pub fn to_bytes(&self) -> crate::Result<Vec<u8>> {
164        Ok(postcard::to_allocvec(self)?)
165    }
166
167    /// Deserialize from bytes
168    pub fn from_bytes(data: &[u8]) -> crate::Result<Self> {
169        Ok(postcard::from_bytes(data)?)
170    }
171}