Skip to main content

rustfs_kafka/producer/
record.rs

1use bytes::Bytes;
2use std::fmt;
3
4/// A collection of key-value headers attached to a Kafka record.
5///
6/// Headers are supported since Kafka 0.11.
7///
8/// Header values are stored as `Bytes` to enable zero-copy sharing
9/// when records are batched and encoded for the wire protocol.
10#[derive(Default, Clone, Debug)]
11pub struct Headers(pub(crate) Vec<(String, Bytes)>);
12
13impl Headers {
14    /// Creates an empty headers collection.
15    #[inline]
16    #[must_use]
17    pub fn new() -> Self {
18        Self::default()
19    }
20
21    /// Adds a header key-value pair.
22    pub fn insert(&mut self, key: impl Into<String>, value: impl AsRef<[u8]>) {
23        self.0
24            .push((key.into(), Bytes::copy_from_slice(value.as_ref())));
25    }
26
27    /// Returns the number of headers.
28    #[inline]
29    #[must_use]
30    pub fn len(&self) -> usize {
31        self.0.len()
32    }
33
34    /// Returns `true` if there are no headers.
35    #[inline]
36    #[must_use]
37    pub fn is_empty(&self) -> bool {
38        self.0.is_empty()
39    }
40
41    /// Returns an iterator over the headers.
42    #[inline]
43    pub fn iter(&self) -> impl Iterator<Item = &(String, Bytes)> {
44        self.0.iter()
45    }
46}
47
48impl IntoIterator for Headers {
49    type Item = (String, Bytes);
50    type IntoIter = std::vec::IntoIter<(String, Bytes)>;
51
52    fn into_iter(self) -> Self::IntoIter {
53        self.0.into_iter()
54    }
55}
56
57// --------------------------------------------------------------------
58
59/// A trait used by `Producer` to obtain the bytes `Record::key` and
60/// `Record::value` represent.  This leaves the choice of the types
61/// for `key` and `value` with the client.
62pub trait AsBytes {
63    /// Return the byte slice representation of this value.
64    ///
65    /// Implementors should return a stable slice view over the underlying
66    /// data. Empty values may return an empty slice.
67    fn as_bytes(&self) -> &[u8];
68}
69
70impl AsBytes for () {
71    fn as_bytes(&self) -> &[u8] {
72        &[]
73    }
74}
75
76impl AsBytes for String {
77    fn as_bytes(&self) -> &[u8] {
78        self.as_ref()
79    }
80}
81impl AsBytes for Vec<u8> {
82    fn as_bytes(&self) -> &[u8] {
83        self.as_ref()
84    }
85}
86
87impl AsBytes for &[u8] {
88    fn as_bytes(&self) -> &[u8] {
89        self
90    }
91}
92impl AsBytes for &str {
93    fn as_bytes(&self) -> &[u8] {
94        str::as_bytes(self)
95    }
96}
97
98// --------------------------------------------------------------------
99
100/// A structure representing a message to be sent to Kafka through the
101/// `Producer` API.  Such a message is basically a key/value pair
102/// specifying the target topic and optionally the topic's partition.
103pub struct Record<'a, K, V> {
104    /// Key data of this (message) record.
105    pub key: K,
106
107    /// Value data of this (message) record.
108    pub value: V,
109
110    /// Name of the topic this message is supposed to be delivered to.
111    pub topic: &'a str,
112
113    /// The partition id of the topic to deliver this message to.
114    /// This partition may be `< 0` in which case it is considered
115    /// "unspecified".  A `Producer` will then typically try to derive
116    /// a partition on its own.
117    pub partition: i32,
118
119    /// Optional headers attached to this record.
120    pub headers: Headers,
121}
122
123impl<'a, K, V> Record<'a, K, V> {
124    /// Convenience function to create a new key/value record with an
125    /// "unspecified" partition - this is, a partition set to a negative
126    /// value.
127    #[inline]
128    pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
129        Record {
130            key,
131            value,
132            topic,
133            partition: -1,
134            headers: Headers::new(),
135        }
136    }
137
138    /// Convenience method to set the partition.
139    #[inline]
140    #[must_use]
141    pub fn with_partition(mut self, partition: i32) -> Self {
142        self.partition = partition;
143        self
144    }
145
146    /// Sets the headers for this record.
147    #[inline]
148    #[must_use]
149    pub fn with_headers(mut self, headers: Headers) -> Self {
150        self.headers = headers;
151        self
152    }
153
154    /// Adds a single header to this record.
155    #[inline]
156    #[must_use]
157    pub fn with_header(mut self, key: impl Into<String>, value: impl AsRef<[u8]>) -> Self {
158        self.headers.insert(key, value);
159        self
160    }
161}
162
163impl<'a, V> Record<'a, (), V> {
164    /// Convenience function to create a new value only record with an
165    /// "unspecified" partition - this is, a partition set to a negative
166    /// value.
167    #[inline]
168    pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
169        Record {
170            key: (),
171            value,
172            topic,
173            partition: -1,
174            headers: Headers::new(),
175        }
176    }
177}
178
179impl<K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'_, K, V> {
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        write!(
182            f,
183            "Record {{ topic: {}, partition: {}, key: {:?}, value: {:?}, headers: {:?} }}",
184            self.topic, self.partition, self.key, self.value, self.headers
185        )
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_headers_empty_default() {
195        let h = Headers::new();
196        assert!(h.is_empty());
197        assert_eq!(h.len(), 0);
198    }
199
200    #[test]
201    fn test_headers_insert_and_iter() {
202        let mut h = Headers::new();
203        h.insert("key1", b"value1");
204        h.insert("key2", b"value2");
205
206        assert_eq!(h.len(), 2);
207        assert!(!h.is_empty());
208
209        let pairs: Vec<_> = h.iter().collect();
210        assert_eq!(pairs.len(), 2);
211        assert_eq!(pairs[0].0, "key1");
212        assert_eq!(pairs[1].0, "key2");
213    }
214
215    #[test]
216    fn test_headers_into_iterator() {
217        let mut h = Headers::new();
218        h.insert("a", b"1");
219        h.insert("b", b"2");
220
221        assert_eq!(h.into_iter().count(), 2);
222    }
223
224    #[test]
225    fn test_record_default_no_headers() {
226        let r = Record::from_value("topic", b"value");
227        assert!(r.headers.is_empty());
228        assert_eq!(r.partition, -1);
229    }
230
231    #[test]
232    fn test_record_with_headers() {
233        let mut headers = Headers::new();
234        headers.insert("trace-id", b"abc123");
235
236        let r = Record::from_value("topic", b"value").with_headers(headers);
237        assert_eq!(r.headers.len(), 1);
238    }
239
240    #[test]
241    fn test_record_with_single_header() {
242        let r = Record::from_value("topic", b"value").with_header("content-type", b"json");
243        assert_eq!(r.headers.len(), 1);
244    }
245
246    #[test]
247    fn test_record_from_key_value_with_headers() {
248        let r = Record::from_key_value("topic", "key", b"value").with_header("h1", b"v1");
249        assert_eq!(r.key, "key");
250        assert_eq!(r.headers.len(), 1);
251    }
252}