simd_csv/
seeker.rs

1use std::io::{Cursor, Read, Seek, SeekFrom};
2use std::ops::Range;
3
4use crate::error::{self, Error, ErrorKind};
5use crate::records::ByteRecord;
6use crate::splitter::Splitter;
7use crate::utils::ReverseReader;
8use crate::zero_copy_reader::{ZeroCopyReader, ZeroCopyReaderBuilder};
9
10#[derive(Debug)]
11struct SeekerSample {
12    headers: ByteRecord,
13    record_count: u64,
14    max_record_size: u64,
15    median_record_size: u64,
16    initial_position: u64,
17    first_record_position: u64,
18    fields_mean_sizes: Vec<f64>,
19    stream_len: u64,
20    has_reached_eof: bool,
21}
22
23impl SeekerSample {
24    fn from_reader<R: Read + Seek>(
25        mut reader: R,
26        csv_reader_builder: &ZeroCopyReaderBuilder,
27        sample_size: u64,
28    ) -> error::Result<Option<Self>> {
29        // NOTE: the given reader might have already read.
30        // This is for instance the case for CSV-adjacent formats boasting
31        // metadata in a header before tabular records even start.
32        let initial_position = reader.stream_position()?;
33
34        let mut csv_reader = csv_reader_builder.from_reader(&mut reader);
35
36        let headers = csv_reader.byte_headers()?.clone();
37
38        let first_record_position = if csv_reader.has_headers() {
39            initial_position + csv_reader.position()
40        } else {
41            initial_position
42        };
43
44        let mut i: u64 = 0;
45        let mut record_sizes: Vec<u64> = Vec::new();
46        let mut fields_sizes: Vec<Vec<usize>> = Vec::with_capacity(sample_size as usize);
47
48        while i < sample_size {
49            if let Some(record) = csv_reader.read_byte_record()? {
50                // The "+ 1" is taking \n into account for better accuracy
51                let record_size = record.as_slice().len() as u64 + 1;
52
53                record_sizes.push(record_size);
54                fields_sizes.push(record.iter().map(|cell| cell.len()).collect());
55
56                i += 1;
57            } else {
58                break;
59            }
60        }
61
62        // Not enough data to produce decent sample
63        if i == 0 {
64            return Ok(None);
65        }
66
67        let has_reached_eof = csv_reader.read_byte_record()?.is_none();
68        let file_len = reader.seek(SeekFrom::End(0))?;
69        let fields_mean_sizes = (0..headers.len())
70            .map(|i| {
71                fields_sizes.iter().map(|sizes| sizes[i]).sum::<usize>() as f64
72                    / fields_sizes.len() as f64
73            })
74            .collect();
75
76        record_sizes.sort();
77
78        Ok(Some(Self {
79            headers,
80            record_count: i,
81            max_record_size: *record_sizes.last().unwrap(),
82            median_record_size: record_sizes[record_sizes.len() / 2],
83            initial_position,
84            first_record_position,
85            fields_mean_sizes,
86            has_reached_eof,
87            stream_len: file_len,
88        }))
89    }
90}
91
92fn cosine(profile: &[f64], other: impl Iterator<Item = usize>) -> f64 {
93    let mut self_norm = 0.0;
94    let mut other_norm = 0.0;
95    let mut intersection = 0.0;
96
97    for (a, b) in profile.iter().copied().zip(other.map(|i| i as f64)) {
98        self_norm += a * a;
99        other_norm += b * b;
100        intersection += a * b;
101    }
102
103    intersection / (self_norm * other_norm).sqrt()
104}
105
106/// Builds a [`Seeker`] with given configuration.
107pub struct SeekerBuilder {
108    delimiter: u8,
109    quote: u8,
110    has_headers: bool,
111    buffer_capacity: usize,
112    sample_size: u64,
113    lookahead_factor: u64,
114}
115
116impl Default for SeekerBuilder {
117    fn default() -> Self {
118        Self {
119            delimiter: b',',
120            quote: b'"',
121            buffer_capacity: 8192,
122            has_headers: true,
123            sample_size: 128,
124            lookahead_factor: 32,
125        }
126    }
127}
128
129impl SeekerBuilder {
130    pub fn new() -> Self {
131        Self::default()
132    }
133
134    pub fn with_capacity(capacity: usize) -> Self {
135        let mut reader = Self::default();
136        reader.buffer_capacity(capacity);
137        reader
138    }
139
140    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
141        self.delimiter = delimiter;
142        self
143    }
144
145    pub fn quote(&mut self, quote: u8) -> &mut Self {
146        self.quote = quote;
147        self
148    }
149
150    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
151        self.buffer_capacity = capacity;
152        self
153    }
154
155    pub fn sample_size(&mut self, size: u64) -> &mut Self {
156        self.sample_size = size;
157        self
158    }
159
160    pub fn lookahead_factor(&mut self, factor: u64) -> &mut Self {
161        self.lookahead_factor = factor;
162        self
163    }
164
165    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
166        self.has_headers = yes;
167        self
168    }
169
170    pub fn from_reader<R: Read + Seek>(&self, mut reader: R) -> error::Result<Option<Seeker<R>>> {
171        let mut builder = ZeroCopyReaderBuilder::new();
172
173        builder
174            .buffer_capacity(self.buffer_capacity)
175            .delimiter(self.delimiter)
176            .quote(self.quote)
177            .has_headers(self.has_headers);
178
179        match SeekerSample::from_reader(&mut reader, &builder, self.sample_size) {
180            Ok(Some(sample)) => {
181                builder.has_headers(false).flexible(true);
182
183                Ok(Some(Seeker {
184                    inner: reader,
185                    lookahead_factor: self.lookahead_factor,
186                    scratch: Vec::with_capacity(
187                        (self.lookahead_factor * sample.max_record_size) as usize,
188                    ),
189                    sample,
190                    builder,
191                    has_headers: self.has_headers,
192                }))
193            }
194            Ok(None) => Ok(None),
195            Err(err) => Err(err),
196        }
197    }
198}
199
200fn lookahead<R: Read>(
201    reader: &mut ZeroCopyReader<R>,
202    expected_field_count: usize,
203) -> error::Result<Option<(u64, ByteRecord)>> {
204    let mut i: usize = 0;
205    let mut next_record: Option<(u64, ByteRecord)> = None;
206    let mut field_counts: Vec<usize> = Vec::new();
207    let mut pos: u64 = 0;
208
209    while let Some(record) = reader.read_byte_record()? {
210        if i > 0 {
211            field_counts.push(record.len());
212
213            if i == 1 {
214                next_record = Some((pos, record.to_byte_record()));
215            }
216        }
217
218        pos = reader.position();
219        i += 1;
220    }
221
222    // NOTE: if we have less than 2 records beyond the first one, it will be hard to
223    // make a correct decision
224    // NOTE: last record might be unaligned since we artificially clamp the read buffer
225    if field_counts.len() < 2
226        || field_counts[..field_counts.len() - 1]
227            .iter()
228            .any(|l| *l != expected_field_count)
229    {
230        Ok(None)
231    } else {
232        Ok(next_record)
233    }
234}
235
236/// A specialized CSV stream seeker.
237pub struct Seeker<R> {
238    inner: R,
239    sample: SeekerSample,
240    lookahead_factor: u64,
241    scratch: Vec<u8>,
242    builder: ZeroCopyReaderBuilder,
243    has_headers: bool,
244}
245
246impl<R: Read + Seek> Seeker<R> {
247    /// Returns whether this seeker has been configured to interpret the first
248    /// record as a header.
249    pub fn has_headers(&self) -> bool {
250        self.has_headers
251    }
252
253    /// Returns the position the seekable stream was in when instantiating the
254    /// seeker.
255    #[inline(always)]
256    pub fn initial_position(&self) -> u64 {
257        self.sample.initial_position
258    }
259
260    /// Returns the absolute byte offset of the first record (excluding header)
261    /// of the seekable stream.
262    #[inline(always)]
263    pub fn first_record_position(&self) -> u64 {
264        self.sample.first_record_position
265    }
266
267    /// Returns the total number of bytes contained in the seekable stream.
268    #[inline(always)]
269    pub fn stream_len(&self) -> u64 {
270        self.sample.stream_len
271    }
272
273    /// Returns the number of bytes that will be read when performing a
274    /// lookahead in the seekable stream when using
275    /// [`Seeker::find_record_after`].
276    #[inline(always)]
277    pub fn lookahead_len(&self) -> u64 {
278        self.lookahead_factor * self.sample.max_record_size
279    }
280
281    /// Returns the `first_record_position..stream_len` range of the seeker.
282    #[inline(always)]
283    pub fn range(&self) -> Range<u64> {
284        self.sample.first_record_position..self.sample.stream_len
285    }
286
287    /// Returns the exact number of records (header excluded) contained in the
288    /// seekable stream, if the initial sample built when instantiating the
289    /// seeker exhausted the whole stream.
290    #[inline]
291    pub fn exact_count(&self) -> Option<u64> {
292        self.sample
293            .has_reached_eof
294            .then_some(self.sample.record_count)
295    }
296
297    /// Either returns the exact number of records (header excluded) contained
298    /// in the seekable stream or an approximation based on statistics sampled
299    /// from the beginning of the stream and the total stream length.
300    #[inline]
301    pub fn approx_count(&self) -> u64 {
302        let sample = &self.sample;
303
304        if sample.has_reached_eof {
305            sample.record_count
306        } else {
307            ((sample.stream_len - sample.first_record_position) as f64
308                / sample.median_record_size as f64)
309                .ceil() as u64
310        }
311    }
312
313    /// Attempt to find the position, in the seekable stream, of the beginning
314    /// of the CSV record just after the one where `from_pos` would end in.
315    ///
316    /// Beware: if `from_pos` is the exact first byte of a CSV record, this
317    /// method will still return the position of next CSV record because it has
318    /// no way of knowing whether the byte just before `from_pos` is a newline.
319    ///
320    /// This method will return an error if given `from_pos` is out of bounds.
321    ///
322    /// This method will return `None` if it did not succeed in finding  the
323    /// next CSV record starting position. This can typically happen when
324    /// seeking too close to the end of the stream, since this method needs to
325    /// read ahead of the stream to test its heuristics.
326    ///
327    /// ```
328    /// match seeker.find_record_after(1024) {
329    ///     Ok(Some((pos, record))) => {
330    ///         // Everything went fine
331    ///     },
332    ///     Ok(None) => {
333    ///         // Lookahead failed
334    ///     },
335    ///     Err(err) => {
336    ///         // Either `from_pos` was out-of-bounds, or some IO error occurred
337    ///     }
338    /// }
339    /// ```
340    pub fn find_record_after(&mut self, from_pos: u64) -> error::Result<Option<(u64, ByteRecord)>> {
341        if from_pos < self.first_record_position() || from_pos >= self.sample.stream_len {
342            return Err(Error::new(ErrorKind::OutOfBounds {
343                pos: from_pos,
344                start: self.first_record_position(),
345                end: self.sample.stream_len,
346            }));
347        }
348
349        self.inner.seek(SeekFrom::Start(from_pos))?;
350
351        // NOTE: first record does not need to be more complex
352        if from_pos == self.first_record_position() {
353            let first_record = self
354                .builder
355                .from_reader(&mut self.inner)
356                .read_byte_record()?
357                .unwrap()
358                .to_byte_record();
359
360            return Ok(Some((self.first_record_position(), first_record)));
361        }
362
363        self.scratch.clear();
364        (&mut self.inner)
365            .take(self.lookahead_factor * self.sample.max_record_size)
366            .read_to_end(&mut self.scratch)?;
367
368        let mut unquoted_reader = self.builder.from_reader(self.scratch.as_slice());
369        let mut quoted_reader = self
370            .builder
371            .from_reader(Cursor::new(b"\"").chain(self.scratch.as_slice()));
372
373        let expected_field_count = self.sample.headers.len();
374
375        let unquoted = lookahead(&mut unquoted_reader, expected_field_count)?;
376        let quoted = lookahead(&mut quoted_reader, expected_field_count)?;
377
378        match (unquoted, quoted) {
379            (None, None) => Ok(None),
380            (Some((pos, record)), None) => Ok(Some((from_pos + pos, record))),
381            (None, Some((pos, record))) => Ok(Some((from_pos + pos - 1, record))),
382            (Some((unquoted_pos, unquoted_record)), Some((mut quoted_pos, quoted_record))) => {
383                // Sometimes we might fall within a cell whose contents suspiciously yield
384                // the same record structure. In this case we rely on cosine similarity over
385                // record profiles to make sure we select the correct offset.
386                quoted_pos -= 1;
387
388                // A tie in offset pos means we are unquoted
389                if unquoted_pos == quoted_pos {
390                    Ok(Some((from_pos + unquoted_pos, unquoted_record)))
391                } else {
392                    let unquoted_cosine = cosine(
393                        &self.sample.fields_mean_sizes,
394                        unquoted_record.iter().map(|cell| cell.len()),
395                    );
396                    let quoted_cosine = cosine(
397                        &self.sample.fields_mean_sizes,
398                        quoted_record.iter().map(|cell| cell.len()),
399                    );
400
401                    if unquoted_cosine > quoted_cosine {
402                        Ok(Some((from_pos + unquoted_pos, unquoted_record)))
403                    } else {
404                        Ok(Some((from_pos + quoted_pos, quoted_record)))
405                    }
406                }
407            }
408        }
409    }
410
411    /// Split the seekable stream into a maximum of `count` segments.
412    ///
413    /// This method might return less than `count` segments if the stream
414    /// seems too small to safely return that many segments.
415    pub fn segments(&mut self, count: usize) -> error::Result<Vec<(u64, u64)>> {
416        let sample = &self.sample;
417        let file_len = sample.stream_len;
418
419        // File is way too short
420        if self.sample.record_count < count as u64 {
421            return Ok(vec![(self.first_record_position(), file_len)]);
422        }
423
424        let adjusted_file_len = file_len - self.first_record_position();
425
426        // Adjusting chunks
427        let count = count
428            .min(
429                (file_len / (sample.max_record_size * self.lookahead_factor)).saturating_sub(1)
430                    as usize,
431            )
432            .max(1);
433
434        let mut offsets = vec![self.first_record_position()];
435
436        for i in 1..count {
437            let file_offset = ((i as f64 / count as f64) * adjusted_file_len as f64).floor() as u64
438                + self.first_record_position();
439
440            if let Some((record_offset, _)) = self.find_record_after(file_offset)? {
441                offsets.push(record_offset);
442            } else {
443                break;
444            }
445        }
446
447        offsets.push(file_len);
448
449        Ok(offsets.windows(2).map(|w| (w[0], w[1])).collect())
450    }
451
452    /// Returns the headers of the seekable stream, or just the first record the
453    /// seeker was configured thusly.
454    pub fn byte_headers(&self) -> &ByteRecord {
455        &self.sample.headers
456    }
457
458    /// Attempt to read the first record of the seekable stream.
459    pub fn first_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
460        self.inner
461            .seek(SeekFrom::Start(self.first_record_position()))?;
462
463        match self.builder.from_reader(&mut self.inner).read_byte_record() {
464            Ok(Some(record)) => Ok(Some(record.to_byte_record())),
465            Ok(None) => Ok(None),
466            Err(err) => Err(err),
467        }
468    }
469
470    /// Attempt to read the last record of the seekable stream by reading it in
471    /// reverse.
472    pub fn last_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
473        let reverse_reader = ReverseReader::new(
474            &mut self.inner,
475            self.sample.stream_len,
476            self.sample.first_record_position,
477        );
478
479        let mut reverse_csv_reader = self.builder.from_reader(reverse_reader);
480
481        reverse_csv_reader
482            .read_byte_record()
483            .map(|record_opt| record_opt.map(|record| record.to_byte_record_in_reverse()))
484    }
485
486    /// Returns the underlying reader without unwinding its position.
487    pub fn into_inner(self) -> R {
488        self.inner
489    }
490
491    fn into_zero_copy_reader_from_position(
492        mut self,
493        pos: SeekFrom,
494    ) -> error::Result<ZeroCopyReader<R>> {
495        self.inner.seek(pos)?;
496        self.builder.has_headers(self.has_headers);
497        self.builder.flexible(false);
498
499        Ok(self.builder.from_reader(self.inner))
500    }
501
502    /// Transform the seeker into a [`ZeroCopyReader`]. Underlying reader will
503    /// be correctly reset to the stream initial position beforehand.
504    pub fn into_zero_copy_reader(self) -> error::Result<ZeroCopyReader<R>> {
505        let pos = SeekFrom::Start(self.sample.initial_position);
506        self.into_zero_copy_reader_from_position(pos)
507    }
508
509    fn into_splitter_from_position(mut self, pos: SeekFrom) -> error::Result<Splitter<R>> {
510        self.inner.seek(pos)?;
511        self.builder.has_headers(self.has_headers);
512
513        Ok(self.builder.to_splitter_builder().from_reader(self.inner))
514    }
515
516    /// Transform the seeker into a [`Splitter`]. Underlying reader will
517    /// be correctly reset to the stream initial position beforehand.
518    pub fn into_splitter(self) -> error::Result<Splitter<R>> {
519        let pos = SeekFrom::Start(self.sample.initial_position);
520        self.into_splitter_from_position(pos)
521    }
522}