splitx/
lib.rs

1use std::ffi::OsStr;
2use std::fs;
3use std::fs::metadata;
4use std::fs::File;
5use std::io::{self, BufRead, Write};
6use std::path::Path;
7use std::path::PathBuf;
8
9/// Splits a text file into pieces with the size of each piece below a specified maximum number of bytes.
10///
11/// # Arguments
12///
13/// * file_path - the path of the file to be split.
14/// * max_file_size_bytes - the maximum size of each piece of the file in bytes after splitting.
15/// * num_header_lines - how many lines are the file's header. If no header lines, use 0. Header lines will be kept in each of the pieces.
16/// * output_dir - where to write the pieces of the file.
17pub fn split<P>(
18    file_path: P,
19    max_file_size_bytes: u64,
20    num_header_lines: u8,
21    output_dir: P,
22) -> Result<(), Box<dyn std::error::Error>>
23where
24    P: AsRef<Path> + std::fmt::Display + Clone,
25{
26    let o_path = output_dir.as_ref();
27    if !o_path.exists() {
28        let _ = fs::create_dir_all(o_path);
29    }
30
31    let (max_chunk_bytes, header) =
32        estimate_chunk_size(file_path.clone(), max_file_size_bytes, num_header_lines)?;
33    let file = File::open(file_path.clone())?;
34    let reader = io::BufReader::new(file);
35
36    let mut lines = reader.lines();
37    let mut linex: String;
38
39    let mut file_index = 0;
40    let mut buffer = Vec::new();
41    let mut remainder: Option<Vec<String>>;
42
43    let mut chunk_bytes: u64 = 0;
44
45    loop {
46        match lines.next() {
47            Some(line) => {
48                linex = line?;
49                let line_num_bytes = linex.as_bytes().len() as u64 + NEW_LINE_BYTES as u64;
50                if chunk_bytes + line_num_bytes > max_chunk_bytes {
51                    (remainder, file_index) = write_buffer_to_file(
52                        &buffer[..],
53                        output_dir.clone(),
54                        file_path.clone(),
55                        file_index,
56                        max_file_size_bytes,
57                        max_chunk_bytes,
58                        &header[..],
59                        false,
60                    )?;
61                    buffer.clear();
62                    chunk_bytes = line_num_bytes;
63                    if let Some(r) = &remainder {
64                        buffer.extend_from_slice(&r[..]);
65                        chunk_bytes += get_slice_bytes(&r[..]);
66                    }
67                    buffer.push(linex);
68                } else {
69                    chunk_bytes += line_num_bytes;
70                    buffer.push(linex);
71                }
72            }
73            None => {
74                if !buffer.is_empty() {
75                    let _ = write_buffer_to_file(
76                        &buffer[..],
77                        output_dir,
78                        file_path,
79                        file_index,
80                        max_file_size_bytes,
81                        max_chunk_bytes,
82                        &header[..],
83                        true,
84                    );
85                }
86                break;
87            }
88        }
89    }
90
91    Ok(())
92}
93
94const NEW_LINE_BYTES: usize = "\n".as_bytes().len();
95
96fn format_os_str(os_str: Option<&OsStr>) -> Option<String> {
97    os_str.map(|value| value.to_string_lossy().into_owned())
98}
99
100fn get_file_name_and_extension<P>(file_path: P) -> (Option<String>, Option<String>)
101where
102    P: AsRef<Path>,
103{
104    let path = file_path.as_ref();
105    let file_stem = format_os_str(path.file_stem());
106    let extension = format_os_str(path.extension());
107    (file_stem, extension)
108}
109
110fn compose_file_path<P>(directory: P, file: P, file_index: u32) -> PathBuf
111where
112    P: AsRef<Path> + std::fmt::Display,
113{
114    let (fname, ext) = get_file_name_and_extension(file);
115
116    let mut path = PathBuf::new();
117    path.push(directory);
118    let mut buf = String::new();
119    if let Some(n) = fname {
120        buf.push_str(&n);
121        buf.push_str("_");
122    };
123    buf.push_str(format!("{file_index:09}").as_str());
124    if let Some(x) = ext {
125        buf.push_str(format!(".{x}").as_str());
126    }
127    path.push(buf);
128    path
129}
130
131fn write_lines_to_file<P>(buffer: &[String], file_path: P) -> io::Result<u64>
132where
133    P: AsRef<Path> + std::fmt::Debug,
134{
135    {
136        let mut file = File::create(&file_path)?;
137        for line in buffer {
138            writeln!(file, "{}", line)?;
139        }
140    }
141    get_file_size(file_path)
142}
143
144fn write_buffer_to_file<P>(
145    buffer: &[String],
146    output_dir: P,
147    file: P,
148    mut file_index: u32,
149    max_size: u64,
150    max_chunk_memory_bytes: u64,
151    header: &[String],
152    is_end_of_file: bool,
153) -> io::Result<(Option<Vec<String>>, u32)>
154where
155    P: AsRef<Path> + std::fmt::Display,
156{
157    let f = compose_file_path(&output_dir, &file, file_index);
158    let mut size = write_lines_to_file(&buffer, &f)?;
159    let mut remainder: Option<Vec<String>> = None;
160    let mut first_part = &buffer[..];
161    let len = buffer.len();
162    let header_len = header.len();
163    let mut remainder_bytes = 0;
164
165    // The following while loop should never be executed based on tests so far.
166    // It serves as a safeguard against unseen situations where the calculated
167    // memroy size does not match the disk size.
168    while size > max_size {
169        println!("wasted write (should not happen) at file_index: {file_index}; size: {size}; max_size: {max_size}");
170        let split_point = (first_part.len() as f32 * 0.8) as usize;
171        first_part = &buffer[..split_point];
172        if len > split_point {
173            let mut r: Vec<String> = Vec::with_capacity(len - split_point + header_len);
174            if header_len > 0 {
175                r.extend_from_slice(&header[..]);
176            }
177            r.extend_from_slice(&buffer[split_point..]);
178            remainder_bytes = get_slice_bytes(&r[..]);
179            remainder = Some(r);
180        }
181        size = write_lines_to_file(first_part, &f)?;
182    }
183    file_index += 1;
184
185    // The following (remainder_bytes as u64 > max_chunk_memory_bytes) condition should never be
186    // true based on tests so far. It serves as a safeguard against unseen situations where the
187    // calculated memroy size does not match the disk size.
188    if (remainder_bytes as u64 > max_chunk_memory_bytes) || is_end_of_file {
189        if let Some(r) = remainder {
190            (remainder, file_index) = write_buffer_to_file(
191                &r[..],
192                output_dir,
193                file,
194                file_index,
195                max_size,
196                max_chunk_memory_bytes,
197                &header[..],
198                is_end_of_file,
199            )?;
200        }
201    }
202
203    if !is_end_of_file && remainder == None && header_len > 0 {
204        let mut r = Vec::with_capacity(header_len);
205        r.extend_from_slice(&header[..]);
206        remainder = Some(r);
207    }
208
209    Ok((remainder, file_index))
210}
211
212fn get_file_size<P>(file_path: P) -> io::Result<u64>
213where
214    P: AsRef<Path>,
215{
216    let metadata = metadata(file_path)?;
217    Ok(metadata.len())
218}
219
220fn estimate_chunk_size<P>(
221    file_path: P,
222    max_file_size_bytes: u64,
223    num_header_lines: u8,
224) -> io::Result<(u64, Vec<String>)>
225where
226    P: AsRef<Path>,
227{
228    let file_disk_size = get_file_size(&file_path)?;
229    let mut file_memory_size = 0;
230    let mut header = Vec::with_capacity(num_header_lines as usize);
231    let file = File::open(&file_path)?;
232    let reader = io::BufReader::new(file);
233    let mut num_lines: u64 = 0;
234    let mut header_done = false;
235
236    for line in reader.lines() {
237        num_lines += 1;
238        let line = line?;
239        let line_size = line.as_bytes().len() + NEW_LINE_BYTES;
240        file_memory_size += line_size;
241        if !header_done {
242            if num_lines <= num_header_lines as u64 {
243                header.push(line);
244            } else {
245                header_done = true;
246            }
247        }
248    }
249
250    let memory_over_disk_size_ratio = file_memory_size as f64 / file_disk_size as f64;
251    // memory_over_disk_size_ratio should be 1, but not guaranteed
252    Ok((
253        (max_file_size_bytes as f64 * memory_over_disk_size_ratio) as u64,
254        header,
255    ))
256}
257
258fn get_slice_bytes(s: &[String]) -> u64 {
259    let mut slice_bytes: u64 = 0;
260    for line in s {
261        slice_bytes += line.as_bytes().len() as u64;
262        slice_bytes += NEW_LINE_BYTES as u64;
263    }
264    slice_bytes
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_split() {
273        let input_file = "./test/test.csv";
274        let output_dir = "./test/results";
275        let num_header_lines = 1;
276        let max_file_size_bytes = 4000;
277        let _ = split(
278            input_file,
279            max_file_size_bytes,
280            num_header_lines,
281            output_dir,
282        );
283        assert_eq!(1, 1); // todo: validate the split results
284    }
285}