1use std::ffi::OsStr;
2use std::fs;
3use std::fs::metadata;
4use std::fs::File;
5use std::io::{self, BufRead, Write};
6use std::path::Path;
7use std::path::PathBuf;
8
9pub fn split<P>(
18 file_path: P,
19 max_file_size_bytes: u64,
20 num_header_lines: u8,
21 output_dir: P,
22) -> Result<(), Box<dyn std::error::Error>>
23where
24 P: AsRef<Path> + std::fmt::Display + Clone,
25{
26 let o_path = output_dir.as_ref();
27 if !o_path.exists() {
28 let _ = fs::create_dir_all(o_path);
29 }
30
31 let (max_chunk_bytes, header) =
32 estimate_chunk_size(file_path.clone(), max_file_size_bytes, num_header_lines)?;
33 let file = File::open(file_path.clone())?;
34 let reader = io::BufReader::new(file);
35
36 let mut lines = reader.lines();
37 let mut linex: String;
38
39 let mut file_index = 0;
40 let mut buffer = Vec::new();
41 let mut remainder: Option<Vec<String>>;
42
43 let mut chunk_bytes: u64 = 0;
44
45 loop {
46 match lines.next() {
47 Some(line) => {
48 linex = line?;
49 let line_num_bytes = linex.as_bytes().len() as u64 + NEW_LINE_BYTES as u64;
50 if chunk_bytes + line_num_bytes > max_chunk_bytes {
51 (remainder, file_index) = write_buffer_to_file(
52 &buffer[..],
53 output_dir.clone(),
54 file_path.clone(),
55 file_index,
56 max_file_size_bytes,
57 max_chunk_bytes,
58 &header[..],
59 false,
60 )?;
61 buffer.clear();
62 chunk_bytes = line_num_bytes;
63 if let Some(r) = &remainder {
64 buffer.extend_from_slice(&r[..]);
65 chunk_bytes += get_slice_bytes(&r[..]);
66 }
67 buffer.push(linex);
68 } else {
69 chunk_bytes += line_num_bytes;
70 buffer.push(linex);
71 }
72 }
73 None => {
74 if !buffer.is_empty() {
75 let _ = write_buffer_to_file(
76 &buffer[..],
77 output_dir,
78 file_path,
79 file_index,
80 max_file_size_bytes,
81 max_chunk_bytes,
82 &header[..],
83 true,
84 );
85 }
86 break;
87 }
88 }
89 }
90
91 Ok(())
92}
93
94const NEW_LINE_BYTES: usize = "\n".as_bytes().len();
95
96fn format_os_str(os_str: Option<&OsStr>) -> Option<String> {
97 os_str.map(|value| value.to_string_lossy().into_owned())
98}
99
100fn get_file_name_and_extension<P>(file_path: P) -> (Option<String>, Option<String>)
101where
102 P: AsRef<Path>,
103{
104 let path = file_path.as_ref();
105 let file_stem = format_os_str(path.file_stem());
106 let extension = format_os_str(path.extension());
107 (file_stem, extension)
108}
109
110fn compose_file_path<P>(directory: P, file: P, file_index: u32) -> PathBuf
111where
112 P: AsRef<Path> + std::fmt::Display,
113{
114 let (fname, ext) = get_file_name_and_extension(file);
115
116 let mut path = PathBuf::new();
117 path.push(directory);
118 let mut buf = String::new();
119 if let Some(n) = fname {
120 buf.push_str(&n);
121 buf.push_str("_");
122 };
123 buf.push_str(format!("{file_index:09}").as_str());
124 if let Some(x) = ext {
125 buf.push_str(format!(".{x}").as_str());
126 }
127 path.push(buf);
128 path
129}
130
131fn write_lines_to_file<P>(buffer: &[String], file_path: P) -> io::Result<u64>
132where
133 P: AsRef<Path> + std::fmt::Debug,
134{
135 {
136 let mut file = File::create(&file_path)?;
137 for line in buffer {
138 writeln!(file, "{}", line)?;
139 }
140 }
141 get_file_size(file_path)
142}
143
144fn write_buffer_to_file<P>(
145 buffer: &[String],
146 output_dir: P,
147 file: P,
148 mut file_index: u32,
149 max_size: u64,
150 max_chunk_memory_bytes: u64,
151 header: &[String],
152 is_end_of_file: bool,
153) -> io::Result<(Option<Vec<String>>, u32)>
154where
155 P: AsRef<Path> + std::fmt::Display,
156{
157 let f = compose_file_path(&output_dir, &file, file_index);
158 let mut size = write_lines_to_file(&buffer, &f)?;
159 let mut remainder: Option<Vec<String>> = None;
160 let mut first_part = &buffer[..];
161 let len = buffer.len();
162 let header_len = header.len();
163 let mut remainder_bytes = 0;
164
165 while size > max_size {
169 println!("wasted write (should not happen) at file_index: {file_index}; size: {size}; max_size: {max_size}");
170 let split_point = (first_part.len() as f32 * 0.8) as usize;
171 first_part = &buffer[..split_point];
172 if len > split_point {
173 let mut r: Vec<String> = Vec::with_capacity(len - split_point + header_len);
174 if header_len > 0 {
175 r.extend_from_slice(&header[..]);
176 }
177 r.extend_from_slice(&buffer[split_point..]);
178 remainder_bytes = get_slice_bytes(&r[..]);
179 remainder = Some(r);
180 }
181 size = write_lines_to_file(first_part, &f)?;
182 }
183 file_index += 1;
184
185 if (remainder_bytes as u64 > max_chunk_memory_bytes) || is_end_of_file {
189 if let Some(r) = remainder {
190 (remainder, file_index) = write_buffer_to_file(
191 &r[..],
192 output_dir,
193 file,
194 file_index,
195 max_size,
196 max_chunk_memory_bytes,
197 &header[..],
198 is_end_of_file,
199 )?;
200 }
201 }
202
203 if !is_end_of_file && remainder == None && header_len > 0 {
204 let mut r = Vec::with_capacity(header_len);
205 r.extend_from_slice(&header[..]);
206 remainder = Some(r);
207 }
208
209 Ok((remainder, file_index))
210}
211
212fn get_file_size<P>(file_path: P) -> io::Result<u64>
213where
214 P: AsRef<Path>,
215{
216 let metadata = metadata(file_path)?;
217 Ok(metadata.len())
218}
219
220fn estimate_chunk_size<P>(
221 file_path: P,
222 max_file_size_bytes: u64,
223 num_header_lines: u8,
224) -> io::Result<(u64, Vec<String>)>
225where
226 P: AsRef<Path>,
227{
228 let file_disk_size = get_file_size(&file_path)?;
229 let mut file_memory_size = 0;
230 let mut header = Vec::with_capacity(num_header_lines as usize);
231 let file = File::open(&file_path)?;
232 let reader = io::BufReader::new(file);
233 let mut num_lines: u64 = 0;
234 let mut header_done = false;
235
236 for line in reader.lines() {
237 num_lines += 1;
238 let line = line?;
239 let line_size = line.as_bytes().len() + NEW_LINE_BYTES;
240 file_memory_size += line_size;
241 if !header_done {
242 if num_lines <= num_header_lines as u64 {
243 header.push(line);
244 } else {
245 header_done = true;
246 }
247 }
248 }
249
250 let memory_over_disk_size_ratio = file_memory_size as f64 / file_disk_size as f64;
251 Ok((
253 (max_file_size_bytes as f64 * memory_over_disk_size_ratio) as u64,
254 header,
255 ))
256}
257
258fn get_slice_bytes(s: &[String]) -> u64 {
259 let mut slice_bytes: u64 = 0;
260 for line in s {
261 slice_bytes += line.as_bytes().len() as u64;
262 slice_bytes += NEW_LINE_BYTES as u64;
263 }
264 slice_bytes
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn test_split() {
273 let input_file = "./test/test.csv";
274 let output_dir = "./test/results";
275 let num_header_lines = 1;
276 let max_file_size_bytes = 4000;
277 let _ = split(
278 input_file,
279 max_file_size_bytes,
280 num_header_lines,
281 output_dir,
282 );
283 assert_eq!(1, 1); }
285}