hyperi_rustlib/transport/grpc/batch.rs
1// Project: hyperi-rustlib
2// File: src/transport/grpc/batch.rs
3// Purpose: Native batch transport -- WorkBatch <-> proto Batch wire mapper
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Native batch transport wire mapper (Task 0.6)
10//!
11//! Serde-less rustlib<->rustlib (DFE<->DFE) transfer of a whole
12//! [`WorkBatch`](crate::transport::WorkBatch) over the existing gRPC mesh. One
13//! `WorkBatch` maps to one proto [`Batch`](super::proto::Batch) and travels in a
14//! single `RouteBatch` RPC -- batch-at-a-time, NOT record-by-record streaming.
15//!
16//! ## Payloads are OPAQUE in transit
17//!
18//! Each [`Record`] payload maps straight onto the proto `Record.payload`
19//! `bytes` field. prost decodes that field ZERO-COPY into [`bytes::Bytes`]
20//! (`.bytes(".")` in `build.rs`), so the receive-side payload is a refcounted
21//! view of the decode buffer. The JSON / MsgPack codec
22//! ([`crate::transport::codec`]) is NEVER invoked here -- non-JSON/MsgPack
23//! bytes survive a round-trip byte-identical.
24//!
25//! ## Swappable wire
26//!
27//! The mapping is isolated to this module: a future hand-rolled frame could
28//! replace [`records_to_proto`] / [`proto_batch_to_records`] without touching
29//! the `WorkBatch` types or the transport seam.
30//!
31//! ## What does NOT cross the wire
32//!
33//! `commit_tokens` and `dlq_entries` are left off the proto `Batch`. Commit
34//! tokens are the SENDER's source acks -- fired locally after send
35//! (at-least-once), meaningless on the receiver. Inline-DLQ entries are a local
36//! no-silent-drop concern. So the mapper takes / returns records, not the whole
37//! `WorkBatch<T>` -- which also keeps the `CommitToken` generic off the wire.
38
39use super::proto;
40use crate::transport::types::PayloadFormat;
41use crate::transport::work_batch::{Record, RecordMeta};
42use bytes::Bytes;
43use std::sync::Arc;
44
45/// Map a rustlib [`PayloadFormat`] onto the proto [`Format`](proto::Format).
46fn format_to_proto(format: PayloadFormat) -> proto::Format {
47 match format {
48 PayloadFormat::Auto => proto::Format::Auto,
49 PayloadFormat::Json => proto::Format::Json,
50 PayloadFormat::MsgPack => proto::Format::Msgpack,
51 }
52}
53
54/// Map a proto [`Format`](proto::Format) back onto a rustlib [`PayloadFormat`].
55///
56/// `FORMAT_ARROW_IPC` has no rustlib equivalent yet, so it collapses to
57/// [`PayloadFormat::Auto`] (the safe "detect later" default). An out-of-range
58/// enum value (forward-compat from a newer peer) likewise maps to `Auto`.
59fn format_from_proto(format: i32) -> PayloadFormat {
60 match proto::Format::try_from(format) {
61 Ok(proto::Format::Json) => PayloadFormat::Json,
62 Ok(proto::Format::Msgpack) => PayloadFormat::MsgPack,
63 // FORMAT_AUTO, FORMAT_ARROW_IPC, or an unknown future value.
64 _ => PayloadFormat::Auto,
65 }
66}
67
68/// Map one rustlib [`Record`] onto a proto [`Record`](proto::Record).
69///
70/// The payload `Bytes` handle is MOVED onto the proto field (no copy). `key`
71/// `None` becomes the empty string; `Some(k)` carries the key text.
72fn record_to_proto(record: Record) -> proto::Record {
73 let (timestamp_ms, has_timestamp_ms) = match record.metadata.timestamp_ms {
74 Some(ts) => (ts, true),
75 None => (0, false),
76 };
77
78 proto::Record {
79 payload: record.payload,
80 key: record.key.as_deref().unwrap_or("").to_string(),
81 headers: record
82 .headers
83 .into_iter()
84 .map(|(key, value)| proto::Header {
85 key,
86 value: Bytes::from(value),
87 })
88 .collect(),
89 timestamp_ms,
90 has_timestamp_ms,
91 format: format_to_proto(record.metadata.format).into(),
92 }
93}
94
95/// Map a proto [`Record`](proto::Record) back onto a rustlib [`Record`].
96///
97/// The payload is the prost-decoded [`Bytes`] handle MOVED across (zero-copy --
98/// a refcounted view of the decode buffer). An empty `key` string maps back to
99/// `None`.
100fn record_from_proto(record: proto::Record) -> Record {
101 let key = if record.key.is_empty() {
102 None
103 } else {
104 Some(Arc::from(record.key.as_str()))
105 };
106
107 Record {
108 payload: record.payload,
109 key,
110 headers: record
111 .headers
112 .into_iter()
113 .map(|h| (h.key, h.value.to_vec()))
114 .collect(),
115 metadata: RecordMeta {
116 timestamp_ms: if record.has_timestamp_ms {
117 Some(record.timestamp_ms)
118 } else {
119 None
120 },
121 format: format_from_proto(record.format),
122 },
123 }
124}
125
126/// Map a slice of rustlib [`Record`]s onto a proto [`Batch`](proto::Batch).
127///
128/// SEND-side mapper. Takes records, not the whole `WorkBatch<T>` (commit
129/// tokens and DLQ entries do not cross the wire -- see module docs). Payloads
130/// are moved, not copied.
131#[must_use]
132pub fn records_to_proto(records: Vec<Record>) -> proto::Batch {
133 proto::Batch {
134 records: records.into_iter().map(record_to_proto).collect(),
135 }
136}
137
138/// Map a proto [`Batch`](proto::Batch) back onto a `Vec<Record>`.
139///
140/// RECEIVE-side mapper. The caller wraps the returned records in a fresh
141/// [`WorkBatch`](crate::transport::WorkBatch) (attaching its own commit
142/// tokens). Payloads are zero-copy [`Bytes`] views of the decode buffer.
143#[must_use]
144pub fn proto_batch_to_records(batch: proto::Batch) -> Vec<Record> {
145 batch.records.into_iter().map(record_from_proto).collect()
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151
152 /// Build a record with all fields populated (incl. non-UTF8 payload).
153 fn full_record() -> Record {
154 Record {
155 // Deliberately NOT valid JSON or MsgPack, and includes non-UTF8
156 // bytes -- proves no codec runs on the payload in transit.
157 payload: Bytes::from_static(&[0x00, 0xff, 0xfe, b'{', 0x80, b'a']),
158 key: Some(Arc::from("events.land")),
159 headers: vec![
160 ("trace".to_string(), vec![0x01, 0x02, 0x03]),
161 ("source".to_string(), b"loader".to_vec()),
162 ],
163 metadata: RecordMeta {
164 timestamp_ms: Some(1_717_000_000_123),
165 format: PayloadFormat::Json,
166 },
167 }
168 }
169
170 /// Round-trip ONE record through proto and back, asserting byte-equality on
171 /// every field. `proto::Record::payload` must be `Bytes` (typed below).
172 #[test]
173 fn record_round_trips_byte_identical() {
174 let original = full_record();
175 let p = record_to_proto(original.clone());
176
177 // The proto payload field is `bytes::Bytes` (zero-copy config). This
178 // line would not compile if prost emitted `Vec<u8>` here.
179 let _: &Bytes = &p.payload;
180
181 let back = record_from_proto(p);
182 assert_eq!(back, original);
183 }
184
185 /// Whole-batch round-trip with several records, including binary payloads
186 /// with non-UTF8 bytes, keys, headers, and timestamps. Asserts every field
187 /// is byte-identical after a real prost encode/decode cycle.
188 #[test]
189 fn batch_round_trips_through_prost_encode_decode() {
190 use prost::Message as _;
191
192 let records = vec![
193 full_record(),
194 Record {
195 payload: Bytes::from_static(b"plain text not json"),
196 key: None,
197 headers: Vec::new(),
198 metadata: RecordMeta {
199 timestamp_ms: None,
200 format: PayloadFormat::Auto,
201 },
202 },
203 Record {
204 // MsgPack fixmap byte lead.
205 payload: Bytes::from_static(&[0x81, 0xa1, b'a', 0x01]),
206 key: Some(Arc::from("metrics")),
207 headers: vec![("k".to_string(), Vec::new())],
208 metadata: RecordMeta {
209 timestamp_ms: Some(0), // genuine zero, must survive as Some(0)
210 format: PayloadFormat::MsgPack,
211 },
212 },
213 ];
214
215 // Map -> encode (prost) -> decode (prost) -> map back.
216 let proto_batch = records_to_proto(records.clone());
217 let encoded = proto_batch.encode_to_vec();
218 let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
219 let back = proto_batch_to_records(decoded);
220
221 assert_eq!(back.len(), records.len());
222 for (a, b) in back.iter().zip(records.iter()) {
223 assert_eq!(a, b, "record mismatch after wire round-trip");
224 }
225 // The genuine zero timestamp survived as Some(0), not None.
226 assert_eq!(back[2].metadata.timestamp_ms, Some(0));
227 }
228
229 /// A payload that is NEITHER valid JSON NOR valid MsgPack survives the wire
230 /// round-trip byte-for-byte. If any codec ran on the payload in transit it
231 /// would fail to parse or mutate the bytes -- proving the payload is opaque.
232 #[test]
233 fn non_codec_payload_survives_round_trip_opaque() {
234 use prost::Message as _;
235
236 // Random binary: not parseable as JSON or MsgPack.
237 let raw: Vec<u8> = (0u8..=255).cycle().take(777).collect();
238 let payload = Bytes::from(raw.clone());
239
240 let records = vec![Record {
241 payload,
242 key: Some(Arc::from("k")),
243 headers: Vec::new(),
244 metadata: RecordMeta {
245 timestamp_ms: None,
246 format: PayloadFormat::Auto,
247 },
248 }];
249
250 let encoded = records_to_proto(records).encode_to_vec();
251 let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
252 let back = proto_batch_to_records(decoded);
253
254 assert_eq!(back.len(), 1);
255 assert_eq!(back[0].payload.as_ref(), raw.as_slice());
256 }
257
258 /// An empty batch round-trips to an empty batch.
259 #[test]
260 fn empty_batch_round_trips() {
261 use prost::Message as _;
262
263 let encoded = records_to_proto(Vec::new()).encode_to_vec();
264 let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
265 let back = proto_batch_to_records(decoded);
266 assert!(back.is_empty());
267 }
268
269 /// A large-ish batch (1000 small records) round-trips intact.
270 #[test]
271 fn large_batch_round_trips() {
272 use prost::Message as _;
273
274 let records: Vec<Record> = (0..1000u32)
275 .map(|i| Record {
276 payload: Bytes::from(format!("{{\"i\":{i}}}").into_bytes()),
277 key: Some(Arc::from("bulk")),
278 headers: vec![("seq".to_string(), i.to_le_bytes().to_vec())],
279 metadata: RecordMeta {
280 timestamp_ms: Some(i64::from(i)),
281 format: PayloadFormat::Json,
282 },
283 })
284 .collect();
285
286 let encoded = records_to_proto(records.clone()).encode_to_vec();
287 let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
288 let back = proto_batch_to_records(decoded);
289
290 assert_eq!(back.len(), 1000);
291 assert_eq!(back, records);
292 }
293
294 /// The proto `Record.payload` field is `bytes::Bytes` (not `Vec<u8>`).
295 /// This is the build.rs `.bytes(".")` config taking effect; a regression to
296 /// the default `Vec<u8>` would fail to compile this assignment.
297 #[test]
298 fn proto_payload_field_is_bytes_type() {
299 let r = proto::Record {
300 payload: Bytes::from_static(b"x"),
301 ..Default::default()
302 };
303 let p: Bytes = r.payload;
304 assert_eq!(p.as_ref(), b"x");
305 }
306
307 /// Empty key string maps to `None`, and `None` maps back to empty string.
308 #[test]
309 fn empty_key_maps_to_none() {
310 let r = Record {
311 payload: Bytes::from_static(b"x"),
312 key: None,
313 headers: Vec::new(),
314 metadata: RecordMeta {
315 timestamp_ms: None,
316 format: PayloadFormat::Auto,
317 },
318 };
319 let p = record_to_proto(r);
320 assert_eq!(p.key, "");
321 let back = record_from_proto(p);
322 assert_eq!(back.key, None);
323 }
324
325 /// FORMAT_ARROW_IPC (no rustlib equivalent) maps back to `Auto`.
326 #[test]
327 fn arrow_format_collapses_to_auto() {
328 let p = proto::Record {
329 payload: Bytes::from_static(b"x"),
330 format: proto::Format::ArrowIpc.into(),
331 ..Default::default()
332 };
333 let back = record_from_proto(p);
334 assert_eq!(back.metadata.format, PayloadFormat::Auto);
335 }
336}