use bytes::Bytes;
use serde_wincode::SerdeCompat;
use wincode::{Deserialize as _, Serialize as _};
use crabka_protocol::records::Record;
use crate::records::MetadataRecord;
#[derive(Debug, thiserror::Error)]
pub enum KafkaRecordError {
#[error("metadata record has no value payload")]
MissingValue,
#[error("wincode decode failed: {0}")]
Decode(String),
#[error("wincode encode failed: {0}")]
Encode(String),
}
pub fn to_kafka_record(rec: &MetadataRecord) -> Result<Record, KafkaRecordError> {
let payload = <SerdeCompat<MetadataRecord>>::serialize(rec)
.map_err(|e| KafkaRecordError::Encode(e.to_string()))?;
Ok(Record {
key: None,
value: Some(Bytes::from(payload)),
..Default::default()
})
}
pub fn from_kafka_record(rec: &Record) -> Result<MetadataRecord, KafkaRecordError> {
let value = rec.value.as_ref().ok_or(KafkaRecordError::MissingValue)?;
<SerdeCompat<MetadataRecord>>::deserialize(value)
.map_err(|e| KafkaRecordError::Decode(e.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::records::{MetadataRecord, TopicRecord};
use assert2::assert;
use uuid::Uuid;
fn sample_topic() -> MetadataRecord {
MetadataRecord::V1Topic(TopicRecord {
name: "orders".into(),
topic_id: Uuid::from_u128(0x1234_5678_9abc_def0),
partitions: 6,
replication_factor: 3,
})
}
#[test]
fn round_trips_through_kafka_record() {
let rec = sample_topic();
let kafka = to_kafka_record(&rec).expect("encode");
let back = from_kafka_record(&kafka).expect("decode");
assert!(rec == back);
}
#[test]
fn key_is_none_and_value_is_present() {
let kafka = to_kafka_record(&sample_topic()).expect("encode");
assert!(kafka.key.is_none());
assert!(kafka.value.is_some());
}
#[test]
fn missing_value_is_an_error() {
let empty = Record::default();
assert!(matches!(
from_kafka_record(&empty),
Err(KafkaRecordError::MissingValue)
));
}
#[test]
fn encoding_is_stable_across_calls() {
let rec = sample_topic();
let a = to_kafka_record(&rec).expect("encode");
let b = to_kafka_record(&rec).expect("encode");
assert!(a.value == b.value);
}
}