rustfs_kafka/producer/
record.rs1use bytes::Bytes;
2use std::fmt;
3
4#[derive(Default, Clone, Debug)]
11pub struct Headers(pub(crate) Vec<(String, Bytes)>);
12
13impl Headers {
14 #[inline]
16 #[must_use]
17 pub fn new() -> Self {
18 Self::default()
19 }
20
21 pub fn insert(&mut self, key: impl Into<String>, value: impl AsRef<[u8]>) {
23 self.0
24 .push((key.into(), Bytes::copy_from_slice(value.as_ref())));
25 }
26
27 #[inline]
29 #[must_use]
30 pub fn len(&self) -> usize {
31 self.0.len()
32 }
33
34 #[inline]
36 #[must_use]
37 pub fn is_empty(&self) -> bool {
38 self.0.is_empty()
39 }
40
41 #[inline]
43 pub fn iter(&self) -> impl Iterator<Item = &(String, Bytes)> {
44 self.0.iter()
45 }
46}
47
48impl IntoIterator for Headers {
49 type Item = (String, Bytes);
50 type IntoIter = std::vec::IntoIter<(String, Bytes)>;
51
52 fn into_iter(self) -> Self::IntoIter {
53 self.0.into_iter()
54 }
55}
56
57pub trait AsBytes {
63 fn as_bytes(&self) -> &[u8];
68}
69
70impl AsBytes for () {
71 fn as_bytes(&self) -> &[u8] {
72 &[]
73 }
74}
75
76impl AsBytes for String {
77 fn as_bytes(&self) -> &[u8] {
78 self.as_ref()
79 }
80}
81impl AsBytes for Vec<u8> {
82 fn as_bytes(&self) -> &[u8] {
83 self.as_ref()
84 }
85}
86
87impl AsBytes for &[u8] {
88 fn as_bytes(&self) -> &[u8] {
89 self
90 }
91}
92impl AsBytes for &str {
93 fn as_bytes(&self) -> &[u8] {
94 str::as_bytes(self)
95 }
96}
97
98pub struct Record<'a, K, V> {
104 pub key: K,
106
107 pub value: V,
109
110 pub topic: &'a str,
112
113 pub partition: i32,
118
119 pub headers: Headers,
121}
122
123impl<'a, K, V> Record<'a, K, V> {
124 #[inline]
128 pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
129 Record {
130 key,
131 value,
132 topic,
133 partition: -1,
134 headers: Headers::new(),
135 }
136 }
137
138 #[inline]
140 #[must_use]
141 pub fn with_partition(mut self, partition: i32) -> Self {
142 self.partition = partition;
143 self
144 }
145
146 #[inline]
148 #[must_use]
149 pub fn with_headers(mut self, headers: Headers) -> Self {
150 self.headers = headers;
151 self
152 }
153
154 #[inline]
156 #[must_use]
157 pub fn with_header(mut self, key: impl Into<String>, value: impl AsRef<[u8]>) -> Self {
158 self.headers.insert(key, value);
159 self
160 }
161}
162
163impl<'a, V> Record<'a, (), V> {
164 #[inline]
168 pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
169 Record {
170 key: (),
171 value,
172 topic,
173 partition: -1,
174 headers: Headers::new(),
175 }
176 }
177}
178
179impl<K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'_, K, V> {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 write!(
182 f,
183 "Record {{ topic: {}, partition: {}, key: {:?}, value: {:?}, headers: {:?} }}",
184 self.topic, self.partition, self.key, self.value, self.headers
185 )
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn test_headers_empty_default() {
195 let h = Headers::new();
196 assert!(h.is_empty());
197 assert_eq!(h.len(), 0);
198 }
199
200 #[test]
201 fn test_headers_insert_and_iter() {
202 let mut h = Headers::new();
203 h.insert("key1", b"value1");
204 h.insert("key2", b"value2");
205
206 assert_eq!(h.len(), 2);
207 assert!(!h.is_empty());
208
209 let pairs: Vec<_> = h.iter().collect();
210 assert_eq!(pairs.len(), 2);
211 assert_eq!(pairs[0].0, "key1");
212 assert_eq!(pairs[1].0, "key2");
213 }
214
215 #[test]
216 fn test_headers_into_iterator() {
217 let mut h = Headers::new();
218 h.insert("a", b"1");
219 h.insert("b", b"2");
220
221 assert_eq!(h.into_iter().count(), 2);
222 }
223
224 #[test]
225 fn test_record_default_no_headers() {
226 let r = Record::from_value("topic", b"value");
227 assert!(r.headers.is_empty());
228 assert_eq!(r.partition, -1);
229 }
230
231 #[test]
232 fn test_record_with_headers() {
233 let mut headers = Headers::new();
234 headers.insert("trace-id", b"abc123");
235
236 let r = Record::from_value("topic", b"value").with_headers(headers);
237 assert_eq!(r.headers.len(), 1);
238 }
239
240 #[test]
241 fn test_record_with_single_header() {
242 let r = Record::from_value("topic", b"value").with_header("content-type", b"json");
243 assert_eq!(r.headers.len(), 1);
244 }
245
246 #[test]
247 fn test_record_from_key_value_with_headers() {
248 let r = Record::from_key_value("topic", "key", b"value").with_header("h1", b"v1");
249 assert_eq!(r.key, "key");
250 assert_eq!(r.headers.len(), 1);
251 }
252}