use super::proto;
use crate::transport::types::PayloadFormat;
use crate::transport::work_batch::{Record, RecordMeta};
use bytes::Bytes;
use std::sync::Arc;
fn format_to_proto(format: PayloadFormat) -> proto::Format {
match format {
PayloadFormat::Auto => proto::Format::Auto,
PayloadFormat::Json => proto::Format::Json,
PayloadFormat::MsgPack => proto::Format::Msgpack,
}
}
fn format_from_proto(format: i32) -> PayloadFormat {
match proto::Format::try_from(format) {
Ok(proto::Format::Json) => PayloadFormat::Json,
Ok(proto::Format::Msgpack) => PayloadFormat::MsgPack,
_ => PayloadFormat::Auto,
}
}
fn record_to_proto(record: Record) -> proto::Record {
let (timestamp_ms, has_timestamp_ms) = match record.metadata.timestamp_ms {
Some(ts) => (ts, true),
None => (0, false),
};
proto::Record {
payload: record.payload,
key: record.key.as_deref().unwrap_or("").to_string(),
headers: record
.headers
.into_iter()
.map(|(key, value)| proto::Header {
key,
value: Bytes::from(value),
})
.collect(),
timestamp_ms,
has_timestamp_ms,
format: format_to_proto(record.metadata.format).into(),
}
}
fn record_from_proto(record: proto::Record) -> Record {
let key = if record.key.is_empty() {
None
} else {
Some(Arc::from(record.key.as_str()))
};
Record {
payload: record.payload,
key,
headers: record
.headers
.into_iter()
.map(|h| (h.key, h.value.to_vec()))
.collect(),
metadata: RecordMeta {
timestamp_ms: if record.has_timestamp_ms {
Some(record.timestamp_ms)
} else {
None
},
format: format_from_proto(record.format),
},
}
}
#[must_use]
pub fn records_to_proto(records: Vec<Record>) -> proto::Batch {
proto::Batch {
records: records.into_iter().map(record_to_proto).collect(),
}
}
#[must_use]
pub fn proto_batch_to_records(batch: proto::Batch) -> Vec<Record> {
batch.records.into_iter().map(record_from_proto).collect()
}
#[cfg(test)]
mod tests {
use super::*;
fn full_record() -> Record {
Record {
payload: Bytes::from_static(&[0x00, 0xff, 0xfe, b'{', 0x80, b'a']),
key: Some(Arc::from("events.land")),
headers: vec![
("trace".to_string(), vec![0x01, 0x02, 0x03]),
("source".to_string(), b"loader".to_vec()),
],
metadata: RecordMeta {
timestamp_ms: Some(1_717_000_000_123),
format: PayloadFormat::Json,
},
}
}
#[test]
fn record_round_trips_byte_identical() {
let original = full_record();
let p = record_to_proto(original.clone());
let _: &Bytes = &p.payload;
let back = record_from_proto(p);
assert_eq!(back, original);
}
#[test]
fn batch_round_trips_through_prost_encode_decode() {
use prost::Message as _;
let records = vec![
full_record(),
Record {
payload: Bytes::from_static(b"plain text not json"),
key: None,
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Auto,
},
},
Record {
payload: Bytes::from_static(&[0x81, 0xa1, b'a', 0x01]),
key: Some(Arc::from("metrics")),
headers: vec![("k".to_string(), Vec::new())],
metadata: RecordMeta {
timestamp_ms: Some(0), format: PayloadFormat::MsgPack,
},
},
];
let proto_batch = records_to_proto(records.clone());
let encoded = proto_batch.encode_to_vec();
let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
let back = proto_batch_to_records(decoded);
assert_eq!(back.len(), records.len());
for (a, b) in back.iter().zip(records.iter()) {
assert_eq!(a, b, "record mismatch after wire round-trip");
}
assert_eq!(back[2].metadata.timestamp_ms, Some(0));
}
#[test]
fn non_codec_payload_survives_round_trip_opaque() {
use prost::Message as _;
let raw: Vec<u8> = (0u8..=255).cycle().take(777).collect();
let payload = Bytes::from(raw.clone());
let records = vec![Record {
payload,
key: Some(Arc::from("k")),
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Auto,
},
}];
let encoded = records_to_proto(records).encode_to_vec();
let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
let back = proto_batch_to_records(decoded);
assert_eq!(back.len(), 1);
assert_eq!(back[0].payload.as_ref(), raw.as_slice());
}
#[test]
fn empty_batch_round_trips() {
use prost::Message as _;
let encoded = records_to_proto(Vec::new()).encode_to_vec();
let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
let back = proto_batch_to_records(decoded);
assert!(back.is_empty());
}
#[test]
fn large_batch_round_trips() {
use prost::Message as _;
let records: Vec<Record> = (0..1000u32)
.map(|i| Record {
payload: Bytes::from(format!("{{\"i\":{i}}}").into_bytes()),
key: Some(Arc::from("bulk")),
headers: vec![("seq".to_string(), i.to_le_bytes().to_vec())],
metadata: RecordMeta {
timestamp_ms: Some(i64::from(i)),
format: PayloadFormat::Json,
},
})
.collect();
let encoded = records_to_proto(records.clone()).encode_to_vec();
let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
let back = proto_batch_to_records(decoded);
assert_eq!(back.len(), 1000);
assert_eq!(back, records);
}
#[test]
fn proto_payload_field_is_bytes_type() {
let r = proto::Record {
payload: Bytes::from_static(b"x"),
..Default::default()
};
let p: Bytes = r.payload;
assert_eq!(p.as_ref(), b"x");
}
#[test]
fn empty_key_maps_to_none() {
let r = Record {
payload: Bytes::from_static(b"x"),
key: None,
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Auto,
},
};
let p = record_to_proto(r);
assert_eq!(p.key, "");
let back = record_from_proto(p);
assert_eq!(back.key, None);
}
#[test]
fn arrow_format_collapses_to_auto() {
let p = proto::Record {
payload: Bytes::from_static(b"x"),
format: proto::Format::ArrowIpc.into(),
..Default::default()
};
let back = record_from_proto(p);
assert_eq!(back.metadata.format, PayloadFormat::Auto);
}
}