1use std::io::{Cursor, Read, Seek, SeekFrom};
2use std::ops::Range;
3
4use crate::error::{self, Error, ErrorKind};
5use crate::records::ByteRecord;
6use crate::splitter::Splitter;
7use crate::utils::ReverseReader;
8use crate::zero_copy_reader::{ZeroCopyReader, ZeroCopyReaderBuilder};
9
10#[derive(Debug)]
11struct SeekerSample {
12 headers: ByteRecord,
13 record_count: u64,
14 max_record_size: u64,
15 median_record_size: u64,
16 initial_position: u64,
17 first_record_position: u64,
18 fields_mean_sizes: Vec<f64>,
19 stream_len: u64,
20 has_reached_eof: bool,
21}
22
23impl SeekerSample {
24 fn from_reader<R: Read + Seek>(
25 mut reader: R,
26 csv_reader_builder: &ZeroCopyReaderBuilder,
27 sample_size: u64,
28 ) -> error::Result<Option<Self>> {
29 let initial_position = reader.stream_position()?;
33
34 let mut csv_reader = csv_reader_builder.from_reader(&mut reader);
35
36 let headers = csv_reader.byte_headers()?.clone();
37
38 let first_record_position = if csv_reader.has_headers() {
39 initial_position + csv_reader.position()
40 } else {
41 initial_position
42 };
43
44 let mut i: u64 = 0;
45 let mut record_sizes: Vec<u64> = Vec::new();
46 let mut fields_sizes: Vec<Vec<usize>> = Vec::with_capacity(sample_size as usize);
47
48 while i < sample_size {
49 if let Some(record) = csv_reader.read_byte_record()? {
50 let record_size = record.as_slice().len() as u64 + 1;
52
53 record_sizes.push(record_size);
54 fields_sizes.push(record.iter().map(|cell| cell.len()).collect());
55
56 i += 1;
57 } else {
58 break;
59 }
60 }
61
62 if i == 0 {
64 return Ok(None);
65 }
66
67 let has_reached_eof = csv_reader.read_byte_record()?.is_none();
68 let file_len = reader.seek(SeekFrom::End(0))?;
69 let fields_mean_sizes = (0..headers.len())
70 .map(|i| {
71 fields_sizes.iter().map(|sizes| sizes[i]).sum::<usize>() as f64
72 / fields_sizes.len() as f64
73 })
74 .collect();
75
76 record_sizes.sort();
77
78 Ok(Some(Self {
79 headers,
80 record_count: i,
81 max_record_size: *record_sizes.last().unwrap(),
82 median_record_size: record_sizes[record_sizes.len() / 2],
83 initial_position,
84 first_record_position,
85 fields_mean_sizes,
86 has_reached_eof,
87 stream_len: file_len,
88 }))
89 }
90}
91
92fn cosine(profile: &[f64], other: impl Iterator<Item = usize>) -> f64 {
93 let mut self_norm = 0.0;
94 let mut other_norm = 0.0;
95 let mut intersection = 0.0;
96
97 for (a, b) in profile.iter().copied().zip(other.map(|i| i as f64)) {
98 self_norm += a * a;
99 other_norm += b * b;
100 intersection += a * b;
101 }
102
103 intersection / (self_norm * other_norm).sqrt()
104}
105
106pub struct SeekerBuilder {
108 delimiter: u8,
109 quote: u8,
110 has_headers: bool,
111 buffer_capacity: usize,
112 sample_size: u64,
113 lookahead_factor: u64,
114}
115
116impl Default for SeekerBuilder {
117 fn default() -> Self {
118 Self {
119 delimiter: b',',
120 quote: b'"',
121 buffer_capacity: 8192,
122 has_headers: true,
123 sample_size: 128,
124 lookahead_factor: 32,
125 }
126 }
127}
128
129impl SeekerBuilder {
130 pub fn new() -> Self {
131 Self::default()
132 }
133
134 pub fn with_capacity(capacity: usize) -> Self {
135 let mut reader = Self::default();
136 reader.buffer_capacity(capacity);
137 reader
138 }
139
140 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
141 self.delimiter = delimiter;
142 self
143 }
144
145 pub fn quote(&mut self, quote: u8) -> &mut Self {
146 self.quote = quote;
147 self
148 }
149
150 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
151 self.buffer_capacity = capacity;
152 self
153 }
154
155 pub fn sample_size(&mut self, size: u64) -> &mut Self {
156 self.sample_size = size;
157 self
158 }
159
160 pub fn lookahead_factor(&mut self, factor: u64) -> &mut Self {
161 self.lookahead_factor = factor;
162 self
163 }
164
165 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
166 self.has_headers = yes;
167 self
168 }
169
170 pub fn from_reader<R: Read + Seek>(&self, mut reader: R) -> error::Result<Option<Seeker<R>>> {
171 let mut builder = ZeroCopyReaderBuilder::new();
172
173 builder
174 .buffer_capacity(self.buffer_capacity)
175 .delimiter(self.delimiter)
176 .quote(self.quote)
177 .has_headers(self.has_headers);
178
179 match SeekerSample::from_reader(&mut reader, &builder, self.sample_size) {
180 Ok(Some(sample)) => {
181 builder.has_headers(false).flexible(true);
182
183 Ok(Some(Seeker {
184 inner: reader,
185 lookahead_factor: self.lookahead_factor,
186 scratch: Vec::with_capacity(
187 (self.lookahead_factor * sample.max_record_size) as usize,
188 ),
189 sample,
190 builder,
191 has_headers: self.has_headers,
192 }))
193 }
194 Ok(None) => Ok(None),
195 Err(err) => Err(err),
196 }
197 }
198}
199
200fn lookahead<R: Read>(
201 reader: &mut ZeroCopyReader<R>,
202 expected_field_count: usize,
203) -> error::Result<Option<(u64, ByteRecord)>> {
204 let mut i: usize = 0;
205 let mut next_record: Option<(u64, ByteRecord)> = None;
206 let mut field_counts: Vec<usize> = Vec::new();
207 let mut pos: u64 = 0;
208
209 while let Some(record) = reader.read_byte_record()? {
210 if i > 0 {
211 field_counts.push(record.len());
212
213 if i == 1 {
214 next_record = Some((pos, record.to_byte_record()));
215 }
216 }
217
218 pos = reader.position();
219 i += 1;
220 }
221
222 if field_counts.len() < 2
226 || field_counts[..field_counts.len() - 1]
227 .iter()
228 .any(|l| *l != expected_field_count)
229 {
230 Ok(None)
231 } else {
232 Ok(next_record)
233 }
234}
235
236pub struct Seeker<R> {
238 inner: R,
239 sample: SeekerSample,
240 lookahead_factor: u64,
241 scratch: Vec<u8>,
242 builder: ZeroCopyReaderBuilder,
243 has_headers: bool,
244}
245
246impl<R: Read + Seek> Seeker<R> {
247 pub fn has_headers(&self) -> bool {
250 self.has_headers
251 }
252
253 #[inline(always)]
256 pub fn initial_position(&self) -> u64 {
257 self.sample.initial_position
258 }
259
260 #[inline(always)]
263 pub fn first_record_position(&self) -> u64 {
264 self.sample.first_record_position
265 }
266
267 #[inline(always)]
269 pub fn stream_len(&self) -> u64 {
270 self.sample.stream_len
271 }
272
273 #[inline(always)]
277 pub fn lookahead_len(&self) -> u64 {
278 self.lookahead_factor * self.sample.max_record_size
279 }
280
281 #[inline(always)]
283 pub fn range(&self) -> Range<u64> {
284 self.sample.first_record_position..self.sample.stream_len
285 }
286
287 #[inline]
291 pub fn exact_count(&self) -> Option<u64> {
292 self.sample
293 .has_reached_eof
294 .then_some(self.sample.record_count)
295 }
296
297 #[inline]
301 pub fn approx_count(&self) -> u64 {
302 let sample = &self.sample;
303
304 if sample.has_reached_eof {
305 sample.record_count
306 } else {
307 ((sample.stream_len - sample.first_record_position) as f64
308 / sample.median_record_size as f64)
309 .ceil() as u64
310 }
311 }
312
313 pub fn find_record_after(&mut self, from_pos: u64) -> error::Result<Option<(u64, ByteRecord)>> {
341 if from_pos < self.first_record_position() || from_pos >= self.sample.stream_len {
342 return Err(Error::new(ErrorKind::OutOfBounds {
343 pos: from_pos,
344 start: self.first_record_position(),
345 end: self.sample.stream_len,
346 }));
347 }
348
349 self.inner.seek(SeekFrom::Start(from_pos))?;
350
351 if from_pos == self.first_record_position() {
353 let first_record = self
354 .builder
355 .from_reader(&mut self.inner)
356 .read_byte_record()?
357 .unwrap()
358 .to_byte_record();
359
360 return Ok(Some((self.first_record_position(), first_record)));
361 }
362
363 self.scratch.clear();
364 (&mut self.inner)
365 .take(self.lookahead_factor * self.sample.max_record_size)
366 .read_to_end(&mut self.scratch)?;
367
368 let mut unquoted_reader = self.builder.from_reader(self.scratch.as_slice());
369 let mut quoted_reader = self
370 .builder
371 .from_reader(Cursor::new(b"\"").chain(self.scratch.as_slice()));
372
373 let expected_field_count = self.sample.headers.len();
374
375 let unquoted = lookahead(&mut unquoted_reader, expected_field_count)?;
376 let quoted = lookahead(&mut quoted_reader, expected_field_count)?;
377
378 match (unquoted, quoted) {
379 (None, None) => Ok(None),
380 (Some((pos, record)), None) => Ok(Some((from_pos + pos, record))),
381 (None, Some((pos, record))) => Ok(Some((from_pos + pos - 1, record))),
382 (Some((unquoted_pos, unquoted_record)), Some((mut quoted_pos, quoted_record))) => {
383 quoted_pos -= 1;
387
388 if unquoted_pos == quoted_pos {
390 Ok(Some((from_pos + unquoted_pos, unquoted_record)))
391 } else {
392 let unquoted_cosine = cosine(
393 &self.sample.fields_mean_sizes,
394 unquoted_record.iter().map(|cell| cell.len()),
395 );
396 let quoted_cosine = cosine(
397 &self.sample.fields_mean_sizes,
398 quoted_record.iter().map(|cell| cell.len()),
399 );
400
401 if unquoted_cosine > quoted_cosine {
402 Ok(Some((from_pos + unquoted_pos, unquoted_record)))
403 } else {
404 Ok(Some((from_pos + quoted_pos, quoted_record)))
405 }
406 }
407 }
408 }
409 }
410
411 pub fn segments(&mut self, count: usize) -> error::Result<Vec<(u64, u64)>> {
416 let sample = &self.sample;
417 let file_len = sample.stream_len;
418
419 if self.sample.record_count < count as u64 {
421 return Ok(vec![(self.first_record_position(), file_len)]);
422 }
423
424 let adjusted_file_len = file_len - self.first_record_position();
425
426 let count = count
428 .min(
429 (file_len / (sample.max_record_size * self.lookahead_factor)).saturating_sub(1)
430 as usize,
431 )
432 .max(1);
433
434 let mut offsets = vec![self.first_record_position()];
435
436 for i in 1..count {
437 let file_offset = ((i as f64 / count as f64) * adjusted_file_len as f64).floor() as u64
438 + self.first_record_position();
439
440 if let Some((record_offset, _)) = self.find_record_after(file_offset)? {
441 offsets.push(record_offset);
442 } else {
443 break;
444 }
445 }
446
447 offsets.push(file_len);
448
449 Ok(offsets.windows(2).map(|w| (w[0], w[1])).collect())
450 }
451
452 pub fn byte_headers(&self) -> &ByteRecord {
455 &self.sample.headers
456 }
457
458 pub fn first_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
460 self.inner
461 .seek(SeekFrom::Start(self.first_record_position()))?;
462
463 match self.builder.from_reader(&mut self.inner).read_byte_record() {
464 Ok(Some(record)) => Ok(Some(record.to_byte_record())),
465 Ok(None) => Ok(None),
466 Err(err) => Err(err),
467 }
468 }
469
470 pub fn last_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
473 let reverse_reader = ReverseReader::new(
474 &mut self.inner,
475 self.sample.stream_len,
476 self.sample.first_record_position,
477 );
478
479 let mut reverse_csv_reader = self.builder.from_reader(reverse_reader);
480
481 reverse_csv_reader
482 .read_byte_record()
483 .map(|record_opt| record_opt.map(|record| record.to_byte_record_in_reverse()))
484 }
485
486 pub fn into_inner(self) -> R {
488 self.inner
489 }
490
491 fn into_zero_copy_reader_from_position(
492 mut self,
493 pos: SeekFrom,
494 ) -> error::Result<ZeroCopyReader<R>> {
495 self.inner.seek(pos)?;
496 self.builder.has_headers(self.has_headers);
497 self.builder.flexible(false);
498
499 Ok(self.builder.from_reader(self.inner))
500 }
501
502 pub fn into_zero_copy_reader(self) -> error::Result<ZeroCopyReader<R>> {
505 let pos = SeekFrom::Start(self.sample.initial_position);
506 self.into_zero_copy_reader_from_position(pos)
507 }
508
509 fn into_splitter_from_position(mut self, pos: SeekFrom) -> error::Result<Splitter<R>> {
510 self.inner.seek(pos)?;
511 self.builder.has_headers(self.has_headers);
512
513 Ok(self.builder.to_splitter_builder().from_reader(self.inner))
514 }
515
516 pub fn into_splitter(self) -> error::Result<Splitter<R>> {
519 let pos = SeekFrom::Start(self.sample.initial_position);
520 self.into_splitter_from_position(pos)
521 }
522}