1use std::io::{Cursor, Read, Seek, SeekFrom};
2use std::ops::Range;
3
4use crate::error::{self, Error, ErrorKind};
5use crate::records::ByteRecord;
6use crate::splitter::Splitter;
7use crate::utils::ReverseReader;
8use crate::zero_copy_reader::{ZeroCopyReader, ZeroCopyReaderBuilder};
9
10#[derive(Debug)]
11struct SeekerSample {
12 headers: ByteRecord,
13 record_count: u64,
14 max_record_size: u64,
15 median_record_size: u64,
16 initial_position: u64,
17 first_record_position: u64,
18 fields_mean_sizes: Vec<f64>,
19 stream_len: u64,
20 has_reached_eof: bool,
21}
22
23impl SeekerSample {
24 fn from_reader<R: Read + Seek>(
25 mut reader: R,
26 csv_reader_builder: &ZeroCopyReaderBuilder,
27 sample_size: u64,
28 ) -> error::Result<Option<Self>> {
29 let initial_position = reader.stream_position()?;
33
34 let mut csv_reader = csv_reader_builder.from_reader(&mut reader);
35
36 let headers = csv_reader.byte_headers()?.clone();
37
38 let first_record_position = if csv_reader.has_headers() {
39 initial_position + csv_reader.position()
40 } else {
41 initial_position
42 };
43
44 let mut i: u64 = 0;
45 let mut record_sizes: Vec<u64> = Vec::new();
46 let mut fields_sizes: Vec<Vec<usize>> = Vec::with_capacity(sample_size as usize);
47
48 while i < sample_size {
49 if let Some(record) = csv_reader.read_byte_record()? {
50 let record_size = record.as_slice().len() as u64 + 1;
52
53 record_sizes.push(record_size);
54 fields_sizes.push(record.iter().map(|cell| cell.len()).collect());
55
56 i += 1;
57 } else {
58 break;
59 }
60 }
61
62 if i == 0 {
64 return Ok(None);
65 }
66
67 let has_reached_eof = csv_reader.read_byte_record()?.is_none();
68 let file_len = reader.seek(SeekFrom::End(0))?;
69 let fields_mean_sizes = (0..headers.len())
70 .map(|i| {
71 fields_sizes.iter().map(|sizes| sizes[i]).sum::<usize>() as f64
72 / fields_sizes.len() as f64
73 })
74 .collect();
75
76 record_sizes.sort();
77
78 Ok(Some(Self {
79 headers,
80 record_count: i,
81 max_record_size: *record_sizes.last().unwrap(),
82 median_record_size: record_sizes[record_sizes.len() / 2],
83 initial_position,
84 first_record_position,
85 fields_mean_sizes,
86 has_reached_eof,
87 stream_len: file_len,
88 }))
89 }
90}
91
92fn cosine(profile: &[f64], other: impl Iterator<Item = usize>) -> f64 {
93 let mut self_norm = 0.0;
94 let mut other_norm = 0.0;
95 let mut intersection = 0.0;
96
97 for (a, b) in profile.iter().copied().zip(other.map(|i| i as f64)) {
98 self_norm += a * a;
99 other_norm += b * b;
100 intersection += a * b;
101 }
102
103 intersection / (self_norm * other_norm).sqrt()
104}
105
106pub struct SeekerBuilder {
108 delimiter: u8,
109 quote: u8,
110 has_headers: bool,
111 buffer_capacity: usize,
112 sample_size: u64,
113 lookahead_factor: u64,
114}
115
116impl Default for SeekerBuilder {
117 fn default() -> Self {
118 Self {
119 delimiter: b',',
120 quote: b'"',
121 buffer_capacity: 8192,
122 has_headers: true,
123 sample_size: 128,
124 lookahead_factor: 32,
125 }
126 }
127}
128
129impl SeekerBuilder {
130 pub fn new() -> Self {
132 Self::default()
133 }
134
135 pub fn with_capacity(capacity: usize) -> Self {
137 let mut reader = Self::default();
138 reader.buffer_capacity(capacity);
139 reader
140 }
141
142 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
148 self.delimiter = delimiter;
149 self
150 }
151
152 pub fn quote(&mut self, quote: u8) -> &mut Self {
158 self.quote = quote;
159 self
160 }
161
162 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
164 self.buffer_capacity = capacity;
165 self
166 }
167
168 pub fn sample_size(&mut self, size: u64) -> &mut Self {
174 self.sample_size = size;
175 self
176 }
177
178 pub fn lookahead_factor(&mut self, factor: u64) -> &mut Self {
184 self.lookahead_factor = factor;
185 self
186 }
187
188 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
192 self.has_headers = yes;
193 self
194 }
195
196 pub fn from_reader<R: Read + Seek>(&self, mut reader: R) -> error::Result<Option<Seeker<R>>> {
199 let mut builder = ZeroCopyReaderBuilder::new();
200
201 builder
202 .buffer_capacity(self.buffer_capacity)
203 .delimiter(self.delimiter)
204 .quote(self.quote)
205 .has_headers(self.has_headers);
206
207 match SeekerSample::from_reader(&mut reader, &builder, self.sample_size) {
208 Ok(Some(sample)) => {
209 builder.has_headers(false).flexible(true);
210
211 Ok(Some(Seeker {
212 inner: reader,
213 lookahead_factor: self.lookahead_factor,
214 scratch: Vec::with_capacity(
215 (self.lookahead_factor * sample.max_record_size) as usize,
216 ),
217 sample,
218 builder,
219 has_headers: self.has_headers,
220 }))
221 }
222 Ok(None) => Ok(None),
223 Err(err) => Err(err),
224 }
225 }
226}
227
228fn lookahead<R: Read>(
229 reader: &mut ZeroCopyReader<R>,
230 expected_field_count: usize,
231) -> error::Result<Option<(u64, ByteRecord)>> {
232 let mut i: usize = 0;
233 let mut next_record: Option<(u64, ByteRecord)> = None;
234 let mut field_counts: Vec<usize> = Vec::new();
235 let mut pos: u64 = 0;
236
237 while let Some(record) = reader.read_byte_record()? {
238 if i > 0 {
239 field_counts.push(record.len());
240
241 if i == 1 {
242 next_record = Some((pos, record.to_byte_record()));
243 }
244 }
245
246 pos = reader.position();
247 i += 1;
248 }
249
250 if field_counts.len() < 2
254 || field_counts[..field_counts.len() - 1]
255 .iter()
256 .any(|l| *l != expected_field_count)
257 {
258 Ok(None)
259 } else {
260 Ok(next_record)
261 }
262}
263
264pub struct Seeker<R> {
266 inner: R,
267 sample: SeekerSample,
268 lookahead_factor: u64,
269 scratch: Vec<u8>,
270 builder: ZeroCopyReaderBuilder,
271 has_headers: bool,
272}
273
274impl<R: Read + Seek> Seeker<R> {
275 pub fn has_headers(&self) -> bool {
278 self.has_headers
279 }
280
281 #[inline(always)]
284 pub fn initial_position(&self) -> u64 {
285 self.sample.initial_position
286 }
287
288 #[inline(always)]
291 pub fn first_record_position(&self) -> u64 {
292 self.sample.first_record_position
293 }
294
295 #[inline(always)]
297 pub fn stream_len(&self) -> u64 {
298 self.sample.stream_len
299 }
300
301 #[inline(always)]
305 pub fn lookahead_len(&self) -> u64 {
306 self.lookahead_factor * self.sample.max_record_size
307 }
308
309 #[inline(always)]
311 pub fn range(&self) -> Range<u64> {
312 self.sample.first_record_position..self.sample.stream_len
313 }
314
315 #[inline]
319 pub fn exact_count(&self) -> Option<u64> {
320 self.sample
321 .has_reached_eof
322 .then_some(self.sample.record_count)
323 }
324
325 #[inline]
329 pub fn approx_count(&self) -> u64 {
330 let sample = &self.sample;
331
332 if sample.has_reached_eof {
333 sample.record_count
334 } else {
335 ((sample.stream_len - sample.first_record_position) as f64
336 / sample.median_record_size as f64)
337 .ceil() as u64
338 }
339 }
340
341 pub fn find_record_after(&mut self, from_pos: u64) -> error::Result<Option<(u64, ByteRecord)>> {
369 if from_pos < self.first_record_position() || from_pos >= self.sample.stream_len {
370 return Err(Error::new(ErrorKind::OutOfBounds {
371 pos: from_pos,
372 start: self.first_record_position(),
373 end: self.sample.stream_len,
374 }));
375 }
376
377 self.inner.seek(SeekFrom::Start(from_pos))?;
378
379 if from_pos == self.first_record_position() {
381 let first_record = self
382 .builder
383 .from_reader(&mut self.inner)
384 .read_byte_record()?
385 .unwrap()
386 .to_byte_record();
387
388 return Ok(Some((self.first_record_position(), first_record)));
389 }
390
391 self.scratch.clear();
392 (&mut self.inner)
393 .take(self.lookahead_factor * self.sample.max_record_size)
394 .read_to_end(&mut self.scratch)?;
395
396 let mut unquoted_reader = self.builder.from_reader(self.scratch.as_slice());
397 let mut quoted_reader = self
398 .builder
399 .from_reader(Cursor::new(b"\"").chain(self.scratch.as_slice()));
400
401 let expected_field_count = self.sample.headers.len();
402
403 let unquoted = lookahead(&mut unquoted_reader, expected_field_count)?;
404 let quoted = lookahead(&mut quoted_reader, expected_field_count)?;
405
406 match (unquoted, quoted) {
407 (None, None) => Ok(None),
408 (Some((pos, record)), None) => Ok(Some((from_pos + pos, record))),
409 (None, Some((pos, record))) => Ok(Some((from_pos + pos - 1, record))),
410 (Some((unquoted_pos, unquoted_record)), Some((mut quoted_pos, quoted_record))) => {
411 quoted_pos -= 1;
415
416 if unquoted_pos == quoted_pos {
418 Ok(Some((from_pos + unquoted_pos, unquoted_record)))
419 } else {
420 let unquoted_cosine = cosine(
421 &self.sample.fields_mean_sizes,
422 unquoted_record.iter().map(|cell| cell.len()),
423 );
424 let quoted_cosine = cosine(
425 &self.sample.fields_mean_sizes,
426 quoted_record.iter().map(|cell| cell.len()),
427 );
428
429 if unquoted_cosine > quoted_cosine {
430 Ok(Some((from_pos + unquoted_pos, unquoted_record)))
431 } else {
432 Ok(Some((from_pos + quoted_pos, quoted_record)))
433 }
434 }
435 }
436 }
437 }
438
439 pub fn segments(&mut self, count: usize) -> error::Result<Vec<(u64, u64)>> {
444 let sample = &self.sample;
445 let file_len = sample.stream_len;
446
447 if self.sample.record_count < count as u64 {
449 return Ok(vec![(self.first_record_position(), file_len)]);
450 }
451
452 let adjusted_file_len = file_len - self.first_record_position();
453
454 let count = count
456 .min(
457 (file_len / (sample.max_record_size * self.lookahead_factor)).saturating_sub(1)
458 as usize,
459 )
460 .max(1);
461
462 let mut offsets = vec![self.first_record_position()];
463
464 for i in 1..count {
465 let file_offset = ((i as f64 / count as f64) * adjusted_file_len as f64).floor() as u64
466 + self.first_record_position();
467
468 if let Some((record_offset, _)) = self.find_record_after(file_offset)? {
469 offsets.push(record_offset);
470 } else {
471 break;
472 }
473 }
474
475 offsets.push(file_len);
476
477 Ok(offsets.windows(2).map(|w| (w[0], w[1])).collect())
478 }
479
480 pub fn byte_headers(&self) -> &ByteRecord {
483 &self.sample.headers
484 }
485
486 pub fn first_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
488 self.inner
489 .seek(SeekFrom::Start(self.first_record_position()))?;
490
491 match self.builder.from_reader(&mut self.inner).read_byte_record() {
492 Ok(Some(record)) => Ok(Some(record.to_byte_record())),
493 Ok(None) => Ok(None),
494 Err(err) => Err(err),
495 }
496 }
497
498 pub fn last_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
501 let reverse_reader = ReverseReader::new(
502 &mut self.inner,
503 self.sample.stream_len,
504 self.sample.first_record_position,
505 );
506
507 let mut reverse_csv_reader = self.builder.from_reader(reverse_reader);
508
509 reverse_csv_reader
510 .read_byte_record()
511 .map(|record_opt| record_opt.map(|record| record.to_byte_record_in_reverse()))
512 }
513
514 pub fn into_inner(self) -> R {
516 self.inner
517 }
518
519 fn into_zero_copy_reader_from_position(
520 mut self,
521 pos: SeekFrom,
522 ) -> error::Result<ZeroCopyReader<R>> {
523 self.inner.seek(pos)?;
524 self.builder.has_headers(self.has_headers);
525 self.builder.flexible(false);
526
527 Ok(self.builder.from_reader(self.inner))
528 }
529
530 pub fn into_zero_copy_reader(self) -> error::Result<ZeroCopyReader<R>> {
533 let pos = SeekFrom::Start(self.sample.initial_position);
534 self.into_zero_copy_reader_from_position(pos)
535 }
536
537 fn into_splitter_from_position(mut self, pos: SeekFrom) -> error::Result<Splitter<R>> {
538 self.inner.seek(pos)?;
539 self.builder.has_headers(self.has_headers);
540
541 Ok(self.builder.to_splitter_builder().from_reader(self.inner))
542 }
543
544 pub fn into_splitter(self) -> error::Result<Splitter<R>> {
547 let pos = SeekFrom::Start(self.sample.initial_position);
548 self.into_splitter_from_position(pos)
549 }
550}