hyperi_rustlib/transport/grpc/batch.rs
1// Project: hyperi-rustlib
2// File: src/transport/grpc/batch.rs
3// Purpose: Native batch transport -- WorkBatch <-> proto Batch wire mapper
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Native batch transport wire mapper (Task 0.6)
10//!
11//! Serde-less rustlib<->rustlib (DFE<->DFE) transfer of a whole
12//! [`WorkBatch`](crate::transport::WorkBatch) over the existing gRPC mesh. One
13//! `WorkBatch` maps to one proto [`Batch`](super::proto::Batch) and travels in a
14//! single `RouteBatch` RPC -- batch-at-a-time, NOT record-by-record streaming.
15//!
16//! ## Payloads are OPAQUE in transit
17//!
18//! Each [`Record`] payload is mapped straight onto the
19//! proto `Record.payload` `bytes` field and back. prost is configured (in
20//! `build.rs`, `.bytes(".")`) to decode that field ZERO-COPY into
21//! [`bytes::Bytes`], so on the receive side the payload is a refcounted view of
22//! the decode buffer, never copied. The JSON / MsgPack codec
23//! ([`crate::transport::codec`]) is NEVER invoked here -- the bytes pass through
24//! untouched, so a payload that is not valid JSON or MsgPack survives a
25//! round-trip byte-identical.
26//!
27//! ## Swappable wire
28//!
29//! The wire is protobuf BY DEFAULT but the mapping is isolated to this module:
30//! a future hand-rolled frame could replace [`records_to_proto`] /
31//! [`proto_batch_to_records`] without touching the `WorkBatch` types or the
32//! transport seam.
33//!
34//! ## What does NOT cross the wire
35//!
36//! `commit_tokens` and `dlq_entries` are deliberately left off the proto `Batch`.
37//! Commit tokens are the SENDER's source acks -- fired locally after the block
38//! is sent (at-least-once), they have no meaning on the receiver. Inline-DLQ
39//! entries are a local no-silent-drop concern. So the mapper takes / returns the
40//! records (`&[Record]` / `Vec<Record>`), not the whole `WorkBatch<T>`; this
41//! also avoids dragging the `CommitToken` generic onto the wire path.
42
43use super::proto;
44use crate::transport::types::PayloadFormat;
45use crate::transport::work_batch::{Record, RecordMeta};
46use bytes::Bytes;
47use std::sync::Arc;
48
49/// Map a rustlib [`PayloadFormat`] onto the proto [`Format`](proto::Format).
50fn format_to_proto(format: PayloadFormat) -> proto::Format {
51 match format {
52 PayloadFormat::Auto => proto::Format::Auto,
53 PayloadFormat::Json => proto::Format::Json,
54 PayloadFormat::MsgPack => proto::Format::Msgpack,
55 }
56}
57
58/// Map a proto [`Format`](proto::Format) back onto a rustlib [`PayloadFormat`].
59///
60/// `FORMAT_ARROW_IPC` has no rustlib equivalent yet, so it collapses to
61/// [`PayloadFormat::Auto`] (the safe "detect later" default). An out-of-range
62/// enum value (forward-compat from a newer peer) likewise maps to `Auto`.
63fn format_from_proto(format: i32) -> PayloadFormat {
64 match proto::Format::try_from(format) {
65 Ok(proto::Format::Json) => PayloadFormat::Json,
66 Ok(proto::Format::Msgpack) => PayloadFormat::MsgPack,
67 // FORMAT_AUTO, FORMAT_ARROW_IPC, or an unknown future value.
68 _ => PayloadFormat::Auto,
69 }
70}
71
72/// Map one rustlib [`Record`] onto a proto [`Record`](proto::Record).
73///
74/// The payload `Bytes` handle is MOVED onto the proto field (no copy). `key`
75/// `None` becomes the empty string; `Some(k)` carries the key text.
76fn record_to_proto(record: Record) -> proto::Record {
77 let (timestamp_ms, has_timestamp_ms) = match record.metadata.timestamp_ms {
78 Some(ts) => (ts, true),
79 None => (0, false),
80 };
81
82 proto::Record {
83 payload: record.payload,
84 key: record.key.as_deref().unwrap_or("").to_string(),
85 headers: record
86 .headers
87 .into_iter()
88 .map(|(key, value)| proto::Header {
89 key,
90 value: Bytes::from(value),
91 })
92 .collect(),
93 timestamp_ms,
94 has_timestamp_ms,
95 format: format_to_proto(record.metadata.format).into(),
96 }
97}
98
99/// Map a proto [`Record`](proto::Record) back onto a rustlib [`Record`].
100///
101/// The payload is the prost-decoded [`Bytes`] handle MOVED across (zero-copy --
102/// a refcounted view of the decode buffer). An empty `key` string maps back to
103/// `None`.
104fn record_from_proto(record: proto::Record) -> Record {
105 let key = if record.key.is_empty() {
106 None
107 } else {
108 Some(Arc::from(record.key.as_str()))
109 };
110
111 Record {
112 payload: record.payload,
113 key,
114 headers: record
115 .headers
116 .into_iter()
117 .map(|h| (h.key, h.value.to_vec()))
118 .collect(),
119 metadata: RecordMeta {
120 timestamp_ms: if record.has_timestamp_ms {
121 Some(record.timestamp_ms)
122 } else {
123 None
124 },
125 format: format_from_proto(record.format),
126 },
127 }
128}
129
130/// Map a slice of rustlib [`Record`]s onto a proto [`Batch`](proto::Batch).
131///
132/// This is the SEND-side mapper. It takes the records (NOT the whole
133/// `WorkBatch<T>`) because commit tokens and DLQ entries do not cross the wire
134/// (see the module docs). Payloads are moved, not copied.
135#[must_use]
136pub fn records_to_proto(records: Vec<Record>) -> proto::Batch {
137 proto::Batch {
138 records: records.into_iter().map(record_to_proto).collect(),
139 }
140}
141
142/// Map a proto [`Batch`](proto::Batch) back onto a `Vec<Record>`.
143///
144/// This is the RECEIVE-side mapper. The caller wraps the returned records in a
145/// fresh [`WorkBatch`](crate::transport::WorkBatch) (attaching its own commit
146/// tokens). Payloads are zero-copy [`Bytes`] views of the decode buffer.
147#[must_use]
148pub fn proto_batch_to_records(batch: proto::Batch) -> Vec<Record> {
149 batch.records.into_iter().map(record_from_proto).collect()
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 /// Build a record with all fields populated (incl. non-UTF8 payload).
157 fn full_record() -> Record {
158 Record {
159 // Deliberately NOT valid JSON or MsgPack, and includes non-UTF8
160 // bytes -- proves no codec runs on the payload in transit.
161 payload: Bytes::from_static(&[0x00, 0xff, 0xfe, b'{', 0x80, b'a']),
162 key: Some(Arc::from("events.land")),
163 headers: vec![
164 ("trace".to_string(), vec![0x01, 0x02, 0x03]),
165 ("source".to_string(), b"loader".to_vec()),
166 ],
167 metadata: RecordMeta {
168 timestamp_ms: Some(1_717_000_000_123),
169 format: PayloadFormat::Json,
170 },
171 }
172 }
173
174 /// Round-trip ONE record through proto and back, asserting byte-equality on
175 /// every field. `proto::Record::payload` must be `Bytes` (typed below).
176 #[test]
177 fn record_round_trips_byte_identical() {
178 let original = full_record();
179 let p = record_to_proto(original.clone());
180
181 // The proto payload field is `bytes::Bytes` (zero-copy config). This
182 // line would not compile if prost emitted `Vec<u8>` here.
183 let _: &Bytes = &p.payload;
184
185 let back = record_from_proto(p);
186 assert_eq!(back, original);
187 }
188
189 /// Whole-batch round-trip with several records, including binary payloads
190 /// with non-UTF8 bytes, keys, headers, and timestamps. Asserts every field
191 /// is byte-identical after a real prost encode/decode cycle.
192 #[test]
193 fn batch_round_trips_through_prost_encode_decode() {
194 use prost::Message as _;
195
196 let records = vec![
197 full_record(),
198 Record {
199 payload: Bytes::from_static(b"plain text not json"),
200 key: None,
201 headers: Vec::new(),
202 metadata: RecordMeta {
203 timestamp_ms: None,
204 format: PayloadFormat::Auto,
205 },
206 },
207 Record {
208 // MsgPack fixmap byte lead.
209 payload: Bytes::from_static(&[0x81, 0xa1, b'a', 0x01]),
210 key: Some(Arc::from("metrics")),
211 headers: vec![("k".to_string(), Vec::new())],
212 metadata: RecordMeta {
213 timestamp_ms: Some(0), // genuine zero, must survive as Some(0)
214 format: PayloadFormat::MsgPack,
215 },
216 },
217 ];
218
219 // Map -> encode (prost) -> decode (prost) -> map back.
220 let proto_batch = records_to_proto(records.clone());
221 let encoded = proto_batch.encode_to_vec();
222 let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
223 let back = proto_batch_to_records(decoded);
224
225 assert_eq!(back.len(), records.len());
226 for (a, b) in back.iter().zip(records.iter()) {
227 assert_eq!(a, b, "record mismatch after wire round-trip");
228 }
229 // The genuine zero timestamp survived as Some(0), not None.
230 assert_eq!(back[2].metadata.timestamp_ms, Some(0));
231 }
232
233 /// A payload that is NEITHER valid JSON NOR valid MsgPack survives the wire
234 /// round-trip byte-for-byte. If any codec ran on the payload in transit it
235 /// would fail to parse or mutate the bytes -- proving the payload is opaque.
236 #[test]
237 fn non_codec_payload_survives_round_trip_opaque() {
238 use prost::Message as _;
239
240 // Random binary: not parseable as JSON or MsgPack.
241 let raw: Vec<u8> = (0u8..=255).cycle().take(777).collect();
242 let payload = Bytes::from(raw.clone());
243
244 let records = vec![Record {
245 payload,
246 key: Some(Arc::from("k")),
247 headers: Vec::new(),
248 metadata: RecordMeta {
249 timestamp_ms: None,
250 format: PayloadFormat::Auto,
251 },
252 }];
253
254 let encoded = records_to_proto(records).encode_to_vec();
255 let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
256 let back = proto_batch_to_records(decoded);
257
258 assert_eq!(back.len(), 1);
259 assert_eq!(back[0].payload.as_ref(), raw.as_slice());
260 }
261
262 /// An empty batch round-trips to an empty batch.
263 #[test]
264 fn empty_batch_round_trips() {
265 use prost::Message as _;
266
267 let encoded = records_to_proto(Vec::new()).encode_to_vec();
268 let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
269 let back = proto_batch_to_records(decoded);
270 assert!(back.is_empty());
271 }
272
273 /// A large-ish batch (1000 small records) round-trips intact.
274 #[test]
275 fn large_batch_round_trips() {
276 use prost::Message as _;
277
278 let records: Vec<Record> = (0..1000u32)
279 .map(|i| Record {
280 payload: Bytes::from(format!("{{\"i\":{i}}}").into_bytes()),
281 key: Some(Arc::from("bulk")),
282 headers: vec![("seq".to_string(), i.to_le_bytes().to_vec())],
283 metadata: RecordMeta {
284 timestamp_ms: Some(i64::from(i)),
285 format: PayloadFormat::Json,
286 },
287 })
288 .collect();
289
290 let encoded = records_to_proto(records.clone()).encode_to_vec();
291 let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
292 let back = proto_batch_to_records(decoded);
293
294 assert_eq!(back.len(), 1000);
295 assert_eq!(back, records);
296 }
297
298 /// The proto `Record.payload` field is `bytes::Bytes` (not `Vec<u8>`).
299 /// This is the build.rs `.bytes(".")` config taking effect; a regression to
300 /// the default `Vec<u8>` would fail to compile this assignment.
301 #[test]
302 fn proto_payload_field_is_bytes_type() {
303 let r = proto::Record {
304 payload: Bytes::from_static(b"x"),
305 ..Default::default()
306 };
307 let p: Bytes = r.payload;
308 assert_eq!(p.as_ref(), b"x");
309 }
310
311 /// Empty key string maps to `None`, and `None` maps back to empty string.
312 #[test]
313 fn empty_key_maps_to_none() {
314 let r = Record {
315 payload: Bytes::from_static(b"x"),
316 key: None,
317 headers: Vec::new(),
318 metadata: RecordMeta {
319 timestamp_ms: None,
320 format: PayloadFormat::Auto,
321 },
322 };
323 let p = record_to_proto(r);
324 assert_eq!(p.key, "");
325 let back = record_from_proto(p);
326 assert_eq!(back.key, None);
327 }
328
329 /// FORMAT_ARROW_IPC (no rustlib equivalent) maps back to `Auto`.
330 #[test]
331 fn arrow_format_collapses_to_auto() {
332 let p = proto::Record {
333 payload: Bytes::from_static(b"x"),
334 format: proto::Format::ArrowIpc.into(),
335 ..Default::default()
336 };
337 let back = record_from_proto(p);
338 assert_eq!(back.metadata.format, PayloadFormat::Auto);
339 }
340}