use super::filter::FilteredDlqEntry;
use super::traits::CommitToken;
use super::types::PayloadFormat;
use bytes::Bytes;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error, PartialEq, Eq)]
pub enum FramingError {
#[error("json-array framing: expected opening '[', found {0}")]
NotAnArray(String),
#[error("json-array framing: unexpected end of input (unterminated array or string)")]
UnexpectedEof,
#[error("json-array framing: empty element at byte offset {0} (stray comma)")]
EmptyElement(usize),
#[error("json-array framing: unbalanced closing bracket at byte offset {0}")]
Unbalanced(usize),
#[error("json-array framing: trailing garbage after closing ']' at byte offset {0}")]
TrailingGarbage(usize),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RecordMeta {
pub timestamp_ms: Option<i64>,
pub format: PayloadFormat,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Record {
pub payload: Bytes,
pub key: Option<Arc<str>>,
pub headers: Vec<(String, Vec<u8>)>,
pub metadata: RecordMeta,
}
#[derive(Debug)]
pub struct WorkBatch<T: CommitToken> {
pub records: Vec<Record>,
pub commit_tokens: Vec<T>,
pub dlq_entries: Vec<FilteredDlqEntry>,
}
impl<T: CommitToken> WorkBatch<T> {
#[must_use]
pub fn empty() -> Self {
Self {
records: Vec::new(),
commit_tokens: Vec::new(),
dlq_entries: Vec::new(),
}
}
#[must_use]
pub fn from_records(records: Vec<Record>) -> Self {
Self {
records,
commit_tokens: Vec::new(),
dlq_entries: Vec::new(),
}
}
#[must_use]
pub fn new(records: Vec<Record>, commit_tokens: Vec<T>) -> Self {
Self {
records,
commit_tokens,
dlq_entries: Vec::new(),
}
}
#[must_use]
pub fn with_dlq_entries(mut self, dlq_entries: Vec<FilteredDlqEntry>) -> Self {
self.dlq_entries = dlq_entries;
self
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.records.len()
}
#[must_use]
pub fn record_count(&self) -> usize {
self.records.len()
}
#[must_use]
pub fn total_payload_bytes(&self) -> usize {
self.records
.iter()
.map(|r| r.payload.len())
.fold(0usize, usize::saturating_add)
}
#[must_use]
pub fn map_records(mut self, f: impl FnOnce(Vec<Record>) -> Vec<Record>) -> Self {
self.records = f(self.records);
self
}
#[must_use]
pub fn single(blob: Bytes) -> Self {
let format = PayloadFormat::detect(&blob);
let record = Record {
payload: blob,
key: None,
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format,
},
};
Self {
records: vec![record],
commit_tokens: Vec::new(),
dlq_entries: Vec::new(),
}
}
#[must_use]
pub fn from_ndjson(blob: Bytes) -> Self {
let mut records = Vec::new();
let mut line_start = 0usize;
let bytes = blob.as_ref();
for nl in memchr::memchr_iter(b'\n', bytes) {
Self::push_ndjson_line(&mut records, &blob, line_start, nl);
line_start = nl + 1;
}
if line_start < bytes.len() {
Self::push_ndjson_line(&mut records, &blob, line_start, bytes.len());
}
Self {
records,
commit_tokens: Vec::new(),
dlq_entries: Vec::new(),
}
}
fn push_ndjson_line(records: &mut Vec<Record>, blob: &Bytes, start: usize, mut end: usize) {
let bytes = blob.as_ref();
if end > start && bytes[end - 1] == b'\r' {
end -= 1;
}
while end > start && bytes[end - 1].is_ascii_whitespace() {
end -= 1;
}
let mut begin = start;
while begin < end && bytes[begin].is_ascii_whitespace() {
begin += 1;
}
if begin >= end {
return; }
records.push(Record {
payload: blob.slice(begin..end),
key: None,
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
});
}
pub fn from_json_array(blob: Bytes) -> Result<Self, FramingError> {
let records = scan_json_array(&blob)?;
Ok(Self {
records,
commit_tokens: Vec::new(),
dlq_entries: Vec::new(),
})
}
}
fn scan_json_array(blob: &Bytes) -> Result<Vec<Record>, FramingError> {
let bytes = blob.as_ref();
let len = bytes.len();
let mut i = 0usize;
while i < len && bytes[i].is_ascii_whitespace() {
i += 1;
}
if i >= len {
return Err(FramingError::NotAnArray("end of input".to_string()));
}
if bytes[i] != b'[' {
return Err(FramingError::NotAnArray(format!(
"byte {:#04x} ('{}')",
bytes[i], bytes[i] as char
)));
}
i += 1;
let mut records: Vec<Record> = Vec::new();
let mut first_element = true;
loop {
while i < len && bytes[i].is_ascii_whitespace() {
i += 1;
}
if i >= len {
return Err(FramingError::UnexpectedEof);
}
if bytes[i] == b']' {
if first_element {
i += 1;
return finish(blob, records, i);
}
return Err(FramingError::EmptyElement(i));
}
if bytes[i] == b',' {
return Err(FramingError::EmptyElement(i));
}
let elem_start = i;
let mut depth: usize = 0;
let mut in_string = false;
let mut escaped = false;
let elem_end;
loop {
if i >= len {
return Err(FramingError::UnexpectedEof);
}
let c = bytes[i];
if in_string {
if escaped {
escaped = false;
} else if c == b'\\' {
escaped = true;
} else if c == b'"' {
in_string = false;
}
i += 1;
continue;
}
match c {
b'"' => {
in_string = true;
i += 1;
}
b'{' | b'[' => {
depth += 1;
i += 1;
}
b'}' => {
depth = depth.checked_sub(1).ok_or(FramingError::Unbalanced(i))?;
i += 1;
}
b']' => {
if depth == 0 {
elem_end = i;
i += 1; push_element(blob, &mut records, elem_start, elem_end);
return finish(blob, records, i);
}
depth -= 1;
i += 1;
}
b',' if depth == 0 => {
elem_end = i;
i += 1; break;
}
_ => {
i += 1;
}
}
}
push_element(blob, &mut records, elem_start, elem_end);
first_element = false;
}
}
fn push_element(blob: &Bytes, records: &mut Vec<Record>, start: usize, end: usize) {
let bytes = blob.as_ref();
let mut e = end;
while e > start && bytes[e - 1].is_ascii_whitespace() {
e -= 1;
}
records.push(Record {
payload: blob.slice(start..e),
key: None,
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
});
}
fn finish(blob: &Bytes, records: Vec<Record>, mut i: usize) -> Result<Vec<Record>, FramingError> {
let bytes = blob.as_ref();
let len = bytes.len();
while i < len && bytes[i].is_ascii_whitespace() {
i += 1;
}
if i < len {
return Err(FramingError::TrailingGarbage(i));
}
Ok(records)
}
impl<T: CommitToken> From<crate::Message<T>> for WorkBatch<T> {
fn from(msg: crate::Message<T>) -> Self {
let record = Record {
payload: msg.payload,
key: msg.key,
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: msg.timestamp_ms,
format: msg.format,
},
};
Self {
records: vec![record],
commit_tokens: vec![msg.token],
dlq_entries: Vec::new(),
}
}
}
impl<T: CommitToken> From<crate::transport::traits::RecvBatch<T>> for WorkBatch<T> {
fn from(batch: crate::transport::traits::RecvBatch<T>) -> Self {
let mut records = Vec::with_capacity(batch.messages.len());
let mut commit_tokens = Vec::with_capacity(batch.messages.len());
for msg in batch.messages {
commit_tokens.push(msg.token);
records.push(Record {
payload: msg.payload,
key: msg.key,
headers: Vec::new(),
metadata: RecordMeta {
timestamp_ms: msg.timestamp_ms,
format: msg.format,
},
});
}
Self {
records,
commit_tokens,
dlq_entries: batch.dlq_entries,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Message;
use crate::transport::traits::RecvBatch;
#[derive(Debug, Clone, PartialEq, Eq)]
struct TestToken(u64);
impl std::fmt::Display for TestToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "tok-{}", self.0)
}
}
impl CommitToken for TestToken {}
fn record(payload: &'static [u8]) -> Record {
Record {
payload: Bytes::from_static(payload),
key: Some(Arc::from("events")),
headers: vec![("h".to_string(), b"v".to_vec())],
metadata: RecordMeta {
timestamp_ms: Some(42),
format: PayloadFormat::Json,
},
}
}
#[test]
fn empty_has_no_records_tokens_or_dlq() {
let b = WorkBatch::<TestToken>::empty();
assert!(b.is_empty());
assert_eq!(b.len(), 0);
assert_eq!(b.record_count(), 0);
assert!(b.commit_tokens.is_empty());
assert!(b.dlq_entries.is_empty());
assert_eq!(b.total_payload_bytes(), 0);
}
#[test]
fn from_records_has_no_tokens() {
let b = WorkBatch::<TestToken>::from_records(vec![record(b"{}"), record(b"[]")]);
assert_eq!(b.len(), 2);
assert!(!b.is_empty());
assert!(b.commit_tokens.is_empty());
}
#[test]
fn new_carries_records_and_tokens() {
let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(1), TestToken(2)]);
assert_eq!(b.record_count(), 1);
assert_eq!(b.commit_tokens.len(), 2);
}
#[test]
fn with_dlq_entries_attaches_entries() {
let entry = FilteredDlqEntry {
payload: b"bad".to_vec(),
key: None,
reason: "filter".to_string(),
};
let b =
WorkBatch::<TestToken>::from_records(vec![record(b"{}")]).with_dlq_entries(vec![entry]);
assert_eq!(b.dlq_entries.len(), 1);
assert_eq!(b.dlq_entries[0].reason, "filter");
}
#[test]
fn total_payload_bytes_sums_payloads() {
let b = WorkBatch::<TestToken>::from_records(vec![
record(b"abc"), record(b"de"), record(b"f"), ]);
assert_eq!(b.total_payload_bytes(), 6);
}
#[test]
fn map_records_preserves_tokens_and_dlq() {
let entry = FilteredDlqEntry {
payload: b"bad".to_vec(),
key: None,
reason: "filter".to_string(),
};
let b =
WorkBatch::new(vec![record(b"{}")], vec![TestToken(7)]).with_dlq_entries(vec![entry]);
let b = b.map_records(|recs| {
recs.into_iter()
.map(|mut r| {
r.payload = Bytes::from_static(b"changed");
r
})
.collect()
});
assert_eq!(b.record_count(), 1);
assert_eq!(b.records[0].payload.as_ref(), b"changed");
assert_eq!(b.commit_tokens, vec![TestToken(7)]);
assert_eq!(b.dlq_entries.len(), 1);
}
#[test]
fn map_records_fan_out_keeps_tokens_intact() {
let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(99)]);
assert_eq!(b.record_count(), 1);
assert_eq!(b.commit_tokens.len(), 1);
let b = b.map_records(|recs| {
let mut out = Vec::new();
for r in recs {
out.push(r.clone());
out.push(r.clone());
out.push(r);
}
out
});
assert_eq!(b.record_count(), 3);
assert_eq!(b.commit_tokens, vec![TestToken(99)]);
}
#[test]
fn from_message_yields_single_record_batch() {
let msg = Message::new(
Some(Arc::from("topic")),
b"{\"a\":1}".to_vec(),
TestToken(5),
Some(11),
);
let b: WorkBatch<TestToken> = msg.into();
assert_eq!(b.record_count(), 1);
assert_eq!(b.commit_tokens, vec![TestToken(5)]);
assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
assert_eq!(b.records[0].key.as_deref(), Some("topic"));
assert_eq!(b.records[0].metadata.timestamp_ms, Some(11));
assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
assert!(b.dlq_entries.is_empty());
}
#[test]
fn from_recv_batch_collapses_and_preserves_order() {
let entry = FilteredDlqEntry {
payload: b"bad".to_vec(),
key: None,
reason: "drop-it".to_string(),
};
let recv = RecvBatch {
messages: vec![
Message::new(Some(Arc::from("a")), b"{}".to_vec(), TestToken(1), None),
Message::new(Some(Arc::from("b")), b"[]".to_vec(), TestToken(2), None),
Message::new(None, b"{}".to_vec(), TestToken(3), None),
],
dlq_entries: vec![entry],
};
let b: WorkBatch<TestToken> = recv.into();
assert_eq!(b.record_count(), 3);
assert_eq!(
b.commit_tokens,
vec![TestToken(1), TestToken(2), TestToken(3)]
);
assert_eq!(b.dlq_entries.len(), 1);
assert_eq!(b.dlq_entries[0].reason, "drop-it");
assert_eq!(b.records[0].key.as_deref(), Some("a"));
assert_eq!(b.records[2].key, None);
}
#[test]
fn from_message_moves_payload_without_copying() {
let payload = b"zero-copy-please".to_vec();
let payload_ptr = payload.as_ptr();
let msg = Message::new(Some(Arc::from("topic")), payload, TestToken(1), None);
assert_eq!(msg.payload.as_ptr(), payload_ptr);
let wb: WorkBatch<TestToken> = msg.into();
assert_eq!(wb.records[0].payload.as_ptr(), payload_ptr);
}
#[test]
fn bytes_payload_travels_zero_copy_through_workbatch() {
let raw = b"bytes-zero-copy-payload-test".to_vec();
let src: Bytes = raw.into();
let src_ptr = src.as_ptr();
let msg = Message::new(Some(Arc::from("k")), src, TestToken(42), Some(99));
assert_eq!(msg.payload.as_ptr(), src_ptr, "copy at Message::new");
let wb: WorkBatch<TestToken> = msg.into();
assert_eq!(
wb.records[0].payload.as_ptr(),
src_ptr,
"copy at From<Message> for WorkBatch"
);
let cloned = wb.records[0].payload.clone();
assert_eq!(
cloned.as_ptr(),
src_ptr,
"clone allocated a new buffer instead of bumping refcount"
);
}
#[test]
fn from_recv_batch_moves_payloads_without_copying() {
let p0 = b"first-buffer".to_vec();
let p1 = b"second-buffer".to_vec();
let p0_ptr = p0.as_ptr();
let p1_ptr = p1.as_ptr();
let recv = RecvBatch {
messages: vec![
Message::new(Some(Arc::from("a")), p0, TestToken(1), None),
Message::new(Some(Arc::from("b")), p1, TestToken(2), None),
],
dlq_entries: Vec::new(),
};
let wb: WorkBatch<TestToken> = recv.into();
assert_eq!(wb.records[0].payload.as_ptr(), p0_ptr);
assert_eq!(wb.records[1].payload.as_ptr(), p1_ptr);
}
#[test]
fn payload_is_bytes_and_clone_is_zero_copy() {
let r = record(b"shared-buffer");
let p1 = r.payload.clone();
let r2 = r.clone();
assert_eq!(p1.as_ptr(), r2.payload.as_ptr());
assert_eq!(r2.payload.as_ref(), b"shared-buffer");
}
fn assert_within(slice: &Bytes, blob: &Bytes) {
let blob_start = blob.as_ptr() as usize;
let blob_end = blob_start + blob.len();
let slice_start = slice.as_ptr() as usize;
let slice_end = slice_start + slice.len();
assert!(
slice_start >= blob_start && slice_end <= blob_end,
"slice [{slice_start:#x}, {slice_end:#x}) is not within blob \
[{blob_start:#x}, {blob_end:#x}) -- it is a copy, not a view"
);
}
#[test]
fn single_holds_whole_blob_as_one_record() {
let blob = Bytes::from_static(b"{\"a\":1}");
let b = WorkBatch::<TestToken>::single(blob.clone());
assert_eq!(b.record_count(), 1);
assert!(b.commit_tokens.is_empty());
assert!(b.dlq_entries.is_empty());
assert_eq!(b.records[0].payload, blob);
assert_eq!(b.records[0].key, None);
assert!(b.records[0].headers.is_empty());
assert_eq!(b.records[0].payload.as_ptr(), blob.as_ptr());
}
#[test]
fn single_detects_format_json_object() {
let b = WorkBatch::<TestToken>::single(Bytes::from_static(b"{\"a\":1}"));
assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
}
#[test]
fn single_detects_format_msgpack() {
let b = WorkBatch::<TestToken>::single(Bytes::from_static(&[0x81, 0xa1, 0x61]));
assert_eq!(b.records[0].metadata.format, PayloadFormat::MsgPack);
}
#[test]
fn ndjson_splits_lines_into_records() {
let blob = Bytes::from_static(b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}");
let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
assert_eq!(b.record_count(), 3);
assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
assert_eq!(b.records[2].payload.as_ref(), b"{\"c\":3}");
for r in &b.records {
assert_eq!(r.metadata.format, PayloadFormat::Json);
assert_within(&r.payload, &blob);
}
assert!(b.commit_tokens.is_empty());
}
#[test]
fn ndjson_trims_trailing_carriage_return() {
let blob = Bytes::from_static(b"{\"a\":1}\r\n{\"b\":2}\r\n");
let b = WorkBatch::<TestToken>::from_ndjson(blob);
assert_eq!(b.record_count(), 2);
assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
}
#[test]
fn ndjson_skips_blank_and_whitespace_only_lines() {
let blob = Bytes::from_static(b"{\"a\":1}\n\n \n{\"b\":2}\n\t\r\n");
let b = WorkBatch::<TestToken>::from_ndjson(blob);
assert_eq!(b.record_count(), 2);
assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
}
#[test]
fn ndjson_empty_blob_yields_no_records() {
let b = WorkBatch::<TestToken>::from_ndjson(Bytes::new());
assert_eq!(b.record_count(), 0);
}
#[test]
fn ndjson_single_line_no_newline() {
let blob = Bytes::from_static(b"{\"only\":true}");
let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
assert_eq!(b.record_count(), 1);
assert_eq!(b.records[0].payload.as_ref(), b"{\"only\":true}");
assert_within(&b.records[0].payload, &blob);
}
#[test]
fn ndjson_preserves_inner_whitespace_but_trims_edges() {
let blob = Bytes::from_static(b" {\"a\": 1} \n");
let b = WorkBatch::<TestToken>::from_ndjson(blob);
assert_eq!(b.record_count(), 1);
assert_eq!(b.records[0].payload.as_ref(), b"{\"a\": 1}");
}
#[test]
fn json_array_empty_yields_no_records() {
let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[]")).unwrap();
assert_eq!(b.record_count(), 0);
assert!(b.commit_tokens.is_empty());
assert!(b.dlq_entries.is_empty());
}
#[test]
fn json_array_empty_with_whitespace() {
let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b" [ ] ")).unwrap();
assert_eq!(b.record_count(), 0);
}
#[test]
fn json_array_single_element() {
let blob = Bytes::from_static(b"[{\"a\":1}]");
let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
assert_eq!(b.record_count(), 1);
assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
assert_within(&b.records[0].payload, &blob);
}
#[test]
fn json_array_multiple_scalar_elements() {
let blob = Bytes::from_static(b"[1, 2, 3]");
let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
assert_eq!(b.record_count(), 3);
assert_eq!(b.records[0].payload.as_ref(), b"1");
assert_eq!(b.records[1].payload.as_ref(), b"2");
assert_eq!(b.records[2].payload.as_ref(), b"3");
}
#[test]
fn json_array_trims_whitespace_around_elements() {
let blob = Bytes::from_static(b"[ {\"a\":1} , {\"b\":2} ]");
let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
assert_eq!(b.record_count(), 2);
assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
}
#[test]
fn json_array_leading_trailing_whitespace_and_newlines() {
let blob = Bytes::from_static(b"\n\t [\n 1,\n 2\n] \n");
let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
assert_eq!(b.record_count(), 2);
assert_eq!(b.records[0].payload.as_ref(), b"1");
assert_eq!(b.records[1].payload.as_ref(), b"2");
}
#[test]
fn json_array_string_with_brackets_and_commas() {
let blob = Bytes::from_static(b"[\"a,b[c]{d}\", \"plain\"]");
let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
assert_eq!(b.record_count(), 2);
assert_eq!(b.records[0].payload.as_ref(), b"\"a,b[c]{d}\"");
assert_eq!(b.records[1].payload.as_ref(), b"\"plain\"");
}
#[test]
fn json_array_string_with_escaped_quote() {
let blob = Bytes::from_static(b"[\"he said \\\"hi\\\", then left\", 7]");
let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
assert_eq!(b.record_count(), 2);
assert_eq!(
b.records[0].payload.as_ref(),
b"\"he said \\\"hi\\\", then left\""
);
assert_eq!(b.records[1].payload.as_ref(), b"7");
}
#[test]
fn json_array_string_with_escaped_backslash_then_closing_quote() {
let blob = Bytes::from_static(b"[\"path\\\\\", 1]");
let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
assert_eq!(b.record_count(), 2);
assert_eq!(b.records[0].payload.as_ref(), b"\"path\\\\\"");
assert_eq!(b.records[1].payload.as_ref(), b"1");
}
#[test]
fn json_array_nested_arrays_and_objects() {
let blob = Bytes::from_static(b"[[1,2],[3],{\"k\":[4,5]}]");
let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
assert_eq!(b.record_count(), 3);
assert_eq!(b.records[0].payload.as_ref(), b"[1,2]");
assert_eq!(b.records[1].payload.as_ref(), b"[3]");
assert_eq!(b.records[2].payload.as_ref(), b"{\"k\":[4,5]}");
}
#[test]
fn json_array_deeply_nested_object_one_element() {
let blob = Bytes::from_static(b"[{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}]");
let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
assert_eq!(b.record_count(), 1);
assert_eq!(
b.records[0].payload.as_ref(),
b"{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}"
);
}
#[test]
fn json_array_unicode_in_strings() {
let blob = Bytes::from(r#"["café", "naïve"]"#.as_bytes().to_vec());
let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
assert_eq!(b.record_count(), 2);
assert_eq!(b.records[0].payload.as_ref(), "\"café\"".as_bytes());
assert_eq!(b.records[1].payload.as_ref(), "\"naïve\"".as_bytes());
assert_within(&b.records[1].payload, &blob);
}
#[test]
fn json_array_zero_copy_views_into_blob() {
let blob = Bytes::from_static(b"[{\"a\":1}, {\"b\":2}, {\"c\":3}]");
let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
assert_eq!(b.record_count(), 3);
for r in &b.records {
assert_within(&r.payload, &blob);
}
}
#[test]
fn json_array_no_opening_bracket_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"{\"a\":1}")).is_err());
}
#[test]
fn json_array_empty_blob_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::new()).is_err());
}
#[test]
fn json_array_whitespace_only_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b" \n\t ")).is_err());
}
#[test]
fn json_array_unterminated_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2")).is_err());
}
#[test]
fn json_array_unterminated_string_errors() {
assert!(
WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[\"unclosed]")).is_err()
);
}
#[test]
fn json_array_trailing_garbage_errors() {
assert!(
WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2] junk")).is_err()
);
}
#[test]
fn json_array_trailing_comma_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2, ]")).is_err());
}
#[test]
fn json_array_leading_comma_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[, 1]")).is_err());
}
#[test]
fn json_array_double_comma_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1,, 2]")).is_err());
}
#[test]
fn json_array_only_open_bracket_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[")).is_err());
}
#[test]
fn json_array_unbalanced_extra_close_errors() {
assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1]]")).is_err());
}
#[test]
fn framing_error_is_displayable() {
let err = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"nope")).unwrap_err();
assert!(!err.to_string().is_empty());
}
}