use std::collections::HashSet;
use bytes::Bytes;
use crate::{Offset, PartitionId, Timestamp};
#[non_exhaustive]
#[must_use = "contains data consumed from Kafka"]
#[derive(Debug, Clone)]
pub struct ConsumerRecord {
pub topic: String,
pub partition: PartitionId,
pub offset: Offset,
pub timestamp: Timestamp,
pub timestamp_type: i8,
pub key: Option<Bytes>,
pub value: Option<Bytes>,
pub headers: Vec<(Bytes, Option<Bytes>)>,
pub leader_epoch: Option<i32>,
pub delivery_count: Option<i16>,
}
impl ConsumerRecord {
pub fn new(
topic: impl Into<String>,
partition: PartitionId,
offset: Offset,
key: Option<Bytes>,
value: Option<Bytes>,
) -> Self {
Self {
topic: topic.into(),
partition,
offset,
timestamp: 0,
timestamp_type: 0,
key,
value,
headers: Vec::new(),
leader_epoch: None,
delivery_count: None,
}
}
#[inline]
pub fn is_tombstone(&self) -> bool {
self.key.is_some() && self.value.is_none()
}
#[inline]
pub fn serialized_key_size(&self) -> Option<usize> {
self.key.as_ref().map(|k| k.len())
}
#[inline]
pub fn serialized_value_size(&self) -> Option<usize> {
self.value.as_ref().map(|v| v.len())
}
#[inline]
pub fn key_str(&self) -> Option<&str> {
self.key.as_ref().and_then(|k| std::str::from_utf8(k).ok())
}
#[inline]
pub fn value_str(&self) -> Option<&str> {
self.value
.as_ref()
.and_then(|v| std::str::from_utf8(v).ok())
}
#[inline]
pub fn header(&self, key: &[u8]) -> Option<Option<&Bytes>> {
self.headers
.iter()
.find(|(k, _)| k.as_ref() == key)
.map(|(_, v)| v.as_ref())
}
#[inline]
pub fn header_str(&self, key: &[u8]) -> Option<&str> {
self.header(key)
.flatten()
.and_then(|v| std::str::from_utf8(v).ok())
}
#[inline]
pub fn header_value(&self, key: &[u8]) -> Option<&Bytes> {
self.headers
.iter()
.find(|(k, v)| k.as_ref() == key && v.is_some())
.and_then(|(_, v)| v.as_ref())
}
#[inline]
pub fn headers_by_key(&self, key: &[u8]) -> Vec<Option<&Bytes>> {
self.headers
.iter()
.filter(|(k, _)| k.as_ref() == key)
.map(|(_, v)| v.as_ref())
.collect()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct TopicPartition {
pub topic: String,
pub partition: PartitionId,
}
impl TopicPartition {
pub fn new(topic: impl Into<String>, partition: PartitionId) -> Self {
Self {
topic: topic.into(),
partition,
}
}
#[inline]
pub fn topic(&self) -> &str {
&self.topic
}
#[inline]
pub fn partition(&self) -> PartitionId {
self.partition
}
}
#[derive(Debug, Default)]
pub struct ConsumerRecords {
records: Vec<ConsumerRecord>,
partitions: Vec<(String, PartitionId)>,
}
impl ConsumerRecords {
pub fn empty() -> Self {
Self::default()
}
pub fn from_records(records: Vec<ConsumerRecord>) -> Self {
let mut seen = HashSet::new();
let mut partitions = Vec::new();
for record in &records {
let tp = (record.topic.clone(), record.partition);
if seen.insert(tp.clone()) {
partitions.push(tp);
}
}
Self {
records,
partitions,
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
#[inline]
pub fn count(&self) -> usize {
self.records.len()
}
pub fn records_for_topic(&self, topic: &str) -> impl Iterator<Item = &ConsumerRecord> {
self.records.iter().filter(move |r| r.topic == topic)
}
pub fn records_for_partition(
&self,
topic: &str,
partition: PartitionId,
) -> impl Iterator<Item = &ConsumerRecord> {
self.records
.iter()
.filter(move |r| r.topic == topic && r.partition == partition)
}
#[inline]
pub fn partitions(&self) -> &[(String, PartitionId)] {
&self.partitions
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &ConsumerRecord> {
self.records.iter()
}
pub fn into_vec(self) -> Vec<ConsumerRecord> {
self.records
}
}
impl IntoIterator for ConsumerRecords {
type Item = ConsumerRecord;
type IntoIter = std::vec::IntoIter<ConsumerRecord>;
fn into_iter(self) -> Self::IntoIter {
self.records.into_iter()
}
}
impl<'a> IntoIterator for &'a ConsumerRecords {
type Item = &'a ConsumerRecord;
type IntoIter = std::slice::Iter<'a, ConsumerRecord>;
fn into_iter(self) -> Self::IntoIter {
self.records.iter()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_consumer_record_new() {
let record = ConsumerRecord::new(
"test-topic",
0,
42,
Some(Bytes::from("key")),
Some(Bytes::from("value")),
);
assert_eq!(record.topic, "test-topic");
assert_eq!(record.partition, 0);
assert_eq!(record.offset, 42);
assert_eq!(record.key_str(), Some("key"));
assert_eq!(record.value_str(), Some("value"));
assert_eq!(record.serialized_key_size(), Some(3));
assert_eq!(record.serialized_value_size(), Some(5));
}
#[test]
fn test_consumer_record_serialized_sizes_absent() {
let record = ConsumerRecord::new("topic", 0, 0, None, None);
assert_eq!(record.serialized_key_size(), None);
assert_eq!(record.serialized_value_size(), None);
}
#[test]
fn test_consumer_record_is_tombstone() {
let tombstone = ConsumerRecord::new("t", 0, 0, Some(Bytes::from("key")), None);
assert!(tombstone.is_tombstone());
let normal = ConsumerRecord::new(
"t",
0,
0,
Some(Bytes::from("key")),
Some(Bytes::from("val")),
);
assert!(!normal.is_tombstone());
let keyless = ConsumerRecord::new("t", 0, 0, None, None);
assert!(!keyless.is_tombstone());
let no_key = ConsumerRecord::new("t", 0, 0, None, Some(Bytes::from("val")));
assert!(!no_key.is_tombstone());
}
#[test]
fn test_consumer_records_iteration() {
let records = vec![
ConsumerRecord::new("topic1", 0, 0, None, Some(Bytes::from("a"))),
ConsumerRecord::new("topic1", 0, 1, None, Some(Bytes::from("b"))),
ConsumerRecord::new("topic1", 1, 0, None, Some(Bytes::from("c"))),
];
let consumer_records = ConsumerRecords::from_records(records);
assert_eq!(consumer_records.count(), 3);
assert!(!consumer_records.is_empty());
let p0_records: Vec<_> = consumer_records
.records_for_partition("topic1", 0)
.collect();
assert_eq!(p0_records.len(), 2);
}
#[test]
fn test_consumer_records_partitions() {
let records = vec![
ConsumerRecord::new("topic1", 0, 0, None, None),
ConsumerRecord::new("topic1", 1, 0, None, None),
ConsumerRecord::new("topic2", 0, 0, None, None),
];
let consumer_records = ConsumerRecords::from_records(records);
assert_eq!(consumer_records.partitions().len(), 3);
}
#[test]
fn test_consumer_record_duplicate_headers_preserved() {
let mut record = ConsumerRecord::new("test-topic", 0, 0, None, Some(Bytes::from("value")));
record
.headers
.push((Bytes::from("trace-id"), Some(Bytes::from("abc"))));
record
.headers
.push((Bytes::from("trace-id"), Some(Bytes::from("def"))));
record
.headers
.push((Bytes::from("other"), Some(Bytes::from("xyz"))));
assert_eq!(
record.headers.len(),
3,
"all headers including duplicates should be preserved"
);
assert_eq!(
record.header(b"trace-id"),
Some(Some(&Bytes::from("abc"))),
"header() should return the first matching header value"
);
}
#[test]
fn test_consumer_record_headers_by_key() {
let mut record = ConsumerRecord::new("test-topic", 0, 0, None, Some(Bytes::from("value")));
record
.headers
.push((Bytes::from("trace-id"), Some(Bytes::from("first"))));
record
.headers
.push((Bytes::from("trace-id"), Some(Bytes::from("second"))));
record
.headers
.push((Bytes::from("trace-id"), Some(Bytes::from("third"))));
record
.headers
.push((Bytes::from("other-key"), Some(Bytes::from("other"))));
let trace_values = record.headers_by_key(b"trace-id");
assert_eq!(
trace_values.len(),
3,
"headers_by_key should return all values for a duplicate key"
);
assert_eq!(trace_values[0], Some(&Bytes::from("first")));
assert_eq!(trace_values[1], Some(&Bytes::from("second")));
assert_eq!(trace_values[2], Some(&Bytes::from("third")));
let other_values = record.headers_by_key(b"other-key");
assert_eq!(other_values.len(), 1);
let missing_values = record.headers_by_key(b"nonexistent");
assert!(
missing_values.is_empty(),
"headers_by_key for missing key should return empty vec"
);
}
#[test]
fn test_consumer_record_header_with_null_value() {
let mut record = ConsumerRecord::new("t", 0, 0, None, Some(Bytes::from("v")));
record.headers.push((Bytes::from("x-null"), None));
record
.headers
.push((Bytes::from("x-present"), Some(Bytes::from("data"))));
assert_eq!(record.header(b"x-null"), Some(None));
assert_eq!(
record.header(b"x-present"),
Some(Some(&Bytes::from("data")))
);
assert_eq!(record.header(b"missing"), None);
}
#[test]
fn test_consumer_record_header_value_skips_null() {
let mut record = ConsumerRecord::new("t", 0, 0, None, Some(Bytes::from("v")));
record.headers.push((Bytes::from("key"), None));
record
.headers
.push((Bytes::from("key"), Some(Bytes::from("real"))));
assert_eq!(record.header_value(b"key"), Some(&Bytes::from("real")));
let mut record2 = ConsumerRecord::new("t", 0, 0, None, None);
record2.headers.push((Bytes::from("all-null"), None));
assert_eq!(record2.header_value(b"all-null"), None);
}
#[test]
fn test_consumer_record_header_str_returns_none_for_null() {
let mut record = ConsumerRecord::new("t", 0, 0, None, None);
record.headers.push((Bytes::from("h"), None));
record
.headers
.push((Bytes::from("h2"), Some(Bytes::from("text"))));
assert_eq!(record.header_str(b"h"), None);
assert_eq!(record.header_str(b"h2"), Some("text"));
}
#[test]
fn test_consumer_record_headers_by_key_with_nulls() {
let mut record = ConsumerRecord::new("t", 0, 0, None, None);
record
.headers
.push((Bytes::from("k"), Some(Bytes::from("a"))));
record.headers.push((Bytes::from("k"), None));
record
.headers
.push((Bytes::from("k"), Some(Bytes::from("b"))));
let vals = record.headers_by_key(b"k");
assert_eq!(vals.len(), 3);
assert_eq!(vals[0], Some(&Bytes::from("a")));
assert_eq!(vals[1], None);
assert_eq!(vals[2], Some(&Bytes::from("b")));
}
}