simd_csv/
seeker.rs

1use std::io::{Cursor, Read, Seek, SeekFrom};
2use std::ops::Range;
3
4use crate::error::{self, Error, ErrorKind};
5use crate::records::ByteRecord;
6use crate::splitter::Splitter;
7use crate::utils::ReverseReader;
8use crate::zero_copy_reader::{ZeroCopyReader, ZeroCopyReaderBuilder};
9
10#[derive(Debug)]
11struct SeekerSample {
12    headers: ByteRecord,
13    record_count: u64,
14    max_record_size: u64,
15    median_record_size: u64,
16    initial_position: u64,
17    first_record_position: u64,
18    fields_mean_sizes: Vec<f64>,
19    file_len: u64,
20    has_reached_eof: bool,
21}
22
23impl SeekerSample {
24    fn from_reader<R: Read + Seek>(
25        mut reader: R,
26        csv_reader_builder: &ZeroCopyReaderBuilder,
27        sample_size: u64,
28    ) -> error::Result<Option<Self>> {
29        // NOTE: the given reader might have already read.
30        // This is for instance the case for CSV-adjacent formats boasting
31        // metadata in a header before tabular records even start.
32        let initial_position = reader.stream_position()?;
33
34        let mut csv_reader = csv_reader_builder.from_reader(&mut reader);
35
36        let headers = csv_reader.byte_headers()?.clone();
37
38        let first_record_position = if csv_reader.has_headers() {
39            initial_position + csv_reader.position()
40        } else {
41            initial_position
42        };
43
44        let mut i: u64 = 0;
45        let mut record_sizes: Vec<u64> = Vec::new();
46        let mut fields_sizes: Vec<Vec<usize>> = Vec::with_capacity(sample_size as usize);
47
48        while i < sample_size {
49            if let Some(record) = csv_reader.read_byte_record()? {
50                // The "+ 1" is taking \n into account for better accuracy
51                let record_size = record.as_slice().len() as u64 + 1;
52
53                record_sizes.push(record_size);
54                fields_sizes.push(record.iter().map(|cell| cell.len()).collect());
55
56                i += 1;
57            } else {
58                break;
59            }
60        }
61
62        // Not enough data to produce decent sample
63        if i == 0 {
64            return Ok(None);
65        }
66
67        let has_reached_eof = csv_reader.read_byte_record()?.is_none();
68        let file_len = reader.seek(SeekFrom::End(0))?;
69        let fields_mean_sizes = (0..headers.len())
70            .map(|i| {
71                fields_sizes.iter().map(|sizes| sizes[i]).sum::<usize>() as f64
72                    / fields_sizes.len() as f64
73            })
74            .collect();
75
76        record_sizes.sort();
77
78        Ok(Some(Self {
79            headers,
80            record_count: i,
81            max_record_size: *record_sizes.last().unwrap(),
82            median_record_size: record_sizes[record_sizes.len() / 2],
83            initial_position,
84            first_record_position,
85            fields_mean_sizes,
86            has_reached_eof,
87            file_len,
88        }))
89    }
90}
91
92fn cosine(profile: &[f64], other: impl Iterator<Item = usize>) -> f64 {
93    let mut self_norm = 0.0;
94    let mut other_norm = 0.0;
95    let mut intersection = 0.0;
96
97    for (a, b) in profile.iter().copied().zip(other.map(|i| i as f64)) {
98        self_norm += a * a;
99        other_norm += b * b;
100        intersection += a * b;
101    }
102
103    intersection / (self_norm * other_norm).sqrt()
104}
105
106pub struct SeekerBuilder {
107    delimiter: u8,
108    quote: u8,
109    has_headers: bool,
110    buffer_capacity: usize,
111    sample_size: u64,
112    lookahead_factor: u64,
113}
114
115impl Default for SeekerBuilder {
116    fn default() -> Self {
117        Self {
118            delimiter: b',',
119            quote: b'"',
120            buffer_capacity: 8192,
121            has_headers: true,
122            sample_size: 128,
123            lookahead_factor: 32,
124        }
125    }
126}
127
128impl SeekerBuilder {
129    pub fn new() -> Self {
130        Self::default()
131    }
132
133    pub fn with_capacity(capacity: usize) -> Self {
134        let mut reader = Self::default();
135        reader.buffer_capacity(capacity);
136        reader
137    }
138
139    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
140        self.delimiter = delimiter;
141        self
142    }
143
144    pub fn quote(&mut self, quote: u8) -> &mut Self {
145        self.quote = quote;
146        self
147    }
148
149    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
150        self.buffer_capacity = capacity;
151        self
152    }
153
154    pub fn sample_size(&mut self, size: u64) -> &mut Self {
155        self.sample_size = size;
156        self
157    }
158
159    pub fn lookahead_factor(&mut self, factor: u64) -> &mut Self {
160        self.lookahead_factor = factor;
161        self
162    }
163
164    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
165        self.has_headers = yes;
166        self
167    }
168
169    pub fn from_reader<R: Read + Seek>(&self, mut reader: R) -> error::Result<Option<Seeker<R>>> {
170        let mut builder = ZeroCopyReaderBuilder::new();
171
172        builder
173            .buffer_capacity(self.buffer_capacity)
174            .delimiter(self.delimiter)
175            .quote(self.quote)
176            .has_headers(self.has_headers);
177
178        match SeekerSample::from_reader(&mut reader, &builder, self.sample_size) {
179            Ok(Some(sample)) => {
180                builder.has_headers(false).flexible(true);
181
182                Ok(Some(Seeker {
183                    inner: reader,
184                    lookahead_factor: self.lookahead_factor,
185                    scratch: Vec::with_capacity(
186                        (self.lookahead_factor * sample.max_record_size) as usize,
187                    ),
188                    sample,
189                    builder,
190                    has_headers: self.has_headers,
191                }))
192            }
193            Ok(None) => Ok(None),
194            Err(err) => Err(err),
195        }
196    }
197}
198
199fn lookahead<R: Read>(
200    reader: &mut ZeroCopyReader<R>,
201    expected_field_count: usize,
202) -> error::Result<Option<(u64, ByteRecord)>> {
203    let mut i: usize = 0;
204    let mut next_record: Option<(u64, ByteRecord)> = None;
205    let mut field_counts: Vec<usize> = Vec::new();
206    let mut pos: u64 = 0;
207
208    while let Some(record) = reader.read_byte_record()? {
209        if i > 0 {
210            field_counts.push(record.len());
211
212            if i == 1 {
213                next_record = Some((pos, record.to_byte_record()));
214            }
215        }
216
217        pos = reader.position();
218        i += 1;
219    }
220
221    // NOTE: if we have less than 2 records beyond the first one, it will be hard to
222    // make a correct decision
223    // NOTE: last record might be unaligned since we artificially clamp the read buffer
224    if field_counts.len() < 2
225        || field_counts[..field_counts.len() - 1]
226            .iter()
227            .any(|l| *l != expected_field_count)
228    {
229        Ok(None)
230    } else {
231        Ok(next_record)
232    }
233}
234
235pub struct Seeker<R> {
236    inner: R,
237    sample: SeekerSample,
238    lookahead_factor: u64,
239    scratch: Vec<u8>,
240    builder: ZeroCopyReaderBuilder,
241    has_headers: bool,
242}
243
244impl<R: Read + Seek> Seeker<R> {
245    pub fn has_headers(&self) -> bool {
246        self.has_headers
247    }
248
249    pub fn initial_position(&self) -> u64 {
250        self.sample.initial_position
251    }
252
253    pub fn first_record_position(&self) -> u64 {
254        self.sample.first_record_position
255    }
256
257    pub fn file_len(&self) -> u64 {
258        self.sample.file_len
259    }
260
261    #[inline(always)]
262    pub fn range(&self) -> Range<u64> {
263        self.sample.first_record_position..self.sample.file_len
264    }
265
266    #[inline]
267    pub fn exact_count(&self) -> Option<u64> {
268        self.sample
269            .has_reached_eof
270            .then_some(self.sample.record_count)
271    }
272
273    #[inline]
274    pub fn approx_count(&self) -> u64 {
275        let sample = &self.sample;
276
277        if sample.has_reached_eof {
278            sample.record_count
279        } else {
280            ((sample.file_len - sample.first_record_position) as f64
281                / sample.median_record_size as f64)
282                .ceil() as u64
283        }
284    }
285
286    pub fn seek(&mut self, from_pos: u64) -> error::Result<Option<(u64, ByteRecord)>> {
287        if from_pos < self.first_record_position() || from_pos >= self.sample.file_len {
288            return Err(Error::new(ErrorKind::OutOfBounds {
289                pos: from_pos,
290                start: self.first_record_position(),
291                end: self.sample.file_len,
292            }));
293        }
294
295        self.inner.seek(SeekFrom::Start(from_pos))?;
296
297        // NOTE: first record does not need to be more complex
298        if from_pos == self.first_record_position() {
299            let first_record = self
300                .builder
301                .from_reader(&mut self.inner)
302                .read_byte_record()?
303                .unwrap()
304                .to_byte_record();
305
306            return Ok(Some((self.first_record_position(), first_record)));
307        }
308
309        self.scratch.clear();
310        (&mut self.inner)
311            .take(self.lookahead_factor * self.sample.max_record_size)
312            .read_to_end(&mut self.scratch)?;
313
314        let mut unquoted_reader = self.builder.from_reader(self.scratch.as_slice());
315        let mut quoted_reader = self
316            .builder
317            .from_reader(Cursor::new(b"\"").chain(self.scratch.as_slice()));
318
319        let expected_field_count = self.sample.headers.len();
320
321        let unquoted = lookahead(&mut unquoted_reader, expected_field_count)?;
322        let quoted = lookahead(&mut quoted_reader, expected_field_count)?;
323
324        match (unquoted, quoted) {
325            (None, None) => Ok(None),
326            (Some((pos, record)), None) => Ok(Some((from_pos + pos, record))),
327            (None, Some((pos, record))) => Ok(Some((from_pos + pos - 1, record))),
328            (Some((unquoted_pos, unquoted_record)), Some((mut quoted_pos, quoted_record))) => {
329                // Sometimes we might fall within a cell whose contents suspiciously yield
330                // the same record structure. In this case we rely on cosine similarity over
331                // record profiles to make sure we select the correct offset.
332                quoted_pos -= 1;
333
334                // A tie in offset pos means we are unquoted
335                if unquoted_pos == quoted_pos {
336                    Ok(Some((from_pos + unquoted_pos, unquoted_record)))
337                } else {
338                    let unquoted_cosine = cosine(
339                        &self.sample.fields_mean_sizes,
340                        unquoted_record.iter().map(|cell| cell.len()),
341                    );
342                    let quoted_cosine = cosine(
343                        &self.sample.fields_mean_sizes,
344                        quoted_record.iter().map(|cell| cell.len()),
345                    );
346
347                    if unquoted_cosine > quoted_cosine {
348                        Ok(Some((from_pos + unquoted_pos, unquoted_record)))
349                    } else {
350                        Ok(Some((from_pos + quoted_pos, quoted_record)))
351                    }
352                }
353            }
354        }
355    }
356
357    pub fn segments(&mut self, count: usize) -> error::Result<Vec<(u64, u64)>> {
358        let sample = &self.sample;
359        let file_len = sample.file_len;
360
361        // File is way too short
362        if self.sample.record_count < count as u64 {
363            return Ok(vec![(self.first_record_position(), file_len)]);
364        }
365
366        let adjusted_file_len = file_len - self.first_record_position();
367
368        // Adjusting chunks
369        let count = count
370            .min(
371                (file_len / (sample.max_record_size * self.lookahead_factor)).saturating_sub(1)
372                    as usize,
373            )
374            .max(1);
375
376        let mut offsets = vec![self.first_record_position()];
377
378        for i in 1..count {
379            let file_offset = ((i as f64 / count as f64) * adjusted_file_len as f64).floor() as u64
380                + self.first_record_position();
381
382            if let Some((record_offset, _)) = self.seek(file_offset)? {
383                offsets.push(record_offset);
384            } else {
385                break;
386            }
387        }
388
389        offsets.push(file_len);
390
391        Ok(offsets.windows(2).map(|w| (w[0], w[1])).collect())
392    }
393
394    pub fn byte_headers(&self) -> &ByteRecord {
395        &self.sample.headers
396    }
397
398    pub fn first_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
399        match self.seek(self.first_record_position()) {
400            Ok(Some((_, record))) => Ok(Some(record)),
401            Ok(None) => Ok(None),
402            Err(err) => Err(err),
403        }
404    }
405
406    pub fn last_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
407        let reverse_reader = ReverseReader::new(
408            &mut self.inner,
409            self.sample.file_len,
410            self.sample.first_record_position,
411        );
412
413        let mut reverse_csv_reader = self.builder.from_reader(reverse_reader);
414
415        reverse_csv_reader
416            .read_byte_record()
417            .map(|record_opt| record_opt.map(|record| record.to_byte_record_in_reverse()))
418    }
419
420    pub fn into_inner(self) -> R {
421        self.inner
422    }
423
424    pub fn into_zero_copy_reader(mut self) -> error::Result<ZeroCopyReader<R>> {
425        self.inner
426            .seek(SeekFrom::Start(self.sample.initial_position))?;
427        self.builder.has_headers(self.has_headers);
428        self.builder.flexible(false);
429        Ok(self.builder.from_reader(self.inner))
430    }
431
432    pub fn into_splitter(mut self) -> error::Result<Splitter<R>> {
433        self.inner
434            .seek(SeekFrom::Start(self.sample.initial_position))?;
435        self.builder.has_headers(self.has_headers);
436        Ok(self.builder.to_splitter_builder().from_reader(self.inner))
437    }
438}