Skip to main content

hyperi_rustlib/transport/grpc/
batch.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/grpc/batch.rs
3// Purpose:   Native batch transport -- WorkBatch <-> proto Batch wire mapper
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Native batch transport wire mapper (Task 0.6)
10//!
11//! Serde-less rustlib<->rustlib (DFE<->DFE) transfer of a whole
12//! [`WorkBatch`](crate::transport::WorkBatch) over the existing gRPC mesh. One
13//! `WorkBatch` maps to one proto [`Batch`](super::proto::Batch) and travels in a
14//! single `RouteBatch` RPC -- batch-at-a-time, NOT record-by-record streaming.
15//!
16//! ## Payloads are OPAQUE in transit
17//!
18//! Each [`Record`] payload is mapped straight onto the
19//! proto `Record.payload` `bytes` field and back. prost is configured (in
20//! `build.rs`, `.bytes(".")`) to decode that field ZERO-COPY into
21//! [`bytes::Bytes`], so on the receive side the payload is a refcounted view of
22//! the decode buffer, never copied. The JSON / MsgPack codec
23//! ([`crate::transport::codec`]) is NEVER invoked here -- the bytes pass through
24//! untouched, so a payload that is not valid JSON or MsgPack survives a
25//! round-trip byte-identical.
26//!
27//! ## Swappable wire
28//!
29//! The wire is protobuf BY DEFAULT but the mapping is isolated to this module:
30//! a future hand-rolled frame could replace [`records_to_proto`] /
31//! [`proto_batch_to_records`] without touching the `WorkBatch` types or the
32//! transport seam.
33//!
34//! ## What does NOT cross the wire
35//!
36//! `commit_tokens` and `dlq_entries` are deliberately left off the proto `Batch`.
37//! Commit tokens are the SENDER's source acks -- fired locally after the block
38//! is sent (at-least-once), they have no meaning on the receiver. Inline-DLQ
39//! entries are a local no-silent-drop concern. So the mapper takes / returns the
40//! records (`&[Record]` / `Vec<Record>`), not the whole `WorkBatch<T>`; this
41//! also avoids dragging the `CommitToken` generic onto the wire path.
42
43use super::proto;
44use crate::transport::types::PayloadFormat;
45use crate::transport::work_batch::{Record, RecordMeta};
46use bytes::Bytes;
47use std::sync::Arc;
48
49/// Map a rustlib [`PayloadFormat`] onto the proto [`Format`](proto::Format).
50fn format_to_proto(format: PayloadFormat) -> proto::Format {
51    match format {
52        PayloadFormat::Auto => proto::Format::Auto,
53        PayloadFormat::Json => proto::Format::Json,
54        PayloadFormat::MsgPack => proto::Format::Msgpack,
55    }
56}
57
58/// Map a proto [`Format`](proto::Format) back onto a rustlib [`PayloadFormat`].
59///
60/// `FORMAT_ARROW_IPC` has no rustlib equivalent yet, so it collapses to
61/// [`PayloadFormat::Auto`] (the safe "detect later" default). An out-of-range
62/// enum value (forward-compat from a newer peer) likewise maps to `Auto`.
63fn format_from_proto(format: i32) -> PayloadFormat {
64    match proto::Format::try_from(format) {
65        Ok(proto::Format::Json) => PayloadFormat::Json,
66        Ok(proto::Format::Msgpack) => PayloadFormat::MsgPack,
67        // FORMAT_AUTO, FORMAT_ARROW_IPC, or an unknown future value.
68        _ => PayloadFormat::Auto,
69    }
70}
71
72/// Map one rustlib [`Record`] onto a proto [`Record`](proto::Record).
73///
74/// The payload `Bytes` handle is MOVED onto the proto field (no copy). `key`
75/// `None` becomes the empty string; `Some(k)` carries the key text.
76fn record_to_proto(record: Record) -> proto::Record {
77    let (timestamp_ms, has_timestamp_ms) = match record.metadata.timestamp_ms {
78        Some(ts) => (ts, true),
79        None => (0, false),
80    };
81
82    proto::Record {
83        payload: record.payload,
84        key: record.key.as_deref().unwrap_or("").to_string(),
85        headers: record
86            .headers
87            .into_iter()
88            .map(|(key, value)| proto::Header {
89                key,
90                value: Bytes::from(value),
91            })
92            .collect(),
93        timestamp_ms,
94        has_timestamp_ms,
95        format: format_to_proto(record.metadata.format).into(),
96    }
97}
98
99/// Map a proto [`Record`](proto::Record) back onto a rustlib [`Record`].
100///
101/// The payload is the prost-decoded [`Bytes`] handle MOVED across (zero-copy --
102/// a refcounted view of the decode buffer). An empty `key` string maps back to
103/// `None`.
104fn record_from_proto(record: proto::Record) -> Record {
105    let key = if record.key.is_empty() {
106        None
107    } else {
108        Some(Arc::from(record.key.as_str()))
109    };
110
111    Record {
112        payload: record.payload,
113        key,
114        headers: record
115            .headers
116            .into_iter()
117            .map(|h| (h.key, h.value.to_vec()))
118            .collect(),
119        metadata: RecordMeta {
120            timestamp_ms: if record.has_timestamp_ms {
121                Some(record.timestamp_ms)
122            } else {
123                None
124            },
125            format: format_from_proto(record.format),
126        },
127    }
128}
129
130/// Map a slice of rustlib [`Record`]s onto a proto [`Batch`](proto::Batch).
131///
132/// This is the SEND-side mapper. It takes the records (NOT the whole
133/// `WorkBatch<T>`) because commit tokens and DLQ entries do not cross the wire
134/// (see the module docs). Payloads are moved, not copied.
135#[must_use]
136pub fn records_to_proto(records: Vec<Record>) -> proto::Batch {
137    proto::Batch {
138        records: records.into_iter().map(record_to_proto).collect(),
139    }
140}
141
142/// Map a proto [`Batch`](proto::Batch) back onto a `Vec<Record>`.
143///
144/// This is the RECEIVE-side mapper. The caller wraps the returned records in a
145/// fresh [`WorkBatch`](crate::transport::WorkBatch) (attaching its own commit
146/// tokens). Payloads are zero-copy [`Bytes`] views of the decode buffer.
147#[must_use]
148pub fn proto_batch_to_records(batch: proto::Batch) -> Vec<Record> {
149    batch.records.into_iter().map(record_from_proto).collect()
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    /// Build a record with all fields populated (incl. non-UTF8 payload).
157    fn full_record() -> Record {
158        Record {
159            // Deliberately NOT valid JSON or MsgPack, and includes non-UTF8
160            // bytes -- proves no codec runs on the payload in transit.
161            payload: Bytes::from_static(&[0x00, 0xff, 0xfe, b'{', 0x80, b'a']),
162            key: Some(Arc::from("events.land")),
163            headers: vec![
164                ("trace".to_string(), vec![0x01, 0x02, 0x03]),
165                ("source".to_string(), b"loader".to_vec()),
166            ],
167            metadata: RecordMeta {
168                timestamp_ms: Some(1_717_000_000_123),
169                format: PayloadFormat::Json,
170            },
171        }
172    }
173
174    /// Round-trip ONE record through proto and back, asserting byte-equality on
175    /// every field. `proto::Record::payload` must be `Bytes` (typed below).
176    #[test]
177    fn record_round_trips_byte_identical() {
178        let original = full_record();
179        let p = record_to_proto(original.clone());
180
181        // The proto payload field is `bytes::Bytes` (zero-copy config). This
182        // line would not compile if prost emitted `Vec<u8>` here.
183        let _: &Bytes = &p.payload;
184
185        let back = record_from_proto(p);
186        assert_eq!(back, original);
187    }
188
189    /// Whole-batch round-trip with several records, including binary payloads
190    /// with non-UTF8 bytes, keys, headers, and timestamps. Asserts every field
191    /// is byte-identical after a real prost encode/decode cycle.
192    #[test]
193    fn batch_round_trips_through_prost_encode_decode() {
194        use prost::Message as _;
195
196        let records = vec![
197            full_record(),
198            Record {
199                payload: Bytes::from_static(b"plain text not json"),
200                key: None,
201                headers: Vec::new(),
202                metadata: RecordMeta {
203                    timestamp_ms: None,
204                    format: PayloadFormat::Auto,
205                },
206            },
207            Record {
208                // MsgPack fixmap byte lead.
209                payload: Bytes::from_static(&[0x81, 0xa1, b'a', 0x01]),
210                key: Some(Arc::from("metrics")),
211                headers: vec![("k".to_string(), Vec::new())],
212                metadata: RecordMeta {
213                    timestamp_ms: Some(0), // genuine zero, must survive as Some(0)
214                    format: PayloadFormat::MsgPack,
215                },
216            },
217        ];
218
219        // Map -> encode (prost) -> decode (prost) -> map back.
220        let proto_batch = records_to_proto(records.clone());
221        let encoded = proto_batch.encode_to_vec();
222        let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
223        let back = proto_batch_to_records(decoded);
224
225        assert_eq!(back.len(), records.len());
226        for (a, b) in back.iter().zip(records.iter()) {
227            assert_eq!(a, b, "record mismatch after wire round-trip");
228        }
229        // The genuine zero timestamp survived as Some(0), not None.
230        assert_eq!(back[2].metadata.timestamp_ms, Some(0));
231    }
232
233    /// A payload that is NEITHER valid JSON NOR valid MsgPack survives the wire
234    /// round-trip byte-for-byte. If any codec ran on the payload in transit it
235    /// would fail to parse or mutate the bytes -- proving the payload is opaque.
236    #[test]
237    fn non_codec_payload_survives_round_trip_opaque() {
238        use prost::Message as _;
239
240        // Random binary: not parseable as JSON or MsgPack.
241        let raw: Vec<u8> = (0u8..=255).cycle().take(777).collect();
242        let payload = Bytes::from(raw.clone());
243
244        let records = vec![Record {
245            payload,
246            key: Some(Arc::from("k")),
247            headers: Vec::new(),
248            metadata: RecordMeta {
249                timestamp_ms: None,
250                format: PayloadFormat::Auto,
251            },
252        }];
253
254        let encoded = records_to_proto(records).encode_to_vec();
255        let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
256        let back = proto_batch_to_records(decoded);
257
258        assert_eq!(back.len(), 1);
259        assert_eq!(back[0].payload.as_ref(), raw.as_slice());
260    }
261
262    /// An empty batch round-trips to an empty batch.
263    #[test]
264    fn empty_batch_round_trips() {
265        use prost::Message as _;
266
267        let encoded = records_to_proto(Vec::new()).encode_to_vec();
268        let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
269        let back = proto_batch_to_records(decoded);
270        assert!(back.is_empty());
271    }
272
273    /// A large-ish batch (1000 small records) round-trips intact.
274    #[test]
275    fn large_batch_round_trips() {
276        use prost::Message as _;
277
278        let records: Vec<Record> = (0..1000u32)
279            .map(|i| Record {
280                payload: Bytes::from(format!("{{\"i\":{i}}}").into_bytes()),
281                key: Some(Arc::from("bulk")),
282                headers: vec![("seq".to_string(), i.to_le_bytes().to_vec())],
283                metadata: RecordMeta {
284                    timestamp_ms: Some(i64::from(i)),
285                    format: PayloadFormat::Json,
286                },
287            })
288            .collect();
289
290        let encoded = records_to_proto(records.clone()).encode_to_vec();
291        let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
292        let back = proto_batch_to_records(decoded);
293
294        assert_eq!(back.len(), 1000);
295        assert_eq!(back, records);
296    }
297
298    /// The proto `Record.payload` field is `bytes::Bytes` (not `Vec<u8>`).
299    /// This is the build.rs `.bytes(".")` config taking effect; a regression to
300    /// the default `Vec<u8>` would fail to compile this assignment.
301    #[test]
302    fn proto_payload_field_is_bytes_type() {
303        let r = proto::Record {
304            payload: Bytes::from_static(b"x"),
305            ..Default::default()
306        };
307        let p: Bytes = r.payload;
308        assert_eq!(p.as_ref(), b"x");
309    }
310
311    /// Empty key string maps to `None`, and `None` maps back to empty string.
312    #[test]
313    fn empty_key_maps_to_none() {
314        let r = Record {
315            payload: Bytes::from_static(b"x"),
316            key: None,
317            headers: Vec::new(),
318            metadata: RecordMeta {
319                timestamp_ms: None,
320                format: PayloadFormat::Auto,
321            },
322        };
323        let p = record_to_proto(r);
324        assert_eq!(p.key, "");
325        let back = record_from_proto(p);
326        assert_eq!(back.key, None);
327    }
328
329    /// FORMAT_ARROW_IPC (no rustlib equivalent) maps back to `Auto`.
330    #[test]
331    fn arrow_format_collapses_to_auto() {
332        let p = proto::Record {
333            payload: Bytes::from_static(b"x"),
334            format: proto::Format::ArrowIpc.into(),
335            ..Default::default()
336        };
337        let back = record_from_proto(p);
338        assert_eq!(back.metadata.format, PayloadFormat::Auto);
339    }
340}