1use std::io::{BufRead, BufReader, Read};
2
3use memchr::{memchr, memchr2};
4
5use crate::error::{self, Error};
6use crate::records::{ByteRecord, ByteRecordBuilder, ZeroCopyByteRecord};
7use crate::searcher::Searcher;
8use crate::utils::trim_trailing_cr;
9
10#[derive(Debug)]
11enum ReadResult {
12 InputEmpty,
13 Cr,
14 Lf,
15 Record,
16 End,
17}
18
19#[derive(Debug)]
20enum ReadState {
21 Unquoted,
22 Quoted,
23 Quote,
24}
25
26struct Reader {
29 delimiter: u8,
30 quote: u8,
31 state: ReadState,
32 record_was_read: bool,
33 searcher: Searcher,
34}
35
36impl Reader {
37 fn new(delimiter: u8, quote: u8) -> Self {
38 Self {
39 delimiter,
40 quote,
41 state: ReadState::Unquoted,
42 record_was_read: true,
44 searcher: Searcher::new(delimiter, b'\n', quote),
45 }
46 }
47
48 fn split_record(&mut self, input: &[u8]) -> (ReadResult, usize) {
49 use ReadState::*;
50
51 if input.is_empty() {
52 if !self.record_was_read {
53 self.record_was_read = true;
54 return (ReadResult::Record, 0);
55 }
56
57 return (ReadResult::End, 0);
58 }
59
60 if self.record_was_read {
61 if input[0] == b'\n' {
62 return (ReadResult::Lf, 1);
63 } else if input[0] == b'\r' {
64 return (ReadResult::Cr, 1);
65 }
66 }
67
68 self.record_was_read = false;
69
70 let mut pos: usize = 0;
71
72 while pos < input.len() {
73 match self.state {
74 Unquoted => {
75 if input[pos] == self.quote {
77 self.state = Quoted;
78 pos += 1;
79 continue;
80 }
81
82 if let Some(offset) = memchr2(b'\n', self.quote, &input[pos..]) {
84 pos += offset;
85
86 let byte = input[pos];
87
88 pos += 1;
89
90 if byte == b'\n' {
91 self.record_was_read = true;
92 return (ReadResult::Record, pos);
93 }
94
95 self.state = Quoted;
97 } else {
98 break;
99 }
100 }
101 Quoted => {
102 if let Some(offset) = memchr(self.quote, &input[pos..]) {
104 pos += offset + 1;
105 self.state = Quote;
106 } else {
107 break;
108 }
109 }
110 Quote => {
111 let byte = input[pos];
112
113 pos += 1;
114
115 if byte == self.quote {
116 self.state = Quoted;
117 } else if byte == b'\n' {
118 self.record_was_read = true;
119 self.state = Unquoted;
120 return (ReadResult::Record, pos);
121 } else {
122 self.state = Unquoted;
123 }
124 }
125 }
126 }
127
128 (ReadResult::InputEmpty, input.len())
129 }
130
131 fn split_record_and_find_separators(
132 &mut self,
133 input: &[u8],
134 seps_offset: usize,
135 seps: &mut Vec<usize>,
136 ) -> (ReadResult, usize) {
137 use ReadState::*;
138
139 if input.is_empty() {
140 if !self.record_was_read {
141 self.record_was_read = true;
142 return (ReadResult::Record, 0);
143 }
144
145 return (ReadResult::End, 0);
146 }
147
148 if self.record_was_read {
149 if input[0] == b'\n' {
150 return (ReadResult::Lf, 1);
151 } else if input[0] == b'\r' {
152 return (ReadResult::Cr, 1);
153 }
154 }
155
156 self.record_was_read = false;
157
158 let mut pos: usize = 0;
159
160 while pos < input.len() {
161 match self.state {
162 Unquoted => {
163 if input[pos] == self.quote {
165 self.state = Quoted;
166 pos += 1;
167 continue;
168 }
169
170 let mut last_offset: usize = 0;
172
173 for offset in self.searcher.search(&input[pos..]) {
174 last_offset = offset + 1;
175
176 let byte = input[pos + offset];
177
178 if byte == self.delimiter {
179 seps.push(seps_offset + pos + offset);
180 continue;
181 }
182
183 if byte == b'\n' {
184 self.record_was_read = true;
185 return (ReadResult::Record, pos + last_offset);
186 }
187
188 self.state = Quoted;
190 break;
191 }
192
193 if last_offset > 0 {
194 pos += last_offset;
195 } else {
196 break;
197 }
198 }
199 Quoted => {
200 if let Some(offset) = memchr(self.quote, &input[pos..]) {
202 pos += offset + 1;
203 self.state = Quote;
204 } else {
205 break;
206 }
207 }
208 Quote => {
209 let byte = input[pos];
210
211 pos += 1;
212
213 if byte == self.quote {
214 self.state = Quoted;
215 } else if byte == self.delimiter {
216 seps.push(seps_offset + pos - 1);
217 self.state = Unquoted;
218 } else if byte == b'\n' {
219 self.record_was_read = true;
220 self.state = Unquoted;
221 return (ReadResult::Record, pos);
222 } else {
223 self.state = Unquoted;
224 }
225 }
226 }
227 }
228
229 (ReadResult::InputEmpty, input.len())
230 }
231
232 fn read_record(
233 &mut self,
234 input: &[u8],
235 record_builder: &mut ByteRecordBuilder,
236 ) -> (ReadResult, usize) {
237 use ReadState::*;
238
239 if input.is_empty() {
240 if !self.record_was_read {
241 self.record_was_read = true;
242
243 record_builder.finalize_field();
245 return (ReadResult::Record, 0);
246 }
247
248 return (ReadResult::End, 0);
249 }
250
251 if self.record_was_read {
252 if input[0] == b'\n' {
253 return (ReadResult::Lf, 1);
254 } else if input[0] == b'\r' {
255 return (ReadResult::Cr, 1);
256 }
257 }
258
259 self.record_was_read = false;
260
261 let mut pos: usize = 0;
262
263 while pos < input.len() {
264 match self.state {
265 Unquoted => {
266 if input[pos] == self.quote {
268 self.state = Quoted;
269 pos += 1;
270 continue;
271 }
272
273 let mut last_offset: usize = 0;
275
276 for offset in self.searcher.search(&input[pos..]) {
277 last_offset = offset + 1;
278
279 let byte = input[pos + offset];
280
281 if byte == self.delimiter {
284 record_builder.finalize_field_preemptively(offset);
285 continue;
286 }
287
288 if byte == b'\n' {
289 record_builder
290 .extend_from_slice(trim_trailing_cr(&input[pos..pos + offset]));
291 record_builder.finalize_field();
292 self.record_was_read = true;
293 return (ReadResult::Record, pos + last_offset);
294 }
295
296 self.state = Quoted;
298 record_builder.bump();
299 break;
300 }
301
302 if last_offset > 0 {
303 record_builder.extend_from_slice(&input[pos..pos + last_offset]);
304 pos += last_offset
305 } else {
306 break;
307 }
308 }
309 Quoted => {
310 if let Some(offset) = memchr(self.quote, &input[pos..]) {
312 record_builder.extend_from_slice(&input[pos..pos + offset]);
313 pos += offset + 1;
314 self.state = Quote;
315 } else {
316 break;
317 }
318 }
319 Quote => {
320 let byte = input[pos];
321
322 pos += 1;
323
324 if byte == self.quote {
325 self.state = Quoted;
326 record_builder.push_byte(byte);
327 } else if byte == self.delimiter {
328 record_builder.finalize_field();
329 self.state = Unquoted;
330 } else if byte == b'\n' {
331 self.record_was_read = true;
332 self.state = Unquoted;
333 record_builder.finalize_field();
334 return (ReadResult::Record, pos);
335 } else {
336 self.state = Unquoted;
337 }
338 }
339 }
340 }
341
342 record_builder.extend_from_slice(&input[pos..]);
343
344 (ReadResult::InputEmpty, input.len())
345 }
346
347 }
441
442pub struct BufferedReader<R> {
443 buffer: BufReader<R>,
444 scratch: Vec<u8>,
445 seps: Vec<usize>,
446 actual_buffer_position: Option<usize>,
447 inner: Reader,
448 field_count: Option<usize>,
449}
450
451impl<R: Read> BufferedReader<R> {
452 pub fn new(reader: R, delimiter: u8, quote: u8) -> Self {
453 Self {
454 buffer: BufReader::new(reader),
455 scratch: Vec::new(),
456 seps: Vec::new(),
457 actual_buffer_position: None,
458 inner: Reader::new(delimiter, quote),
459 field_count: None,
460 }
461 }
462
463 pub fn with_capacity(capacity: usize, reader: R, delimiter: u8, quote: u8) -> Self {
464 Self {
465 buffer: BufReader::with_capacity(capacity, reader),
466 scratch: Vec::new(),
467 seps: Vec::new(),
468 actual_buffer_position: None,
469 inner: Reader::new(delimiter, quote),
470 field_count: None,
471 }
472 }
473
474 #[inline]
475 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
476 match self.field_count {
477 Some(expected) => {
478 if written != expected {
479 return Err(Error::unequal_lengths(expected, written));
480 }
481 }
482 None => {
483 self.field_count = Some(written);
484 }
485 }
486
487 Ok(())
488 }
489
490 pub fn strip_bom(&mut self) -> error::Result<()> {
491 let input = self.buffer.fill_buf()?;
492
493 if input.len() >= 3 && &input[..3] == b"\xef\xbb\xbf" {
494 self.buffer.consume(3);
495 }
496
497 Ok(())
498 }
499
500 pub fn first_byte_record(&mut self, consume: bool) -> error::Result<ByteRecord> {
501 use ReadResult::*;
502
503 let mut record = ByteRecord::new();
504 let mut record_builder = ByteRecordBuilder::wrap(&mut record);
505
506 let input = self.buffer.fill_buf()?;
507
508 let (result, pos) = self.inner.read_record(input, &mut record_builder);
509
510 match result {
511 End => Ok(ByteRecord::new()),
512
513 Cr | Lf | ReadResult::InputEmpty => Err(Error::invalid_headers()),
516 Record => {
517 if consume {
518 self.buffer.consume(pos);
519 }
520
521 Ok(record)
522 }
523 }
524 }
525
526 pub fn count_records(&mut self) -> error::Result<u64> {
527 use ReadResult::*;
528
529 let mut count: u64 = 0;
530
531 loop {
532 let input = self.buffer.fill_buf()?;
533
534 let (result, pos) = self.inner.split_record(input);
535
536 self.buffer.consume(pos);
537
538 match result {
539 End => break,
540 InputEmpty | Cr | Lf => continue,
541 Record => {
542 count += 1;
543 }
544 };
545 }
546
547 Ok(count)
548 }
549
550 pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
551 use ReadResult::*;
552
553 self.scratch.clear();
554
555 if let Some(last_pos) = self.actual_buffer_position.take() {
556 self.buffer.consume(last_pos);
557 }
558
559 loop {
560 let input = self.buffer.fill_buf()?;
561
562 let (result, pos) = self.inner.split_record(input);
563
564 match result {
565 End => {
566 self.buffer.consume(pos);
567 return Ok(None);
568 }
569 Cr | Lf => {
570 self.buffer.consume(pos);
571 }
572 InputEmpty => {
573 self.scratch.extend_from_slice(input);
574 self.buffer.consume(pos);
575 }
576 Record => {
577 if self.scratch.is_empty() {
578 self.actual_buffer_position = Some(pos);
579 return Ok(Some(&self.buffer.buffer()[..pos]));
580 } else {
581 self.scratch.extend_from_slice(&input[..pos]);
582 self.buffer.consume(pos);
583
584 return Ok(Some(&self.scratch));
585 }
586 }
587 };
588 }
589 }
590
591 pub fn read_zero_copy_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
592 use ReadResult::*;
593
594 self.scratch.clear();
595 self.seps.clear();
596
597 if let Some(last_pos) = self.actual_buffer_position.take() {
598 self.buffer.consume(last_pos);
599 }
600
601 loop {
602 let input = self.buffer.fill_buf()?;
603
604 let (result, pos) = self.inner.split_record_and_find_separators(
605 input,
606 self.scratch.len(),
607 &mut self.seps,
608 );
609
610 match result {
611 End => {
612 self.buffer.consume(pos);
613 return Ok(None);
614 }
615 Cr | Lf => {
616 self.buffer.consume(pos);
617 }
618 InputEmpty => {
619 self.scratch.extend_from_slice(input);
620 self.buffer.consume(pos);
621 }
622 Record => {
623 if self.scratch.is_empty() {
624 self.check_field_count(self.seps.len() + 1)?;
625 self.actual_buffer_position = Some(pos);
626 return Ok(Some(ZeroCopyByteRecord::new(
627 &self.buffer.buffer()[..pos],
628 &self.seps,
629 )));
630 } else {
631 self.scratch.extend_from_slice(&input[..pos]);
632 self.buffer.consume(pos);
633 self.check_field_count(self.seps.len() + 1)?;
634 return Ok(Some(ZeroCopyByteRecord::new(&self.scratch, &self.seps)));
635 }
636 }
637 };
638 }
639 }
640
641 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
642 use ReadResult::*;
643
644 record.clear();
645
646 let mut record_builder = ByteRecordBuilder::wrap(record);
647
648 if let Some(last_pos) = self.actual_buffer_position.take() {
649 self.buffer.consume(last_pos);
650 }
651
652 loop {
653 let input = self.buffer.fill_buf()?;
654
655 let (result, pos) = self.inner.read_record(input, &mut record_builder);
656
657 self.buffer.consume(pos);
658
659 match result {
660 End => {
661 return Ok(false);
662 }
663 Cr | Lf | InputEmpty => {
664 continue;
665 }
666 Record => {
667 self.check_field_count(record.len())?;
668 return Ok(true);
669 }
670 };
671 }
672 }
673
674 pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
675 ByteRecordsIter {
676 reader: self,
677 record: ByteRecord::new(),
678 }
679 }
680
681 pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
682 ByteRecordsIntoIter {
683 reader: self,
684 record: ByteRecord::new(),
685 }
686 }
687}
688
689pub struct ByteRecordsIter<'r, R> {
690 reader: &'r mut BufferedReader<R>,
691 record: ByteRecord,
692}
693
694impl<'r, R: Read> Iterator for ByteRecordsIter<'r, R> {
695 type Item = error::Result<ByteRecord>;
696
697 fn next(&mut self) -> Option<Self::Item> {
698 match self.reader.read_byte_record(&mut self.record) {
701 Err(err) => Some(Err(err)),
702 Ok(true) => Some(Ok(self.record.clone())),
703 Ok(false) => None,
704 }
705 }
706}
707
708pub struct ByteRecordsIntoIter<R> {
709 reader: BufferedReader<R>,
710 record: ByteRecord,
711}
712
713impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
714 type Item = error::Result<ByteRecord>;
715
716 fn next(&mut self) -> Option<Self::Item> {
717 match self.reader.read_byte_record(&mut self.record) {
720 Err(err) => Some(Err(err)),
721 Ok(true) => Some(Ok(self.record.clone())),
722 Ok(false) => None,
723 }
724 }
725}
726
727pub struct TotalReader<'b> {
730 inner: Reader,
731 bytes: &'b [u8],
732 pos: usize,
733}
734
735impl<'b> TotalReader<'b> {
736 pub fn new(delimiter: u8, quote: u8, bytes: &'b [u8]) -> Self {
737 Self {
738 inner: Reader::new(delimiter, quote),
739 bytes,
740 pos: 0,
741 }
742 }
743
744 pub fn count_records(&mut self) -> u64 {
745 use ReadResult::*;
746
747 let mut count: u64 = 0;
748
749 loop {
750 let (result, pos) = self.inner.split_record(&self.bytes[self.pos..]);
751
752 self.pos += pos;
753
754 match result {
755 End => break,
756 InputEmpty | Cr | Lf => continue,
757 Record => {
758 count += 1;
759 }
760 };
761 }
762
763 count
764 }
765
766 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
767 use ReadResult::*;
768
769 record.clear();
770
771 let mut record_builder = ByteRecordBuilder::wrap(record);
772
773 loop {
774 let (result, pos) = self
775 .inner
776 .read_record(&self.bytes[self.pos..], &mut record_builder);
777
778 self.pos += pos;
779
780 match result {
781 End => {
782 return Ok(false);
783 }
784 Cr | Lf | InputEmpty => {
785 continue;
786 }
787 Record => {
788 return Ok(true);
789 }
790 };
791 }
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use std::io::Cursor;
798
799 use crate::brec;
800
801 use super::*;
802
803 fn count_records(data: &str, capacity: usize) -> u64 {
804 let mut splitter = BufferedReader::with_capacity(capacity, Cursor::new(data), b',', b'"');
805 splitter.count_records().unwrap()
806 }
807
808 #[test]
809 fn test_count() {
810 assert_eq!(count_records("", 1024), 0);
812
813 let tests = vec![
815 "name\njohn\nlucy",
816 "name\njohn\nlucy\n",
817 "name\n\njohn\r\nlucy\n",
818 "name\n\njohn\r\nlucy\n\n",
819 "name\n\n\njohn\r\n\r\nlucy\n\n\n",
820 "\nname\njohn\nlucy",
821 "\n\nname\njohn\nlucy",
822 "\r\n\r\nname\njohn\nlucy",
823 "name\njohn\nlucy\r\n",
824 "name\njohn\nlucy\r\n\r\n",
825 ];
826
827 for capacity in [32usize, 4, 3, 2, 1] {
828 for test in tests.iter() {
829 assert_eq!(
830 count_records(test, capacity),
831 3,
832 "capacity={} string={:?}",
833 capacity,
834 test
835 );
836 }
837 }
838
839 let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
841 assert_eq!(count_records(data, 1024), 3);
842
843 for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
845 let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
846
847 assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
848 }
849
850 let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
852 assert_eq!(count_records(data, 1024), 3);
853 }
854
855 #[test]
856 fn test_read_zero_copy_byte_record() -> error::Result<()> {
857 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
858
859 let mut reader = BufferedReader::with_capacity(32, Cursor::new(csv), b',', b'"');
860 let mut records = Vec::new();
861
862 let expected = vec![
863 vec!["name", "surname", "age"],
864 vec![
865 "\"john\"",
866 "\"landy, the \"\"everlasting\"\" bastard\"",
867 "45",
868 ],
869 vec!["lucy", "rose", "\"67\""],
870 vec!["jermaine", "jackson", "\"89\""],
871 vec!["karine", "loucan", "\"52\""],
872 vec!["rose", "\"glib\"", "12"],
873 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
874 ]
875 .into_iter()
876 .map(|record| {
877 record
878 .into_iter()
879 .map(|cell| cell.as_bytes().to_vec())
880 .collect::<Vec<_>>()
881 })
882 .collect::<Vec<_>>();
883
884 while let Some(record) = reader.read_zero_copy_byte_record()? {
885 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
886 }
887
888 assert_eq!(records, expected);
889
890 Ok(())
891 }
892
893 #[test]
894 fn test_read_byte_record() -> error::Result<()> {
895 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
896
897 let expected = vec![
898 brec!["name", "surname", "age"],
899 brec!["john", "landy, the \"everlasting\" bastard", "45"],
900 brec!["\"ok\"", "whatever", "dude"],
901 brec!["lucy", "rose", "67"],
902 brec!["jermaine", "jackson", "89"],
903 brec!["karine", "loucan", "52"],
904 brec!["rose", "glib", "12"],
905 brec!["guillaume", "plique", "42"],
906 ];
907
908 for capacity in [32usize, 4, 3, 2, 1] {
909 let mut reader = BufferedReader::with_capacity(capacity, Cursor::new(csv), b',', b'"');
910
911 assert_eq!(
912 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
913 expected
914 );
915 }
916
917 Ok(())
918 }
919
920 #[test]
921 fn test_strip_bom() -> error::Result<()> {
922 let mut reader = BufferedReader::new(Cursor::new("name,surname,age"), b',', b'"');
923 reader.strip_bom()?;
924
925 assert_eq!(
926 reader.byte_records().next().unwrap()?,
927 brec!["name", "surname", "age"]
928 );
929
930 let mut reader =
931 BufferedReader::new(Cursor::new(b"\xef\xbb\xbfname,surname,age"), b',', b'"');
932 reader.strip_bom()?;
933
934 assert_eq!(
935 reader.byte_records().next().unwrap()?,
936 brec!["name", "surname", "age"]
937 );
938
939 Ok(())
940 }
941
942 #[test]
943 fn test_empty_row() -> error::Result<()> {
944 let data = "name\n\"\"\nlucy\n\"\"";
945
946 let mut reader = BufferedReader::new(Cursor::new(data), b',', b'"');
948
949 assert_eq!(reader.count_records()?, 4);
950
951 let mut reader = BufferedReader::new(Cursor::new(data), b',', b'"');
953
954 let expected = vec![
955 vec!["name".as_bytes().to_vec()],
956 vec!["\"\"".as_bytes().to_vec()],
957 vec!["lucy".as_bytes().to_vec()],
958 vec!["\"\"".as_bytes().to_vec()],
959 ];
960
961 let mut records = Vec::new();
963
964 while let Some(record) = reader.read_zero_copy_byte_record()? {
965 records.push(vec![record.as_slice().to_vec()]);
966 }
967
968 assert_eq!(records, expected);
969
970 let reader = BufferedReader::new(Cursor::new(data), b',', b'"');
971
972 let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
973
974 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
975
976 assert_eq!(records, expected);
977
978 Ok(())
979 }
980
981 #[test]
982 fn test_crlf() -> error::Result<()> {
983 let reader = BufferedReader::new(
984 Cursor::new("name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n"),
985 b',',
986 b'"',
987 );
988
989 let expected = vec![
990 brec!["name", "surname"],
991 brec!["lucy", "john"],
992 brec!["evan", "zhong"],
993 brec!["béatrice", "glougou"],
994 ];
995
996 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
997
998 assert_eq!(records, expected);
999
1000 Ok(())
1001 }
1002
1003 #[test]
1004 fn test_quote_always() -> error::Result<()> {
1005 let reader = BufferedReader::new(
1006 Cursor::new("\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\""),
1007 b',',
1008 b'"',
1009 );
1010
1011 let expected = vec![
1012 brec!["name", "surname"],
1013 brec!["lucy", "rose"],
1014 brec!["john", "mayhew"],
1015 ];
1016
1017 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
1018
1019 assert_eq!(records, expected);
1020
1021 Ok(())
1022 }
1023}