1use super::filter::FilteredDlqEntry;
38use super::traits::CommitToken;
39use super::types::PayloadFormat;
40use bytes::Bytes;
41use std::sync::Arc;
42use thiserror::Error;
43
44#[derive(Debug, Error, PartialEq, Eq)]
53#[non_exhaustive]
54pub enum FramingError {
55 #[error("json-array framing: expected opening '[', found {0}")]
57 NotAnArray(String),
58
59 #[error("json-array framing: unexpected end of input (unterminated array or string)")]
61 UnexpectedEof,
62
63 #[error("json-array framing: empty element at byte offset {0} (stray comma)")]
65 EmptyElement(usize),
66
67 #[error("json-array framing: unbalanced closing bracket at byte offset {0}")]
69 Unbalanced(usize),
70
71 #[error("json-array framing: trailing garbage after closing ']' at byte offset {0}")]
73 TrailingGarbage(usize),
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct RecordMeta {
82 pub timestamp_ms: Option<i64>,
84
85 pub format: PayloadFormat,
87}
88
89#[derive(Debug, Clone, PartialEq)]
94pub struct Record {
95 pub payload: Bytes,
97
98 pub key: Option<Arc<str>>,
100
101 pub headers: Vec<(String, Vec<u8>)>,
103
104 pub metadata: RecordMeta,
106}
107
108#[derive(Debug)]
119pub struct WorkBatch<T: CommitToken> {
120 pub records: Vec<Record>,
122
123 pub commit_tokens: Vec<T>,
126
127 pub dlq_entries: Vec<FilteredDlqEntry>,
129}
130
131impl<T: CommitToken> WorkBatch<T> {
132 #[must_use]
134 pub fn empty() -> Self {
135 Self {
136 records: Vec::new(),
137 commit_tokens: Vec::new(),
138 dlq_entries: Vec::new(),
139 }
140 }
141
142 #[must_use]
147 pub fn from_records(records: Vec<Record>) -> Self {
148 Self {
149 records,
150 commit_tokens: Vec::new(),
151 dlq_entries: Vec::new(),
152 }
153 }
154
155 #[must_use]
157 pub fn new(records: Vec<Record>, commit_tokens: Vec<T>) -> Self {
158 Self {
159 records,
160 commit_tokens,
161 dlq_entries: Vec::new(),
162 }
163 }
164
165 #[must_use]
167 pub fn with_dlq_entries(mut self, dlq_entries: Vec<FilteredDlqEntry>) -> Self {
168 self.dlq_entries = dlq_entries;
169 self
170 }
171
172 #[must_use]
174 pub fn is_empty(&self) -> bool {
175 self.records.is_empty()
176 }
177
178 #[must_use]
180 pub fn len(&self) -> usize {
181 self.records.len()
182 }
183
184 #[must_use]
186 pub fn record_count(&self) -> usize {
187 self.records.len()
188 }
189
190 #[must_use]
196 pub fn total_payload_bytes(&self) -> usize {
197 self.records
198 .iter()
199 .map(|r| r.payload.len())
200 .fold(0usize, usize::saturating_add)
201 }
202
203 #[must_use]
209 pub fn map_records(mut self, f: impl FnOnce(Vec<Record>) -> Vec<Record>) -> Self {
210 self.records = f(self.records);
211 self
212 }
213
214 #[must_use]
228 pub fn single(blob: Bytes) -> Self {
229 let format = PayloadFormat::detect(&blob);
230 let record = Record {
231 payload: blob,
232 key: None,
233 headers: Vec::new(),
234 metadata: RecordMeta {
235 timestamp_ms: None,
236 format,
237 },
238 };
239 Self {
240 records: vec![record],
241 commit_tokens: Vec::new(),
242 dlq_entries: Vec::new(),
243 }
244 }
245
246 #[must_use]
257 pub fn from_ndjson(blob: Bytes) -> Self {
258 let mut records = Vec::new();
259 let mut line_start = 0usize;
260 let bytes = blob.as_ref();
261
262 for nl in memchr::memchr_iter(b'\n', bytes) {
264 Self::push_ndjson_line(&mut records, &blob, line_start, nl);
265 line_start = nl + 1;
266 }
267 if line_start < bytes.len() {
268 Self::push_ndjson_line(&mut records, &blob, line_start, bytes.len());
269 }
270
271 Self {
272 records,
273 commit_tokens: Vec::new(),
274 dlq_entries: Vec::new(),
275 }
276 }
277
278 fn push_ndjson_line(records: &mut Vec<Record>, blob: &Bytes, start: usize, mut end: usize) {
281 let bytes = blob.as_ref();
282 if end > start && bytes[end - 1] == b'\r' {
284 end -= 1;
285 }
286 while end > start && bytes[end - 1].is_ascii_whitespace() {
288 end -= 1;
289 }
290 let mut begin = start;
291 while begin < end && bytes[begin].is_ascii_whitespace() {
292 begin += 1;
293 }
294 if begin >= end {
295 return; }
297 records.push(Record {
298 payload: blob.slice(begin..end),
299 key: None,
300 headers: Vec::new(),
301 metadata: RecordMeta {
302 timestamp_ms: None,
303 format: PayloadFormat::Json,
304 },
305 });
306 }
307
308 pub fn from_json_array(blob: Bytes) -> Result<Self, FramingError> {
323 let records = scan_json_array(&blob)?;
324 Ok(Self {
325 records,
326 commit_tokens: Vec::new(),
327 dlq_entries: Vec::new(),
328 })
329 }
330}
331
332fn scan_json_array(blob: &Bytes) -> Result<Vec<Record>, FramingError> {
337 let bytes = blob.as_ref();
338 let len = bytes.len();
339
340 let mut i = 0usize;
342 while i < len && bytes[i].is_ascii_whitespace() {
343 i += 1;
344 }
345 if i >= len {
346 return Err(FramingError::NotAnArray("end of input".to_string()));
347 }
348 if bytes[i] != b'[' {
349 return Err(FramingError::NotAnArray(format!(
350 "byte {:#04x} ('{}')",
351 bytes[i], bytes[i] as char
352 )));
353 }
354 i += 1; let mut records: Vec<Record> = Vec::new();
357 let mut first_element = true;
360
361 loop {
362 while i < len && bytes[i].is_ascii_whitespace() {
364 i += 1;
365 }
366 if i >= len {
367 return Err(FramingError::UnexpectedEof);
368 }
369
370 if bytes[i] == b']' {
371 if first_element {
375 i += 1;
376 return finish(blob, records, i);
377 }
378 return Err(FramingError::EmptyElement(i));
381 }
382 if bytes[i] == b',' {
383 return Err(FramingError::EmptyElement(i));
385 }
386
387 let elem_start = i;
389 let mut depth: usize = 0;
390 let mut in_string = false;
391 let mut escaped = false;
392
393 let elem_end;
394 loop {
395 if i >= len {
396 return Err(FramingError::UnexpectedEof);
397 }
398 let c = bytes[i];
399
400 if in_string {
401 if escaped {
402 escaped = false;
403 } else if c == b'\\' {
404 escaped = true;
405 } else if c == b'"' {
406 in_string = false;
407 }
408 i += 1;
409 continue;
410 }
411
412 match c {
413 b'"' => {
414 in_string = true;
415 i += 1;
416 }
417 b'{' | b'[' => {
418 depth += 1;
419 i += 1;
420 }
421 b'}' => {
422 depth = depth.checked_sub(1).ok_or(FramingError::Unbalanced(i))?;
424 i += 1;
425 }
426 b']' => {
427 if depth == 0 {
428 elem_end = i;
430 i += 1; push_element(blob, &mut records, elem_start, elem_end);
432 return finish(blob, records, i);
433 }
434 depth -= 1;
435 i += 1;
436 }
437 b',' if depth == 0 => {
438 elem_end = i;
440 i += 1; break;
442 }
443 _ => {
444 i += 1;
445 }
446 }
447 }
448
449 push_element(blob, &mut records, elem_start, elem_end);
450 first_element = false;
451 }
452}
453
454fn push_element(blob: &Bytes, records: &mut Vec<Record>, start: usize, end: usize) {
459 let bytes = blob.as_ref();
460 let mut e = end;
461 while e > start && bytes[e - 1].is_ascii_whitespace() {
462 e -= 1;
463 }
464 records.push(Record {
465 payload: blob.slice(start..e),
466 key: None,
467 headers: Vec::new(),
468 metadata: RecordMeta {
469 timestamp_ms: None,
470 format: PayloadFormat::Json,
471 },
472 });
473}
474
475fn finish(blob: &Bytes, records: Vec<Record>, mut i: usize) -> Result<Vec<Record>, FramingError> {
478 let bytes = blob.as_ref();
479 let len = bytes.len();
480 while i < len && bytes[i].is_ascii_whitespace() {
481 i += 1;
482 }
483 if i < len {
484 return Err(FramingError::TrailingGarbage(i));
485 }
486 Ok(records)
487}
488
489impl<T: CommitToken> From<crate::Message<T>> for WorkBatch<T> {
490 fn from(msg: crate::Message<T>) -> Self {
494 let record = Record {
495 payload: msg.payload,
496 key: msg.key,
497 headers: Vec::new(),
498 metadata: RecordMeta {
499 timestamp_ms: msg.timestamp_ms,
500 format: msg.format,
501 },
502 };
503 Self {
504 records: vec![record],
505 commit_tokens: vec![msg.token],
506 dlq_entries: Vec::new(),
507 }
508 }
509}
510
511impl<T: CommitToken> From<crate::transport::traits::RecvBatch<T>> for WorkBatch<T> {
512 fn from(batch: crate::transport::traits::RecvBatch<T>) -> Self {
522 let token_capacity = batch.messages.len() + batch.filtered_tokens.len();
523 let mut records = Vec::with_capacity(batch.messages.len());
524 let mut commit_tokens = Vec::with_capacity(token_capacity);
525 for msg in batch.messages {
526 commit_tokens.push(msg.token);
527 records.push(Record {
528 payload: msg.payload,
529 key: msg.key,
530 headers: Vec::new(),
531 metadata: RecordMeta {
532 timestamp_ms: msg.timestamp_ms,
533 format: msg.format,
534 },
535 });
536 }
537 commit_tokens.extend(batch.filtered_tokens);
540 Self {
541 records,
542 commit_tokens,
543 dlq_entries: batch.dlq_entries,
544 }
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::Message;
552 use crate::transport::traits::RecvBatch;
553
554 #[derive(Debug, Clone, PartialEq, Eq)]
556 struct TestToken(u64);
557
558 impl std::fmt::Display for TestToken {
559 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560 write!(f, "tok-{}", self.0)
561 }
562 }
563
564 impl CommitToken for TestToken {}
565
566 fn record(payload: &'static [u8]) -> Record {
567 Record {
568 payload: Bytes::from_static(payload),
569 key: Some(Arc::from("events")),
570 headers: vec![("h".to_string(), b"v".to_vec())],
571 metadata: RecordMeta {
572 timestamp_ms: Some(42),
573 format: PayloadFormat::Json,
574 },
575 }
576 }
577
578 #[test]
579 fn empty_has_no_records_tokens_or_dlq() {
580 let b = WorkBatch::<TestToken>::empty();
581 assert!(b.is_empty());
582 assert_eq!(b.len(), 0);
583 assert_eq!(b.record_count(), 0);
584 assert!(b.commit_tokens.is_empty());
585 assert!(b.dlq_entries.is_empty());
586 assert_eq!(b.total_payload_bytes(), 0);
587 }
588
589 #[test]
590 fn from_records_has_no_tokens() {
591 let b = WorkBatch::<TestToken>::from_records(vec![record(b"{}"), record(b"[]")]);
592 assert_eq!(b.len(), 2);
593 assert!(!b.is_empty());
594 assert!(b.commit_tokens.is_empty());
595 }
596
597 #[test]
598 fn new_carries_records_and_tokens() {
599 let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(1), TestToken(2)]);
600 assert_eq!(b.record_count(), 1);
601 assert_eq!(b.commit_tokens.len(), 2);
602 }
603
604 #[test]
605 fn with_dlq_entries_attaches_entries() {
606 let entry = FilteredDlqEntry {
607 payload: b"bad".to_vec(),
608 key: None,
609 reason: "filter".to_string(),
610 };
611 let b =
612 WorkBatch::<TestToken>::from_records(vec![record(b"{}")]).with_dlq_entries(vec![entry]);
613 assert_eq!(b.dlq_entries.len(), 1);
614 assert_eq!(b.dlq_entries[0].reason, "filter");
615 }
616
617 #[test]
618 fn total_payload_bytes_sums_payloads() {
619 let b = WorkBatch::<TestToken>::from_records(vec![
620 record(b"abc"), record(b"de"), record(b"f"), ]);
624 assert_eq!(b.total_payload_bytes(), 6);
625 }
626
627 #[test]
628 fn map_records_preserves_tokens_and_dlq() {
629 let entry = FilteredDlqEntry {
630 payload: b"bad".to_vec(),
631 key: None,
632 reason: "filter".to_string(),
633 };
634 let b =
635 WorkBatch::new(vec![record(b"{}")], vec![TestToken(7)]).with_dlq_entries(vec![entry]);
636
637 let b = b.map_records(|recs| {
638 recs.into_iter()
640 .map(|mut r| {
641 r.payload = Bytes::from_static(b"changed");
642 r
643 })
644 .collect()
645 });
646
647 assert_eq!(b.record_count(), 1);
648 assert_eq!(b.records[0].payload.as_ref(), b"changed");
649 assert_eq!(b.commit_tokens, vec![TestToken(7)]);
651 assert_eq!(b.dlq_entries.len(), 1);
652 }
653
654 #[test]
655 fn map_records_fan_out_keeps_tokens_intact() {
656 let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(99)]);
658 assert_eq!(b.record_count(), 1);
659 assert_eq!(b.commit_tokens.len(), 1);
660
661 let b = b.map_records(|recs| {
663 let mut out = Vec::new();
664 for r in recs {
665 out.push(r.clone());
666 out.push(r.clone());
667 out.push(r);
668 }
669 out
670 });
671
672 assert_eq!(b.record_count(), 3);
673 assert_eq!(b.commit_tokens, vec![TestToken(99)]);
674 }
675
676 #[test]
677 fn from_message_yields_single_record_batch() {
678 let msg = Message::new(
679 Some(Arc::from("topic")),
680 b"{\"a\":1}".to_vec(),
681 TestToken(5),
682 Some(11),
683 );
684 let b: WorkBatch<TestToken> = msg.into();
685
686 assert_eq!(b.record_count(), 1);
687 assert_eq!(b.commit_tokens, vec![TestToken(5)]);
688 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
689 assert_eq!(b.records[0].key.as_deref(), Some("topic"));
690 assert_eq!(b.records[0].metadata.timestamp_ms, Some(11));
691 assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
692 assert!(b.dlq_entries.is_empty());
693 }
694
695 #[test]
696 fn from_recv_batch_collapses_and_preserves_order() {
697 let entry = FilteredDlqEntry {
698 payload: b"bad".to_vec(),
699 key: None,
700 reason: "drop-it".to_string(),
701 };
702 let recv = RecvBatch {
703 messages: vec![
704 Message::new(Some(Arc::from("a")), b"{}".to_vec(), TestToken(1), None),
705 Message::new(Some(Arc::from("b")), b"[]".to_vec(), TestToken(2), None),
706 Message::new(None, b"{}".to_vec(), TestToken(3), None),
707 ],
708 dlq_entries: vec![entry],
709 filtered_tokens: Vec::new(),
710 };
711
712 let b: WorkBatch<TestToken> = recv.into();
713
714 assert_eq!(b.record_count(), 3);
715 assert_eq!(
717 b.commit_tokens,
718 vec![TestToken(1), TestToken(2), TestToken(3)]
719 );
720 assert_eq!(b.dlq_entries.len(), 1);
722 assert_eq!(b.dlq_entries[0].reason, "drop-it");
723 assert_eq!(b.records[0].key.as_deref(), Some("a"));
725 assert_eq!(b.records[2].key, None);
726 }
727
728 #[test]
729 fn from_recv_batch_appends_filtered_tokens_to_commit_tokens() {
730 let recv = RecvBatch {
733 messages: vec![Message::new(
734 Some(Arc::from("a")),
735 b"{}".to_vec(),
736 TestToken(1),
737 None,
738 )],
739 dlq_entries: Vec::new(),
740 filtered_tokens: vec![TestToken(7), TestToken(8)],
741 };
742 let b: WorkBatch<TestToken> = recv.into();
743 assert_eq!(b.record_count(), 1);
745 assert_eq!(
746 b.commit_tokens,
747 vec![TestToken(1), TestToken(7), TestToken(8)]
748 );
749 }
750
751 #[test]
752 fn from_recv_batch_all_filtered_is_records_empty_tokens_present() {
753 let recv: RecvBatch<TestToken> = RecvBatch {
756 messages: Vec::new(),
757 dlq_entries: Vec::new(),
758 filtered_tokens: vec![TestToken(5), TestToken(6)],
759 };
760 let b: WorkBatch<TestToken> = recv.into();
761 assert_eq!(b.record_count(), 0);
762 assert_eq!(b.commit_tokens, vec![TestToken(5), TestToken(6)]);
763 }
764
765 #[test]
766 fn from_message_moves_payload_without_copying() {
767 let payload = b"zero-copy-please".to_vec();
770 let payload_ptr = payload.as_ptr();
771
772 let msg = Message::new(Some(Arc::from("topic")), payload, TestToken(1), None);
773 assert_eq!(msg.payload.as_ptr(), payload_ptr);
775
776 let wb: WorkBatch<TestToken> = msg.into();
777
778 assert_eq!(wb.records[0].payload.as_ptr(), payload_ptr);
781 }
782
783 #[test]
790 fn bytes_payload_travels_zero_copy_through_workbatch() {
791 let raw = b"bytes-zero-copy-payload-test".to_vec();
794 let src: Bytes = raw.into();
795 let src_ptr = src.as_ptr();
796
797 let msg = Message::new(Some(Arc::from("k")), src, TestToken(42), Some(99));
799 assert_eq!(msg.payload.as_ptr(), src_ptr, "copy at Message::new");
801
802 let wb: WorkBatch<TestToken> = msg.into();
804 assert_eq!(
805 wb.records[0].payload.as_ptr(),
806 src_ptr,
807 "copy at From<Message> for WorkBatch"
808 );
809
810 let cloned = wb.records[0].payload.clone();
812 assert_eq!(
813 cloned.as_ptr(),
814 src_ptr,
815 "clone allocated a new buffer instead of bumping refcount"
816 );
817 }
818
819 #[test]
820 fn from_recv_batch_moves_payloads_without_copying() {
821 let p0 = b"first-buffer".to_vec();
822 let p1 = b"second-buffer".to_vec();
823 let p0_ptr = p0.as_ptr();
824 let p1_ptr = p1.as_ptr();
825
826 let recv = RecvBatch {
827 messages: vec![
828 Message::new(Some(Arc::from("a")), p0, TestToken(1), None),
829 Message::new(Some(Arc::from("b")), p1, TestToken(2), None),
830 ],
831 dlq_entries: Vec::new(),
832 filtered_tokens: Vec::new(),
833 };
834
835 let wb: WorkBatch<TestToken> = recv.into();
836
837 assert_eq!(wb.records[0].payload.as_ptr(), p0_ptr);
839 assert_eq!(wb.records[1].payload.as_ptr(), p1_ptr);
840 }
841
842 #[test]
843 fn payload_is_bytes_and_clone_is_zero_copy() {
844 let r = record(b"shared-buffer");
846 let p1 = r.payload.clone();
847 let r2 = r.clone();
848 assert_eq!(p1.as_ptr(), r2.payload.as_ptr());
850 assert_eq!(r2.payload.as_ref(), b"shared-buffer");
851 }
852
853 fn assert_within(slice: &Bytes, blob: &Bytes) {
858 let blob_start = blob.as_ptr() as usize;
859 let blob_end = blob_start + blob.len();
860 let slice_start = slice.as_ptr() as usize;
861 let slice_end = slice_start + slice.len();
862 assert!(
863 slice_start >= blob_start && slice_end <= blob_end,
864 "slice [{slice_start:#x}, {slice_end:#x}) is not within blob \
865 [{blob_start:#x}, {blob_end:#x}) -- it is a copy, not a view"
866 );
867 }
868
869 #[test]
872 fn single_holds_whole_blob_as_one_record() {
873 let blob = Bytes::from_static(b"{\"a\":1}");
874 let b = WorkBatch::<TestToken>::single(blob.clone());
875 assert_eq!(b.record_count(), 1);
876 assert!(b.commit_tokens.is_empty());
877 assert!(b.dlq_entries.is_empty());
878 assert_eq!(b.records[0].payload, blob);
879 assert_eq!(b.records[0].key, None);
880 assert!(b.records[0].headers.is_empty());
881 assert_eq!(b.records[0].payload.as_ptr(), blob.as_ptr());
883 }
884
885 #[test]
886 fn single_detects_format_json_object() {
887 let b = WorkBatch::<TestToken>::single(Bytes::from_static(b"{\"a\":1}"));
888 assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
889 }
890
891 #[test]
892 fn single_detects_format_msgpack() {
893 let b = WorkBatch::<TestToken>::single(Bytes::from_static(&[0x81, 0xa1, 0x61]));
895 assert_eq!(b.records[0].metadata.format, PayloadFormat::MsgPack);
896 }
897
898 #[test]
901 fn ndjson_splits_lines_into_records() {
902 let blob = Bytes::from_static(b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}");
903 let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
904 assert_eq!(b.record_count(), 3);
905 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
906 assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
907 assert_eq!(b.records[2].payload.as_ref(), b"{\"c\":3}");
908 for r in &b.records {
909 assert_eq!(r.metadata.format, PayloadFormat::Json);
910 assert_within(&r.payload, &blob);
911 }
912 assert!(b.commit_tokens.is_empty());
913 }
914
915 #[test]
916 fn ndjson_trims_trailing_carriage_return() {
917 let blob = Bytes::from_static(b"{\"a\":1}\r\n{\"b\":2}\r\n");
919 let b = WorkBatch::<TestToken>::from_ndjson(blob);
920 assert_eq!(b.record_count(), 2);
921 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
922 assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
923 }
924
925 #[test]
926 fn ndjson_skips_blank_and_whitespace_only_lines() {
927 let blob = Bytes::from_static(b"{\"a\":1}\n\n \n{\"b\":2}\n\t\r\n");
928 let b = WorkBatch::<TestToken>::from_ndjson(blob);
929 assert_eq!(b.record_count(), 2);
930 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
931 assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
932 }
933
934 #[test]
935 fn ndjson_empty_blob_yields_no_records() {
936 let b = WorkBatch::<TestToken>::from_ndjson(Bytes::new());
937 assert_eq!(b.record_count(), 0);
938 }
939
940 #[test]
941 fn ndjson_single_line_no_newline() {
942 let blob = Bytes::from_static(b"{\"only\":true}");
943 let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
944 assert_eq!(b.record_count(), 1);
945 assert_eq!(b.records[0].payload.as_ref(), b"{\"only\":true}");
946 assert_within(&b.records[0].payload, &blob);
947 }
948
949 #[test]
950 fn ndjson_preserves_inner_whitespace_but_trims_edges() {
951 let blob = Bytes::from_static(b" {\"a\": 1} \n");
953 let b = WorkBatch::<TestToken>::from_ndjson(blob);
954 assert_eq!(b.record_count(), 1);
955 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\": 1}");
956 }
957
958 #[test]
961 fn json_array_empty_yields_no_records() {
962 let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[]")).unwrap();
963 assert_eq!(b.record_count(), 0);
964 assert!(b.commit_tokens.is_empty());
965 assert!(b.dlq_entries.is_empty());
966 }
967
968 #[test]
969 fn json_array_empty_with_whitespace() {
970 let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b" [ ] ")).unwrap();
971 assert_eq!(b.record_count(), 0);
972 }
973
974 #[test]
975 fn json_array_single_element() {
976 let blob = Bytes::from_static(b"[{\"a\":1}]");
977 let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
978 assert_eq!(b.record_count(), 1);
979 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
980 assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
981 assert_within(&b.records[0].payload, &blob);
982 }
983
984 #[test]
985 fn json_array_multiple_scalar_elements() {
986 let blob = Bytes::from_static(b"[1, 2, 3]");
987 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
988 assert_eq!(b.record_count(), 3);
989 assert_eq!(b.records[0].payload.as_ref(), b"1");
990 assert_eq!(b.records[1].payload.as_ref(), b"2");
991 assert_eq!(b.records[2].payload.as_ref(), b"3");
992 }
993
994 #[test]
995 fn json_array_trims_whitespace_around_elements() {
996 let blob = Bytes::from_static(b"[ {\"a\":1} , {\"b\":2} ]");
997 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
998 assert_eq!(b.record_count(), 2);
999 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
1000 assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
1001 }
1002
1003 #[test]
1004 fn json_array_leading_trailing_whitespace_and_newlines() {
1005 let blob = Bytes::from_static(b"\n\t [\n 1,\n 2\n] \n");
1006 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1007 assert_eq!(b.record_count(), 2);
1008 assert_eq!(b.records[0].payload.as_ref(), b"1");
1009 assert_eq!(b.records[1].payload.as_ref(), b"2");
1010 }
1011
1012 #[test]
1013 fn json_array_string_with_brackets_and_commas() {
1014 let blob = Bytes::from_static(b"[\"a,b[c]{d}\", \"plain\"]");
1016 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1017 assert_eq!(b.record_count(), 2);
1018 assert_eq!(b.records[0].payload.as_ref(), b"\"a,b[c]{d}\"");
1019 assert_eq!(b.records[1].payload.as_ref(), b"\"plain\"");
1020 }
1021
1022 #[test]
1023 fn json_array_string_with_escaped_quote() {
1024 let blob = Bytes::from_static(b"[\"he said \\\"hi\\\", then left\", 7]");
1026 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1027 assert_eq!(b.record_count(), 2);
1028 assert_eq!(
1029 b.records[0].payload.as_ref(),
1030 b"\"he said \\\"hi\\\", then left\""
1031 );
1032 assert_eq!(b.records[1].payload.as_ref(), b"7");
1033 }
1034
1035 #[test]
1036 fn json_array_string_with_escaped_backslash_then_closing_quote() {
1037 let blob = Bytes::from_static(b"[\"path\\\\\", 1]");
1040 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1041 assert_eq!(b.record_count(), 2);
1042 assert_eq!(b.records[0].payload.as_ref(), b"\"path\\\\\"");
1043 assert_eq!(b.records[1].payload.as_ref(), b"1");
1044 }
1045
1046 #[test]
1047 fn json_array_nested_arrays_and_objects() {
1048 let blob = Bytes::from_static(b"[[1,2],[3],{\"k\":[4,5]}]");
1049 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1050 assert_eq!(b.record_count(), 3);
1051 assert_eq!(b.records[0].payload.as_ref(), b"[1,2]");
1052 assert_eq!(b.records[1].payload.as_ref(), b"[3]");
1053 assert_eq!(b.records[2].payload.as_ref(), b"{\"k\":[4,5]}");
1054 }
1055
1056 #[test]
1057 fn json_array_deeply_nested_object_one_element() {
1058 let blob = Bytes::from_static(b"[{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}]");
1059 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1060 assert_eq!(b.record_count(), 1);
1061 assert_eq!(
1062 b.records[0].payload.as_ref(),
1063 b"{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}"
1064 );
1065 }
1066
1067 #[test]
1068 fn json_array_unicode_in_strings() {
1069 let blob = Bytes::from(r#"["café", "naïve"]"#.as_bytes().to_vec());
1071 let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
1072 assert_eq!(b.record_count(), 2);
1073 assert_eq!(b.records[0].payload.as_ref(), "\"café\"".as_bytes());
1074 assert_eq!(b.records[1].payload.as_ref(), "\"naïve\"".as_bytes());
1075 assert_within(&b.records[1].payload, &blob);
1076 }
1077
1078 #[test]
1079 fn json_array_zero_copy_views_into_blob() {
1080 let blob = Bytes::from_static(b"[{\"a\":1}, {\"b\":2}, {\"c\":3}]");
1081 let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
1082 assert_eq!(b.record_count(), 3);
1083 for r in &b.records {
1084 assert_within(&r.payload, &blob);
1085 }
1086 }
1087
1088 #[test]
1091 fn json_array_no_opening_bracket_errors() {
1092 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"{\"a\":1}")).is_err());
1093 }
1094
1095 #[test]
1096 fn json_array_empty_blob_errors() {
1097 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::new()).is_err());
1098 }
1099
1100 #[test]
1101 fn json_array_whitespace_only_errors() {
1102 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b" \n\t ")).is_err());
1103 }
1104
1105 #[test]
1106 fn json_array_unterminated_errors() {
1107 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2")).is_err());
1108 }
1109
1110 #[test]
1111 fn json_array_unterminated_string_errors() {
1112 assert!(
1113 WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[\"unclosed]")).is_err()
1114 );
1115 }
1116
1117 #[test]
1118 fn json_array_trailing_garbage_errors() {
1119 assert!(
1120 WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2] junk")).is_err()
1121 );
1122 }
1123
1124 #[test]
1125 fn json_array_trailing_comma_errors() {
1126 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2, ]")).is_err());
1127 }
1128
1129 #[test]
1130 fn json_array_leading_comma_errors() {
1131 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[, 1]")).is_err());
1132 }
1133
1134 #[test]
1135 fn json_array_double_comma_errors() {
1136 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1,, 2]")).is_err());
1137 }
1138
1139 #[test]
1140 fn json_array_only_open_bracket_errors() {
1141 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[")).is_err());
1142 }
1143
1144 #[test]
1145 fn json_array_unbalanced_extra_close_errors() {
1146 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1]]")).is_err());
1148 }
1149
1150 #[test]
1151 fn framing_error_is_displayable() {
1152 let err = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"nope")).unwrap_err();
1153 assert!(!err.to_string().is_empty());
1155 }
1156}