use bytes::Bytes;
use std::fmt;
#[derive(Default, Clone, Debug)]
pub struct Headers(pub(crate) Vec<(String, Bytes)>);
impl Headers {
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn insert(&mut self, key: impl Into<String>, value: impl AsRef<[u8]>) {
self.0
.push((key.into(), Bytes::copy_from_slice(value.as_ref())));
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.0.len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &(String, Bytes)> {
self.0.iter()
}
}
impl IntoIterator for Headers {
type Item = (String, Bytes);
type IntoIter = std::vec::IntoIter<(String, Bytes)>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
pub trait AsBytes {
fn as_bytes(&self) -> &[u8];
}
impl AsBytes for () {
fn as_bytes(&self) -> &[u8] {
&[]
}
}
impl AsBytes for String {
fn as_bytes(&self) -> &[u8] {
self.as_ref()
}
}
impl AsBytes for Vec<u8> {
fn as_bytes(&self) -> &[u8] {
self.as_ref()
}
}
impl AsBytes for &[u8] {
fn as_bytes(&self) -> &[u8] {
self
}
}
impl AsBytes for &str {
fn as_bytes(&self) -> &[u8] {
str::as_bytes(self)
}
}
pub struct Record<'a, K, V> {
pub key: K,
pub value: V,
pub topic: &'a str,
pub partition: i32,
pub headers: Headers,
}
impl<'a, K, V> Record<'a, K, V> {
#[inline]
pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
Record {
key,
value,
topic,
partition: -1,
headers: Headers::new(),
}
}
#[inline]
#[must_use]
pub fn with_partition(mut self, partition: i32) -> Self {
self.partition = partition;
self
}
#[inline]
#[must_use]
pub fn with_headers(mut self, headers: Headers) -> Self {
self.headers = headers;
self
}
#[inline]
#[must_use]
pub fn with_header(mut self, key: impl Into<String>, value: impl AsRef<[u8]>) -> Self {
self.headers.insert(key, value);
self
}
}
impl<'a, V> Record<'a, (), V> {
#[inline]
pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
Record {
key: (),
value,
topic,
partition: -1,
headers: Headers::new(),
}
}
}
impl<K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'_, K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Record {{ topic: {}, partition: {}, key: {:?}, value: {:?}, headers: {:?} }}",
self.topic, self.partition, self.key, self.value, self.headers
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_headers_empty_default() {
let h = Headers::new();
assert!(h.is_empty());
assert_eq!(h.len(), 0);
}
#[test]
fn test_headers_insert_and_iter() {
let mut h = Headers::new();
h.insert("key1", b"value1");
h.insert("key2", b"value2");
assert_eq!(h.len(), 2);
assert!(!h.is_empty());
let pairs: Vec<_> = h.iter().collect();
assert_eq!(pairs.len(), 2);
assert_eq!(pairs[0].0, "key1");
assert_eq!(pairs[1].0, "key2");
}
#[test]
fn test_headers_into_iterator() {
let mut h = Headers::new();
h.insert("a", b"1");
h.insert("b", b"2");
assert_eq!(h.into_iter().count(), 2);
}
#[test]
fn test_record_default_no_headers() {
let r = Record::from_value("topic", b"value");
assert!(r.headers.is_empty());
assert_eq!(r.partition, -1);
}
#[test]
fn test_record_with_headers() {
let mut headers = Headers::new();
headers.insert("trace-id", b"abc123");
let r = Record::from_value("topic", b"value").with_headers(headers);
assert_eq!(r.headers.len(), 1);
}
#[test]
fn test_record_with_single_header() {
let r = Record::from_value("topic", b"value").with_header("content-type", b"json");
assert_eq!(r.headers.len(), 1);
}
#[test]
fn test_record_from_key_value_with_headers() {
let r = Record::from_key_value("topic", "key", b"value").with_header("h1", b"v1");
assert_eq!(r.key, "key");
assert_eq!(r.headers.len(), 1);
}
}