1use std::io::{Cursor, Read, Seek, SeekFrom};
2use std::ops::Range;
3
4use crate::error::{self, Error, ErrorKind};
5use crate::reader::Reader;
6use crate::records::ByteRecord;
7use crate::splitter::Splitter;
8use crate::utils::ReverseReader;
9use crate::zero_copy_reader::{ZeroCopyReader, ZeroCopyReaderBuilder};
10
11#[derive(Debug)]
12struct SeekerSample {
13 headers: ByteRecord,
14 record_count: u64,
15 max_record_size: u64,
16 median_record_size: u64,
17 initial_position: u64,
18 first_record_position: u64,
19 fields_mean_sizes: Vec<f64>,
20 stream_len: u64,
21 has_reached_eof: bool,
22}
23
24impl SeekerSample {
25 fn from_reader<R: Read + Seek>(
26 mut reader: R,
27 csv_reader_builder: &ZeroCopyReaderBuilder,
28 sample_size: u64,
29 ) -> error::Result<Option<Self>> {
30 let initial_position = reader.stream_position()?;
34
35 let mut csv_reader = csv_reader_builder.from_reader(&mut reader);
36
37 let headers = csv_reader.byte_headers()?.clone();
38
39 let first_record_position = if csv_reader.has_headers() {
40 initial_position + csv_reader.position()
41 } else {
42 initial_position
43 };
44
45 let mut i: u64 = 0;
46 let mut record_sizes: Vec<u64> = Vec::new();
47 let mut fields_sizes: Vec<Vec<usize>> = Vec::with_capacity(sample_size as usize);
48
49 while i < sample_size {
50 if let Some(record) = csv_reader.read_byte_record()? {
51 let record_size = record.as_slice().len() as u64 + 1;
53
54 record_sizes.push(record_size);
55 fields_sizes.push(record.iter().map(|cell| cell.len()).collect());
56
57 i += 1;
58 } else {
59 break;
60 }
61 }
62
63 if i == 0 {
65 return Ok(None);
66 }
67
68 let has_reached_eof = csv_reader.read_byte_record()?.is_none();
69 let file_len = reader.seek(SeekFrom::End(0))?;
70 let fields_mean_sizes = (0..headers.len())
71 .map(|i| {
72 fields_sizes.iter().map(|sizes| sizes[i]).sum::<usize>() as f64
73 / fields_sizes.len() as f64
74 })
75 .collect();
76
77 record_sizes.sort();
78
79 Ok(Some(Self {
80 headers,
81 record_count: i,
82 max_record_size: *record_sizes.last().unwrap(),
83 median_record_size: record_sizes[record_sizes.len() / 2],
84 initial_position,
85 first_record_position,
86 fields_mean_sizes,
87 has_reached_eof,
88 stream_len: file_len,
89 }))
90 }
91}
92
93fn cosine(profile: &[f64], other: impl Iterator<Item = usize>) -> f64 {
94 let mut self_norm = 0.0;
95 let mut other_norm = 0.0;
96 let mut intersection = 0.0;
97
98 for (a, b) in profile.iter().copied().zip(other.map(|i| i as f64)) {
99 self_norm += a * a;
100 other_norm += b * b;
101 intersection += a * b;
102 }
103
104 intersection / (self_norm * other_norm).sqrt()
105}
106
107pub struct SeekerBuilder {
109 delimiter: u8,
110 quote: u8,
111 has_headers: bool,
112 buffer_capacity: usize,
113 sample_size: u64,
114 lookahead_factor: u64,
115}
116
117impl Default for SeekerBuilder {
118 fn default() -> Self {
119 Self {
120 delimiter: b',',
121 quote: b'"',
122 buffer_capacity: 8192,
123 has_headers: true,
124 sample_size: 128,
125 lookahead_factor: 32,
126 }
127 }
128}
129
130impl SeekerBuilder {
131 pub fn new() -> Self {
133 Self::default()
134 }
135
136 pub fn with_capacity(capacity: usize) -> Self {
138 let mut reader = Self::default();
139 reader.buffer_capacity(capacity);
140 reader
141 }
142
143 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
149 self.delimiter = delimiter;
150 self
151 }
152
153 pub fn quote(&mut self, quote: u8) -> &mut Self {
159 self.quote = quote;
160 self
161 }
162
163 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
165 self.buffer_capacity = capacity;
166 self
167 }
168
169 pub fn sample_size(&mut self, size: u64) -> &mut Self {
175 self.sample_size = size;
176 self
177 }
178
179 pub fn lookahead_factor(&mut self, factor: u64) -> &mut Self {
185 self.lookahead_factor = factor;
186 self
187 }
188
189 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
193 self.has_headers = yes;
194 self
195 }
196
197 pub fn from_reader<R: Read + Seek>(&self, mut reader: R) -> error::Result<Option<Seeker<R>>> {
200 let mut builder = ZeroCopyReaderBuilder::new();
201
202 builder
203 .buffer_capacity(self.buffer_capacity)
204 .delimiter(self.delimiter)
205 .quote(self.quote)
206 .has_headers(self.has_headers);
207
208 match SeekerSample::from_reader(&mut reader, &builder, self.sample_size) {
209 Ok(Some(sample)) => {
210 builder.has_headers(false).flexible(true);
211
212 Ok(Some(Seeker {
213 inner: reader,
214 lookahead_factor: self.lookahead_factor,
215 scratch: Vec::with_capacity(
216 (self.lookahead_factor * sample.max_record_size) as usize,
217 ),
218 sample,
219 builder,
220 has_headers: self.has_headers,
221 }))
222 }
223 Ok(None) => Ok(None),
224 Err(err) => Err(err),
225 }
226 }
227}
228
229fn lookahead<R: Read>(
230 reader: &mut ZeroCopyReader<R>,
231 expected_field_count: usize,
232) -> error::Result<Option<(u64, ByteRecord)>> {
233 let mut i: usize = 0;
234 let mut next_record: Option<(u64, ByteRecord)> = None;
235 let mut field_counts: Vec<usize> = Vec::new();
236 let mut pos: u64 = 0;
237
238 while let Some(record) = reader.read_byte_record()? {
239 if i > 0 {
240 field_counts.push(record.len());
241
242 if i == 1 {
243 next_record = Some((pos, record.to_byte_record()));
244 }
245 }
246
247 pos = reader.position();
248 i += 1;
249 }
250
251 if field_counts.len() < 2
255 || field_counts[..field_counts.len() - 1]
256 .iter()
257 .any(|l| *l != expected_field_count)
258 {
259 Ok(None)
260 } else {
261 Ok(next_record)
262 }
263}
264
265pub struct Seeker<R> {
267 inner: R,
268 sample: SeekerSample,
269 lookahead_factor: u64,
270 scratch: Vec<u8>,
271 builder: ZeroCopyReaderBuilder,
272 has_headers: bool,
273}
274
275impl<R: Read + Seek> Seeker<R> {
276 pub fn has_headers(&self) -> bool {
279 self.has_headers
280 }
281
282 #[inline(always)]
285 pub fn initial_position(&self) -> u64 {
286 self.sample.initial_position
287 }
288
289 #[inline(always)]
292 pub fn first_record_position(&self) -> u64 {
293 self.sample.first_record_position
294 }
295
296 #[inline(always)]
298 pub fn stream_len(&self) -> u64 {
299 self.sample.stream_len
300 }
301
302 #[inline(always)]
306 pub fn lookahead_len(&self) -> u64 {
307 self.lookahead_factor * self.sample.max_record_size
308 }
309
310 #[inline(always)]
312 pub fn range(&self) -> Range<u64> {
313 self.sample.first_record_position..self.sample.stream_len
314 }
315
316 #[inline]
320 pub fn exact_count(&self) -> Option<u64> {
321 self.sample
322 .has_reached_eof
323 .then_some(self.sample.record_count)
324 }
325
326 #[inline]
330 pub fn approx_count(&self) -> u64 {
331 let sample = &self.sample;
332
333 if sample.has_reached_eof {
334 sample.record_count
335 } else {
336 ((sample.stream_len - sample.first_record_position) as f64
337 / sample.median_record_size as f64)
338 .ceil() as u64
339 }
340 }
341
342 pub fn find_record_after(&mut self, from_pos: u64) -> error::Result<Option<(u64, ByteRecord)>> {
370 if from_pos < self.first_record_position() || from_pos >= self.sample.stream_len {
371 return Err(Error::new(ErrorKind::OutOfBounds {
372 pos: from_pos,
373 start: self.first_record_position(),
374 end: self.sample.stream_len,
375 }));
376 }
377
378 self.inner.seek(SeekFrom::Start(from_pos))?;
379
380 if from_pos == self.first_record_position() {
382 let first_record = self
383 .builder
384 .from_reader(&mut self.inner)
385 .read_byte_record()?
386 .unwrap()
387 .to_byte_record();
388
389 return Ok(Some((self.first_record_position(), first_record)));
390 }
391
392 self.scratch.clear();
393 (&mut self.inner)
394 .take(self.lookahead_factor * self.sample.max_record_size)
395 .read_to_end(&mut self.scratch)?;
396
397 let mut unquoted_reader = self.builder.from_reader(self.scratch.as_slice());
398 let mut quoted_reader = self
399 .builder
400 .from_reader(Cursor::new(b"\"").chain(self.scratch.as_slice()));
401
402 let expected_field_count = self.sample.headers.len();
403
404 let unquoted = lookahead(&mut unquoted_reader, expected_field_count)?;
405 let quoted = lookahead(&mut quoted_reader, expected_field_count)?;
406
407 match (unquoted, quoted) {
408 (None, None) => Ok(None),
409 (Some((pos, record)), None) => Ok(Some((from_pos + pos, record))),
410 (None, Some((pos, record))) => Ok(Some((from_pos + pos - 1, record))),
411 (Some((unquoted_pos, unquoted_record)), Some((mut quoted_pos, quoted_record))) => {
412 quoted_pos -= 1;
416
417 if unquoted_pos == quoted_pos {
419 Ok(Some((from_pos + unquoted_pos, unquoted_record)))
420 } else {
421 let unquoted_cosine = cosine(
422 &self.sample.fields_mean_sizes,
423 unquoted_record.iter().map(|cell| cell.len()),
424 );
425 let quoted_cosine = cosine(
426 &self.sample.fields_mean_sizes,
427 quoted_record.iter().map(|cell| cell.len()),
428 );
429
430 if unquoted_cosine > quoted_cosine {
431 Ok(Some((from_pos + unquoted_pos, unquoted_record)))
432 } else {
433 Ok(Some((from_pos + quoted_pos, quoted_record)))
434 }
435 }
436 }
437 }
438 }
439
440 pub fn segments(&mut self, count: usize) -> error::Result<Vec<(u64, u64)>> {
445 let sample = &self.sample;
446 let file_len = sample.stream_len;
447
448 if self.sample.record_count < count as u64 {
450 return Ok(vec![(self.first_record_position(), file_len)]);
451 }
452
453 let adjusted_file_len = file_len - self.first_record_position();
454
455 let count = count
457 .min(
458 (file_len / (sample.max_record_size * self.lookahead_factor)).saturating_sub(1)
459 as usize,
460 )
461 .max(1);
462
463 let mut offsets = vec![self.first_record_position()];
464
465 for i in 1..count {
466 let file_offset = ((i as f64 / count as f64) * adjusted_file_len as f64).floor() as u64
467 + self.first_record_position();
468
469 if let Some((record_offset, _)) = self.find_record_after(file_offset)? {
470 offsets.push(record_offset);
471 } else {
472 break;
473 }
474 }
475
476 offsets.push(file_len);
477
478 Ok(offsets.windows(2).map(|w| (w[0], w[1])).collect())
479 }
480
481 pub fn byte_headers(&self) -> &ByteRecord {
484 &self.sample.headers
485 }
486
487 pub fn first_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
489 self.inner
490 .seek(SeekFrom::Start(self.first_record_position()))?;
491
492 match self.builder.from_reader(&mut self.inner).read_byte_record() {
493 Ok(Some(record)) => Ok(Some(record.to_byte_record())),
494 Ok(None) => Ok(None),
495 Err(err) => Err(err),
496 }
497 }
498
499 pub fn last_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
502 let reverse_reader = ReverseReader::new(
503 &mut self.inner,
504 self.sample.stream_len,
505 self.sample.first_record_position,
506 );
507
508 let mut reverse_csv_reader = self.builder.from_reader(reverse_reader);
509
510 reverse_csv_reader
511 .read_byte_record()
512 .map(|record_opt| record_opt.map(|record| record.to_byte_record_in_reverse()))
513 }
514
515 pub fn into_inner(self) -> R {
517 self.inner
518 }
519
520 pub fn into_splitter_at_position(mut self, pos: SeekFrom) -> error::Result<Splitter<R>> {
526 self.inner.seek(pos)?;
527 self.builder.has_headers(false);
528
529 Ok(self.builder.to_splitter_builder().from_reader(self.inner))
530 }
531
532 pub fn into_splitter(mut self) -> error::Result<Splitter<R>> {
535 let pos = SeekFrom::Start(self.sample.initial_position);
536
537 self.inner.seek(pos)?;
538 self.builder.has_headers(self.has_headers);
539
540 Ok(self.builder.to_splitter_builder().from_reader(self.inner))
541 }
542
543 pub fn into_zero_copy_reader_at_position(
549 mut self,
550 pos: SeekFrom,
551 ) -> error::Result<ZeroCopyReader<R>> {
552 self.inner.seek(pos)?;
553 self.builder.has_headers(false);
554
555 Ok(self.builder.from_reader(self.inner))
556 }
557
558 pub fn into_zero_copy_reader(mut self) -> error::Result<ZeroCopyReader<R>> {
561 let pos = SeekFrom::Start(self.sample.initial_position);
562
563 self.inner.seek(pos)?;
564 self.builder.has_headers(self.has_headers);
565
566 Ok(self.builder.from_reader(self.inner))
567 }
568
569 pub fn into_reader_at_position(mut self, pos: SeekFrom) -> error::Result<Reader<R>> {
575 self.inner.seek(pos)?;
576 self.builder.has_headers(false);
577
578 Ok(self.builder.to_reader_builder().from_reader(self.inner))
579 }
580
581 pub fn into_reader(mut self) -> error::Result<Reader<R>> {
584 let pos = SeekFrom::Start(self.sample.initial_position);
585
586 self.inner.seek(pos)?;
587 self.builder.has_headers(self.has_headers);
588
589 Ok(self.builder.to_reader_builder().from_reader(self.inner))
590 }
591}