Skip to main content

simd_csv/
seeker.rs

1use std::io::{Cursor, Read, Seek, SeekFrom};
2use std::ops::Range;
3
4use crate::error::{self, Error, ErrorKind};
5use crate::reader::Reader;
6use crate::records::ByteRecord;
7use crate::splitter::Splitter;
8use crate::utils::ReverseReader;
9use crate::zero_copy_reader::{ZeroCopyReader, ZeroCopyReaderBuilder};
10
11#[derive(Debug)]
12struct SeekerSample {
13    headers: ByteRecord,
14    record_count: u64,
15    max_record_size: u64,
16    median_record_size: u64,
17    initial_position: u64,
18    first_record_position: u64,
19    fields_mean_sizes: Vec<f64>,
20    stream_len: u64,
21    has_reached_eof: bool,
22}
23
24impl SeekerSample {
25    fn from_reader<R: Read + Seek>(
26        mut reader: R,
27        csv_reader_builder: &ZeroCopyReaderBuilder,
28        sample_size: u64,
29    ) -> error::Result<Option<Self>> {
30        // NOTE: the given reader might have already read.
31        // This is for instance the case for CSV-adjacent formats boasting
32        // metadata in a header before tabular records even start.
33        let initial_position = reader.stream_position()?;
34
35        let mut csv_reader = csv_reader_builder.from_reader(&mut reader);
36
37        let headers = csv_reader.byte_headers()?.clone();
38
39        let first_record_position = if csv_reader.has_headers() {
40            initial_position + csv_reader.position()
41        } else {
42            initial_position
43        };
44
45        let mut i: u64 = 0;
46        let mut record_sizes: Vec<u64> = Vec::new();
47        let mut fields_sizes: Vec<Vec<usize>> = Vec::with_capacity(sample_size as usize);
48
49        while i < sample_size {
50            if let Some(record) = csv_reader.read_byte_record()? {
51                // The "+ 1" is taking \n into account for better accuracy
52                let record_size = record.as_slice().len() as u64 + 1;
53
54                record_sizes.push(record_size);
55                fields_sizes.push(record.iter().map(|cell| cell.len()).collect());
56
57                i += 1;
58            } else {
59                break;
60            }
61        }
62
63        // Not enough data to produce decent sample
64        if i == 0 {
65            return Ok(None);
66        }
67
68        let has_reached_eof = csv_reader.read_byte_record()?.is_none();
69        let file_len = reader.seek(SeekFrom::End(0))?;
70        let fields_mean_sizes = (0..headers.len())
71            .map(|i| {
72                fields_sizes.iter().map(|sizes| sizes[i]).sum::<usize>() as f64
73                    / fields_sizes.len() as f64
74            })
75            .collect();
76
77        record_sizes.sort();
78
79        Ok(Some(Self {
80            headers,
81            record_count: i,
82            max_record_size: *record_sizes.last().unwrap(),
83            median_record_size: record_sizes[record_sizes.len() / 2],
84            initial_position,
85            first_record_position,
86            fields_mean_sizes,
87            has_reached_eof,
88            stream_len: file_len,
89        }))
90    }
91}
92
93fn cosine(profile: &[f64], other: impl Iterator<Item = usize>) -> f64 {
94    let mut self_norm = 0.0;
95    let mut other_norm = 0.0;
96    let mut intersection = 0.0;
97
98    for (a, b) in profile.iter().copied().zip(other.map(|i| i as f64)) {
99        self_norm += a * a;
100        other_norm += b * b;
101        intersection += a * b;
102    }
103
104    intersection / (self_norm * other_norm).sqrt()
105}
106
107/// Builds a [`Seeker`] with given configuration.
108pub struct SeekerBuilder {
109    delimiter: u8,
110    quote: u8,
111    has_headers: bool,
112    buffer_capacity: usize,
113    sample_size: u64,
114    lookahead_factor: u64,
115}
116
117impl Default for SeekerBuilder {
118    fn default() -> Self {
119        Self {
120            delimiter: b',',
121            quote: b'"',
122            buffer_capacity: 8192,
123            has_headers: true,
124            sample_size: 128,
125            lookahead_factor: 32,
126        }
127    }
128}
129
130impl SeekerBuilder {
131    /// Create a new [`SeekerBuilder`] with default configuration.
132    pub fn new() -> Self {
133        Self::default()
134    }
135
136    /// Create a new [`SeekerBuilder`] with provided `capacity`.
137    pub fn with_capacity(capacity: usize) -> Self {
138        let mut reader = Self::default();
139        reader.buffer_capacity(capacity);
140        reader
141    }
142
143    /// Set the delimiter to be used by the created [`Seeker`].
144    ///
145    /// This delimiter must be a single byte.
146    ///
147    /// Will default to a comma.
148    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
149        self.delimiter = delimiter;
150        self
151    }
152
153    /// Set the quote char to be used by the created [`Seeker`].
154    ///
155    /// This char must be a single byte.
156    ///
157    /// Will default to a double quote.
158    pub fn quote(&mut self, quote: u8) -> &mut Self {
159        self.quote = quote;
160        self
161    }
162
163    /// Set the capacity of the created [`Seeker`]'s buffered reader.
164    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
165        self.buffer_capacity = capacity;
166        self
167    }
168
169    /// Set the sample size of the seeker, i.e. the maximum number of records
170    /// the seeker will attempt to prebuffer to collect some useful statistics
171    /// about target CSV stream.
172    ///
173    /// Will default to `128`.
174    pub fn sample_size(&mut self, size: u64) -> &mut Self {
175        self.sample_size = size;
176        self
177    }
178
179    /// Set the lookahead factor of the seeker, i.e. an approximate number of
180    /// records the seeker will read ahead when calling
181    /// [`Seeker::find_record_after`].
182    ///
183    /// Will default to `32`.
184    pub fn lookahead_factor(&mut self, factor: u64) -> &mut Self {
185        self.lookahead_factor = factor;
186        self
187    }
188
189    /// Indicate whether first record must be understood as a header.
190    ///
191    /// Will default to `true`.
192    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
193        self.has_headers = yes;
194        self
195    }
196
197    /// Create a new [`Seeker`] using the provided reader implementing
198    /// [`std::io::Read`].
199    pub fn from_reader<R: Read + Seek>(&self, mut reader: R) -> error::Result<Option<Seeker<R>>> {
200        let mut builder = ZeroCopyReaderBuilder::new();
201
202        builder
203            .buffer_capacity(self.buffer_capacity)
204            .delimiter(self.delimiter)
205            .quote(self.quote)
206            .has_headers(self.has_headers);
207
208        match SeekerSample::from_reader(&mut reader, &builder, self.sample_size) {
209            Ok(Some(sample)) => {
210                builder.has_headers(false).flexible(true);
211
212                Ok(Some(Seeker {
213                    inner: reader,
214                    lookahead_factor: self.lookahead_factor,
215                    scratch: Vec::with_capacity(
216                        (self.lookahead_factor * sample.max_record_size) as usize,
217                    ),
218                    sample,
219                    builder,
220                    has_headers: self.has_headers,
221                }))
222            }
223            Ok(None) => Ok(None),
224            Err(err) => Err(err),
225        }
226    }
227}
228
229fn lookahead<R: Read>(
230    reader: &mut ZeroCopyReader<R>,
231    expected_field_count: usize,
232) -> error::Result<Option<(u64, ByteRecord)>> {
233    let mut i: usize = 0;
234    let mut next_record: Option<(u64, ByteRecord)> = None;
235    let mut field_counts: Vec<usize> = Vec::new();
236    let mut pos: u64 = 0;
237
238    while let Some(record) = reader.read_byte_record()? {
239        if i > 0 {
240            field_counts.push(record.len());
241
242            if i == 1 {
243                next_record = Some((pos, record.to_byte_record()));
244            }
245        }
246
247        pos = reader.position();
248        i += 1;
249    }
250
251    // NOTE: if we have less than 2 records beyond the first one, it will be hard to
252    // make a correct decision
253    // NOTE: last record might be unaligned since we artificially clamp the read buffer
254    if field_counts.len() < 2
255        || field_counts[..field_counts.len() - 1]
256            .iter()
257            .any(|l| *l != expected_field_count)
258    {
259        Ok(None)
260    } else {
261        Ok(next_record)
262    }
263}
264
265/// A specialized CSV stream seeker.
266pub struct Seeker<R> {
267    inner: R,
268    sample: SeekerSample,
269    lookahead_factor: u64,
270    scratch: Vec<u8>,
271    builder: ZeroCopyReaderBuilder,
272    has_headers: bool,
273}
274
275impl<R: Read + Seek> Seeker<R> {
276    /// Returns whether this seeker has been configured to interpret the first
277    /// record as a header.
278    pub fn has_headers(&self) -> bool {
279        self.has_headers
280    }
281
282    /// Returns the position the seekable stream was in when instantiating the
283    /// seeker.
284    #[inline(always)]
285    pub fn initial_position(&self) -> u64 {
286        self.sample.initial_position
287    }
288
289    /// Returns the absolute byte offset of the first record (excluding header)
290    /// of the seekable stream.
291    #[inline(always)]
292    pub fn first_record_position(&self) -> u64 {
293        self.sample.first_record_position
294    }
295
296    /// Returns the total number of bytes contained in the seekable stream.
297    #[inline(always)]
298    pub fn stream_len(&self) -> u64 {
299        self.sample.stream_len
300    }
301
302    /// Returns the number of bytes that will be read when performing a
303    /// lookahead in the seekable stream when using
304    /// [`Seeker::find_record_after`].
305    #[inline(always)]
306    pub fn lookahead_len(&self) -> u64 {
307        self.lookahead_factor * self.sample.max_record_size
308    }
309
310    /// Returns the `first_record_position..stream_len` range of the seeker.
311    #[inline(always)]
312    pub fn range(&self) -> Range<u64> {
313        self.sample.first_record_position..self.sample.stream_len
314    }
315
316    /// Returns the exact number of records (header excluded) contained in the
317    /// seekable stream, if the initial sample built when instantiating the
318    /// seeker exhausted the whole stream.
319    #[inline]
320    pub fn exact_count(&self) -> Option<u64> {
321        self.sample
322            .has_reached_eof
323            .then_some(self.sample.record_count)
324    }
325
326    /// Either returns the exact number of records (header excluded) contained
327    /// in the seekable stream or an approximation based on statistics sampled
328    /// from the beginning of the stream and the total stream length.
329    #[inline]
330    pub fn approx_count(&self) -> u64 {
331        let sample = &self.sample;
332
333        if sample.has_reached_eof {
334            sample.record_count
335        } else {
336            ((sample.stream_len - sample.first_record_position) as f64
337                / sample.median_record_size as f64)
338                .ceil() as u64
339        }
340    }
341
342    /// Attempt to find the position, in the seekable stream, of the beginning
343    /// of the CSV record just after the one where `from_pos` would end in.
344    ///
345    /// Beware: if `from_pos` is the exact first byte of a CSV record, this
346    /// method will still return the position of next CSV record because it has
347    /// no way of knowing whether the byte just before `from_pos` is a newline.
348    ///
349    /// This method will return an error if given `from_pos` is out of bounds.
350    ///
351    /// This method will return `None` if it did not succeed in finding  the
352    /// next CSV record starting position. This can typically happen when
353    /// seeking too close to the end of the stream, since this method needs to
354    /// read ahead of the stream to test its heuristics.
355    ///
356    /// ```
357    /// match seeker.find_record_after(1024) {
358    ///     Ok(Some((pos, record))) => {
359    ///         // Everything went fine
360    ///     },
361    ///     Ok(None) => {
362    ///         // Lookahead failed
363    ///     },
364    ///     Err(err) => {
365    ///         // Either `from_pos` was out-of-bounds, or some IO error occurred
366    ///     }
367    /// }
368    /// ```
369    pub fn find_record_after(&mut self, from_pos: u64) -> error::Result<Option<(u64, ByteRecord)>> {
370        if from_pos < self.first_record_position() || from_pos >= self.sample.stream_len {
371            return Err(Error::new(ErrorKind::OutOfBounds {
372                pos: from_pos,
373                start: self.first_record_position(),
374                end: self.sample.stream_len,
375            }));
376        }
377
378        self.inner.seek(SeekFrom::Start(from_pos))?;
379
380        // NOTE: first record does not need to be more complex
381        if from_pos == self.first_record_position() {
382            let first_record = self
383                .builder
384                .from_reader(&mut self.inner)
385                .read_byte_record()?
386                .unwrap()
387                .to_byte_record();
388
389            return Ok(Some((self.first_record_position(), first_record)));
390        }
391
392        self.scratch.clear();
393        (&mut self.inner)
394            .take(self.lookahead_factor * self.sample.max_record_size)
395            .read_to_end(&mut self.scratch)?;
396
397        let mut unquoted_reader = self.builder.from_reader(self.scratch.as_slice());
398        let mut quoted_reader = self
399            .builder
400            .from_reader(Cursor::new(b"\"").chain(self.scratch.as_slice()));
401
402        let expected_field_count = self.sample.headers.len();
403
404        let unquoted = lookahead(&mut unquoted_reader, expected_field_count)?;
405        let quoted = lookahead(&mut quoted_reader, expected_field_count)?;
406
407        match (unquoted, quoted) {
408            (None, None) => Ok(None),
409            (Some((pos, record)), None) => Ok(Some((from_pos + pos, record))),
410            (None, Some((pos, record))) => Ok(Some((from_pos + pos - 1, record))),
411            (Some((unquoted_pos, unquoted_record)), Some((mut quoted_pos, quoted_record))) => {
412                // Sometimes we might fall within a cell whose contents suspiciously yield
413                // the same record structure. In this case we rely on cosine similarity over
414                // record profiles to make sure we select the correct offset.
415                quoted_pos -= 1;
416
417                // A tie in offset pos means we are unquoted
418                if unquoted_pos == quoted_pos {
419                    Ok(Some((from_pos + unquoted_pos, unquoted_record)))
420                } else {
421                    let unquoted_cosine = cosine(
422                        &self.sample.fields_mean_sizes,
423                        unquoted_record.iter().map(|cell| cell.len()),
424                    );
425                    let quoted_cosine = cosine(
426                        &self.sample.fields_mean_sizes,
427                        quoted_record.iter().map(|cell| cell.len()),
428                    );
429
430                    if unquoted_cosine > quoted_cosine {
431                        Ok(Some((from_pos + unquoted_pos, unquoted_record)))
432                    } else {
433                        Ok(Some((from_pos + quoted_pos, quoted_record)))
434                    }
435                }
436            }
437        }
438    }
439
440    /// Split the seekable stream into a maximum of `count` segments.
441    ///
442    /// This method might return less than `count` segments if the stream
443    /// seems too small to safely return that many segments.
444    pub fn segments(&mut self, count: usize) -> error::Result<Vec<(u64, u64)>> {
445        let sample = &self.sample;
446        let file_len = sample.stream_len;
447
448        // File is way too short
449        if self.sample.record_count < count as u64 {
450            return Ok(vec![(self.first_record_position(), file_len)]);
451        }
452
453        let adjusted_file_len = file_len - self.first_record_position();
454
455        // Adjusting chunks
456        let count = count
457            .min(
458                (file_len / (sample.max_record_size * self.lookahead_factor)).saturating_sub(1)
459                    as usize,
460            )
461            .max(1);
462
463        let mut offsets = vec![self.first_record_position()];
464
465        for i in 1..count {
466            let file_offset = ((i as f64 / count as f64) * adjusted_file_len as f64).floor() as u64
467                + self.first_record_position();
468
469            if let Some((record_offset, _)) = self.find_record_after(file_offset)? {
470                offsets.push(record_offset);
471            } else {
472                break;
473            }
474        }
475
476        offsets.push(file_len);
477
478        Ok(offsets.windows(2).map(|w| (w[0], w[1])).collect())
479    }
480
481    /// Returns the headers of the seekable stream, or just the first record the
482    /// seeker was configured thusly.
483    pub fn byte_headers(&self) -> &ByteRecord {
484        &self.sample.headers
485    }
486
487    /// Attempt to read the first record of the seekable stream.
488    pub fn first_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
489        self.inner
490            .seek(SeekFrom::Start(self.first_record_position()))?;
491
492        match self.builder.from_reader(&mut self.inner).read_byte_record() {
493            Ok(Some(record)) => Ok(Some(record.to_byte_record())),
494            Ok(None) => Ok(None),
495            Err(err) => Err(err),
496        }
497    }
498
499    /// Attempt to read the last record of the seekable stream by reading it in
500    /// reverse.
501    pub fn last_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
502        let reverse_reader = ReverseReader::new(
503            &mut self.inner,
504            self.sample.stream_len,
505            self.sample.first_record_position,
506        );
507
508        let mut reverse_csv_reader = self.builder.from_reader(reverse_reader);
509
510        reverse_csv_reader
511            .read_byte_record()
512            .map(|record_opt| record_opt.map(|record| record.to_byte_record_in_reverse()))
513    }
514
515    /// Returns the underlying reader without unwinding its position.
516    pub fn into_inner(self) -> R {
517        self.inner
518    }
519
520    /// Create a [`Splitter`] starting from an arbitrary position. This can be useful
521    /// when you want to use the seeker to find a record at a specific position and
522    /// then read the stream from there. Just be aware that the given position must
523    /// be the exact beginning of a CSV record, or yielded records will therefore
524    /// be incorrect.
525    pub fn into_splitter_at_position(mut self, pos: SeekFrom) -> error::Result<Splitter<R>> {
526        self.inner.seek(pos)?;
527        self.builder.has_headers(false);
528
529        Ok(self.builder.to_splitter_builder().from_reader(self.inner))
530    }
531
532    /// Transform the seeker into a [`Splitter`]. Underlying reader will
533    /// be correctly reset to the stream initial position beforehand.
534    pub fn into_splitter(mut self) -> error::Result<Splitter<R>> {
535        let pos = SeekFrom::Start(self.sample.initial_position);
536
537        self.inner.seek(pos)?;
538        self.builder.has_headers(self.has_headers);
539
540        Ok(self.builder.to_splitter_builder().from_reader(self.inner))
541    }
542
543    /// Create a [`ZeroCopyReader`] starting from an arbitrary position. This can be useful
544    /// when you want to use the seeker to find a record at a specific position and
545    /// then read the stream from there. Just be aware that the given position must
546    /// be the exact beginning of a CSV record, or yielded records will therefore
547    /// be incorrect.
548    pub fn into_zero_copy_reader_at_position(
549        mut self,
550        pos: SeekFrom,
551    ) -> error::Result<ZeroCopyReader<R>> {
552        self.inner.seek(pos)?;
553        self.builder.has_headers(false);
554
555        Ok(self.builder.from_reader(self.inner))
556    }
557
558    /// Transform the seeker into a [`ZeroCopyReader`]. Underlying reader will
559    /// be correctly reset to the stream initial position beforehand.
560    pub fn into_zero_copy_reader(mut self) -> error::Result<ZeroCopyReader<R>> {
561        let pos = SeekFrom::Start(self.sample.initial_position);
562
563        self.inner.seek(pos)?;
564        self.builder.has_headers(self.has_headers);
565
566        Ok(self.builder.from_reader(self.inner))
567    }
568
569    /// Create a [`Reader`] starting from an arbitrary position. This can be useful
570    /// when you want to use the seeker to find a record at a specific position and
571    /// then read the stream from there. Just be aware that the given position must
572    /// be the exact beginning of a CSV record, or yielded records will therefore
573    /// be incorrect.
574    pub fn into_reader_at_position(mut self, pos: SeekFrom) -> error::Result<Reader<R>> {
575        self.inner.seek(pos)?;
576        self.builder.has_headers(false);
577
578        Ok(self.builder.to_reader_builder().from_reader(self.inner))
579    }
580
581    /// Transform the seeker into a [`Reader`]. Underlying reader will
582    /// be correctly reset to the stream initial position beforehand.
583    pub fn into_reader(mut self) -> error::Result<Reader<R>> {
584        let pos = SeekFrom::Start(self.sample.initial_position);
585
586        self.inner.seek(pos)?;
587        self.builder.has_headers(self.has_headers);
588
589        Ok(self.builder.to_reader_builder().from_reader(self.inner))
590    }
591}