1use crate::serde_utils::{bytes_serde, option_bytes_serde};
2use crate::transaction::TransactionMarker;
3use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct Message {
10 pub offset: u64,
12
13 #[serde(with = "option_bytes_serde")]
15 pub key: Option<Bytes>,
16
17 #[serde(with = "bytes_serde")]
19 pub value: Bytes,
20
21 pub timestamp: DateTime<Utc>,
23
24 pub headers: Vec<(String, Vec<u8>)>,
26
27 #[serde(default)]
30 pub producer_id: Option<u64>,
31
32 #[serde(default)]
34 pub producer_epoch: Option<u16>,
35
36 #[serde(default)]
39 pub transaction_marker: Option<TransactionMarker>,
40
41 #[serde(default)]
44 pub is_transactional: bool,
45}
46
47impl Message {
48 pub fn new(value: Bytes) -> Self {
50 Self {
51 offset: 0,
52 key: None,
53 value,
54 timestamp: Utc::now(),
55 headers: Vec::new(),
56 producer_id: None,
57 producer_epoch: None,
58 transaction_marker: None,
59 is_transactional: false,
60 }
61 }
62
63 pub fn with_key(key: Bytes, value: Bytes) -> Self {
65 Self {
66 offset: 0,
67 key: Some(key),
68 value,
69 timestamp: Utc::now(),
70 headers: Vec::new(),
71 producer_id: None,
72 producer_epoch: None,
73 transaction_marker: None,
74 is_transactional: false,
75 }
76 }
77
78 pub fn transactional(value: Bytes, producer_id: u64, producer_epoch: u16) -> Self {
80 Self {
81 offset: 0,
82 key: None,
83 value,
84 timestamp: Utc::now(),
85 headers: Vec::new(),
86 producer_id: Some(producer_id),
87 producer_epoch: Some(producer_epoch),
88 transaction_marker: None,
89 is_transactional: true,
90 }
91 }
92
93 pub fn transactional_with_key(
95 key: Bytes,
96 value: Bytes,
97 producer_id: u64,
98 producer_epoch: u16,
99 ) -> Self {
100 Self {
101 offset: 0,
102 key: Some(key),
103 value,
104 timestamp: Utc::now(),
105 headers: Vec::new(),
106 producer_id: Some(producer_id),
107 producer_epoch: Some(producer_epoch),
108 transaction_marker: None,
109 is_transactional: true,
110 }
111 }
112
113 pub fn control_record(
115 marker: TransactionMarker,
116 producer_id: u64,
117 producer_epoch: u16,
118 ) -> Self {
119 Self {
120 offset: 0,
121 key: None,
122 value: Bytes::new(), timestamp: Utc::now(),
124 headers: Vec::new(),
125 producer_id: Some(producer_id),
126 producer_epoch: Some(producer_epoch),
127 transaction_marker: Some(marker),
128 is_transactional: true,
129 }
130 }
131
132 pub fn is_control_record(&self) -> bool {
134 self.transaction_marker.is_some()
135 }
136
137 pub fn is_committed(&self) -> bool {
140 !self.is_transactional || matches!(self.transaction_marker, Some(TransactionMarker::Commit))
141 }
142
143 pub fn add_header(mut self, key: String, value: Vec<u8>) -> Self {
145 self.headers.push((key, value));
146 self
147 }
148
149 pub fn with_producer(
151 mut self,
152 producer_id: u64,
153 producer_epoch: u16,
154 transactional: bool,
155 ) -> Self {
156 self.producer_id = Some(producer_id);
157 self.producer_epoch = Some(producer_epoch);
158 self.is_transactional = transactional;
159 self
160 }
161
162 pub fn to_bytes(&self) -> crate::Result<Vec<u8>> {
164 Ok(postcard::to_allocvec(self)?)
165 }
166
167 pub fn from_bytes(data: &[u8]) -> crate::Result<Self> {
169 Ok(postcard::from_bytes(data)?)
170 }
171}