1use super::filter::FilteredDlqEntry;
38use super::traits::CommitToken;
39use super::types::PayloadFormat;
40use bytes::Bytes;
41use std::sync::Arc;
42use thiserror::Error;
43
44#[derive(Debug, Error, PartialEq, Eq)]
53pub enum FramingError {
54 #[error("json-array framing: expected opening '[', found {0}")]
56 NotAnArray(String),
57
58 #[error("json-array framing: unexpected end of input (unterminated array or string)")]
60 UnexpectedEof,
61
62 #[error("json-array framing: empty element at byte offset {0} (stray comma)")]
64 EmptyElement(usize),
65
66 #[error("json-array framing: unbalanced closing bracket at byte offset {0}")]
68 Unbalanced(usize),
69
70 #[error("json-array framing: trailing garbage after closing ']' at byte offset {0}")]
72 TrailingGarbage(usize),
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct RecordMeta {
81 pub timestamp_ms: Option<i64>,
83
84 pub format: PayloadFormat,
86}
87
88#[derive(Debug, Clone, PartialEq)]
93pub struct Record {
94 pub payload: Bytes,
96
97 pub key: Option<Arc<str>>,
99
100 pub headers: Vec<(String, Vec<u8>)>,
102
103 pub metadata: RecordMeta,
105}
106
107#[derive(Debug)]
118pub struct WorkBatch<T: CommitToken> {
119 pub records: Vec<Record>,
121
122 pub commit_tokens: Vec<T>,
125
126 pub dlq_entries: Vec<FilteredDlqEntry>,
128}
129
130impl<T: CommitToken> WorkBatch<T> {
131 #[must_use]
133 pub fn empty() -> Self {
134 Self {
135 records: Vec::new(),
136 commit_tokens: Vec::new(),
137 dlq_entries: Vec::new(),
138 }
139 }
140
141 #[must_use]
146 pub fn from_records(records: Vec<Record>) -> Self {
147 Self {
148 records,
149 commit_tokens: Vec::new(),
150 dlq_entries: Vec::new(),
151 }
152 }
153
154 #[must_use]
156 pub fn new(records: Vec<Record>, commit_tokens: Vec<T>) -> Self {
157 Self {
158 records,
159 commit_tokens,
160 dlq_entries: Vec::new(),
161 }
162 }
163
164 #[must_use]
166 pub fn with_dlq_entries(mut self, dlq_entries: Vec<FilteredDlqEntry>) -> Self {
167 self.dlq_entries = dlq_entries;
168 self
169 }
170
171 #[must_use]
173 pub fn is_empty(&self) -> bool {
174 self.records.is_empty()
175 }
176
177 #[must_use]
179 pub fn len(&self) -> usize {
180 self.records.len()
181 }
182
183 #[must_use]
185 pub fn record_count(&self) -> usize {
186 self.records.len()
187 }
188
189 #[must_use]
195 pub fn total_payload_bytes(&self) -> usize {
196 self.records
197 .iter()
198 .map(|r| r.payload.len())
199 .fold(0usize, usize::saturating_add)
200 }
201
202 #[must_use]
208 pub fn map_records(mut self, f: impl FnOnce(Vec<Record>) -> Vec<Record>) -> Self {
209 self.records = f(self.records);
210 self
211 }
212
213 #[must_use]
227 pub fn single(blob: Bytes) -> Self {
228 let format = PayloadFormat::detect(&blob);
229 let record = Record {
230 payload: blob,
231 key: None,
232 headers: Vec::new(),
233 metadata: RecordMeta {
234 timestamp_ms: None,
235 format,
236 },
237 };
238 Self {
239 records: vec![record],
240 commit_tokens: Vec::new(),
241 dlq_entries: Vec::new(),
242 }
243 }
244
245 #[must_use]
256 pub fn from_ndjson(blob: Bytes) -> Self {
257 let mut records = Vec::new();
258 let mut line_start = 0usize;
259 let bytes = blob.as_ref();
260
261 for nl in memchr::memchr_iter(b'\n', bytes) {
263 Self::push_ndjson_line(&mut records, &blob, line_start, nl);
264 line_start = nl + 1;
265 }
266 if line_start < bytes.len() {
267 Self::push_ndjson_line(&mut records, &blob, line_start, bytes.len());
268 }
269
270 Self {
271 records,
272 commit_tokens: Vec::new(),
273 dlq_entries: Vec::new(),
274 }
275 }
276
277 fn push_ndjson_line(records: &mut Vec<Record>, blob: &Bytes, start: usize, mut end: usize) {
280 let bytes = blob.as_ref();
281 if end > start && bytes[end - 1] == b'\r' {
283 end -= 1;
284 }
285 while end > start && bytes[end - 1].is_ascii_whitespace() {
287 end -= 1;
288 }
289 let mut begin = start;
290 while begin < end && bytes[begin].is_ascii_whitespace() {
291 begin += 1;
292 }
293 if begin >= end {
294 return; }
296 records.push(Record {
297 payload: blob.slice(begin..end),
298 key: None,
299 headers: Vec::new(),
300 metadata: RecordMeta {
301 timestamp_ms: None,
302 format: PayloadFormat::Json,
303 },
304 });
305 }
306
307 pub fn from_json_array(blob: Bytes) -> Result<Self, FramingError> {
322 let records = scan_json_array(&blob)?;
323 Ok(Self {
324 records,
325 commit_tokens: Vec::new(),
326 dlq_entries: Vec::new(),
327 })
328 }
329}
330
331fn scan_json_array(blob: &Bytes) -> Result<Vec<Record>, FramingError> {
336 let bytes = blob.as_ref();
337 let len = bytes.len();
338
339 let mut i = 0usize;
341 while i < len && bytes[i].is_ascii_whitespace() {
342 i += 1;
343 }
344 if i >= len {
345 return Err(FramingError::NotAnArray("end of input".to_string()));
346 }
347 if bytes[i] != b'[' {
348 return Err(FramingError::NotAnArray(format!(
349 "byte {:#04x} ('{}')",
350 bytes[i], bytes[i] as char
351 )));
352 }
353 i += 1; let mut records: Vec<Record> = Vec::new();
356 let mut first_element = true;
359
360 loop {
361 while i < len && bytes[i].is_ascii_whitespace() {
363 i += 1;
364 }
365 if i >= len {
366 return Err(FramingError::UnexpectedEof);
367 }
368
369 if bytes[i] == b']' {
370 if first_element {
374 i += 1;
375 return finish(blob, records, i);
376 }
377 return Err(FramingError::EmptyElement(i));
380 }
381 if bytes[i] == b',' {
382 return Err(FramingError::EmptyElement(i));
384 }
385
386 let elem_start = i;
388 let mut depth: usize = 0;
389 let mut in_string = false;
390 let mut escaped = false;
391
392 let elem_end;
393 loop {
394 if i >= len {
395 return Err(FramingError::UnexpectedEof);
396 }
397 let c = bytes[i];
398
399 if in_string {
400 if escaped {
401 escaped = false;
402 } else if c == b'\\' {
403 escaped = true;
404 } else if c == b'"' {
405 in_string = false;
406 }
407 i += 1;
408 continue;
409 }
410
411 match c {
412 b'"' => {
413 in_string = true;
414 i += 1;
415 }
416 b'{' | b'[' => {
417 depth += 1;
418 i += 1;
419 }
420 b'}' => {
421 depth = depth.checked_sub(1).ok_or(FramingError::Unbalanced(i))?;
423 i += 1;
424 }
425 b']' => {
426 if depth == 0 {
427 elem_end = i;
429 i += 1; push_element(blob, &mut records, elem_start, elem_end);
431 return finish(blob, records, i);
432 }
433 depth -= 1;
434 i += 1;
435 }
436 b',' if depth == 0 => {
437 elem_end = i;
439 i += 1; break;
441 }
442 _ => {
443 i += 1;
444 }
445 }
446 }
447
448 push_element(blob, &mut records, elem_start, elem_end);
449 first_element = false;
450 }
451}
452
453fn push_element(blob: &Bytes, records: &mut Vec<Record>, start: usize, end: usize) {
458 let bytes = blob.as_ref();
459 let mut e = end;
460 while e > start && bytes[e - 1].is_ascii_whitespace() {
461 e -= 1;
462 }
463 records.push(Record {
464 payload: blob.slice(start..e),
465 key: None,
466 headers: Vec::new(),
467 metadata: RecordMeta {
468 timestamp_ms: None,
469 format: PayloadFormat::Json,
470 },
471 });
472}
473
474fn finish(blob: &Bytes, records: Vec<Record>, mut i: usize) -> Result<Vec<Record>, FramingError> {
477 let bytes = blob.as_ref();
478 let len = bytes.len();
479 while i < len && bytes[i].is_ascii_whitespace() {
480 i += 1;
481 }
482 if i < len {
483 return Err(FramingError::TrailingGarbage(i));
484 }
485 Ok(records)
486}
487
488impl<T: CommitToken> From<crate::Message<T>> for WorkBatch<T> {
489 fn from(msg: crate::Message<T>) -> Self {
493 let record = Record {
494 payload: msg.payload,
495 key: msg.key,
496 headers: Vec::new(),
497 metadata: RecordMeta {
498 timestamp_ms: msg.timestamp_ms,
499 format: msg.format,
500 },
501 };
502 Self {
503 records: vec![record],
504 commit_tokens: vec![msg.token],
505 dlq_entries: Vec::new(),
506 }
507 }
508}
509
510impl<T: CommitToken> From<crate::transport::traits::RecvBatch<T>> for WorkBatch<T> {
511 fn from(batch: crate::transport::traits::RecvBatch<T>) -> Self {
518 let mut records = Vec::with_capacity(batch.messages.len());
519 let mut commit_tokens = Vec::with_capacity(batch.messages.len());
520 for msg in batch.messages {
521 commit_tokens.push(msg.token);
522 records.push(Record {
523 payload: msg.payload,
524 key: msg.key,
525 headers: Vec::new(),
526 metadata: RecordMeta {
527 timestamp_ms: msg.timestamp_ms,
528 format: msg.format,
529 },
530 });
531 }
532 Self {
533 records,
534 commit_tokens,
535 dlq_entries: batch.dlq_entries,
536 }
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use crate::Message;
544 use crate::transport::traits::RecvBatch;
545
546 #[derive(Debug, Clone, PartialEq, Eq)]
548 struct TestToken(u64);
549
550 impl std::fmt::Display for TestToken {
551 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552 write!(f, "tok-{}", self.0)
553 }
554 }
555
556 impl CommitToken for TestToken {}
557
558 fn record(payload: &'static [u8]) -> Record {
559 Record {
560 payload: Bytes::from_static(payload),
561 key: Some(Arc::from("events")),
562 headers: vec![("h".to_string(), b"v".to_vec())],
563 metadata: RecordMeta {
564 timestamp_ms: Some(42),
565 format: PayloadFormat::Json,
566 },
567 }
568 }
569
570 #[test]
571 fn empty_has_no_records_tokens_or_dlq() {
572 let b = WorkBatch::<TestToken>::empty();
573 assert!(b.is_empty());
574 assert_eq!(b.len(), 0);
575 assert_eq!(b.record_count(), 0);
576 assert!(b.commit_tokens.is_empty());
577 assert!(b.dlq_entries.is_empty());
578 assert_eq!(b.total_payload_bytes(), 0);
579 }
580
581 #[test]
582 fn from_records_has_no_tokens() {
583 let b = WorkBatch::<TestToken>::from_records(vec![record(b"{}"), record(b"[]")]);
584 assert_eq!(b.len(), 2);
585 assert!(!b.is_empty());
586 assert!(b.commit_tokens.is_empty());
587 }
588
589 #[test]
590 fn new_carries_records_and_tokens() {
591 let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(1), TestToken(2)]);
592 assert_eq!(b.record_count(), 1);
593 assert_eq!(b.commit_tokens.len(), 2);
594 }
595
596 #[test]
597 fn with_dlq_entries_attaches_entries() {
598 let entry = FilteredDlqEntry {
599 payload: b"bad".to_vec(),
600 key: None,
601 reason: "filter".to_string(),
602 };
603 let b =
604 WorkBatch::<TestToken>::from_records(vec![record(b"{}")]).with_dlq_entries(vec![entry]);
605 assert_eq!(b.dlq_entries.len(), 1);
606 assert_eq!(b.dlq_entries[0].reason, "filter");
607 }
608
609 #[test]
610 fn total_payload_bytes_sums_payloads() {
611 let b = WorkBatch::<TestToken>::from_records(vec![
612 record(b"abc"), record(b"de"), record(b"f"), ]);
616 assert_eq!(b.total_payload_bytes(), 6);
617 }
618
619 #[test]
620 fn map_records_preserves_tokens_and_dlq() {
621 let entry = FilteredDlqEntry {
622 payload: b"bad".to_vec(),
623 key: None,
624 reason: "filter".to_string(),
625 };
626 let b =
627 WorkBatch::new(vec![record(b"{}")], vec![TestToken(7)]).with_dlq_entries(vec![entry]);
628
629 let b = b.map_records(|recs| {
630 recs.into_iter()
632 .map(|mut r| {
633 r.payload = Bytes::from_static(b"changed");
634 r
635 })
636 .collect()
637 });
638
639 assert_eq!(b.record_count(), 1);
640 assert_eq!(b.records[0].payload.as_ref(), b"changed");
641 assert_eq!(b.commit_tokens, vec![TestToken(7)]);
643 assert_eq!(b.dlq_entries.len(), 1);
644 }
645
646 #[test]
647 fn map_records_fan_out_keeps_tokens_intact() {
648 let b = WorkBatch::new(vec![record(b"{}")], vec![TestToken(99)]);
650 assert_eq!(b.record_count(), 1);
651 assert_eq!(b.commit_tokens.len(), 1);
652
653 let b = b.map_records(|recs| {
655 let mut out = Vec::new();
656 for r in recs {
657 out.push(r.clone());
658 out.push(r.clone());
659 out.push(r);
660 }
661 out
662 });
663
664 assert_eq!(b.record_count(), 3);
665 assert_eq!(b.commit_tokens, vec![TestToken(99)]);
666 }
667
668 #[test]
669 fn from_message_yields_single_record_batch() {
670 let msg = Message::new(
671 Some(Arc::from("topic")),
672 b"{\"a\":1}".to_vec(),
673 TestToken(5),
674 Some(11),
675 );
676 let b: WorkBatch<TestToken> = msg.into();
677
678 assert_eq!(b.record_count(), 1);
679 assert_eq!(b.commit_tokens, vec![TestToken(5)]);
680 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
681 assert_eq!(b.records[0].key.as_deref(), Some("topic"));
682 assert_eq!(b.records[0].metadata.timestamp_ms, Some(11));
683 assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
684 assert!(b.dlq_entries.is_empty());
685 }
686
687 #[test]
688 fn from_recv_batch_collapses_and_preserves_order() {
689 let entry = FilteredDlqEntry {
690 payload: b"bad".to_vec(),
691 key: None,
692 reason: "drop-it".to_string(),
693 };
694 let recv = RecvBatch {
695 messages: vec![
696 Message::new(Some(Arc::from("a")), b"{}".to_vec(), TestToken(1), None),
697 Message::new(Some(Arc::from("b")), b"[]".to_vec(), TestToken(2), None),
698 Message::new(None, b"{}".to_vec(), TestToken(3), None),
699 ],
700 dlq_entries: vec![entry],
701 };
702
703 let b: WorkBatch<TestToken> = recv.into();
704
705 assert_eq!(b.record_count(), 3);
706 assert_eq!(
708 b.commit_tokens,
709 vec![TestToken(1), TestToken(2), TestToken(3)]
710 );
711 assert_eq!(b.dlq_entries.len(), 1);
713 assert_eq!(b.dlq_entries[0].reason, "drop-it");
714 assert_eq!(b.records[0].key.as_deref(), Some("a"));
716 assert_eq!(b.records[2].key, None);
717 }
718
719 #[test]
720 fn from_message_moves_payload_without_copying() {
721 let payload = b"zero-copy-please".to_vec();
724 let payload_ptr = payload.as_ptr();
725
726 let msg = Message::new(Some(Arc::from("topic")), payload, TestToken(1), None);
727 assert_eq!(msg.payload.as_ptr(), payload_ptr);
729
730 let wb: WorkBatch<TestToken> = msg.into();
731
732 assert_eq!(wb.records[0].payload.as_ptr(), payload_ptr);
735 }
736
737 #[test]
744 fn bytes_payload_travels_zero_copy_through_workbatch() {
745 let raw = b"bytes-zero-copy-payload-test".to_vec();
748 let src: Bytes = raw.into();
749 let src_ptr = src.as_ptr();
750
751 let msg = Message::new(Some(Arc::from("k")), src, TestToken(42), Some(99));
753 assert_eq!(msg.payload.as_ptr(), src_ptr, "copy at Message::new");
755
756 let wb: WorkBatch<TestToken> = msg.into();
758 assert_eq!(
759 wb.records[0].payload.as_ptr(),
760 src_ptr,
761 "copy at From<Message> for WorkBatch"
762 );
763
764 let cloned = wb.records[0].payload.clone();
766 assert_eq!(
767 cloned.as_ptr(),
768 src_ptr,
769 "clone allocated a new buffer instead of bumping refcount"
770 );
771 }
772
773 #[test]
774 fn from_recv_batch_moves_payloads_without_copying() {
775 let p0 = b"first-buffer".to_vec();
776 let p1 = b"second-buffer".to_vec();
777 let p0_ptr = p0.as_ptr();
778 let p1_ptr = p1.as_ptr();
779
780 let recv = RecvBatch {
781 messages: vec![
782 Message::new(Some(Arc::from("a")), p0, TestToken(1), None),
783 Message::new(Some(Arc::from("b")), p1, TestToken(2), None),
784 ],
785 dlq_entries: Vec::new(),
786 };
787
788 let wb: WorkBatch<TestToken> = recv.into();
789
790 assert_eq!(wb.records[0].payload.as_ptr(), p0_ptr);
792 assert_eq!(wb.records[1].payload.as_ptr(), p1_ptr);
793 }
794
795 #[test]
796 fn payload_is_bytes_and_clone_is_zero_copy() {
797 let r = record(b"shared-buffer");
799 let p1 = r.payload.clone();
800 let r2 = r.clone();
801 assert_eq!(p1.as_ptr(), r2.payload.as_ptr());
803 assert_eq!(r2.payload.as_ref(), b"shared-buffer");
804 }
805
806 fn assert_within(slice: &Bytes, blob: &Bytes) {
811 let blob_start = blob.as_ptr() as usize;
812 let blob_end = blob_start + blob.len();
813 let slice_start = slice.as_ptr() as usize;
814 let slice_end = slice_start + slice.len();
815 assert!(
816 slice_start >= blob_start && slice_end <= blob_end,
817 "slice [{slice_start:#x}, {slice_end:#x}) is not within blob \
818 [{blob_start:#x}, {blob_end:#x}) -- it is a copy, not a view"
819 );
820 }
821
822 #[test]
825 fn single_holds_whole_blob_as_one_record() {
826 let blob = Bytes::from_static(b"{\"a\":1}");
827 let b = WorkBatch::<TestToken>::single(blob.clone());
828 assert_eq!(b.record_count(), 1);
829 assert!(b.commit_tokens.is_empty());
830 assert!(b.dlq_entries.is_empty());
831 assert_eq!(b.records[0].payload, blob);
832 assert_eq!(b.records[0].key, None);
833 assert!(b.records[0].headers.is_empty());
834 assert_eq!(b.records[0].payload.as_ptr(), blob.as_ptr());
836 }
837
838 #[test]
839 fn single_detects_format_json_object() {
840 let b = WorkBatch::<TestToken>::single(Bytes::from_static(b"{\"a\":1}"));
841 assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
842 }
843
844 #[test]
845 fn single_detects_format_msgpack() {
846 let b = WorkBatch::<TestToken>::single(Bytes::from_static(&[0x81, 0xa1, 0x61]));
848 assert_eq!(b.records[0].metadata.format, PayloadFormat::MsgPack);
849 }
850
851 #[test]
854 fn ndjson_splits_lines_into_records() {
855 let blob = Bytes::from_static(b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}");
856 let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
857 assert_eq!(b.record_count(), 3);
858 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
859 assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
860 assert_eq!(b.records[2].payload.as_ref(), b"{\"c\":3}");
861 for r in &b.records {
862 assert_eq!(r.metadata.format, PayloadFormat::Json);
863 assert_within(&r.payload, &blob);
864 }
865 assert!(b.commit_tokens.is_empty());
866 }
867
868 #[test]
869 fn ndjson_trims_trailing_carriage_return() {
870 let blob = Bytes::from_static(b"{\"a\":1}\r\n{\"b\":2}\r\n");
872 let b = WorkBatch::<TestToken>::from_ndjson(blob);
873 assert_eq!(b.record_count(), 2);
874 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
875 assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
876 }
877
878 #[test]
879 fn ndjson_skips_blank_and_whitespace_only_lines() {
880 let blob = Bytes::from_static(b"{\"a\":1}\n\n \n{\"b\":2}\n\t\r\n");
881 let b = WorkBatch::<TestToken>::from_ndjson(blob);
882 assert_eq!(b.record_count(), 2);
883 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
884 assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
885 }
886
887 #[test]
888 fn ndjson_empty_blob_yields_no_records() {
889 let b = WorkBatch::<TestToken>::from_ndjson(Bytes::new());
890 assert_eq!(b.record_count(), 0);
891 }
892
893 #[test]
894 fn ndjson_single_line_no_newline() {
895 let blob = Bytes::from_static(b"{\"only\":true}");
896 let b = WorkBatch::<TestToken>::from_ndjson(blob.clone());
897 assert_eq!(b.record_count(), 1);
898 assert_eq!(b.records[0].payload.as_ref(), b"{\"only\":true}");
899 assert_within(&b.records[0].payload, &blob);
900 }
901
902 #[test]
903 fn ndjson_preserves_inner_whitespace_but_trims_edges() {
904 let blob = Bytes::from_static(b" {\"a\": 1} \n");
906 let b = WorkBatch::<TestToken>::from_ndjson(blob);
907 assert_eq!(b.record_count(), 1);
908 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\": 1}");
909 }
910
911 #[test]
914 fn json_array_empty_yields_no_records() {
915 let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[]")).unwrap();
916 assert_eq!(b.record_count(), 0);
917 assert!(b.commit_tokens.is_empty());
918 assert!(b.dlq_entries.is_empty());
919 }
920
921 #[test]
922 fn json_array_empty_with_whitespace() {
923 let b = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b" [ ] ")).unwrap();
924 assert_eq!(b.record_count(), 0);
925 }
926
927 #[test]
928 fn json_array_single_element() {
929 let blob = Bytes::from_static(b"[{\"a\":1}]");
930 let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
931 assert_eq!(b.record_count(), 1);
932 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
933 assert_eq!(b.records[0].metadata.format, PayloadFormat::Json);
934 assert_within(&b.records[0].payload, &blob);
935 }
936
937 #[test]
938 fn json_array_multiple_scalar_elements() {
939 let blob = Bytes::from_static(b"[1, 2, 3]");
940 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
941 assert_eq!(b.record_count(), 3);
942 assert_eq!(b.records[0].payload.as_ref(), b"1");
943 assert_eq!(b.records[1].payload.as_ref(), b"2");
944 assert_eq!(b.records[2].payload.as_ref(), b"3");
945 }
946
947 #[test]
948 fn json_array_trims_whitespace_around_elements() {
949 let blob = Bytes::from_static(b"[ {\"a\":1} , {\"b\":2} ]");
950 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
951 assert_eq!(b.record_count(), 2);
952 assert_eq!(b.records[0].payload.as_ref(), b"{\"a\":1}");
953 assert_eq!(b.records[1].payload.as_ref(), b"{\"b\":2}");
954 }
955
956 #[test]
957 fn json_array_leading_trailing_whitespace_and_newlines() {
958 let blob = Bytes::from_static(b"\n\t [\n 1,\n 2\n] \n");
959 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
960 assert_eq!(b.record_count(), 2);
961 assert_eq!(b.records[0].payload.as_ref(), b"1");
962 assert_eq!(b.records[1].payload.as_ref(), b"2");
963 }
964
965 #[test]
966 fn json_array_string_with_brackets_and_commas() {
967 let blob = Bytes::from_static(b"[\"a,b[c]{d}\", \"plain\"]");
969 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
970 assert_eq!(b.record_count(), 2);
971 assert_eq!(b.records[0].payload.as_ref(), b"\"a,b[c]{d}\"");
972 assert_eq!(b.records[1].payload.as_ref(), b"\"plain\"");
973 }
974
975 #[test]
976 fn json_array_string_with_escaped_quote() {
977 let blob = Bytes::from_static(b"[\"he said \\\"hi\\\", then left\", 7]");
979 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
980 assert_eq!(b.record_count(), 2);
981 assert_eq!(
982 b.records[0].payload.as_ref(),
983 b"\"he said \\\"hi\\\", then left\""
984 );
985 assert_eq!(b.records[1].payload.as_ref(), b"7");
986 }
987
988 #[test]
989 fn json_array_string_with_escaped_backslash_then_closing_quote() {
990 let blob = Bytes::from_static(b"[\"path\\\\\", 1]");
993 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
994 assert_eq!(b.record_count(), 2);
995 assert_eq!(b.records[0].payload.as_ref(), b"\"path\\\\\"");
996 assert_eq!(b.records[1].payload.as_ref(), b"1");
997 }
998
999 #[test]
1000 fn json_array_nested_arrays_and_objects() {
1001 let blob = Bytes::from_static(b"[[1,2],[3],{\"k\":[4,5]}]");
1002 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1003 assert_eq!(b.record_count(), 3);
1004 assert_eq!(b.records[0].payload.as_ref(), b"[1,2]");
1005 assert_eq!(b.records[1].payload.as_ref(), b"[3]");
1006 assert_eq!(b.records[2].payload.as_ref(), b"{\"k\":[4,5]}");
1007 }
1008
1009 #[test]
1010 fn json_array_deeply_nested_object_one_element() {
1011 let blob = Bytes::from_static(b"[{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}]");
1012 let b = WorkBatch::<TestToken>::from_json_array(blob).unwrap();
1013 assert_eq!(b.record_count(), 1);
1014 assert_eq!(
1015 b.records[0].payload.as_ref(),
1016 b"{\"a\":{\"b\":{\"c\":[1,{\"d\":2}]}}}"
1017 );
1018 }
1019
1020 #[test]
1021 fn json_array_unicode_in_strings() {
1022 let blob = Bytes::from(r#"["café", "naïve"]"#.as_bytes().to_vec());
1024 let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
1025 assert_eq!(b.record_count(), 2);
1026 assert_eq!(b.records[0].payload.as_ref(), "\"café\"".as_bytes());
1027 assert_eq!(b.records[1].payload.as_ref(), "\"naïve\"".as_bytes());
1028 assert_within(&b.records[1].payload, &blob);
1029 }
1030
1031 #[test]
1032 fn json_array_zero_copy_views_into_blob() {
1033 let blob = Bytes::from_static(b"[{\"a\":1}, {\"b\":2}, {\"c\":3}]");
1034 let b = WorkBatch::<TestToken>::from_json_array(blob.clone()).unwrap();
1035 assert_eq!(b.record_count(), 3);
1036 for r in &b.records {
1037 assert_within(&r.payload, &blob);
1038 }
1039 }
1040
1041 #[test]
1044 fn json_array_no_opening_bracket_errors() {
1045 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"{\"a\":1}")).is_err());
1046 }
1047
1048 #[test]
1049 fn json_array_empty_blob_errors() {
1050 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::new()).is_err());
1051 }
1052
1053 #[test]
1054 fn json_array_whitespace_only_errors() {
1055 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b" \n\t ")).is_err());
1056 }
1057
1058 #[test]
1059 fn json_array_unterminated_errors() {
1060 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2")).is_err());
1061 }
1062
1063 #[test]
1064 fn json_array_unterminated_string_errors() {
1065 assert!(
1066 WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[\"unclosed]")).is_err()
1067 );
1068 }
1069
1070 #[test]
1071 fn json_array_trailing_garbage_errors() {
1072 assert!(
1073 WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2] junk")).is_err()
1074 );
1075 }
1076
1077 #[test]
1078 fn json_array_trailing_comma_errors() {
1079 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1, 2, ]")).is_err());
1080 }
1081
1082 #[test]
1083 fn json_array_leading_comma_errors() {
1084 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[, 1]")).is_err());
1085 }
1086
1087 #[test]
1088 fn json_array_double_comma_errors() {
1089 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1,, 2]")).is_err());
1090 }
1091
1092 #[test]
1093 fn json_array_only_open_bracket_errors() {
1094 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[")).is_err());
1095 }
1096
1097 #[test]
1098 fn json_array_unbalanced_extra_close_errors() {
1099 assert!(WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"[1]]")).is_err());
1101 }
1102
1103 #[test]
1104 fn framing_error_is_displayable() {
1105 let err = WorkBatch::<TestToken>::from_json_array(Bytes::from_static(b"nope")).unwrap_err();
1106 assert!(!err.to_string().is_empty());
1108 }
1109}