evolution_slicer/
slicer.rs

1//
2// MIT License
3//
4// Copyright (c) 2024 Firelink Data
5//
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to deal
8// in the Software without restriction, including without limitation the rights
9// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10// copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12//
13// The above copyright notice and this permission notice shall be included in all
14// copies or substantial portions of the Software.
15//
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22// SOFTWARE.
23//
24// File created: 2023-12-11
25// Last updated: 2024-10-13
26//
27
28use evolution_common::error::{ExecutionError, Result};
29use evolution_common::NUM_BYTES_FOR_NEWLINE;
30use log::warn;
31
32use std::fs::{File, OpenOptions};
33use std::io::{BufReader, ErrorKind, Read};
34use std::path::PathBuf;
35use std::sync::Arc;
36
37///
38pub trait Slicer {
39    fn is_done(&self) -> bool;
40}
41
42///
43pub type SlicerRef = Arc<dyn Slicer>;
44
45///
46pub struct FileSlicer {
47    inner: BufReader<File>,
48    bytes_to_read: usize,
49    remaining_bytes: usize,
50    bytes_processed: usize,
51    bytes_overlapped: usize,
52}
53
54impl FileSlicer {
55    /// Try creating a new [`FileSlicer`] from a relative or absolute path
56    /// to the fixed-length file that is to be sliced.
57    ///
58    /// # Errors
59    /// This function can return an error for the following reasons:
60    /// * Any I/O error was returned when trying to open the path as a file.
61    /// * Could not read the metadata of the file at the path.
62    pub fn try_from_path(in_path: PathBuf) -> Result<Self> {
63        let file: File = OpenOptions::new().read(true).open(in_path)?;
64
65        let bytes_to_read: usize = file.metadata()?.len() as usize;
66        let remaining_bytes: usize = bytes_to_read;
67        let bytes_processed: usize = 0;
68        let bytes_overlapped: usize = 0;
69
70        let inner: BufReader<File> = BufReader::new(file);
71
72        Ok(FileSlicer {
73            inner,
74            bytes_to_read,
75            remaining_bytes,
76            bytes_processed,
77            bytes_overlapped,
78        })
79    }
80
81    /// Create a new [`FixedLengthFileSlicer`] from a relative or absolute path to
82    /// the fixed-length file that is to be sliced.
83    ///
84    /// # Panics
85    /// This function can panic for the following reasons:
86    /// * Any I/O error was returned when trying to open the path as a file.
87    /// * Could not read the metadata of the file at the path.
88    pub fn from_path(in_path: PathBuf) -> Self {
89        FileSlicer::try_from_path(in_path).unwrap()
90    }
91
92    /// Get the total number of bytes to read.
93    pub fn bytes_to_read(&self) -> usize {
94        self.bytes_to_read
95    }
96
97    /// Get the number of remaining bytes to read.
98    pub fn remaining_bytes(&self) -> usize {
99        self.remaining_bytes
100    }
101
102    /// Set the number of remaining bytes to read.
103    pub fn set_remaining_bytes(&mut self, remaining_bytes: usize) {
104        self.remaining_bytes = remaining_bytes;
105    }
106
107    /// Get the total number of processed bytes.
108    pub fn bytes_processed(&self) -> usize {
109        self.bytes_processed
110    }
111
112    /// Set the total number of processed bytes.
113    pub fn set_bytes_processed(&mut self, bytes_processed: usize) {
114        self.bytes_processed = bytes_processed;
115    }
116
117    /// Get the total number of overlapped bytes (due to sliding window).
118    pub fn bytes_overlapped(&self) -> usize {
119        self.bytes_overlapped
120    }
121
122    /// Set the total number of overlapped bytes.
123    pub fn set_bytes_overlapped(&mut self, bytes_overlapped: usize) {
124        self.bytes_overlapped = bytes_overlapped;
125    }
126
127    /// Try and read from the buffered reader into the provided buffer. This function
128    /// reads enough bytes to fill the buffer, hence, it is up to the caller to
129    /// ensure that the buffer has the correct and/or wanted capacity.
130    ///
131    /// # Errors
132    /// If the buffered reader encounters an EOF before completely filling the buffer.
133    pub fn try_read_to_buffer(&mut self, buffer: &mut [u8]) -> Result<()> {
134        match self.inner.read_exact(buffer) {
135            Ok(()) => Ok(()),
136            Err(e) => match e.kind() {
137                ErrorKind::UnexpectedEof => {
138                    warn!("EOF reached, this should be the last time reading from the file.");
139                    Ok(())
140                }
141                _ => Err(Box::new(e)),
142            },
143        }
144    }
145
146    /// Try and evenly distribute the buffer into uniformly sized chunks for each worker thread.
147    /// This function expects a [`Vec`] of usize tuples, representing the start and end byte
148    /// indices for each worker threads chunk.
149    ///
150    /// # Note
151    /// This function is optimized to spend as little time as possible looking for valid chunks, i.e.,
152    /// where there are line breaks, and will not look through the entire buffer. This can have an
153    /// effect on the CPU cache hit-rate, however, this depends on the size of the buffer.
154    ///
155    /// # Errors
156    /// This function might return an error for the following reasons:
157    /// * If the buffer was empty.
158    /// * If there were no line breaks in the buffer.
159    pub fn try_distribute_buffer_chunks_on_workers(
160        &self,
161        buffer: &[u8],
162        thread_workloads: &mut Vec<(usize, usize)>,
163    ) -> Result<()> {
164        let n_bytes_total: usize = buffer.len();
165        let n_worker_threads: usize = thread_workloads.capacity();
166
167        let n_bytes_per_thread: usize = n_bytes_total / n_worker_threads;
168        let n_bytes_remaining: usize = n_bytes_total - n_bytes_per_thread * n_worker_threads;
169
170        let mut prev_byte_idx: usize = 0;
171        for _ in 0..(n_worker_threads - 1) {
172            let next_byte_idx: usize = n_bytes_per_thread + prev_byte_idx;
173            thread_workloads.push((prev_byte_idx, next_byte_idx));
174            prev_byte_idx = next_byte_idx;
175        }
176
177        thread_workloads.push((
178            prev_byte_idx,
179            prev_byte_idx + n_bytes_per_thread + n_bytes_remaining,
180        ));
181
182        let mut n_bytes_to_offset_start: usize = 0;
183        for t_idx in 0..n_worker_threads {
184            let (mut start_byte_idx, mut end_byte_idx) = thread_workloads[t_idx];
185            start_byte_idx -= n_bytes_to_offset_start;
186            let n_bytes_to_offset_end: usize = (end_byte_idx - start_byte_idx)
187                - self.try_find_last_line_break(&buffer[start_byte_idx..end_byte_idx])?;
188            end_byte_idx -= n_bytes_to_offset_end;
189            thread_workloads[t_idx].0 = start_byte_idx;
190            thread_workloads[t_idx].1 = end_byte_idx;
191            n_bytes_to_offset_start = n_bytes_to_offset_end - NUM_BYTES_FOR_NEWLINE;
192        }
193
194        Ok(())
195    }
196
197    /// Read from the buffered reader into the provided buffer. This function reads
198    /// enough bytes to fill the buffer, hence, it is up to the caller to ensure that
199    /// that buffer has the correct and/or wanted capacity.
200    ///
201    /// # Panics
202    /// If the buffered reader encounters an EOF before completely filling the buffer.
203    pub fn read_to_buffer(&mut self, buffer: &mut [u8]) {
204        self.inner.read_exact(buffer).unwrap();
205    }
206
207    /// Try and find the last linebreak character in a byte slice and return the index
208    /// of the character. The function looks specifically for two character, the
209    /// carriage-return (CR) and line-feed (LF) characters, represented as the character
210    /// sequence '\r\n' on Windows systems.
211    ///
212    /// # Errors
213    /// If either the byte slice to search through was empty, or there existed no linebreak
214    /// character in the byte slice.
215    #[cfg(target_os = "windows")]
216    pub fn try_find_last_line_break(&self, bytes: &[u8]) -> Result<usize> {
217        if bytes.is_empty() {
218            return Err(Box::new(ExecutionError::new(
219                "Byte slice to find newlines in was empty, exiting...",
220            )));
221        };
222
223        let mut idx: usize = bytes.len() - 1;
224
225        while idx > 1 {
226            if (bytes[idx - 1] == 0x0d) && (bytes[idx] == 0x0a) {
227                return Ok(idx - 1);
228            };
229
230            idx -= 1;
231        }
232
233        Err(Box::new(ExecutionError::new(
234            "Could not find any newlines in byte slice, exiting...",
235        )))
236    }
237
238    /// Try and find the last linebreak character in a byte slice and return the index
239    /// of the character. The function looks specifically for a line-feed (LF) character,
240    /// represented as '\n' on Unix systems.
241    ///
242    /// # Errors
243    /// If either the byte slice to search through was empty, or there existed no linebreak
244    /// character in the byte slice.
245    #[cfg(not(target_os = "windows"))]
246    pub fn try_find_last_line_break(&self, bytes: &[u8]) -> Result<usize> {
247        if bytes.is_empty() {
248            return Err(Box::new(ExecutionError::new(
249                "Byte slice to find newlines in was empty, exiting...",
250            )));
251        };
252
253        let mut idx: usize = bytes.len() - 1;
254
255        while idx > 0 {
256            if bytes[idx] == 0x0a {
257                return Ok(idx);
258            };
259
260            idx -= 1;
261        }
262
263        Err(Box::new(ExecutionError::new(
264            "Could not find any newlines in byte slice, exiting...",
265        )))
266    }
267
268    /// Try and find all occurances of linebreak characters in a byte slice and push
269    /// the index of the byte to a provided buffer. The function looks specifically
270    /// for two characters, the carriage-return (CR) and line-feed (LF) characters,
271    /// represented as the character sequence '\r\n' on Windows systems.
272    ///
273    /// # Errors
274    /// If the byte slice to search through was empty.
275    #[cfg(target_os = "windows")]
276    pub fn try_find_line_breaks(
277        &self,
278        bytes: &[u8],
279        buffer: &mut Vec<usize>,
280        add_starting_idx: bool,
281    ) -> Result<()> {
282        if bytes.is_empty() {
283            return Err(Box::new(ExecutionError::new(
284                "Byte slice to find newlines in was empty, exiting...",
285            )));
286        };
287
288        // We need to also set the starting position of the current buffer, which is on index 0.
289        // This is needed for multitthreading when threads need to know the byte indices of their slice.
290        if add_starting_idx {
291            buffer.push(0);
292        }
293
294        (1..bytes.len()).for_each(|idx| {
295            if (bytes[idx - 1] == 0x0d) && (bytes[idx] == 0x0a) {
296                buffer.push(idx - 1);
297            };
298        });
299
300        Ok(())
301    }
302    /// Try and find all occurances of linebreak characters in a byte slice and push
303    /// the index of the byte to a provided buffer. The function looks specifically
304    /// for a line-feed (LF) character, represented as '\n' on Unix systems.
305    ///
306    /// # Errors
307    /// If the byte slice to search through was empty.
308    #[cfg(not(target_os = "windows"))]
309    pub fn try_find_line_breaks(
310        &self,
311        bytes: &[u8],
312        buffer: &mut Vec<usize>,
313        add_starting_idx: bool,
314    ) -> Result<()> {
315        if bytes.is_empty() {
316            return Err(Box::new(ExecutionError::new(
317                "Byte slice to find newlines in was empty, exiting...",
318            )));
319        };
320
321        // We need to also set the starting position of the current buffer, which is on index 0.
322        // This is needed for multitthreading when threads need to know the byte indices of their slice.
323        if add_starting_idx {
324            buffer.push(0);
325        }
326
327        (0..bytes.len()).for_each(|idx| {
328            if bytes[idx] == 0x0a {
329                buffer.push(idx);
330            };
331        });
332
333        Ok(())
334    }
335
336    /// Try and seek relative to the current position in the buffered reader.
337    ///
338    /// # Errors
339    /// Seeking to a negative offset will return an error.
340    pub fn try_seek_relative(&mut self, bytes_to_seek: i64) -> Result<()> {
341        self.inner.seek_relative(bytes_to_seek)?;
342        Ok(())
343    }
344
345    /// Seek relative to the current position in the buffered reader.
346    ///
347    /// # Panics
348    /// Seeking to a negative offset will cause the program to panic.
349    pub fn seek_relative(&mut self, bytes_to_seek: i64) {
350        self.try_seek_relative(bytes_to_seek).unwrap()
351    }
352}
353
354impl Slicer for FileSlicer {
355    /// Get whether or not this [`Slicer`] is done reading the input file.
356    fn is_done(&self) -> bool {
357        self.bytes_processed >= self.bytes_to_read
358    }
359}