1use std::io::{Cursor, Read, Seek, SeekFrom};
2use std::ops::Range;
3
4use crate::error::{self, Error, ErrorKind};
5use crate::records::ByteRecord;
6use crate::splitter::Splitter;
7use crate::utils::ReverseReader;
8use crate::zero_copy_reader::{ZeroCopyReader, ZeroCopyReaderBuilder};
9
10#[derive(Debug)]
11struct SeekerSample {
12 headers: ByteRecord,
13 record_count: u64,
14 max_record_size: u64,
15 median_record_size: u64,
16 initial_position: u64,
17 first_record_position: u64,
18 fields_mean_sizes: Vec<f64>,
19 file_len: u64,
20 has_reached_eof: bool,
21}
22
23impl SeekerSample {
24 fn from_reader<R: Read + Seek>(
25 mut reader: R,
26 csv_reader_builder: &ZeroCopyReaderBuilder,
27 sample_size: u64,
28 ) -> error::Result<Option<Self>> {
29 let initial_position = reader.stream_position()?;
33
34 let mut csv_reader = csv_reader_builder.from_reader(&mut reader);
35
36 let headers = csv_reader.byte_headers()?.clone();
37
38 let first_record_position = if csv_reader.has_headers() {
39 initial_position + csv_reader.position()
40 } else {
41 initial_position
42 };
43
44 let mut i: u64 = 0;
45 let mut record_sizes: Vec<u64> = Vec::new();
46 let mut fields_sizes: Vec<Vec<usize>> = Vec::with_capacity(sample_size as usize);
47
48 while i < sample_size {
49 if let Some(record) = csv_reader.read_byte_record()? {
50 let record_size = record.as_slice().len() as u64 + 1;
52
53 record_sizes.push(record_size);
54 fields_sizes.push(record.iter().map(|cell| cell.len()).collect());
55
56 i += 1;
57 } else {
58 break;
59 }
60 }
61
62 if i == 0 {
64 return Ok(None);
65 }
66
67 let has_reached_eof = csv_reader.read_byte_record()?.is_none();
68 let file_len = reader.seek(SeekFrom::End(0))?;
69 let fields_mean_sizes = (0..headers.len())
70 .map(|i| {
71 fields_sizes.iter().map(|sizes| sizes[i]).sum::<usize>() as f64
72 / fields_sizes.len() as f64
73 })
74 .collect();
75
76 record_sizes.sort();
77
78 Ok(Some(Self {
79 headers,
80 record_count: i,
81 max_record_size: *record_sizes.last().unwrap(),
82 median_record_size: record_sizes[record_sizes.len() / 2],
83 initial_position,
84 first_record_position,
85 fields_mean_sizes,
86 has_reached_eof,
87 file_len,
88 }))
89 }
90}
91
92fn cosine(profile: &[f64], other: impl Iterator<Item = usize>) -> f64 {
93 let mut self_norm = 0.0;
94 let mut other_norm = 0.0;
95 let mut intersection = 0.0;
96
97 for (a, b) in profile.iter().copied().zip(other.map(|i| i as f64)) {
98 self_norm += a * a;
99 other_norm += b * b;
100 intersection += a * b;
101 }
102
103 intersection / (self_norm * other_norm).sqrt()
104}
105
106pub struct SeekerBuilder {
107 delimiter: u8,
108 quote: u8,
109 has_headers: bool,
110 buffer_capacity: usize,
111 sample_size: u64,
112 lookahead_factor: u64,
113}
114
115impl Default for SeekerBuilder {
116 fn default() -> Self {
117 Self {
118 delimiter: b',',
119 quote: b'"',
120 buffer_capacity: 8192,
121 has_headers: true,
122 sample_size: 128,
123 lookahead_factor: 32,
124 }
125 }
126}
127
128impl SeekerBuilder {
129 pub fn new() -> Self {
130 Self::default()
131 }
132
133 pub fn with_capacity(capacity: usize) -> Self {
134 let mut reader = Self::default();
135 reader.buffer_capacity(capacity);
136 reader
137 }
138
139 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
140 self.delimiter = delimiter;
141 self
142 }
143
144 pub fn quote(&mut self, quote: u8) -> &mut Self {
145 self.quote = quote;
146 self
147 }
148
149 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
150 self.buffer_capacity = capacity;
151 self
152 }
153
154 pub fn sample_size(&mut self, size: u64) -> &mut Self {
155 self.sample_size = size;
156 self
157 }
158
159 pub fn lookahead_factor(&mut self, factor: u64) -> &mut Self {
160 self.lookahead_factor = factor;
161 self
162 }
163
164 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
165 self.has_headers = yes;
166 self
167 }
168
169 pub fn from_reader<R: Read + Seek>(&self, mut reader: R) -> error::Result<Option<Seeker<R>>> {
170 let mut builder = ZeroCopyReaderBuilder::new();
171
172 builder
173 .buffer_capacity(self.buffer_capacity)
174 .delimiter(self.delimiter)
175 .quote(self.quote)
176 .has_headers(self.has_headers);
177
178 match SeekerSample::from_reader(&mut reader, &builder, self.sample_size) {
179 Ok(Some(sample)) => {
180 builder.has_headers(false).flexible(true);
181
182 Ok(Some(Seeker {
183 inner: reader,
184 lookahead_factor: self.lookahead_factor,
185 scratch: Vec::with_capacity(
186 (self.lookahead_factor * sample.max_record_size) as usize,
187 ),
188 sample,
189 builder,
190 has_headers: self.has_headers,
191 }))
192 }
193 Ok(None) => Ok(None),
194 Err(err) => Err(err),
195 }
196 }
197}
198
199fn lookahead<R: Read>(
200 reader: &mut ZeroCopyReader<R>,
201 expected_field_count: usize,
202) -> error::Result<Option<(u64, ByteRecord)>> {
203 let mut i: usize = 0;
204 let mut next_record: Option<(u64, ByteRecord)> = None;
205 let mut field_counts: Vec<usize> = Vec::new();
206 let mut pos: u64 = 0;
207
208 while let Some(record) = reader.read_byte_record()? {
209 if i > 0 {
210 field_counts.push(record.len());
211
212 if i == 1 {
213 next_record = Some((pos, record.to_byte_record()));
214 }
215 }
216
217 pos = reader.position();
218 i += 1;
219 }
220
221 if field_counts.len() < 2
225 || field_counts[..field_counts.len() - 1]
226 .iter()
227 .any(|l| *l != expected_field_count)
228 {
229 Ok(None)
230 } else {
231 Ok(next_record)
232 }
233}
234
235pub struct Seeker<R> {
236 inner: R,
237 sample: SeekerSample,
238 lookahead_factor: u64,
239 scratch: Vec<u8>,
240 builder: ZeroCopyReaderBuilder,
241 has_headers: bool,
242}
243
244impl<R: Read + Seek> Seeker<R> {
245 pub fn has_headers(&self) -> bool {
246 self.has_headers
247 }
248
249 pub fn initial_position(&self) -> u64 {
250 self.sample.initial_position
251 }
252
253 pub fn first_record_position(&self) -> u64 {
254 self.sample.first_record_position
255 }
256
257 pub fn file_len(&self) -> u64 {
258 self.sample.file_len
259 }
260
261 #[inline(always)]
262 pub fn range(&self) -> Range<u64> {
263 self.sample.first_record_position..self.sample.file_len
264 }
265
266 #[inline]
267 pub fn exact_count(&self) -> Option<u64> {
268 self.sample
269 .has_reached_eof
270 .then_some(self.sample.record_count)
271 }
272
273 #[inline]
274 pub fn approx_count(&self) -> u64 {
275 let sample = &self.sample;
276
277 if sample.has_reached_eof {
278 sample.record_count
279 } else {
280 ((sample.file_len - sample.first_record_position) as f64
281 / sample.median_record_size as f64)
282 .ceil() as u64
283 }
284 }
285
286 pub fn seek(&mut self, from_pos: u64) -> error::Result<Option<(u64, ByteRecord)>> {
287 if from_pos < self.first_record_position() || from_pos >= self.sample.file_len {
288 return Err(Error::new(ErrorKind::OutOfBounds {
289 pos: from_pos,
290 start: self.first_record_position(),
291 end: self.sample.file_len,
292 }));
293 }
294
295 self.inner.seek(SeekFrom::Start(from_pos))?;
296
297 if from_pos == self.first_record_position() {
299 let first_record = self
300 .builder
301 .from_reader(&mut self.inner)
302 .read_byte_record()?
303 .unwrap()
304 .to_byte_record();
305
306 return Ok(Some((self.first_record_position(), first_record)));
307 }
308
309 self.scratch.clear();
310 (&mut self.inner)
311 .take(self.lookahead_factor * self.sample.max_record_size)
312 .read_to_end(&mut self.scratch)?;
313
314 let mut unquoted_reader = self.builder.from_reader(self.scratch.as_slice());
315 let mut quoted_reader = self
316 .builder
317 .from_reader(Cursor::new(b"\"").chain(self.scratch.as_slice()));
318
319 let expected_field_count = self.sample.headers.len();
320
321 let unquoted = lookahead(&mut unquoted_reader, expected_field_count)?;
322 let quoted = lookahead(&mut quoted_reader, expected_field_count)?;
323
324 match (unquoted, quoted) {
325 (None, None) => Ok(None),
326 (Some((pos, record)), None) => Ok(Some((from_pos + pos, record))),
327 (None, Some((pos, record))) => Ok(Some((from_pos + pos - 1, record))),
328 (Some((unquoted_pos, unquoted_record)), Some((mut quoted_pos, quoted_record))) => {
329 quoted_pos -= 1;
333
334 if unquoted_pos == quoted_pos {
336 Ok(Some((from_pos + unquoted_pos, unquoted_record)))
337 } else {
338 let unquoted_cosine = cosine(
339 &self.sample.fields_mean_sizes,
340 unquoted_record.iter().map(|cell| cell.len()),
341 );
342 let quoted_cosine = cosine(
343 &self.sample.fields_mean_sizes,
344 quoted_record.iter().map(|cell| cell.len()),
345 );
346
347 if unquoted_cosine > quoted_cosine {
348 Ok(Some((from_pos + unquoted_pos, unquoted_record)))
349 } else {
350 Ok(Some((from_pos + quoted_pos, quoted_record)))
351 }
352 }
353 }
354 }
355 }
356
357 pub fn segments(&mut self, count: usize) -> error::Result<Vec<(u64, u64)>> {
358 let sample = &self.sample;
359 let file_len = sample.file_len;
360
361 if self.sample.record_count < count as u64 {
363 return Ok(vec![(self.first_record_position(), file_len)]);
364 }
365
366 let adjusted_file_len = file_len - self.first_record_position();
367
368 let count = count
370 .min(
371 (file_len / (sample.max_record_size * self.lookahead_factor)).saturating_sub(1)
372 as usize,
373 )
374 .max(1);
375
376 let mut offsets = vec![self.first_record_position()];
377
378 for i in 1..count {
379 let file_offset = ((i as f64 / count as f64) * adjusted_file_len as f64).floor() as u64
380 + self.first_record_position();
381
382 if let Some((record_offset, _)) = self.seek(file_offset)? {
383 offsets.push(record_offset);
384 } else {
385 break;
386 }
387 }
388
389 offsets.push(file_len);
390
391 Ok(offsets.windows(2).map(|w| (w[0], w[1])).collect())
392 }
393
394 pub fn byte_headers(&self) -> &ByteRecord {
395 &self.sample.headers
396 }
397
398 pub fn first_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
399 match self.seek(self.first_record_position()) {
400 Ok(Some((_, record))) => Ok(Some(record)),
401 Ok(None) => Ok(None),
402 Err(err) => Err(err),
403 }
404 }
405
406 pub fn last_byte_record(&mut self) -> error::Result<Option<ByteRecord>> {
407 let reverse_reader = ReverseReader::new(
408 &mut self.inner,
409 self.sample.file_len,
410 self.sample.first_record_position,
411 );
412
413 let mut reverse_csv_reader = self.builder.from_reader(reverse_reader);
414
415 reverse_csv_reader
416 .read_byte_record()
417 .map(|record_opt| record_opt.map(|record| record.to_byte_record_in_reverse()))
418 }
419
420 pub fn into_inner(self) -> R {
421 self.inner
422 }
423
424 pub fn into_zero_copy_reader(mut self) -> error::Result<ZeroCopyReader<R>> {
425 self.inner
426 .seek(SeekFrom::Start(self.sample.initial_position))?;
427 self.builder.has_headers(self.has_headers);
428 self.builder.flexible(false);
429 Ok(self.builder.from_reader(self.inner))
430 }
431
432 pub fn into_splitter(mut self) -> error::Result<Splitter<R>> {
433 self.inner
434 .seek(SeekFrom::Start(self.sample.initial_position))?;
435 self.builder.has_headers(self.has_headers);
436 Ok(self.builder.to_splitter_builder().from_reader(self.inner))
437 }
438}