use std::sync::Arc;
use bytes::Bytes;
use crate::error::{KrafkaError, Result};
use crate::protocol::{MAX_RECORD_HEADERS, RecordBatchBuilder, validate_topic_name};
use crate::{PartitionId, Timestamp};
#[non_exhaustive]
#[must_use]
#[derive(Debug, Clone)]
pub struct ProducerRecord {
pub topic: String,
pub partition: Option<PartitionId>,
pub key: Option<Bytes>,
pub value: Bytes,
pub timestamp: Option<Timestamp>,
pub headers: Vec<(String, Vec<u8>)>,
}
impl ProducerRecord {
pub fn new(topic: impl Into<String>, value: impl Into<Bytes>) -> Self {
Self {
topic: topic.into(),
partition: None,
key: None,
value: value.into(),
timestamp: None,
headers: Vec::new(),
}
}
pub fn with_partition(mut self, partition: PartitionId) -> Self {
self.partition = Some(partition);
self
}
pub fn with_key(mut self, key: impl Into<Bytes>) -> Self {
self.key = Some(key.into());
self
}
pub fn without_key(mut self) -> Self {
self.key = None;
self
}
pub fn with_timestamp(mut self, timestamp: Timestamp) -> Self {
self.timestamp = Some(timestamp);
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<Vec<u8>>) -> Self {
self.headers.push((key.into(), value.into()));
self
}
#[inline]
pub fn key_str(&self) -> Option<&str> {
self.key.as_ref().and_then(|k| std::str::from_utf8(k).ok())
}
#[inline]
pub fn value_str(&self) -> Option<&str> {
std::str::from_utf8(&self.value).ok()
}
#[inline]
pub fn estimated_size(&self) -> usize {
let key_size = self.key.as_ref().map(|k| k.len()).unwrap_or(0);
let value_size = self.value.len();
let headers_size: usize = self
.headers
.iter()
.map(|(k, v)| k.len() + v.len() + 8) .sum();
let topic_overhead = self.topic.len() + 64;
key_size + value_size + headers_size + topic_overhead
}
pub fn validate(&self) -> Result<()> {
validate_topic_name(&self.topic)?;
if let Some(ref key) = self.key
&& key.len() > i32::MAX as usize
{
return Err(KrafkaError::protocol(format!(
"record key length {} exceeds protocol limit of {}",
key.len(),
i32::MAX
)));
}
if self.value.len() > i32::MAX as usize {
return Err(KrafkaError::protocol(format!(
"record value length {} exceeds protocol limit of {}",
self.value.len(),
i32::MAX
)));
}
if self.headers.len() > MAX_RECORD_HEADERS {
return Err(KrafkaError::protocol(format!(
"record has {} headers, exceeding limit of {MAX_RECORD_HEADERS}",
self.headers.len()
)));
}
for (i, (key, value)) in self.headers.iter().enumerate() {
if key.len() > i32::MAX as usize {
return Err(KrafkaError::protocol(format!(
"header[{}] key length {} exceeds protocol limit of {}",
i,
key.len(),
i32::MAX
)));
}
if value.len() > i32::MAX as usize {
return Err(KrafkaError::protocol(format!(
"header[{}] value length {} exceeds protocol limit of {}",
i,
value.len(),
i32::MAX
)));
}
}
Ok(())
}
pub(crate) fn into_routed_parts(self) -> RoutedRecordParts {
let Self {
topic,
partition,
key,
value,
timestamp,
headers,
} = self;
RoutedRecordParts {
topic: Arc::<str>::from(topic),
partition,
record: RoutedRecord {
key,
value,
timestamp,
headers,
},
}
}
}
pub(crate) type TopicHandle = Arc<str>;
#[derive(Debug, Clone)]
pub(crate) struct RoutedRecord {
pub key: Option<Bytes>,
pub value: Bytes,
pub timestamp: Option<Timestamp>,
pub headers: Vec<(String, Vec<u8>)>,
}
impl RoutedRecord {
#[inline]
pub(crate) fn key_bytes(&self) -> Option<&[u8]> {
self.key.as_deref()
}
#[inline]
pub(crate) fn payload_size_bytes(&self) -> u64 {
self.value.len() as u64 + self.key.as_ref().map(|key| key.len() as u64).unwrap_or(0)
}
pub(crate) fn append_to_batch_builder(
&self,
batch_builder: RecordBatchBuilder,
) -> RecordBatchBuilder {
if self.headers.is_empty() {
batch_builder.add_record(self.key.clone(), Some(self.value.clone()))
} else {
batch_builder.add_record_with_headers(
self.key.clone(),
Some(self.value.clone()),
self.headers
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect(),
)
}
}
}
pub(crate) struct RoutedRecordParts {
pub topic: TopicHandle,
pub partition: Option<PartitionId>,
pub record: RoutedRecord,
}
#[non_exhaustive]
#[must_use = "contains the result of a send operation"]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RecordMetadata {
pub topic: String,
pub partition: PartitionId,
pub offset: i64,
pub timestamp: Timestamp,
}
impl RecordMetadata {
#[inline]
pub fn is_success(&self) -> bool {
self.offset >= 0
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_producer_record_new() {
let record = ProducerRecord::new("test-topic", b"hello".to_vec());
assert_eq!(record.topic, "test-topic");
assert_eq!(record.value.as_ref(), b"hello");
assert!(record.key.is_none());
assert!(record.partition.is_none());
}
#[test]
fn test_producer_record_with_key() {
let record =
ProducerRecord::new("test-topic", b"hello".to_vec()).with_key(b"my-key".to_vec());
assert_eq!(record.key, Some(Bytes::from_static(b"my-key")));
assert_eq!(record.key_str(), Some("my-key"));
}
#[test]
fn test_producer_record_with_partition() {
let record = ProducerRecord::new("test-topic", b"hello".to_vec()).with_partition(5);
assert_eq!(record.partition, Some(5));
}
#[test]
fn test_producer_record_with_headers() {
let record = ProducerRecord::new("test-topic", b"hello".to_vec())
.with_header("h1", b"v1".to_vec())
.with_header("h2", b"v2".to_vec());
assert_eq!(record.headers.len(), 2);
assert_eq!(record.headers[0].0, "h1");
assert_eq!(record.headers[1].0, "h2");
}
#[test]
fn test_producer_record_estimated_size() {
let record =
ProducerRecord::new("test-topic", b"hello world".to_vec()).with_key(b"key".to_vec());
let size = record.estimated_size();
assert!(size > 3 + 11); }
#[test]
fn test_producer_record_into_routed_parts() {
let record = ProducerRecord::new("test-topic", b"hello".to_vec())
.with_partition(2)
.with_key(b"key".to_vec())
.with_timestamp(1234)
.with_header("h1", b"v1".to_vec());
let routed = record.into_routed_parts();
assert_eq!(routed.topic.as_ref(), "test-topic");
assert_eq!(routed.partition, Some(2));
assert_eq!(routed.record.key, Some(Bytes::from_static(b"key")));
assert_eq!(routed.record.value, Bytes::from_static(b"hello"));
assert_eq!(routed.record.timestamp, Some(1234));
assert_eq!(routed.record.headers.len(), 1);
assert_eq!(routed.record.headers[0].0, "h1");
}
#[test]
fn test_record_metadata() {
let metadata = RecordMetadata {
topic: "test".to_string(),
partition: 0,
offset: 42,
timestamp: 1234567890000,
};
assert!(metadata.is_success());
assert_eq!(metadata.offset, 42);
}
#[test]
fn test_validate_valid_record() {
let record = ProducerRecord::new("topic", b"value".to_vec())
.with_key(b"key".to_vec())
.with_header("h1", b"v1".to_vec());
assert!(record.validate().is_ok());
}
#[test]
fn test_validate_rejects_oversized_topic() {
let record = ProducerRecord::new("x".repeat(i16::MAX as usize + 1), b"v".to_vec());
let err = record.validate().unwrap_err().to_string();
assert!(err.contains("topic name length"), "unexpected: {err}");
}
#[test]
fn test_validate_accepts_header_key_within_i32_limit() {
let record = ProducerRecord::new("topic", b"v".to_vec())
.with_header("x".repeat(i16::MAX as usize + 1), b"v".to_vec());
assert!(record.validate().is_ok());
}
#[test]
fn test_validate_accepts_max_valid_sizes() {
let record = ProducerRecord::new("a".repeat(i16::MAX as usize), b"v".to_vec());
assert!(record.validate().is_ok());
}
#[test]
fn test_without_key_clears_key() {
let record = ProducerRecord::new("topic", b"value".to_vec())
.with_key("my-key")
.without_key();
assert!(record.key.is_none());
}
#[test]
fn test_validate_rejects_empty_topic() {
let record = ProducerRecord::new("", b"value".to_vec());
let err = record.validate().unwrap_err().to_string();
assert!(err.contains("empty"), "unexpected: {err}");
}
#[test]
fn test_record_metadata_equality() {
let a = RecordMetadata {
topic: "t".to_string(),
partition: 0,
offset: 1,
timestamp: 100,
};
let b = RecordMetadata {
topic: "t".to_string(),
partition: 0,
offset: 1,
timestamp: 100,
};
assert_eq!(a, b);
}
}