Skip to main content

hyperi_rustlib/transport/
work_batch.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/work_batch.rs
3// Purpose:   Canonical data-plane contract: Record + WorkBatch
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Canonical data-plane contract
10//!
11//! The single currency that flows through `get -> process -> send -> commit`:
12//!
13//! - [`Record`] -- one work record (payload + routing + lean metadata). It
14//!   carries **no** commit token. Tokens live on the batch, not the record;
15//!   that separation is what makes 1->N fan-out safe (a transform can grow or
16//!   shrink the record count without disturbing the source acks).
17//! - [`RecordMeta`] -- lean per-record metadata (timestamp + payload format).
18//!   Deliberately leaner than the engine's `MessageMetadata`, which carries a
19//!   type-erased commit token; here tokens move to the batch.
20//! - [`WorkBatch`] -- the canonical zero-copy block of records, the source
21//!   commit tokens for the whole block, and any inline-DLQ entries carried
22//!   forward (the no-silent-drop contract preserved from the older
23//!   `RecvBatch`).
24//!
25//! `commit_tokens.len()` is NOT tied to `records.len()`. After a fan-out
26//! transform the record count may change while the commit tokens stay equal to
27//! the input source acks -- they are fired only after the WHOLE block is sent
28//! (at-least-once delivery).
29//!
30//! The block flows `get -> process -> send -> commit` through the unified
31//! engine driver (`crate::worker::engine::driver`); fields are parsed on
32//! demand via the [`codec`](super::codec). For the wider picture see
33//! `docs/BACKPRESSURE.md` (commit-token + brake contract),
34//! `docs/SELF-REGULATION.md`, and `docs/MIGRATIONS.md` (the
35//! `Message`/`RawMessage`/`RecvBatch` -> `WorkBatch` collapse).
36
37use super::filter::FilteredDlqEntry;
38use super::traits::CommitToken;
39use super::types::PayloadFormat;
40use bytes::Bytes;
41use std::sync::Arc;
42use thiserror::Error;
43
44/// Failure modes for [`WorkBatch::from_json_array`] framing.
45///
46/// These cover only the **framing** contract -- slicing a top-level JSON array
47/// into per-element byte views. Element values are NOT parsed, so a malformed
48/// element body (e.g. `[1,nul]`) is not detected here; that is the downstream
49/// parser's job. What IS detected: a missing opening `[`, an unterminated array
50/// or string, structural imbalance, empty elements (leading/trailing/double
51/// commas), and trailing garbage after the closing `]`.
52#[derive(Debug, Error, PartialEq, Eq)]
53pub enum FramingError {
54    /// The blob did not start (after optional whitespace) with `[`.
55    #[error("json-array framing: expected opening '[', found {0}")]
56    NotAnArray(String),
57
58    /// The array (or a string within it) was not terminated before end-of-input.
59    #[error("json-array framing: unexpected end of input (unterminated array or string)")]
60    UnexpectedEof,
61
62    /// An element position held no value (leading, trailing, or doubled comma).
63    #[error("json-array framing: empty element at byte offset {0} (stray comma)")]
64    EmptyElement(usize),
65
66    /// Structure closed more containers than it opened.
67    #[error("json-array framing: unbalanced closing bracket at byte offset {0}")]
68    Unbalanced(usize),
69
70    /// Non-whitespace content appeared after the array's closing `]`.
71    #[error("json-array framing: trailing garbage after closing ']' at byte offset {0}")]
72    TrailingGarbage(usize),
73}
74
75/// Lean per-record metadata.
76///
77/// Unlike the engine's `MessageMetadata`, this carries **no** commit token --
78/// tokens live on the [`WorkBatch`], not the record.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct RecordMeta {
81    /// Source timestamp (milliseconds since epoch), if the transport provided one.
82    pub timestamp_ms: Option<i64>,
83
84    /// Detected or declared payload format.
85    pub format: PayloadFormat,
86}
87
88/// One work record: payload + routing + metadata, with **no** commit token.
89///
90/// The payload is held as [`bytes::Bytes`] so cloning a record (or fanning one
91/// record out into many) is a refcount bump, not a deep copy.
92#[derive(Debug, Clone, PartialEq)]
93pub struct Record {
94    /// Raw payload bytes -- zero-copy / refcounted.
95    pub payload: Bytes,
96
97    /// Routing key (Kafka topic, gRPC metadata key, Redis stream, ...).
98    pub key: Option<Arc<str>>,
99
100    /// Transport / application headers.
101    pub headers: Vec<(String, Vec<u8>)>,
102
103    /// Lean per-record metadata.
104    pub metadata: RecordMeta,
105}
106
107/// The canonical zero-copy block of work records.
108///
109/// One `WorkBatch` is the single currency through `get -> process -> send ->
110/// commit`. The `commit_tokens` are the INPUT source acks -- fired once after
111/// the WHOLE block is sent (at-least-once). `T` generalises the source ack
112/// (Kafka offset, HTTP responder, fetch cursor, ...).
113///
114/// `commit_tokens.len()` is intentionally decoupled from `records.len()`: a
115/// fan-out transform may grow or shrink the record count while the commit
116/// tokens stay equal to the input acks.
117#[derive(Debug)]
118pub struct WorkBatch<T: CommitToken> {
119    /// The work records in this block.
120    pub records: Vec<Record>,
121
122    /// Source acks for the whole block. Fired after the block is fully sent.
123    /// Length is NOT tied to `records.len()`.
124    pub commit_tokens: Vec<T>,
125
126    /// Inline-DLQ entries carried forward (no-silent-drop contract).
127    pub dlq_entries: Vec<FilteredDlqEntry>,
128}
129
130impl<T: CommitToken> WorkBatch<T> {
131    /// An empty batch (no records, no tokens, no DLQ entries).
132    #[must_use]
133    pub fn empty() -> Self {
134        Self {
135            records: Vec::new(),
136            commit_tokens: Vec::new(),
137            dlq_entries: Vec::new(),
138        }
139    }
140
141    /// A batch of records with no commit tokens and no DLQ entries.
142    ///
143    /// Useful for records that were generated downstream (e.g. a transform that
144    /// emits new records) and do not themselves carry a source ack.
145    #[must_use]
146    pub fn from_records(records: Vec<Record>) -> Self {
147        Self {
148            records,
149            commit_tokens: Vec::new(),
150            dlq_entries: Vec::new(),
151        }
152    }
153
154    /// A batch of records plus their source commit tokens (no DLQ entries).
155    #[must_use]
156    pub fn new(records: Vec<Record>, commit_tokens: Vec<T>) -> Self {
157        Self {
158            records,
159            commit_tokens,
160            dlq_entries: Vec::new(),
161        }
162    }
163
164    /// Attach inline-DLQ entries to this batch, consuming and returning it.
165    #[must_use]
166    pub fn with_dlq_entries(mut self, dlq_entries: Vec<FilteredDlqEntry>) -> Self {
167        self.dlq_entries = dlq_entries;
168        self
169    }
170
171    /// Whether the batch has no records (DLQ entries may still be present).
172    #[must_use]
173    pub fn is_empty(&self) -> bool {
174        self.records.is_empty()
175    }
176
177    /// Number of records.
178    #[must_use]
179    pub fn len(&self) -> usize {
180        self.records.len()
181    }
182
183    /// Number of records (explicit alias for [`WorkBatch::len`]).
184    #[must_use]
185    pub fn record_count(&self) -> usize {
186        self.records.len()
187    }
188
189    /// Total payload bytes across all records.
190    ///
191    /// Needed by the byte-budget governor to size a block against a memory
192    /// budget. -- Sums with saturating addition, so a pathological block can
193    /// never overflow `usize`; it clamps at `usize::MAX` instead of wrapping.
194    #[must_use]
195    pub fn total_payload_bytes(&self) -> usize {
196        self.records
197            .iter()
198            .map(|r| r.payload.len())
199            .fold(0usize, usize::saturating_add)
200    }
201
202    /// Transform the records while PRESERVING the commit tokens and DLQ entries.
203    ///
204    /// This is the safe shape for the future `process` handler: a transform may
205    /// grow, shrink, or rewrite the records (fan-out / fan-in), but the source
206    /// acks and inline-DLQ entries flow through untouched.
207    #[must_use]
208    pub fn map_records(mut self, f: impl FnOnce(Vec<Record>) -> Vec<Record>) -> Self {
209        self.records = f(self.records);
210        self
211    }
212
213    // ---- Zero-copy ingestion / framing helpers (Task 0.2) -----------------
214    //
215    // These slice ONE inbound `Bytes` blob into per-record views so the WHOLE
216    // batch shares ONE allocation: `record.payload = blob.slice(start..end)` is
217    // a refcounted view, never a payload copy. They do NOT parse / deserialise
218    // and do NOT re-serialise -- framing only. Commit tokens and DLQ entries are
219    // left EMPTY; the source attaches its acks afterwards.
220
221    /// One record holding the WHOLE blob (no framing).
222    ///
223    /// The single record's payload is the blob itself (zero-copy move of the
224    /// `Bytes` handle), `key` is `None`, `headers` empty, and the metadata
225    /// format is auto-detected via [`PayloadFormat::detect`].
226    #[must_use]
227    pub fn single(blob: Bytes) -> Self {
228        let format = PayloadFormat::detect(&blob);
229        let record = Record {
230            payload: blob,
231            key: None,
232            headers: Vec::new(),
233            metadata: RecordMeta {
234                timestamp_ms: None,
235                format,
236            },
237        };
238        Self {
239            records: vec![record],
240            commit_tokens: Vec::new(),
241            dlq_entries: Vec::new(),
242        }
243    }
244
245    /// Frame an NDJSON (newline-delimited JSON) blob into one record per line.
246    ///
247    /// Splits on `\n` using `memchr`. Each line is trimmed of a trailing `\r`
248    /// (so Windows CRLF endings work) and of surrounding ASCII whitespace; blank
249    /// or whitespace-only lines are skipped. Each surviving line becomes a
250    /// [`Record`] whose payload is a zero-copy `blob.slice(start..end)` view.
251    /// `format` is set to [`PayloadFormat::Json`].
252    ///
253    /// This is framing, not parsing -- it never fails. A line that is not valid
254    /// JSON still becomes a record; the downstream parser surfaces that.
255    #[must_use]
256    pub fn from_ndjson(blob: Bytes) -> Self {
257        let mut records = Vec::new();
258        let mut line_start = 0usize;
259        let bytes = blob.as_ref();
260
261        // Walk newline boundaries; the tail after the last `\n` is its own line.
262        for nl in memchr::memchr_iter(b'\n', bytes) {
263            Self::push_ndjson_line(&mut records, &blob, line_start, nl);
264            line_start = nl + 1;
265        }
266        if line_start < bytes.len() {
267            Self::push_ndjson_line(&mut records, &blob, line_start, bytes.len());
268        }
269
270        Self {
271            records,
272            commit_tokens: Vec::new(),
273            dlq_entries: Vec::new(),
274        }
275    }
276
277    /// Trim one NDJSON line `[start, end)` (end is the `\n` index or EOF) and,
278    /// if non-empty after trimming, push it as a zero-copy record.
279    fn push_ndjson_line(records: &mut Vec<Record>, blob: &Bytes, start: usize, mut end: usize) {
280        let bytes = blob.as_ref();
281        // Strip a single trailing CR (CRLF) before generic whitespace trimming.
282        if end > start && bytes[end - 1] == b'\r' {
283            end -= 1;
284        }
285        // Trim surrounding ASCII whitespace so the slice is the bare record.
286        while end > start && bytes[end - 1].is_ascii_whitespace() {
287            end -= 1;
288        }
289        let mut begin = start;
290        while begin < end && bytes[begin].is_ascii_whitespace() {
291            begin += 1;
292        }
293        if begin >= end {
294            return; // blank / whitespace-only line
295        }
296        records.push(Record {
297            payload: blob.slice(begin..end),
298            key: None,
299            headers: Vec::new(),
300            metadata: RecordMeta {
301                timestamp_ms: None,
302                format: PayloadFormat::Json,
303            },
304        });
305    }
306
307    /// Frame a top-level JSON array blob into one record per top-level element.
308    ///
309    /// Each element becomes a zero-copy `blob.slice(start..end)` view; element
310    /// VALUES are not parsed. The byte-level scanner tracks string state (so
311    /// `[`, `]`, `{`, `}` and `,` inside a string are literal), JSON escapes
312    /// (`\"`, `\\`), and container depth, so nested objects/arrays frame
313    /// correctly. An empty array `[]` yields zero records (`Ok`).
314    ///
315    /// # Errors
316    ///
317    /// Returns [`FramingError`] when the blob is not a well-framed top-level
318    /// array: missing opening `[`, unterminated array/string, an empty element
319    /// position (leading / trailing / doubled comma), an over-close, or trailing
320    /// non-whitespace after the closing `]`.
321    pub fn from_json_array(blob: Bytes) -> Result<Self, FramingError> {
322        let records = scan_json_array(&blob)?;
323        Ok(Self {
324            records,
325            commit_tokens: Vec::new(),
326            dlq_entries: Vec::new(),
327        })
328    }
329}
330
331/// Byte-level top-level JSON-array element scanner (framing, NOT parsing).
332///
333/// Returns one zero-copy [`Record`] per top-level element. See
334/// [`WorkBatch::from_json_array`] for the contract and error modes.
335fn scan_json_array(blob: &Bytes) -> Result<Vec<Record>, FramingError> {
336    let bytes = blob.as_ref();
337    let len = bytes.len();
338
339    // Skip leading whitespace; the first significant byte must be '['.
340    let mut i = 0usize;
341    while i < len && bytes[i].is_ascii_whitespace() {
342        i += 1;
343    }
344    if i >= len {
345        return Err(FramingError::NotAnArray("end of input".to_string()));
346    }
347    if bytes[i] != b'[' {
348        return Err(FramingError::NotAnArray(format!(
349            "byte {:#04x} ('{}')",
350            bytes[i], bytes[i] as char
351        )));
352    }
353    i += 1; // consume '['
354
355    let mut records: Vec<Record> = Vec::new();
356    // `expect_value` distinguishes the slot just after '[' or ',' (where a value
357    // or -- only after '[' -- a closing ']' may appear) from between elements.
358    let mut first_element = true;
359
360    loop {
361        // Skip whitespace before an element (or the closing ']').
362        while i < len && bytes[i].is_ascii_whitespace() {
363            i += 1;
364        }
365        if i >= len {
366            return Err(FramingError::UnexpectedEof);
367        }
368
369        if bytes[i] == b']' {
370            // Closing the array. Only legal here if the array is empty (we are
371            // still on the first element) -- otherwise we'd have consumed a ']'
372            // inside the element-scan loop below after a value.
373            if first_element {
374                i += 1;
375                return finish(blob, records, i);
376            }
377            // We reached here via the top of the loop after a ',', so a ']' now
378            // means a trailing comma: `[1, ]`.
379            return Err(FramingError::EmptyElement(i));
380        }
381        if bytes[i] == b',' {
382            // A ',' where a value is expected: leading or doubled comma.
383            return Err(FramingError::EmptyElement(i));
384        }
385
386        // Scan one element value, tracking string + escape + container depth.
387        let elem_start = i;
388        let mut depth: usize = 0;
389        let mut in_string = false;
390        let mut escaped = false;
391
392        let elem_end;
393        loop {
394            if i >= len {
395                return Err(FramingError::UnexpectedEof);
396            }
397            let c = bytes[i];
398
399            if in_string {
400                if escaped {
401                    escaped = false;
402                } else if c == b'\\' {
403                    escaped = true;
404                } else if c == b'"' {
405                    in_string = false;
406                }
407                i += 1;
408                continue;
409            }
410
411            match c {
412                b'"' => {
413                    in_string = true;
414                    i += 1;
415                }
416                b'{' | b'[' => {
417                    depth += 1;
418                    i += 1;
419                }
420                b'}' => {
421                    // A '}' at depth 0 has no matching opener inside this element.
422                    depth = depth.checked_sub(1).ok_or(FramingError::Unbalanced(i))?;
423                    i += 1;
424                }
425                b']' => {
426                    if depth == 0 {
427                        // This ']' closes the ARRAY -- the element ends here.
428                        elem_end = i;
429                        i += 1; // consume ']'
430                        push_element(blob, &mut records, elem_start, elem_end);
431                        return finish(blob, records, i);
432                    }
433                    depth -= 1;
434                    i += 1;
435                }
436                b',' if depth == 0 => {
437                    // Top-level separator -- the element ends just before it.
438                    elem_end = i;
439                    i += 1; // consume ','
440                    break;
441                }
442                _ => {
443                    i += 1;
444                }
445            }
446        }
447
448        push_element(blob, &mut records, elem_start, elem_end);
449        first_element = false;
450    }
451}
452
453/// Trim trailing whitespace from `[start, end)` and push the zero-copy slice.
454///
455/// Leading whitespace was already skipped by the caller, so only the trailing
456/// edge (between the value and the `,`/`]`) needs trimming.
457fn push_element(blob: &Bytes, records: &mut Vec<Record>, start: usize, end: usize) {
458    let bytes = blob.as_ref();
459    let mut e = end;
460    while e > start && bytes[e - 1].is_ascii_whitespace() {
461        e -= 1;
462    }
463    records.push(Record {
464        payload: blob.slice(start..e),
465        key: None,
466        headers: Vec::new(),
467        metadata: RecordMeta {
468            timestamp_ms: None,
469            format: PayloadFormat::Json,
470        },
471    });
472}
473
474/// After the closing `]`, only trailing whitespace is permitted. Build the
475/// records vector into a `Vec<Record>` result, or error on trailing garbage.
476fn finish(blob: &Bytes, records: Vec<Record>, mut i: usize) -> Result<Vec<Record>, FramingError> {
477    let bytes = blob.as_ref();
478    let len = bytes.len();
479    while i < len && bytes[i].is_ascii_whitespace() {
480        i += 1;
481    }
482    if i < len {
483        return Err(FramingError::TrailingGarbage(i));
484    }
485    Ok(records)
486}
487
488impl<T: CommitToken> From<crate::Message<T>> for WorkBatch<T> {
489    /// Collapse a single [`crate::Message`] into a one-record `WorkBatch`.
490    ///
491    /// The message payload is already [`Bytes`] -- this is a move, not a copy.
492    fn from(msg: crate::Message<T>) -> Self {
493        let record = Record {
494            payload: msg.payload,
495            key: msg.key,
496            headers: Vec::new(),
497            metadata: RecordMeta {
498                timestamp_ms: msg.timestamp_ms,
499                format: msg.format,
500            },
501        };
502        Self {
503            records: vec![record],
504            commit_tokens: vec![msg.token],
505            dlq_entries: Vec::new(),
506        }
507    }
508}
509
510impl<T: CommitToken> From<crate::transport::traits::RecvBatch<T>> for WorkBatch<T> {
511    /// Collapse a [`RecvBatch`](crate::transport::traits::RecvBatch) into a
512    /// `WorkBatch`.
513    ///
514    /// Each `Message<T>` becomes a [`Record`]; its `token` is collected into
515    /// `commit_tokens` (preserving order); `dlq_entries` carry straight across.
516    /// Payloads are already [`Bytes`] -- each is a move, not a copy.
517    fn from(batch: crate::transport::traits::RecvBatch<T>) -> Self {
518        let mut records = Vec::with_capacity(batch.messages.len());
519        let mut commit_tokens = Vec::with_capacity(batch.messages.len());
520        for msg in batch.messages {
521            commit_tokens.push(msg.token);
522            records.push(Record {
523                payload: msg.payload,
524                key: msg.key,
525                headers: Vec::new(),
526                metadata: RecordMeta {
527                    timestamp_ms: msg.timestamp_ms,
528                    format: msg.format,
529                },
530            });
531        }
532        Self {
533            records,
534            commit_tokens,
535            dlq_entries: batch.dlq_entries,
536        }
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use crate::Message;
544    use crate::transport::traits::RecvBatch;
545
546    /// Minimal commit token for tests.
547    #[derive(Debug, Clone, PartialEq, Eq)]
548    struct TestToken(u64);
549
550    impl std::fmt::Display for TestToken {
551        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552            write!(f, "tok-{}", self.0)
553        }
554    }
555
556    impl CommitToken for TestToken {}
557
558    fn record(payload: &'static [u8]) -> Record {
559        Record {
560            payload: Bytes::from_static(payload),
561            key: Some(Arc::from("events")),
562            headers: vec![("h".to_string(), b"v".to_vec())],
563            metadata: RecordMeta {
564                timestamp_ms: Some(42),
565                format: PayloadFormat::Json,
566            },
567        }
568    }
569
570    #[test]
571    fn empty_has_no_records_tokens_or_dlq() {
572        let b = WorkBatch::<TestToken>::empty();
573        assert!(b.is_empty());
574        assert_eq!(b.len(), 0);
575        assert_eq!(b.record_count(), 0);
576        assert!(b.commit_tokens.is_empty());
577        assert!(b.dlq_entries.is_empty());
578        assert_eq!(b.total_payload_bytes(), 0);
579    }
580
581    #[test]
582    fn from_records_has_no_tokens() {
583        let b = WorkBatch::<TestToken>::from_records(vec![record(b"{}"), record(b"[]")]);
584        assert_eq!(b.len(), 2);
585        assert!(!b.is_empty());
586        assert!(b.commit_tokens.is_empty());
587    }
588
589    #[test]
590    fn new_carries_records_and_tokens() {
591        let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(1), TestToken(2)]);
592        assert_eq!(b.record_count(), 1);
593        assert_eq!(b.commit_tokens.len(), 2);
594    }
595
596    #[test]
597    fn with_dlq_entries_attaches_entries() {
598        let entry = FilteredDlqEntry {
599            payload: b"bad".to_vec(),
600            key: None,
601            reason: "filter".to_string(),
602        };
603        let b =
604            WorkBatch::<TestToken>::from_records(vec![record(b"{}")]).with_dlq_entries(vec![entry]);
605        assert_eq!(b.dlq_entries.len(), 1);
606        assert_eq!(b.dlq_entries[0].reason, "filter");
607    }
608
609    #[test]
610    fn total_payload_bytes_sums_payloads() {
611        let b = WorkBatch::<TestToken>::from_records(vec![
612            record(b"abc"), // 3
613            record(b"de"),  // 2
614            record(b"f"),   // 1
615        ]);
616        assert_eq!(b.total_payload_bytes(), 6);
617    }
618
619    #[test]
620    fn map_records_preserves_tokens_and_dlq() {
621        let entry = FilteredDlqEntry {
622            payload: b"bad".to_vec(),
623            key: None,
624            reason: "filter".to_string(),
625        };
626        let b =
627            WorkBatch::new(vec![record(b"{}")], vec![TestToken(7)]).with_dlq_entries(vec![entry]);
628
629        let b = b.map_records(|recs| {
630            // identity-ish transform that mutates payload but keeps count
631            recs.into_iter()
632                .map(|mut r| {
633                    r.payload = Bytes::from_static(b"changed");
634                    r
635                })
636                .collect()
637        });
638
639        assert_eq!(b.record_count(), 1);
640        assert_eq!(b.records[0].payload.as_ref(), b"changed");
641        // tokens + dlq preserved across the transform
642        assert_eq!(b.commit_tokens, vec![TestToken(7)]);
643        assert_eq!(b.dlq_entries.len(), 1);
644    }
645
646    #[test]
647    fn map_records_fan_out_keeps_tokens_intact() {
648        // One input record + one source ack...
649        let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(99)]);
650        assert_eq!(b.record_count(), 1);
651        assert_eq!(b.commit_tokens.len(), 1);
652
653        // ...fans out to three records, but the source ack must stay singular.
654        let b = b.map_records(|recs| {
655            let mut out = Vec::new();
656            for r in recs {
657                out.push(r.clone());
658                out.push(r.clone());
659                out.push(r);
660            }
661            out
662        });
663
664        assert_eq!(b.record_count(), 3);
665        assert_eq!(b.commit_tokens, vec![TestToken(99)]);
666    }
667
668    #[test]
669    fn from_message_yields_single_record_batch() {
670        let msg = Message::new(
671            Some(Arc::from("topic")),
672            b"{\"a\":1}".to_vec(),
673            TestToken(5),
674            Some(11),
675        );
676        let b: WorkBatch<TestToken> = msg.into();
677
678        assert_eq!(b.record_count(), 1);
679        assert_eq!(b.commit_tokens, vec![TestToken(5)]);
680        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
681        assert_eq!(b.records[0].key.as_deref(), Some("topic"));
682        assert_eq!(b.records[0].metadata.timestamp_ms, Some(11));
683        assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
684        assert!(b.dlq_entries.is_empty());
685    }
686
687    #[test]
688    fn from_recv_batch_collapses_and_preserves_order() {
689        let entry = FilteredDlqEntry {
690            payload: b"bad".to_vec(),
691            key: None,
692            reason: "drop-it".to_string(),
693        };
694        let recv = RecvBatch {
695            messages: vec![
696                Message::new(Some(Arc::from("a")), b"{}".to_vec(), TestToken(1), None),
697                Message::new(Some(Arc::from("b")), b"[]".to_vec(), TestToken(2), None),
698                Message::new(None, b"{}".to_vec(), TestToken(3), None),
699            ],
700            dlq_entries: vec![entry],
701        };
702
703        let b: WorkBatch<TestToken> = recv.into();
704
705        assert_eq!(b.record_count(), 3);
706        // commit tokens preserved in order
707        assert_eq!(
708            b.commit_tokens,
709            vec![TestToken(1), TestToken(2), TestToken(3)]
710        );
711        // dlq entries carried straight across
712        assert_eq!(b.dlq_entries.len(), 1);
713        assert_eq!(b.dlq_entries[0].reason, "drop-it");
714        // record payloads + keys preserved
715        assert_eq!(b.records[0].key.as_deref(), Some("a"));
716        assert_eq!(b.records[2].key, None);
717    }
718
719    #[test]
720    fn from_message_moves_payload_without_copying() {
721        // Build a heap-backed payload and remember its allocation pointer.
722        // The vec is moved via `impl Into<Bytes>` -- no copy at Message::new.
723        let payload = b"zero-copy-please".to_vec();
724        let payload_ptr = payload.as_ptr();
725
726        let msg = Message::new(Some(Arc::from("topic")), payload, TestToken(1), None);
727        // Message.payload is Bytes built from the vec; same allocation.
728        assert_eq!(msg.payload.as_ptr(), payload_ptr);
729
730        let wb: WorkBatch<TestToken> = msg.into();
731
732        // From<Message> moves the Bytes handle -- the record payload must point
733        // at the SAME allocation. A regression to copy_from_slice would fail here.
734        assert_eq!(wb.records[0].payload.as_ptr(), payload_ptr);
735    }
736
737    /// Task 0.4.1 capability test: `Message::new` with an already-allocated
738    /// `Bytes` payload travels through `WorkBatch` with ZERO copies.
739    ///
740    /// This proves the headline win of the migration: an upstream `Bytes` (e.g.
741    /// the axum body) reaches the `Record` without ever calling
742    /// `copy_from_slice`. The allocation pointer must be identical at every hop.
743    #[test]
744    fn bytes_payload_travels_zero_copy_through_workbatch() {
745        // Simulate a caller that already holds a `Bytes` (e.g. the axum ingest
746        // handler after the body.to_vec() removal).
747        let raw = b"bytes-zero-copy-payload-test".to_vec();
748        let src: Bytes = raw.into();
749        let src_ptr = src.as_ptr();
750
751        // Pass the Bytes directly into Message::new (accepted via impl Into<Bytes>).
752        let msg = Message::new(Some(Arc::from("k")), src, TestToken(42), Some(99));
753        // The Bytes handle is MOVED -- same backing allocation.
754        assert_eq!(msg.payload.as_ptr(), src_ptr, "copy at Message::new");
755
756        // Convert to WorkBatch: From<Message> moves msg.payload (already Bytes).
757        let wb: WorkBatch<TestToken> = msg.into();
758        assert_eq!(
759            wb.records[0].payload.as_ptr(),
760            src_ptr,
761            "copy at From<Message> for WorkBatch"
762        );
763
764        // Clone the record payload (fan-out): Bytes::clone is a refcount bump.
765        let cloned = wb.records[0].payload.clone();
766        assert_eq!(
767            cloned.as_ptr(),
768            src_ptr,
769            "clone allocated a new buffer instead of bumping refcount"
770        );
771    }
772
773    #[test]
774    fn from_recv_batch_moves_payloads_without_copying() {
775        let p0 = b"first-buffer".to_vec();
776        let p1 = b"second-buffer".to_vec();
777        let p0_ptr = p0.as_ptr();
778        let p1_ptr = p1.as_ptr();
779
780        let recv = RecvBatch {
781            messages: vec![
782                Message::new(Some(Arc::from("a")), p0, TestToken(1), None),
783                Message::new(Some(Arc::from("b")), p1, TestToken(2), None),
784            ],
785            dlq_entries: Vec::new(),
786        };
787
788        let wb: WorkBatch<TestToken> = recv.into();
789
790        // Each payload Vec is MOVED into Bytes -- same allocations, no copy.
791        assert_eq!(wb.records[0].payload.as_ptr(), p0_ptr);
792        assert_eq!(wb.records[1].payload.as_ptr(), p1_ptr);
793    }
794
795    #[test]
796    fn payload_is_bytes_and_clone_is_zero_copy() {
797        // Bytes::clone shares the same underlying allocation (refcount bump).
798        let r = record(b"shared-buffer");
799        let p1 = r.payload.clone();
800        let r2 = r.clone();
801        // same backing buffer pointer => zero-copy clone
802        assert_eq!(p1.as_ptr(), r2.payload.as_ptr());
803        assert_eq!(r2.payload.as_ref(), b"shared-buffer");
804    }
805
806    // ---- Task 0.2: zero-copy framing helpers -------------------------------
807
808    /// Assert that `slice` is a zero-copy view INTO `blob` (a refcounted slice,
809    /// not a fresh allocation): its byte range must fall within `blob`'s range.
810    fn assert_within(slice: &Bytes, blob: &Bytes) {
811        let blob_start = blob.as_ptr() as usize;
812        let blob_end = blob_start + blob.len();
813        let slice_start = slice.as_ptr() as usize;
814        let slice_end = slice_start + slice.len();
815        assert!(
816            slice_start >= blob_start && slice_end <= blob_end,
817            "slice [{slice_start:#x}, {slice_end:#x}) is not within blob \
818             [{blob_start:#x}, {blob_end:#x}) -- it is a copy, not a view"
819        );
820    }
821
822    // --- single() -----------------------------------------------------------
823
824    #[test]
825    fn single_holds_whole_blob_as_one_record() {
826        let blob = Bytes::from_static(b"{\"a\":1}");
827        let b = WorkBatch::<TestToken>::single(blob.clone());
828        assert_eq!(b.record_count(), 1);
829        assert!(b.commit_tokens.is_empty());
830        assert!(b.dlq_entries.is_empty());
831        assert_eq!(b.records[0].payload, blob);
832        assert_eq!(b.records[0].key, None);
833        assert!(b.records[0].headers.is_empty());
834        // whole blob is the same allocation (zero-copy)
835        assert_eq!(b.records[0].payload.as_ptr(), blob.as_ptr());
836    }
837
838    #[test]
839    fn single_detects_format_json_object() {
840        let b = WorkBatch::<TestToken>::single(Bytes::from_static(b"{\"a\":1}"));
841        assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
842    }
843
844    #[test]
845    fn single_detects_format_msgpack() {
846        // fixmap with one entry (0x81) -> MsgPack
847        let b = WorkBatch::<TestToken>::single(Bytes::from_static(&[0x81, 0xa1, 0x61]));
848        assert_eq!(b.records[0].metadata.format, PayloadFormat::MsgPack);
849    }
850
851    // --- from_ndjson() ------------------------------------------------------
852
853    #[test]
854    fn ndjson_splits_lines_into_records() {
855        let blob = Bytes::from_static(b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}");
856        let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
857        assert_eq!(b.record_count(), 3);
858        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
859        assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
860        assert_eq!(b.records[2].payload.as_ref(), b"{\"c\":3}");
861        for r in &b.records {
862            assert_eq!(r.metadata.format, PayloadFormat::Json);
863            assert_within(&r.payload, &blob);
864        }
865        assert!(b.commit_tokens.is_empty());
866    }
867
868    #[test]
869    fn ndjson_trims_trailing_carriage_return() {
870        // Windows CRLF line endings.
871        let blob = Bytes::from_static(b"{\"a\":1}\r\n{\"b\":2}\r\n");
872        let b = WorkBatch::<TestToken>::from_ndjson(blob);
873        assert_eq!(b.record_count(), 2);
874        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
875        assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
876    }
877
878    #[test]
879    fn ndjson_skips_blank_and_whitespace_only_lines() {
880        let blob = Bytes::from_static(b"{\"a\":1}\n\n   \n{\"b\":2}\n\t\r\n");
881        let b = WorkBatch::<TestToken>::from_ndjson(blob);
882        assert_eq!(b.record_count(), 2);
883        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
884        assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
885    }
886
887    #[test]
888    fn ndjson_empty_blob_yields_no_records() {
889        let b = WorkBatch::<TestToken>::from_ndjson(Bytes::new());
890        assert_eq!(b.record_count(), 0);
891    }
892
893    #[test]
894    fn ndjson_single_line_no_newline() {
895        let blob = Bytes::from_static(b"{\"only\":true}");
896        let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
897        assert_eq!(b.record_count(), 1);
898        assert_eq!(b.records[0].payload.as_ref(), b"{\"only\":true}");
899        assert_within(&b.records[0].payload, &blob);
900    }
901
902    #[test]
903    fn ndjson_preserves_inner_whitespace_but_trims_edges() {
904        // Leading/trailing spaces on a line are trimmed for framing.
905        let blob = Bytes::from_static(b"  {\"a\": 1}  \n");
906        let b = WorkBatch::<TestToken>::from_ndjson(blob);
907        assert_eq!(b.record_count(), 1);
908        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\": 1}");
909    }
910
911    // --- from_json_array() --------------------------------------------------
912
913    #[test]
914    fn json_array_empty_yields_no_records() {
915        let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[]")).unwrap();
916        assert_eq!(b.record_count(), 0);
917        assert!(b.commit_tokens.is_empty());
918        assert!(b.dlq_entries.is_empty());
919    }
920
921    #[test]
922    fn json_array_empty_with_whitespace() {
923        let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"  [  ]  ")).unwrap();
924        assert_eq!(b.record_count(), 0);
925    }
926
927    #[test]
928    fn json_array_single_element() {
929        let blob = Bytes::from_static(b"[{\"a\":1}]");
930        let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
931        assert_eq!(b.record_count(), 1);
932        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
933        assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
934        assert_within(&b.records[0].payload, &blob);
935    }
936
937    #[test]
938    fn json_array_multiple_scalar_elements() {
939        let blob = Bytes::from_static(b"[1, 2, 3]");
940        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
941        assert_eq!(b.record_count(), 3);
942        assert_eq!(b.records[0].payload.as_ref(), b"1");
943        assert_eq!(b.records[1].payload.as_ref(), b"2");
944        assert_eq!(b.records[2].payload.as_ref(), b"3");
945    }
946
947    #[test]
948    fn json_array_trims_whitespace_around_elements() {
949        let blob = Bytes::from_static(b"[  {\"a\":1}  ,  {\"b\":2}  ]");
950        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
951        assert_eq!(b.record_count(), 2);
952        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
953        assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
954    }
955
956    #[test]
957    fn json_array_leading_trailing_whitespace_and_newlines() {
958        let blob = Bytes::from_static(b"\n\t [\n  1,\n  2\n] \n");
959        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
960        assert_eq!(b.record_count(), 2);
961        assert_eq!(b.records[0].payload.as_ref(), b"1");
962        assert_eq!(b.records[1].payload.as_ref(), b"2");
963    }
964
965    #[test]
966    fn json_array_string_with_brackets_and_commas() {
967        // The commas/brackets INSIDE the string must not split or change depth.
968        let blob = Bytes::from_static(b"[\"a,b[c]{d}\", \"plain\"]");
969        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
970        assert_eq!(b.record_count(), 2);
971        assert_eq!(b.records[0].payload.as_ref(), b"\"a,b[c]{d}\"");
972        assert_eq!(b.records[1].payload.as_ref(), b"\"plain\"");
973    }
974
975    #[test]
976    fn json_array_string_with_escaped_quote() {
977        // `\"` does not terminate the string.
978        let blob = Bytes::from_static(b"[\"he said \\\"hi\\\", then left\", 7]");
979        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
980        assert_eq!(b.record_count(), 2);
981        assert_eq!(
982            b.records[0].payload.as_ref(),
983            b"\"he said \\\"hi\\\", then left\""
984        );
985        assert_eq!(b.records[1].payload.as_ref(), b"7");
986    }
987
988    #[test]
989    fn json_array_string_with_escaped_backslash_then_closing_quote() {
990        // `\\` is an escaped backslash; the following `"` DOES close the string.
991        // Element 0 is the string "path\\" and element 1 is the number 1.
992        let blob = Bytes::from_static(b"[\"path\\\\\", 1]");
993        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
994        assert_eq!(b.record_count(), 2);
995        assert_eq!(b.records[0].payload.as_ref(), b"\"path\\\\\"");
996        assert_eq!(b.records[1].payload.as_ref(), b"1");
997    }
998
999    #[test]
1000    fn json_array_nested_arrays_and_objects() {
1001        let blob = Bytes::from_static(b"[[1,2],[3],{\"k\":[4,5]}]");
1002        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1003        assert_eq!(b.record_count(), 3);
1004        assert_eq!(b.records[0].payload.as_ref(), b"[1,2]");
1005        assert_eq!(b.records[1].payload.as_ref(), b"[3]");
1006        assert_eq!(b.records[2].payload.as_ref(), b"{\"k\":[4,5]}");
1007    }
1008
1009    #[test]
1010    fn json_array_deeply_nested_object_one_element() {
1011        let blob = Bytes::from_static(b"[{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}]");
1012        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1013        assert_eq!(b.record_count(), 1);
1014        assert_eq!(
1015            b.records[0].payload.as_ref(),
1016            b"{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}"
1017        );
1018    }
1019
1020    #[test]
1021    fn json_array_unicode_in_strings() {
1022        // Multi-byte UTF-8 inside an element string must pass through verbatim.
1023        let blob = Bytes::from(r#"["café", "naïve"]"#.as_bytes().to_vec());
1024        let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
1025        assert_eq!(b.record_count(), 2);
1026        assert_eq!(b.records[0].payload.as_ref(), "\"café\"".as_bytes());
1027        assert_eq!(b.records[1].payload.as_ref(), "\"naïve\"".as_bytes());
1028        assert_within(&b.records[1].payload, &blob);
1029    }
1030
1031    #[test]
1032    fn json_array_zero_copy_views_into_blob() {
1033        let blob = Bytes::from_static(b"[{\"a\":1}, {\"b\":2}, {\"c\":3}]");
1034        let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
1035        assert_eq!(b.record_count(), 3);
1036        for r in &b.records {
1037            assert_within(&r.payload, &blob);
1038        }
1039    }
1040
1041    // malformed inputs -> Err
1042
1043    #[test]
1044    fn json_array_no_opening_bracket_errors() {
1045        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"{\"a\":1}")).is_err());
1046    }
1047
1048    #[test]
1049    fn json_array_empty_blob_errors() {
1050        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::new()).is_err());
1051    }
1052
1053    #[test]
1054    fn json_array_whitespace_only_errors() {
1055        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"   \n\t ")).is_err());
1056    }
1057
1058    #[test]
1059    fn json_array_unterminated_errors() {
1060        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2")).is_err());
1061    }
1062
1063    #[test]
1064    fn json_array_unterminated_string_errors() {
1065        assert!(
1066            WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[\"unclosed]")).is_err()
1067        );
1068    }
1069
1070    #[test]
1071    fn json_array_trailing_garbage_errors() {
1072        assert!(
1073            WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2] junk")).is_err()
1074        );
1075    }
1076
1077    #[test]
1078    fn json_array_trailing_comma_errors() {
1079        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2, ]")).is_err());
1080    }
1081
1082    #[test]
1083    fn json_array_leading_comma_errors() {
1084        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[, 1]")).is_err());
1085    }
1086
1087    #[test]
1088    fn json_array_double_comma_errors() {
1089        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1,, 2]")).is_err());
1090    }
1091
1092    #[test]
1093    fn json_array_only_open_bracket_errors() {
1094        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[")).is_err());
1095    }
1096
1097    #[test]
1098    fn json_array_unbalanced_extra_close_errors() {
1099        // A nested structure that closes too many times.
1100        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1]]")).is_err());
1101    }
1102
1103    #[test]
1104    fn framing_error_is_displayable() {
1105        let err = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"nope")).unwrap_err();
1106        // thiserror Display should produce a non-empty, informative message.
1107        assert!(!err.to_string().is_empty());
1108    }
1109}