evolution_slicer/slicer.rs
1//
2// MIT License
3//
4// Copyright (c) 2024 Firelink Data
5//
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to deal
8// in the Software without restriction, including without limitation the rights
9// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10// copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12//
13// The above copyright notice and this permission notice shall be included in all
14// copies or substantial portions of the Software.
15//
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22// SOFTWARE.
23//
24// File created: 2023-12-11
25// Last updated: 2024-10-13
26//
27
28use evolution_common::error::{ExecutionError, Result};
29use evolution_common::NUM_BYTES_FOR_NEWLINE;
30use log::warn;
31
32use std::fs::{File, OpenOptions};
33use std::io::{BufReader, ErrorKind, Read};
34use std::path::PathBuf;
35use std::sync::Arc;
36
37///
38pub trait Slicer {
39 fn is_done(&self) -> bool;
40}
41
42///
43pub type SlicerRef = Arc<dyn Slicer>;
44
45///
46pub struct FileSlicer {
47 inner: BufReader<File>,
48 bytes_to_read: usize,
49 remaining_bytes: usize,
50 bytes_processed: usize,
51 bytes_overlapped: usize,
52}
53
54impl FileSlicer {
55 /// Try creating a new [`FileSlicer`] from a relative or absolute path
56 /// to the fixed-length file that is to be sliced.
57 ///
58 /// # Errors
59 /// This function can return an error for the following reasons:
60 /// * Any I/O error was returned when trying to open the path as a file.
61 /// * Could not read the metadata of the file at the path.
62 pub fn try_from_path(in_path: PathBuf) -> Result<Self> {
63 let file: File = OpenOptions::new().read(true).open(in_path)?;
64
65 let bytes_to_read: usize = file.metadata()?.len() as usize;
66 let remaining_bytes: usize = bytes_to_read;
67 let bytes_processed: usize = 0;
68 let bytes_overlapped: usize = 0;
69
70 let inner: BufReader<File> = BufReader::new(file);
71
72 Ok(FileSlicer {
73 inner,
74 bytes_to_read,
75 remaining_bytes,
76 bytes_processed,
77 bytes_overlapped,
78 })
79 }
80
81 /// Create a new [`FixedLengthFileSlicer`] from a relative or absolute path to
82 /// the fixed-length file that is to be sliced.
83 ///
84 /// # Panics
85 /// This function can panic for the following reasons:
86 /// * Any I/O error was returned when trying to open the path as a file.
87 /// * Could not read the metadata of the file at the path.
88 pub fn from_path(in_path: PathBuf) -> Self {
89 FileSlicer::try_from_path(in_path).unwrap()
90 }
91
92 /// Get the total number of bytes to read.
93 pub fn bytes_to_read(&self) -> usize {
94 self.bytes_to_read
95 }
96
97 /// Get the number of remaining bytes to read.
98 pub fn remaining_bytes(&self) -> usize {
99 self.remaining_bytes
100 }
101
102 /// Set the number of remaining bytes to read.
103 pub fn set_remaining_bytes(&mut self, remaining_bytes: usize) {
104 self.remaining_bytes = remaining_bytes;
105 }
106
107 /// Get the total number of processed bytes.
108 pub fn bytes_processed(&self) -> usize {
109 self.bytes_processed
110 }
111
112 /// Set the total number of processed bytes.
113 pub fn set_bytes_processed(&mut self, bytes_processed: usize) {
114 self.bytes_processed = bytes_processed;
115 }
116
117 /// Get the total number of overlapped bytes (due to sliding window).
118 pub fn bytes_overlapped(&self) -> usize {
119 self.bytes_overlapped
120 }
121
122 /// Set the total number of overlapped bytes.
123 pub fn set_bytes_overlapped(&mut self, bytes_overlapped: usize) {
124 self.bytes_overlapped = bytes_overlapped;
125 }
126
127 /// Try and read from the buffered reader into the provided buffer. This function
128 /// reads enough bytes to fill the buffer, hence, it is up to the caller to
129 /// ensure that the buffer has the correct and/or wanted capacity.
130 ///
131 /// # Errors
132 /// If the buffered reader encounters an EOF before completely filling the buffer.
133 pub fn try_read_to_buffer(&mut self, buffer: &mut [u8]) -> Result<()> {
134 match self.inner.read_exact(buffer) {
135 Ok(()) => Ok(()),
136 Err(e) => match e.kind() {
137 ErrorKind::UnexpectedEof => {
138 warn!("EOF reached, this should be the last time reading from the file.");
139 Ok(())
140 }
141 _ => Err(Box::new(e)),
142 },
143 }
144 }
145
146 /// Try and evenly distribute the buffer into uniformly sized chunks for each worker thread.
147 /// This function expects a [`Vec`] of usize tuples, representing the start and end byte
148 /// indices for each worker threads chunk.
149 ///
150 /// # Note
151 /// This function is optimized to spend as little time as possible looking for valid chunks, i.e.,
152 /// where there are line breaks, and will not look through the entire buffer. This can have an
153 /// effect on the CPU cache hit-rate, however, this depends on the size of the buffer.
154 ///
155 /// # Errors
156 /// This function might return an error for the following reasons:
157 /// * If the buffer was empty.
158 /// * If there were no line breaks in the buffer.
159 pub fn try_distribute_buffer_chunks_on_workers(
160 &self,
161 buffer: &[u8],
162 thread_workloads: &mut Vec<(usize, usize)>,
163 ) -> Result<()> {
164 let n_bytes_total: usize = buffer.len();
165 let n_worker_threads: usize = thread_workloads.capacity();
166
167 let n_bytes_per_thread: usize = n_bytes_total / n_worker_threads;
168 let n_bytes_remaining: usize = n_bytes_total - n_bytes_per_thread * n_worker_threads;
169
170 let mut prev_byte_idx: usize = 0;
171 for _ in 0..(n_worker_threads - 1) {
172 let next_byte_idx: usize = n_bytes_per_thread + prev_byte_idx;
173 thread_workloads.push((prev_byte_idx, next_byte_idx));
174 prev_byte_idx = next_byte_idx;
175 }
176
177 thread_workloads.push((
178 prev_byte_idx,
179 prev_byte_idx + n_bytes_per_thread + n_bytes_remaining,
180 ));
181
182 let mut n_bytes_to_offset_start: usize = 0;
183 for t_idx in 0..n_worker_threads {
184 let (mut start_byte_idx, mut end_byte_idx) = thread_workloads[t_idx];
185 start_byte_idx -= n_bytes_to_offset_start;
186 let n_bytes_to_offset_end: usize = (end_byte_idx - start_byte_idx)
187 - self.try_find_last_line_break(&buffer[start_byte_idx..end_byte_idx])?;
188 end_byte_idx -= n_bytes_to_offset_end;
189 thread_workloads[t_idx].0 = start_byte_idx;
190 thread_workloads[t_idx].1 = end_byte_idx;
191 n_bytes_to_offset_start = n_bytes_to_offset_end - NUM_BYTES_FOR_NEWLINE;
192 }
193
194 Ok(())
195 }
196
197 /// Read from the buffered reader into the provided buffer. This function reads
198 /// enough bytes to fill the buffer, hence, it is up to the caller to ensure that
199 /// that buffer has the correct and/or wanted capacity.
200 ///
201 /// # Panics
202 /// If the buffered reader encounters an EOF before completely filling the buffer.
203 pub fn read_to_buffer(&mut self, buffer: &mut [u8]) {
204 self.inner.read_exact(buffer).unwrap();
205 }
206
207 /// Try and find the last linebreak character in a byte slice and return the index
208 /// of the character. The function looks specifically for two character, the
209 /// carriage-return (CR) and line-feed (LF) characters, represented as the character
210 /// sequence '\r\n' on Windows systems.
211 ///
212 /// # Errors
213 /// If either the byte slice to search through was empty, or there existed no linebreak
214 /// character in the byte slice.
215 #[cfg(target_os = "windows")]
216 pub fn try_find_last_line_break(&self, bytes: &[u8]) -> Result<usize> {
217 if bytes.is_empty() {
218 return Err(Box::new(ExecutionError::new(
219 "Byte slice to find newlines in was empty, exiting...",
220 )));
221 };
222
223 let mut idx: usize = bytes.len() - 1;
224
225 while idx > 1 {
226 if (bytes[idx - 1] == 0x0d) && (bytes[idx] == 0x0a) {
227 return Ok(idx - 1);
228 };
229
230 idx -= 1;
231 }
232
233 Err(Box::new(ExecutionError::new(
234 "Could not find any newlines in byte slice, exiting...",
235 )))
236 }
237
238 /// Try and find the last linebreak character in a byte slice and return the index
239 /// of the character. The function looks specifically for a line-feed (LF) character,
240 /// represented as '\n' on Unix systems.
241 ///
242 /// # Errors
243 /// If either the byte slice to search through was empty, or there existed no linebreak
244 /// character in the byte slice.
245 #[cfg(not(target_os = "windows"))]
246 pub fn try_find_last_line_break(&self, bytes: &[u8]) -> Result<usize> {
247 if bytes.is_empty() {
248 return Err(Box::new(ExecutionError::new(
249 "Byte slice to find newlines in was empty, exiting...",
250 )));
251 };
252
253 let mut idx: usize = bytes.len() - 1;
254
255 while idx > 0 {
256 if bytes[idx] == 0x0a {
257 return Ok(idx);
258 };
259
260 idx -= 1;
261 }
262
263 Err(Box::new(ExecutionError::new(
264 "Could not find any newlines in byte slice, exiting...",
265 )))
266 }
267
268 /// Try and find all occurances of linebreak characters in a byte slice and push
269 /// the index of the byte to a provided buffer. The function looks specifically
270 /// for two characters, the carriage-return (CR) and line-feed (LF) characters,
271 /// represented as the character sequence '\r\n' on Windows systems.
272 ///
273 /// # Errors
274 /// If the byte slice to search through was empty.
275 #[cfg(target_os = "windows")]
276 pub fn try_find_line_breaks(
277 &self,
278 bytes: &[u8],
279 buffer: &mut Vec<usize>,
280 add_starting_idx: bool,
281 ) -> Result<()> {
282 if bytes.is_empty() {
283 return Err(Box::new(ExecutionError::new(
284 "Byte slice to find newlines in was empty, exiting...",
285 )));
286 };
287
288 // We need to also set the starting position of the current buffer, which is on index 0.
289 // This is needed for multitthreading when threads need to know the byte indices of their slice.
290 if add_starting_idx {
291 buffer.push(0);
292 }
293
294 (1..bytes.len()).for_each(|idx| {
295 if (bytes[idx - 1] == 0x0d) && (bytes[idx] == 0x0a) {
296 buffer.push(idx - 1);
297 };
298 });
299
300 Ok(())
301 }
302 /// Try and find all occurances of linebreak characters in a byte slice and push
303 /// the index of the byte to a provided buffer. The function looks specifically
304 /// for a line-feed (LF) character, represented as '\n' on Unix systems.
305 ///
306 /// # Errors
307 /// If the byte slice to search through was empty.
308 #[cfg(not(target_os = "windows"))]
309 pub fn try_find_line_breaks(
310 &self,
311 bytes: &[u8],
312 buffer: &mut Vec<usize>,
313 add_starting_idx: bool,
314 ) -> Result<()> {
315 if bytes.is_empty() {
316 return Err(Box::new(ExecutionError::new(
317 "Byte slice to find newlines in was empty, exiting...",
318 )));
319 };
320
321 // We need to also set the starting position of the current buffer, which is on index 0.
322 // This is needed for multitthreading when threads need to know the byte indices of their slice.
323 if add_starting_idx {
324 buffer.push(0);
325 }
326
327 (0..bytes.len()).for_each(|idx| {
328 if bytes[idx] == 0x0a {
329 buffer.push(idx);
330 };
331 });
332
333 Ok(())
334 }
335
336 /// Try and seek relative to the current position in the buffered reader.
337 ///
338 /// # Errors
339 /// Seeking to a negative offset will return an error.
340 pub fn try_seek_relative(&mut self, bytes_to_seek: i64) -> Result<()> {
341 self.inner.seek_relative(bytes_to_seek)?;
342 Ok(())
343 }
344
345 /// Seek relative to the current position in the buffered reader.
346 ///
347 /// # Panics
348 /// Seeking to a negative offset will cause the program to panic.
349 pub fn seek_relative(&mut self, bytes_to_seek: i64) {
350 self.try_seek_relative(bytes_to_seek).unwrap()
351 }
352}
353
354impl Slicer for FileSlicer {
355 /// Get whether or not this [`Slicer`] is done reading the input file.
356 fn is_done(&self) -> bool {
357 self.bytes_processed >= self.bytes_to_read
358 }
359}