Skip to main content

hyperi_rustlib/transport/
work_batch.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/work_batch.rs
3// Purpose:   Canonical data-plane contract: Record + WorkBatch
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Canonical data-plane contract
10//!
11//! The single currency that flows through `get -> process -> send -> commit`:
12//!
13//! - [`Record`] -- one work record (payload + routing + lean metadata). It
14//!   carries **no** commit token. Tokens live on the batch, not the record;
15//!   that separation is what makes 1->N fan-out safe (a transform can grow or
16//!   shrink the record count without disturbing the source acks).
17//! - [`RecordMeta`] -- lean per-record metadata (timestamp + payload format).
18//!   Deliberately leaner than the engine's `MessageMetadata`, which carries a
19//!   type-erased commit token; here tokens move to the batch.
20//! - [`WorkBatch`] -- the canonical zero-copy block of records, the source
21//!   commit tokens for the whole block, and any inline-DLQ entries carried
22//!   forward (the no-silent-drop contract preserved from the older
23//!   `RecvBatch`).
24//!
25//! `commit_tokens.len()` is NOT tied to `records.len()`. After a fan-out
26//! transform the record count may change while the commit tokens stay equal to
27//! the input source acks -- they are fired only after the WHOLE block is sent
28//! (at-least-once delivery).
29//!
30//! The block flows `get -> process -> send -> commit` through the unified
31//! engine driver (`crate::worker::engine::driver`); fields are parsed on
32//! demand via the [`codec`](super::codec). For the wider picture see
33//! `docs/BACKPRESSURE.md` (commit-token + brake contract),
34//! `docs/SELF-REGULATION.md`, and `docs/MIGRATIONS.md` (the
35//! `Message`/`RawMessage`/`RecvBatch` -> `WorkBatch` collapse).
36
37use super::filter::FilteredDlqEntry;
38use super::traits::CommitToken;
39use super::types::PayloadFormat;
40use bytes::Bytes;
41use std::sync::Arc;
42use thiserror::Error;
43
44/// Failure modes for [`WorkBatch::from_json_array`] framing.
45///
46/// These cover only the **framing** contract -- slicing a top-level JSON array
47/// into per-element byte views. Element values are NOT parsed, so a malformed
48/// element body (e.g. `[1,nul]`) is not detected here; that is the downstream
49/// parser's job. What IS detected: a missing opening `[`, an unterminated array
50/// or string, structural imbalance, empty elements (leading/trailing/double
51/// commas), and trailing garbage after the closing `]`.
52#[derive(Debug, Error, PartialEq, Eq)]
53#[non_exhaustive]
54pub enum FramingError {
55    /// The blob did not start (after optional whitespace) with `[`.
56    #[error("json-array framing: expected opening '[', found {0}")]
57    NotAnArray(String),
58
59    /// The array (or a string within it) was not terminated before end-of-input.
60    #[error("json-array framing: unexpected end of input (unterminated array or string)")]
61    UnexpectedEof,
62
63    /// An element position held no value (leading, trailing, or doubled comma).
64    #[error("json-array framing: empty element at byte offset {0} (stray comma)")]
65    EmptyElement(usize),
66
67    /// Structure closed more containers than it opened.
68    #[error("json-array framing: unbalanced closing bracket at byte offset {0}")]
69    Unbalanced(usize),
70
71    /// Non-whitespace content appeared after the array's closing `]`.
72    #[error("json-array framing: trailing garbage after closing ']' at byte offset {0}")]
73    TrailingGarbage(usize),
74}
75
76/// Lean per-record metadata.
77///
78/// Unlike the engine's `MessageMetadata`, this carries **no** commit token --
79/// tokens live on the [`WorkBatch`], not the record.
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct RecordMeta {
82    /// Source timestamp (milliseconds since epoch), if the transport provided one.
83    pub timestamp_ms: Option<i64>,
84
85    /// Detected or declared payload format.
86    pub format: PayloadFormat,
87}
88
89/// One work record: payload + routing + metadata, with **no** commit token.
90///
91/// The payload is held as [`bytes::Bytes`] so cloning a record (or fanning one
92/// record out into many) is a refcount bump, not a deep copy.
93#[derive(Debug, Clone, PartialEq)]
94pub struct Record {
95    /// Raw payload bytes -- zero-copy / refcounted.
96    pub payload: Bytes,
97
98    /// Routing key (Kafka topic, gRPC metadata key, Redis stream, ...).
99    pub key: Option<Arc<str>>,
100
101    /// Transport / application headers.
102    pub headers: Vec<(String, Vec<u8>)>,
103
104    /// Lean per-record metadata.
105    pub metadata: RecordMeta,
106}
107
108/// The canonical zero-copy block of work records.
109///
110/// One `WorkBatch` is the single currency through `get -> process -> send ->
111/// commit`. The `commit_tokens` are the INPUT source acks -- fired once after
112/// the WHOLE block is sent (at-least-once). `T` generalises the source ack
113/// (Kafka offset, HTTP responder, fetch cursor, ...).
114///
115/// `commit_tokens.len()` is intentionally decoupled from `records.len()`: a
116/// fan-out transform may grow or shrink the record count while the commit
117/// tokens stay equal to the input acks.
118#[derive(Debug)]
119pub struct WorkBatch<T: CommitToken> {
120    /// The work records in this block.
121    pub records: Vec<Record>,
122
123    /// Source acks for the whole block. Fired after the block is fully sent.
124    /// Length is NOT tied to `records.len()`.
125    pub commit_tokens: Vec<T>,
126
127    /// Inline-DLQ entries carried forward (no-silent-drop contract).
128    pub dlq_entries: Vec<FilteredDlqEntry>,
129}
130
131impl<T: CommitToken> WorkBatch<T> {
132    /// An empty batch (no records, no tokens, no DLQ entries).
133    #[must_use]
134    pub fn empty() -> Self {
135        Self {
136            records: Vec::new(),
137            commit_tokens: Vec::new(),
138            dlq_entries: Vec::new(),
139        }
140    }
141
142    /// A batch of records with no commit tokens and no DLQ entries.
143    ///
144    /// Useful for records that were generated downstream (e.g. a transform that
145    /// emits new records) and do not themselves carry a source ack.
146    #[must_use]
147    pub fn from_records(records: Vec<Record>) -> Self {
148        Self {
149            records,
150            commit_tokens: Vec::new(),
151            dlq_entries: Vec::new(),
152        }
153    }
154
155    /// A batch of records plus their source commit tokens (no DLQ entries).
156    #[must_use]
157    pub fn new(records: Vec<Record>, commit_tokens: Vec<T>) -> Self {
158        Self {
159            records,
160            commit_tokens,
161            dlq_entries: Vec::new(),
162        }
163    }
164
165    /// Attach inline-DLQ entries to this batch, consuming and returning it.
166    #[must_use]
167    pub fn with_dlq_entries(mut self, dlq_entries: Vec<FilteredDlqEntry>) -> Self {
168        self.dlq_entries = dlq_entries;
169        self
170    }
171
172    /// Whether the batch has no records (DLQ entries may still be present).
173    #[must_use]
174    pub fn is_empty(&self) -> bool {
175        self.records.is_empty()
176    }
177
178    /// Number of records.
179    #[must_use]
180    pub fn len(&self) -> usize {
181        self.records.len()
182    }
183
184    /// Number of records (explicit alias for [`WorkBatch::len`]).
185    #[must_use]
186    pub fn record_count(&self) -> usize {
187        self.records.len()
188    }
189
190    /// Total payload bytes across all records.
191    ///
192    /// Used by the byte-budget governor to size a block against a memory budget.
193    /// Saturating sum: a pathological block clamps at `usize::MAX` rather than
194    /// wrapping.
195    #[must_use]
196    pub fn total_payload_bytes(&self) -> usize {
197        self.records
198            .iter()
199            .map(|r| r.payload.len())
200            .fold(0usize, usize::saturating_add)
201    }
202
203    /// Transform the records while PRESERVING the commit tokens and DLQ entries.
204    ///
205    /// This is the safe shape for the future `process` handler: a transform may
206    /// grow, shrink, or rewrite the records (fan-out / fan-in), but the source
207    /// acks and inline-DLQ entries flow through untouched.
208    #[must_use]
209    pub fn map_records(mut self, f: impl FnOnce(Vec<Record>) -> Vec<Record>) -> Self {
210        self.records = f(self.records);
211        self
212    }
213
214    // ---- Zero-copy ingestion / framing helpers (Task 0.2) -----------------
215    //
216    // These slice ONE inbound `Bytes` blob into per-record views so the WHOLE
217    // batch shares ONE allocation: `record.payload = blob.slice(start..end)` is
218    // a refcounted view, never a payload copy. They do NOT parse / deserialise
219    // and do NOT re-serialise -- framing only. Commit tokens and DLQ entries are
220    // left EMPTY; the source attaches its acks afterwards.
221
222    /// One record holding the WHOLE blob (no framing).
223    ///
224    /// The single record's payload is the blob itself (zero-copy move of the
225    /// `Bytes` handle), `key` is `None`, `headers` empty, and the metadata
226    /// format is auto-detected via [`PayloadFormat::detect`].
227    #[must_use]
228    pub fn single(blob: Bytes) -> Self {
229        let format = PayloadFormat::detect(&blob);
230        let record = Record {
231            payload: blob,
232            key: None,
233            headers: Vec::new(),
234            metadata: RecordMeta {
235                timestamp_ms: None,
236                format,
237            },
238        };
239        Self {
240            records: vec![record],
241            commit_tokens: Vec::new(),
242            dlq_entries: Vec::new(),
243        }
244    }
245
246    /// Frame an NDJSON (newline-delimited JSON) blob into one record per line.
247    ///
248    /// Splits on `\n` using `memchr`. Each line is trimmed of a trailing `\r`
249    /// (so Windows CRLF endings work) and of surrounding ASCII whitespace; blank
250    /// or whitespace-only lines are skipped. Each surviving line becomes a
251    /// [`Record`] whose payload is a zero-copy `blob.slice(start..end)` view.
252    /// `format` is set to [`PayloadFormat::Json`].
253    ///
254    /// This is framing, not parsing -- it never fails. A line that is not valid
255    /// JSON still becomes a record; the downstream parser surfaces that.
256    #[must_use]
257    pub fn from_ndjson(blob: Bytes) -> Self {
258        let mut records = Vec::new();
259        let mut line_start = 0usize;
260        let bytes = blob.as_ref();
261
262        // Walk newline boundaries; the tail after the last `\n` is its own line.
263        for nl in memchr::memchr_iter(b'\n', bytes) {
264            Self::push_ndjson_line(&mut records, &blob, line_start, nl);
265            line_start = nl + 1;
266        }
267        if line_start < bytes.len() {
268            Self::push_ndjson_line(&mut records, &blob, line_start, bytes.len());
269        }
270
271        Self {
272            records,
273            commit_tokens: Vec::new(),
274            dlq_entries: Vec::new(),
275        }
276    }
277
278    /// Trim one NDJSON line `[start, end)` (end is the `\n` index or EOF) and,
279    /// if non-empty after trimming, push it as a zero-copy record.
280    fn push_ndjson_line(records: &mut Vec<Record>, blob: &Bytes, start: usize, mut end: usize) {
281        let bytes = blob.as_ref();
282        // Strip a single trailing CR (CRLF) before generic whitespace trimming.
283        if end > start && bytes[end - 1] == b'\r' {
284            end -= 1;
285        }
286        // Trim surrounding ASCII whitespace so the slice is the bare record.
287        while end > start && bytes[end - 1].is_ascii_whitespace() {
288            end -= 1;
289        }
290        let mut begin = start;
291        while begin < end && bytes[begin].is_ascii_whitespace() {
292            begin += 1;
293        }
294        if begin >= end {
295            return; // blank / whitespace-only line
296        }
297        records.push(Record {
298            payload: blob.slice(begin..end),
299            key: None,
300            headers: Vec::new(),
301            metadata: RecordMeta {
302                timestamp_ms: None,
303                format: PayloadFormat::Json,
304            },
305        });
306    }
307
308    /// Frame a top-level JSON array blob into one record per top-level element.
309    ///
310    /// Each element becomes a zero-copy `blob.slice(start..end)` view; element
311    /// VALUES are not parsed. The byte-level scanner tracks string state (so
312    /// `[`, `]`, `{`, `}` and `,` inside a string are literal), JSON escapes
313    /// (`\"`, `\\`), and container depth, so nested objects/arrays frame
314    /// correctly. An empty array `[]` yields zero records (`Ok`).
315    ///
316    /// # Errors
317    ///
318    /// Returns [`FramingError`] when the blob is not a well-framed top-level
319    /// array: missing opening `[`, unterminated array/string, an empty element
320    /// position (leading / trailing / doubled comma), an over-close, or trailing
321    /// non-whitespace after the closing `]`.
322    pub fn from_json_array(blob: Bytes) -> Result<Self, FramingError> {
323        let records = scan_json_array(&blob)?;
324        Ok(Self {
325            records,
326            commit_tokens: Vec::new(),
327            dlq_entries: Vec::new(),
328        })
329    }
330}
331
332/// Byte-level top-level JSON-array element scanner (framing, NOT parsing).
333///
334/// Returns one zero-copy [`Record`] per top-level element. See
335/// [`WorkBatch::from_json_array`] for the contract and error modes.
336fn scan_json_array(blob: &Bytes) -> Result<Vec<Record>, FramingError> {
337    let bytes = blob.as_ref();
338    let len = bytes.len();
339
340    // Skip leading whitespace; the first significant byte must be '['.
341    let mut i = 0usize;
342    while i < len && bytes[i].is_ascii_whitespace() {
343        i += 1;
344    }
345    if i >= len {
346        return Err(FramingError::NotAnArray("end of input".to_string()));
347    }
348    if bytes[i] != b'[' {
349        return Err(FramingError::NotAnArray(format!(
350            "byte {:#04x} ('{}')",
351            bytes[i], bytes[i] as char
352        )));
353    }
354    i += 1; // consume '['
355
356    let mut records: Vec<Record> = Vec::new();
357    // `expect_value` distinguishes the slot just after '[' or ',' (where a value
358    // or -- only after '[' -- a closing ']' may appear) from between elements.
359    let mut first_element = true;
360
361    loop {
362        // Skip whitespace before an element (or the closing ']').
363        while i < len && bytes[i].is_ascii_whitespace() {
364            i += 1;
365        }
366        if i >= len {
367            return Err(FramingError::UnexpectedEof);
368        }
369
370        if bytes[i] == b']' {
371            // Closing the array. Only legal here if the array is empty (we are
372            // still on the first element) -- otherwise we'd have consumed a ']'
373            // inside the element-scan loop below after a value.
374            if first_element {
375                i += 1;
376                return finish(blob, records, i);
377            }
378            // We reached here via the top of the loop after a ',', so a ']' now
379            // means a trailing comma: `[1, ]`.
380            return Err(FramingError::EmptyElement(i));
381        }
382        if bytes[i] == b',' {
383            // A ',' where a value is expected: leading or doubled comma.
384            return Err(FramingError::EmptyElement(i));
385        }
386
387        // Scan one element value, tracking string + escape + container depth.
388        let elem_start = i;
389        let mut depth: usize = 0;
390        let mut in_string = false;
391        let mut escaped = false;
392
393        let elem_end;
394        loop {
395            if i >= len {
396                return Err(FramingError::UnexpectedEof);
397            }
398            let c = bytes[i];
399
400            if in_string {
401                if escaped {
402                    escaped = false;
403                } else if c == b'\\' {
404                    escaped = true;
405                } else if c == b'"' {
406                    in_string = false;
407                }
408                i += 1;
409                continue;
410            }
411
412            match c {
413                b'"' => {
414                    in_string = true;
415                    i += 1;
416                }
417                b'{' | b'[' => {
418                    depth += 1;
419                    i += 1;
420                }
421                b'}' => {
422                    // A '}' at depth 0 has no matching opener inside this element.
423                    depth = depth.checked_sub(1).ok_or(FramingError::Unbalanced(i))?;
424                    i += 1;
425                }
426                b']' => {
427                    if depth == 0 {
428                        // This ']' closes the ARRAY -- the element ends here.
429                        elem_end = i;
430                        i += 1; // consume ']'
431                        push_element(blob, &mut records, elem_start, elem_end);
432                        return finish(blob, records, i);
433                    }
434                    depth -= 1;
435                    i += 1;
436                }
437                b',' if depth == 0 => {
438                    // Top-level separator -- the element ends just before it.
439                    elem_end = i;
440                    i += 1; // consume ','
441                    break;
442                }
443                _ => {
444                    i += 1;
445                }
446            }
447        }
448
449        push_element(blob, &mut records, elem_start, elem_end);
450        first_element = false;
451    }
452}
453
454/// Trim trailing whitespace from `[start, end)` and push the zero-copy slice.
455///
456/// Leading whitespace was already skipped by the caller, so only the trailing
457/// edge (between the value and the `,`/`]`) needs trimming.
458fn push_element(blob: &Bytes, records: &mut Vec<Record>, start: usize, end: usize) {
459    let bytes = blob.as_ref();
460    let mut e = end;
461    while e > start && bytes[e - 1].is_ascii_whitespace() {
462        e -= 1;
463    }
464    records.push(Record {
465        payload: blob.slice(start..e),
466        key: None,
467        headers: Vec::new(),
468        metadata: RecordMeta {
469            timestamp_ms: None,
470            format: PayloadFormat::Json,
471        },
472    });
473}
474
475/// After the closing `]`, only trailing whitespace is permitted. Build the
476/// records vector into a `Vec<Record>` result, or error on trailing garbage.
477fn finish(blob: &Bytes, records: Vec<Record>, mut i: usize) -> Result<Vec<Record>, FramingError> {
478    let bytes = blob.as_ref();
479    let len = bytes.len();
480    while i < len && bytes[i].is_ascii_whitespace() {
481        i += 1;
482    }
483    if i < len {
484        return Err(FramingError::TrailingGarbage(i));
485    }
486    Ok(records)
487}
488
489impl<T: CommitToken> From<crate::Message<T>> for WorkBatch<T> {
490    /// Collapse a single [`crate::Message`] into a one-record `WorkBatch`.
491    ///
492    /// The message payload is already [`Bytes`] -- this is a move, not a copy.
493    fn from(msg: crate::Message<T>) -> Self {
494        let record = Record {
495            payload: msg.payload,
496            key: msg.key,
497            headers: Vec::new(),
498            metadata: RecordMeta {
499                timestamp_ms: msg.timestamp_ms,
500                format: msg.format,
501            },
502        };
503        Self {
504            records: vec![record],
505            commit_tokens: vec![msg.token],
506            dlq_entries: Vec::new(),
507        }
508    }
509}
510
511impl<T: CommitToken> From<crate::transport::traits::RecvBatch<T>> for WorkBatch<T> {
512    /// Collapse a [`RecvBatch`](crate::transport::traits::RecvBatch) into a
513    /// `WorkBatch`.
514    ///
515    /// Each `Message<T>` becomes a [`Record`]; its `token` is collected into
516    /// `commit_tokens` (preserving order); `dlq_entries` carry straight across.
517    /// `filtered_tokens` (acks of records removed by inbound drop/dlq filters)
518    /// are appended to `commit_tokens` so the block commit advances the source
519    /// past them -- the commit-token count is deliberately NOT tied to
520    /// `records.len()`. Payloads are already [`Bytes`] -- each is a move.
521    fn from(batch: crate::transport::traits::RecvBatch<T>) -> Self {
522        let token_capacity = batch.messages.len() + batch.filtered_tokens.len();
523        let mut records = Vec::with_capacity(batch.messages.len());
524        let mut commit_tokens = Vec::with_capacity(token_capacity);
525        for msg in batch.messages {
526            commit_tokens.push(msg.token);
527            records.push(Record {
528                payload: msg.payload,
529                key: msg.key,
530                headers: Vec::new(),
531                metadata: RecordMeta {
532                    timestamp_ms: msg.timestamp_ms,
533                    format: msg.format,
534                },
535            });
536        }
537        // Acks of filtered-out records: no Record, but the source still needs
538        // committing once the block (incl. DLQ routing) is handled.
539        commit_tokens.extend(batch.filtered_tokens);
540        Self {
541            records,
542            commit_tokens,
543            dlq_entries: batch.dlq_entries,
544        }
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::Message;
552    use crate::transport::traits::RecvBatch;
553
554    /// Minimal commit token for tests.
555    #[derive(Debug, Clone, PartialEq, Eq)]
556    struct TestToken(u64);
557
558    impl std::fmt::Display for TestToken {
559        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560            write!(f, "tok-{}", self.0)
561        }
562    }
563
564    impl CommitToken for TestToken {}
565
566    fn record(payload: &'static [u8]) -> Record {
567        Record {
568            payload: Bytes::from_static(payload),
569            key: Some(Arc::from("events")),
570            headers: vec![("h".to_string(), b"v".to_vec())],
571            metadata: RecordMeta {
572                timestamp_ms: Some(42),
573                format: PayloadFormat::Json,
574            },
575        }
576    }
577
578    #[test]
579    fn empty_has_no_records_tokens_or_dlq() {
580        let b = WorkBatch::<TestToken>::empty();
581        assert!(b.is_empty());
582        assert_eq!(b.len(), 0);
583        assert_eq!(b.record_count(), 0);
584        assert!(b.commit_tokens.is_empty());
585        assert!(b.dlq_entries.is_empty());
586        assert_eq!(b.total_payload_bytes(), 0);
587    }
588
589    #[test]
590    fn from_records_has_no_tokens() {
591        let b = WorkBatch::<TestToken>::from_records(vec![record(b"{}"), record(b"[]")]);
592        assert_eq!(b.len(), 2);
593        assert!(!b.is_empty());
594        assert!(b.commit_tokens.is_empty());
595    }
596
597    #[test]
598    fn new_carries_records_and_tokens() {
599        let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(1), TestToken(2)]);
600        assert_eq!(b.record_count(), 1);
601        assert_eq!(b.commit_tokens.len(), 2);
602    }
603
604    #[test]
605    fn with_dlq_entries_attaches_entries() {
606        let entry = FilteredDlqEntry {
607            payload: b"bad".to_vec(),
608            key: None,
609            reason: "filter".to_string(),
610        };
611        let b =
612            WorkBatch::<TestToken>::from_records(vec![record(b"{}")]).with_dlq_entries(vec![entry]);
613        assert_eq!(b.dlq_entries.len(), 1);
614        assert_eq!(b.dlq_entries[0].reason, "filter");
615    }
616
617    #[test]
618    fn total_payload_bytes_sums_payloads() {
619        let b = WorkBatch::<TestToken>::from_records(vec![
620            record(b"abc"), // 3
621            record(b"de"),  // 2
622            record(b"f"),   // 1
623        ]);
624        assert_eq!(b.total_payload_bytes(), 6);
625    }
626
627    #[test]
628    fn map_records_preserves_tokens_and_dlq() {
629        let entry = FilteredDlqEntry {
630            payload: b"bad".to_vec(),
631            key: None,
632            reason: "filter".to_string(),
633        };
634        let b =
635            WorkBatch::new(vec![record(b"{}")], vec![TestToken(7)]).with_dlq_entries(vec![entry]);
636
637        let b = b.map_records(|recs| {
638            // identity-ish transform that mutates payload but keeps count
639            recs.into_iter()
640                .map(|mut r| {
641                    r.payload = Bytes::from_static(b"changed");
642                    r
643                })
644                .collect()
645        });
646
647        assert_eq!(b.record_count(), 1);
648        assert_eq!(b.records[0].payload.as_ref(), b"changed");
649        // tokens + dlq preserved across the transform
650        assert_eq!(b.commit_tokens, vec![TestToken(7)]);
651        assert_eq!(b.dlq_entries.len(), 1);
652    }
653
654    #[test]
655    fn map_records_fan_out_keeps_tokens_intact() {
656        // One input record + one source ack...
657        let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(99)]);
658        assert_eq!(b.record_count(), 1);
659        assert_eq!(b.commit_tokens.len(), 1);
660
661        // ...fans out to three records, but the source ack must stay singular.
662        let b = b.map_records(|recs| {
663            let mut out = Vec::new();
664            for r in recs {
665                out.push(r.clone());
666                out.push(r.clone());
667                out.push(r);
668            }
669            out
670        });
671
672        assert_eq!(b.record_count(), 3);
673        assert_eq!(b.commit_tokens, vec![TestToken(99)]);
674    }
675
676    #[test]
677    fn from_message_yields_single_record_batch() {
678        let msg = Message::new(
679            Some(Arc::from("topic")),
680            b"{\"a\":1}".to_vec(),
681            TestToken(5),
682            Some(11),
683        );
684        let b: WorkBatch<TestToken> = msg.into();
685
686        assert_eq!(b.record_count(), 1);
687        assert_eq!(b.commit_tokens, vec![TestToken(5)]);
688        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
689        assert_eq!(b.records[0].key.as_deref(), Some("topic"));
690        assert_eq!(b.records[0].metadata.timestamp_ms, Some(11));
691        assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
692        assert!(b.dlq_entries.is_empty());
693    }
694
695    #[test]
696    fn from_recv_batch_collapses_and_preserves_order() {
697        let entry = FilteredDlqEntry {
698            payload: b"bad".to_vec(),
699            key: None,
700            reason: "drop-it".to_string(),
701        };
702        let recv = RecvBatch {
703            messages: vec![
704                Message::new(Some(Arc::from("a")), b"{}".to_vec(), TestToken(1), None),
705                Message::new(Some(Arc::from("b")), b"[]".to_vec(), TestToken(2), None),
706                Message::new(None, b"{}".to_vec(), TestToken(3), None),
707            ],
708            dlq_entries: vec![entry],
709            filtered_tokens: Vec::new(),
710        };
711
712        let b: WorkBatch<TestToken> = recv.into();
713
714        assert_eq!(b.record_count(), 3);
715        // commit tokens preserved in order
716        assert_eq!(
717            b.commit_tokens,
718            vec![TestToken(1), TestToken(2), TestToken(3)]
719        );
720        // dlq entries carried straight across
721        assert_eq!(b.dlq_entries.len(), 1);
722        assert_eq!(b.dlq_entries[0].reason, "drop-it");
723        // record payloads + keys preserved
724        assert_eq!(b.records[0].key.as_deref(), Some("a"));
725        assert_eq!(b.records[2].key, None);
726    }
727
728    #[test]
729    fn from_recv_batch_appends_filtered_tokens_to_commit_tokens() {
730        // Acks of filtered-out records (no passing Record) must still reach
731        // commit_tokens so the block commit advances the source past them.
732        let recv = RecvBatch {
733            messages: vec![Message::new(
734                Some(Arc::from("a")),
735                b"{}".to_vec(),
736                TestToken(1),
737                None,
738            )],
739            dlq_entries: Vec::new(),
740            filtered_tokens: vec![TestToken(7), TestToken(8)],
741        };
742        let b: WorkBatch<TestToken> = recv.into();
743        // One passing record, but THREE acks to commit (1 passing + 2 filtered).
744        assert_eq!(b.record_count(), 1);
745        assert_eq!(
746            b.commit_tokens,
747            vec![TestToken(1), TestToken(7), TestToken(8)]
748        );
749    }
750
751    #[test]
752    fn from_recv_batch_all_filtered_is_records_empty_tokens_present() {
753        // Every record filtered: no Records, but the acks survive so the source
754        // is not stalled behind an all-filtered stretch.
755        let recv: RecvBatch<TestToken> = RecvBatch {
756            messages: Vec::new(),
757            dlq_entries: Vec::new(),
758            filtered_tokens: vec![TestToken(5), TestToken(6)],
759        };
760        let b: WorkBatch<TestToken> = recv.into();
761        assert_eq!(b.record_count(), 0);
762        assert_eq!(b.commit_tokens, vec![TestToken(5), TestToken(6)]);
763    }
764
765    #[test]
766    fn from_message_moves_payload_without_copying() {
767        // Build a heap-backed payload and remember its allocation pointer.
768        // The vec is moved via `impl Into<Bytes>` -- no copy at Message::new.
769        let payload = b"zero-copy-please".to_vec();
770        let payload_ptr = payload.as_ptr();
771
772        let msg = Message::new(Some(Arc::from("topic")), payload, TestToken(1), None);
773        // Message.payload is Bytes built from the vec; same allocation.
774        assert_eq!(msg.payload.as_ptr(), payload_ptr);
775
776        let wb: WorkBatch<TestToken> = msg.into();
777
778        // From<Message> moves the Bytes handle -- the record payload must point
779        // at the SAME allocation. A regression to copy_from_slice would fail here.
780        assert_eq!(wb.records[0].payload.as_ptr(), payload_ptr);
781    }
782
783    /// Task 0.4.1 capability test: `Message::new` with an already-allocated
784    /// `Bytes` payload travels through `WorkBatch` with ZERO copies.
785    ///
786    /// This proves the headline win of the migration: an upstream `Bytes` (e.g.
787    /// the axum body) reaches the `Record` without ever calling
788    /// `copy_from_slice`. The allocation pointer must be identical at every hop.
789    #[test]
790    fn bytes_payload_travels_zero_copy_through_workbatch() {
791        // Simulate a caller that already holds a `Bytes` (e.g. the axum ingest
792        // handler after the body.to_vec() removal).
793        let raw = b"bytes-zero-copy-payload-test".to_vec();
794        let src: Bytes = raw.into();
795        let src_ptr = src.as_ptr();
796
797        // Pass the Bytes directly into Message::new (accepted via impl Into<Bytes>).
798        let msg = Message::new(Some(Arc::from("k")), src, TestToken(42), Some(99));
799        // The Bytes handle is MOVED -- same backing allocation.
800        assert_eq!(msg.payload.as_ptr(), src_ptr, "copy at Message::new");
801
802        // Convert to WorkBatch: From<Message> moves msg.payload (already Bytes).
803        let wb: WorkBatch<TestToken> = msg.into();
804        assert_eq!(
805            wb.records[0].payload.as_ptr(),
806            src_ptr,
807            "copy at From<Message> for WorkBatch"
808        );
809
810        // Clone the record payload (fan-out): Bytes::clone is a refcount bump.
811        let cloned = wb.records[0].payload.clone();
812        assert_eq!(
813            cloned.as_ptr(),
814            src_ptr,
815            "clone allocated a new buffer instead of bumping refcount"
816        );
817    }
818
819    #[test]
820    fn from_recv_batch_moves_payloads_without_copying() {
821        let p0 = b"first-buffer".to_vec();
822        let p1 = b"second-buffer".to_vec();
823        let p0_ptr = p0.as_ptr();
824        let p1_ptr = p1.as_ptr();
825
826        let recv = RecvBatch {
827            messages: vec![
828                Message::new(Some(Arc::from("a")), p0, TestToken(1), None),
829                Message::new(Some(Arc::from("b")), p1, TestToken(2), None),
830            ],
831            dlq_entries: Vec::new(),
832            filtered_tokens: Vec::new(),
833        };
834
835        let wb: WorkBatch<TestToken> = recv.into();
836
837        // Each payload Vec is MOVED into Bytes -- same allocations, no copy.
838        assert_eq!(wb.records[0].payload.as_ptr(), p0_ptr);
839        assert_eq!(wb.records[1].payload.as_ptr(), p1_ptr);
840    }
841
842    #[test]
843    fn payload_is_bytes_and_clone_is_zero_copy() {
844        // Bytes::clone shares the same underlying allocation (refcount bump).
845        let r = record(b"shared-buffer");
846        let p1 = r.payload.clone();
847        let r2 = r.clone();
848        // same backing buffer pointer => zero-copy clone
849        assert_eq!(p1.as_ptr(), r2.payload.as_ptr());
850        assert_eq!(r2.payload.as_ref(), b"shared-buffer");
851    }
852
853    // ---- Task 0.2: zero-copy framing helpers -------------------------------
854
855    /// Assert that `slice` is a zero-copy view INTO `blob` (a refcounted slice,
856    /// not a fresh allocation): its byte range must fall within `blob`'s range.
857    fn assert_within(slice: &Bytes, blob: &Bytes) {
858        let blob_start = blob.as_ptr() as usize;
859        let blob_end = blob_start + blob.len();
860        let slice_start = slice.as_ptr() as usize;
861        let slice_end = slice_start + slice.len();
862        assert!(
863            slice_start >= blob_start && slice_end <= blob_end,
864            "slice [{slice_start:#x}, {slice_end:#x}) is not within blob \
865             [{blob_start:#x}, {blob_end:#x}) -- it is a copy, not a view"
866        );
867    }
868
869    // --- single() -----------------------------------------------------------
870
871    #[test]
872    fn single_holds_whole_blob_as_one_record() {
873        let blob = Bytes::from_static(b"{\"a\":1}");
874        let b = WorkBatch::<TestToken>::single(blob.clone());
875        assert_eq!(b.record_count(), 1);
876        assert!(b.commit_tokens.is_empty());
877        assert!(b.dlq_entries.is_empty());
878        assert_eq!(b.records[0].payload, blob);
879        assert_eq!(b.records[0].key, None);
880        assert!(b.records[0].headers.is_empty());
881        // whole blob is the same allocation (zero-copy)
882        assert_eq!(b.records[0].payload.as_ptr(), blob.as_ptr());
883    }
884
885    #[test]
886    fn single_detects_format_json_object() {
887        let b = WorkBatch::<TestToken>::single(Bytes::from_static(b"{\"a\":1}"));
888        assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
889    }
890
891    #[test]
892    fn single_detects_format_msgpack() {
893        // fixmap with one entry (0x81) -> MsgPack
894        let b = WorkBatch::<TestToken>::single(Bytes::from_static(&[0x81, 0xa1, 0x61]));
895        assert_eq!(b.records[0].metadata.format, PayloadFormat::MsgPack);
896    }
897
898    // --- from_ndjson() ------------------------------------------------------
899
900    #[test]
901    fn ndjson_splits_lines_into_records() {
902        let blob = Bytes::from_static(b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}");
903        let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
904        assert_eq!(b.record_count(), 3);
905        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
906        assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
907        assert_eq!(b.records[2].payload.as_ref(), b"{\"c\":3}");
908        for r in &b.records {
909            assert_eq!(r.metadata.format, PayloadFormat::Json);
910            assert_within(&r.payload, &blob);
911        }
912        assert!(b.commit_tokens.is_empty());
913    }
914
915    #[test]
916    fn ndjson_trims_trailing_carriage_return() {
917        // Windows CRLF line endings.
918        let blob = Bytes::from_static(b"{\"a\":1}\r\n{\"b\":2}\r\n");
919        let b = WorkBatch::<TestToken>::from_ndjson(blob);
920        assert_eq!(b.record_count(), 2);
921        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
922        assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
923    }
924
925    #[test]
926    fn ndjson_skips_blank_and_whitespace_only_lines() {
927        let blob = Bytes::from_static(b"{\"a\":1}\n\n   \n{\"b\":2}\n\t\r\n");
928        let b = WorkBatch::<TestToken>::from_ndjson(blob);
929        assert_eq!(b.record_count(), 2);
930        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
931        assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
932    }
933
934    #[test]
935    fn ndjson_empty_blob_yields_no_records() {
936        let b = WorkBatch::<TestToken>::from_ndjson(Bytes::new());
937        assert_eq!(b.record_count(), 0);
938    }
939
940    #[test]
941    fn ndjson_single_line_no_newline() {
942        let blob = Bytes::from_static(b"{\"only\":true}");
943        let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
944        assert_eq!(b.record_count(), 1);
945        assert_eq!(b.records[0].payload.as_ref(), b"{\"only\":true}");
946        assert_within(&b.records[0].payload, &blob);
947    }
948
949    #[test]
950    fn ndjson_preserves_inner_whitespace_but_trims_edges() {
951        // Leading/trailing spaces on a line are trimmed for framing.
952        let blob = Bytes::from_static(b"  {\"a\": 1}  \n");
953        let b = WorkBatch::<TestToken>::from_ndjson(blob);
954        assert_eq!(b.record_count(), 1);
955        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\": 1}");
956    }
957
958    // --- from_json_array() --------------------------------------------------
959
960    #[test]
961    fn json_array_empty_yields_no_records() {
962        let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[]")).unwrap();
963        assert_eq!(b.record_count(), 0);
964        assert!(b.commit_tokens.is_empty());
965        assert!(b.dlq_entries.is_empty());
966    }
967
968    #[test]
969    fn json_array_empty_with_whitespace() {
970        let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"  [  ]  ")).unwrap();
971        assert_eq!(b.record_count(), 0);
972    }
973
974    #[test]
975    fn json_array_single_element() {
976        let blob = Bytes::from_static(b"[{\"a\":1}]");
977        let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
978        assert_eq!(b.record_count(), 1);
979        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
980        assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
981        assert_within(&b.records[0].payload, &blob);
982    }
983
984    #[test]
985    fn json_array_multiple_scalar_elements() {
986        let blob = Bytes::from_static(b"[1, 2, 3]");
987        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
988        assert_eq!(b.record_count(), 3);
989        assert_eq!(b.records[0].payload.as_ref(), b"1");
990        assert_eq!(b.records[1].payload.as_ref(), b"2");
991        assert_eq!(b.records[2].payload.as_ref(), b"3");
992    }
993
994    #[test]
995    fn json_array_trims_whitespace_around_elements() {
996        let blob = Bytes::from_static(b"[  {\"a\":1}  ,  {\"b\":2}  ]");
997        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
998        assert_eq!(b.record_count(), 2);
999        assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
1000        assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
1001    }
1002
1003    #[test]
1004    fn json_array_leading_trailing_whitespace_and_newlines() {
1005        let blob = Bytes::from_static(b"\n\t [\n  1,\n  2\n] \n");
1006        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1007        assert_eq!(b.record_count(), 2);
1008        assert_eq!(b.records[0].payload.as_ref(), b"1");
1009        assert_eq!(b.records[1].payload.as_ref(), b"2");
1010    }
1011
1012    #[test]
1013    fn json_array_string_with_brackets_and_commas() {
1014        // The commas/brackets INSIDE the string must not split or change depth.
1015        let blob = Bytes::from_static(b"[\"a,b[c]{d}\", \"plain\"]");
1016        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1017        assert_eq!(b.record_count(), 2);
1018        assert_eq!(b.records[0].payload.as_ref(), b"\"a,b[c]{d}\"");
1019        assert_eq!(b.records[1].payload.as_ref(), b"\"plain\"");
1020    }
1021
1022    #[test]
1023    fn json_array_string_with_escaped_quote() {
1024        // `\"` does not terminate the string.
1025        let blob = Bytes::from_static(b"[\"he said \\\"hi\\\", then left\", 7]");
1026        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1027        assert_eq!(b.record_count(), 2);
1028        assert_eq!(
1029            b.records[0].payload.as_ref(),
1030            b"\"he said \\\"hi\\\", then left\""
1031        );
1032        assert_eq!(b.records[1].payload.as_ref(), b"7");
1033    }
1034
1035    #[test]
1036    fn json_array_string_with_escaped_backslash_then_closing_quote() {
1037        // `\\` is an escaped backslash; the following `"` DOES close the string.
1038        // Element 0 is the string "path\\" and element 1 is the number 1.
1039        let blob = Bytes::from_static(b"[\"path\\\\\", 1]");
1040        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1041        assert_eq!(b.record_count(), 2);
1042        assert_eq!(b.records[0].payload.as_ref(), b"\"path\\\\\"");
1043        assert_eq!(b.records[1].payload.as_ref(), b"1");
1044    }
1045
1046    #[test]
1047    fn json_array_nested_arrays_and_objects() {
1048        let blob = Bytes::from_static(b"[[1,2],[3],{\"k\":[4,5]}]");
1049        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1050        assert_eq!(b.record_count(), 3);
1051        assert_eq!(b.records[0].payload.as_ref(), b"[1,2]");
1052        assert_eq!(b.records[1].payload.as_ref(), b"[3]");
1053        assert_eq!(b.records[2].payload.as_ref(), b"{\"k\":[4,5]}");
1054    }
1055
1056    #[test]
1057    fn json_array_deeply_nested_object_one_element() {
1058        let blob = Bytes::from_static(b"[{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}]");
1059        let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1060        assert_eq!(b.record_count(), 1);
1061        assert_eq!(
1062            b.records[0].payload.as_ref(),
1063            b"{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}"
1064        );
1065    }
1066
1067    #[test]
1068    fn json_array_unicode_in_strings() {
1069        // Multi-byte UTF-8 inside an element string must pass through verbatim.
1070        let blob = Bytes::from(r#"["café", "naïve"]"#.as_bytes().to_vec());
1071        let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
1072        assert_eq!(b.record_count(), 2);
1073        assert_eq!(b.records[0].payload.as_ref(), "\"café\"".as_bytes());
1074        assert_eq!(b.records[1].payload.as_ref(), "\"naïve\"".as_bytes());
1075        assert_within(&b.records[1].payload, &blob);
1076    }
1077
1078    #[test]
1079    fn json_array_zero_copy_views_into_blob() {
1080        let blob = Bytes::from_static(b"[{\"a\":1}, {\"b\":2}, {\"c\":3}]");
1081        let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
1082        assert_eq!(b.record_count(), 3);
1083        for r in &b.records {
1084            assert_within(&r.payload, &blob);
1085        }
1086    }
1087
1088    // malformed inputs -> Err
1089
1090    #[test]
1091    fn json_array_no_opening_bracket_errors() {
1092        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"{\"a\":1}")).is_err());
1093    }
1094
1095    #[test]
1096    fn json_array_empty_blob_errors() {
1097        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::new()).is_err());
1098    }
1099
1100    #[test]
1101    fn json_array_whitespace_only_errors() {
1102        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"   \n\t ")).is_err());
1103    }
1104
1105    #[test]
1106    fn json_array_unterminated_errors() {
1107        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2")).is_err());
1108    }
1109
1110    #[test]
1111    fn json_array_unterminated_string_errors() {
1112        assert!(
1113            WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[\"unclosed]")).is_err()
1114        );
1115    }
1116
1117    #[test]
1118    fn json_array_trailing_garbage_errors() {
1119        assert!(
1120            WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2] junk")).is_err()
1121        );
1122    }
1123
1124    #[test]
1125    fn json_array_trailing_comma_errors() {
1126        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2, ]")).is_err());
1127    }
1128
1129    #[test]
1130    fn json_array_leading_comma_errors() {
1131        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[, 1]")).is_err());
1132    }
1133
1134    #[test]
1135    fn json_array_double_comma_errors() {
1136        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1,, 2]")).is_err());
1137    }
1138
1139    #[test]
1140    fn json_array_only_open_bracket_errors() {
1141        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[")).is_err());
1142    }
1143
1144    #[test]
1145    fn json_array_unbalanced_extra_close_errors() {
1146        // A nested structure that closes too many times.
1147        assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1]]")).is_err());
1148    }
1149
1150    #[test]
1151    fn framing_error_is_displayable() {
1152        let err = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"nope")).unwrap_err();
1153        // thiserror Display should produce a non-empty, informative message.
1154        assert!(!err.to_string().is_empty());
1155    }
1156}