Skip to main content

hyperi_rustlib/transport/grpc/
batch.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/grpc/batch.rs
3// Purpose:   Native batch transport -- WorkBatch <-> proto Batch wire mapper
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Native batch transport wire mapper (Task 0.6)
10//!
11//! Serde-less rustlib<->rustlib (DFE<->DFE) transfer of a whole
12//! [`WorkBatch`](crate::transport::WorkBatch) over the existing gRPC mesh. One
13//! `WorkBatch` maps to one proto [`Batch`](super::proto::Batch) and travels in a
14//! single `RouteBatch` RPC -- batch-at-a-time, NOT record-by-record streaming.
15//!
16//! ## Payloads are OPAQUE in transit
17//!
18//! Each [`Record`] payload maps straight onto the proto `Record.payload`
19//! `bytes` field. prost decodes that field ZERO-COPY into [`bytes::Bytes`]
20//! (`.bytes(".")` in `build.rs`), so the receive-side payload is a refcounted
21//! view of the decode buffer. The JSON / MsgPack codec
22//! ([`crate::transport::codec`]) is NEVER invoked here -- non-JSON/MsgPack
23//! bytes survive a round-trip byte-identical.
24//!
25//! ## Swappable wire
26//!
27//! The mapping is isolated to this module: a future hand-rolled frame could
28//! replace [`records_to_proto`] / [`proto_batch_to_records`] without touching
29//! the `WorkBatch` types or the transport seam.
30//!
31//! ## What does NOT cross the wire
32//!
33//! `commit_tokens` and `dlq_entries` are left off the proto `Batch`. Commit
34//! tokens are the SENDER's source acks -- fired locally after send
35//! (at-least-once), meaningless on the receiver. Inline-DLQ entries are a local
36//! no-silent-drop concern. So the mapper takes / returns records, not the whole
37//! `WorkBatch<T>` -- which also keeps the `CommitToken` generic off the wire.
38
39use super::proto;
40use crate::transport::types::PayloadFormat;
41use crate::transport::work_batch::{Record, RecordMeta};
42use bytes::Bytes;
43use std::sync::Arc;
44
45/// Map a rustlib [`PayloadFormat`] onto the proto [`Format`](proto::Format).
46fn format_to_proto(format: PayloadFormat) -> proto::Format {
47    match format {
48        PayloadFormat::Auto => proto::Format::Auto,
49        PayloadFormat::Json => proto::Format::Json,
50        PayloadFormat::MsgPack => proto::Format::Msgpack,
51    }
52}
53
54/// Map a proto [`Format`](proto::Format) back onto a rustlib [`PayloadFormat`].
55///
56/// `FORMAT_ARROW_IPC` has no rustlib equivalent yet, so it collapses to
57/// [`PayloadFormat::Auto`] (the safe "detect later" default). An out-of-range
58/// enum value (forward-compat from a newer peer) likewise maps to `Auto`.
59fn format_from_proto(format: i32) -> PayloadFormat {
60    match proto::Format::try_from(format) {
61        Ok(proto::Format::Json) => PayloadFormat::Json,
62        Ok(proto::Format::Msgpack) => PayloadFormat::MsgPack,
63        // FORMAT_AUTO, FORMAT_ARROW_IPC, or an unknown future value.
64        _ => PayloadFormat::Auto,
65    }
66}
67
68/// Map one rustlib [`Record`] onto a proto [`Record`](proto::Record).
69///
70/// The payload `Bytes` handle is MOVED onto the proto field (no copy). `key`
71/// `None` becomes the empty string; `Some(k)` carries the key text.
72fn record_to_proto(record: Record) -> proto::Record {
73    let (timestamp_ms, has_timestamp_ms) = match record.metadata.timestamp_ms {
74        Some(ts) => (ts, true),
75        None => (0, false),
76    };
77
78    proto::Record {
79        payload: record.payload,
80        key: record.key.as_deref().unwrap_or("").to_string(),
81        headers: record
82            .headers
83            .into_iter()
84            .map(|(key, value)| proto::Header {
85                key,
86                value: Bytes::from(value),
87            })
88            .collect(),
89        timestamp_ms,
90        has_timestamp_ms,
91        format: format_to_proto(record.metadata.format).into(),
92    }
93}
94
95/// Map a proto [`Record`](proto::Record) back onto a rustlib [`Record`].
96///
97/// The payload is the prost-decoded [`Bytes`] handle MOVED across (zero-copy --
98/// a refcounted view of the decode buffer). An empty `key` string maps back to
99/// `None`.
100fn record_from_proto(record: proto::Record) -> Record {
101    let key = if record.key.is_empty() {
102        None
103    } else {
104        Some(Arc::from(record.key.as_str()))
105    };
106
107    Record {
108        payload: record.payload,
109        key,
110        headers: record
111            .headers
112            .into_iter()
113            .map(|h| (h.key, h.value.to_vec()))
114            .collect(),
115        metadata: RecordMeta {
116            timestamp_ms: if record.has_timestamp_ms {
117                Some(record.timestamp_ms)
118            } else {
119                None
120            },
121            format: format_from_proto(record.format),
122        },
123    }
124}
125
126/// Map a slice of rustlib [`Record`]s onto a proto [`Batch`](proto::Batch).
127///
128/// SEND-side mapper. Takes records, not the whole `WorkBatch<T>` (commit
129/// tokens and DLQ entries do not cross the wire -- see module docs). Payloads
130/// are moved, not copied.
131#[must_use]
132pub fn records_to_proto(records: Vec<Record>) -> proto::Batch {
133    proto::Batch {
134        records: records.into_iter().map(record_to_proto).collect(),
135    }
136}
137
138/// Map a proto [`Batch`](proto::Batch) back onto a `Vec<Record>`.
139///
140/// RECEIVE-side mapper. The caller wraps the returned records in a fresh
141/// [`WorkBatch`](crate::transport::WorkBatch) (attaching its own commit
142/// tokens). Payloads are zero-copy [`Bytes`] views of the decode buffer.
143#[must_use]
144pub fn proto_batch_to_records(batch: proto::Batch) -> Vec<Record> {
145    batch.records.into_iter().map(record_from_proto).collect()
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151
152    /// Build a record with all fields populated (incl. non-UTF8 payload).
153    fn full_record() -> Record {
154        Record {
155            // Deliberately NOT valid JSON or MsgPack, and includes non-UTF8
156            // bytes -- proves no codec runs on the payload in transit.
157            payload: Bytes::from_static(&[0x00, 0xff, 0xfe, b'{', 0x80, b'a']),
158            key: Some(Arc::from("events.land")),
159            headers: vec![
160                ("trace".to_string(), vec![0x01, 0x02, 0x03]),
161                ("source".to_string(), b"loader".to_vec()),
162            ],
163            metadata: RecordMeta {
164                timestamp_ms: Some(1_717_000_000_123),
165                format: PayloadFormat::Json,
166            },
167        }
168    }
169
170    /// Round-trip ONE record through proto and back, asserting byte-equality on
171    /// every field. `proto::Record::payload` must be `Bytes` (typed below).
172    #[test]
173    fn record_round_trips_byte_identical() {
174        let original = full_record();
175        let p = record_to_proto(original.clone());
176
177        // The proto payload field is `bytes::Bytes` (zero-copy config). This
178        // line would not compile if prost emitted `Vec<u8>` here.
179        let _: &Bytes = &p.payload;
180
181        let back = record_from_proto(p);
182        assert_eq!(back, original);
183    }
184
185    /// Whole-batch round-trip with several records, including binary payloads
186    /// with non-UTF8 bytes, keys, headers, and timestamps. Asserts every field
187    /// is byte-identical after a real prost encode/decode cycle.
188    #[test]
189    fn batch_round_trips_through_prost_encode_decode() {
190        use prost::Message as _;
191
192        let records = vec![
193            full_record(),
194            Record {
195                payload: Bytes::from_static(b"plain text not json"),
196                key: None,
197                headers: Vec::new(),
198                metadata: RecordMeta {
199                    timestamp_ms: None,
200                    format: PayloadFormat::Auto,
201                },
202            },
203            Record {
204                // MsgPack fixmap byte lead.
205                payload: Bytes::from_static(&[0x81, 0xa1, b'a', 0x01]),
206                key: Some(Arc::from("metrics")),
207                headers: vec![("k".to_string(), Vec::new())],
208                metadata: RecordMeta {
209                    timestamp_ms: Some(0), // genuine zero, must survive as Some(0)
210                    format: PayloadFormat::MsgPack,
211                },
212            },
213        ];
214
215        // Map -> encode (prost) -> decode (prost) -> map back.
216        let proto_batch = records_to_proto(records.clone());
217        let encoded = proto_batch.encode_to_vec();
218        let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
219        let back = proto_batch_to_records(decoded);
220
221        assert_eq!(back.len(), records.len());
222        for (a, b) in back.iter().zip(records.iter()) {
223            assert_eq!(a, b, "record mismatch after wire round-trip");
224        }
225        // The genuine zero timestamp survived as Some(0), not None.
226        assert_eq!(back[2].metadata.timestamp_ms, Some(0));
227    }
228
229    /// A payload that is NEITHER valid JSON NOR valid MsgPack survives the wire
230    /// round-trip byte-for-byte. If any codec ran on the payload in transit it
231    /// would fail to parse or mutate the bytes -- proving the payload is opaque.
232    #[test]
233    fn non_codec_payload_survives_round_trip_opaque() {
234        use prost::Message as _;
235
236        // Random binary: not parseable as JSON or MsgPack.
237        let raw: Vec<u8> = (0u8..=255).cycle().take(777).collect();
238        let payload = Bytes::from(raw.clone());
239
240        let records = vec![Record {
241            payload,
242            key: Some(Arc::from("k")),
243            headers: Vec::new(),
244            metadata: RecordMeta {
245                timestamp_ms: None,
246                format: PayloadFormat::Auto,
247            },
248        }];
249
250        let encoded = records_to_proto(records).encode_to_vec();
251        let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
252        let back = proto_batch_to_records(decoded);
253
254        assert_eq!(back.len(), 1);
255        assert_eq!(back[0].payload.as_ref(), raw.as_slice());
256    }
257
258    /// An empty batch round-trips to an empty batch.
259    #[test]
260    fn empty_batch_round_trips() {
261        use prost::Message as _;
262
263        let encoded = records_to_proto(Vec::new()).encode_to_vec();
264        let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
265        let back = proto_batch_to_records(decoded);
266        assert!(back.is_empty());
267    }
268
269    /// A large-ish batch (1000 small records) round-trips intact.
270    #[test]
271    fn large_batch_round_trips() {
272        use prost::Message as _;
273
274        let records: Vec<Record> = (0..1000u32)
275            .map(|i| Record {
276                payload: Bytes::from(format!("{{\"i\":{i}}}").into_bytes()),
277                key: Some(Arc::from("bulk")),
278                headers: vec![("seq".to_string(), i.to_le_bytes().to_vec())],
279                metadata: RecordMeta {
280                    timestamp_ms: Some(i64::from(i)),
281                    format: PayloadFormat::Json,
282                },
283            })
284            .collect();
285
286        let encoded = records_to_proto(records.clone()).encode_to_vec();
287        let decoded = proto::Batch::decode(Bytes::from(encoded)).expect("prost decode");
288        let back = proto_batch_to_records(decoded);
289
290        assert_eq!(back.len(), 1000);
291        assert_eq!(back, records);
292    }
293
294    /// The proto `Record.payload` field is `bytes::Bytes` (not `Vec<u8>`).
295    /// This is the build.rs `.bytes(".")` config taking effect; a regression to
296    /// the default `Vec<u8>` would fail to compile this assignment.
297    #[test]
298    fn proto_payload_field_is_bytes_type() {
299        let r = proto::Record {
300            payload: Bytes::from_static(b"x"),
301            ..Default::default()
302        };
303        let p: Bytes = r.payload;
304        assert_eq!(p.as_ref(), b"x");
305    }
306
307    /// Empty key string maps to `None`, and `None` maps back to empty string.
308    #[test]
309    fn empty_key_maps_to_none() {
310        let r = Record {
311            payload: Bytes::from_static(b"x"),
312            key: None,
313            headers: Vec::new(),
314            metadata: RecordMeta {
315                timestamp_ms: None,
316                format: PayloadFormat::Auto,
317            },
318        };
319        let p = record_to_proto(r);
320        assert_eq!(p.key, "");
321        let back = record_from_proto(p);
322        assert_eq!(back.key, None);
323    }
324
325    /// FORMAT_ARROW_IPC (no rustlib equivalent) maps back to `Auto`.
326    #[test]
327    fn arrow_format_collapses_to_auto() {
328        let p = proto::Record {
329            payload: Bytes::from_static(b"x"),
330            format: proto::Format::ArrowIpc.into(),
331            ..Default::default()
332        };
333        let back = record_from_proto(p);
334        assert_eq!(back.metadata.format, PayloadFormat::Auto);
335    }
336}