1use std::fs::{self, File};
2use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
3use std::path::{Path, PathBuf};
4use std::process::{Command, Stdio};
5
6use crate::common::io::FileData;
7
8#[cfg(unix)]
9use rayon::prelude::*;
10
11#[derive(Clone, Debug, PartialEq)]
13pub enum SuffixType {
14 Alphabetic,
16 Numeric(u64),
18 Hex(u64),
20}
21
22#[derive(Clone, Debug)]
24pub enum SplitMode {
25 Lines(u64),
27 Bytes(u64),
29 LineBytes(u64),
31 Number(u64),
33 NumberExtract(u64, u64),
35 LineChunks(u64),
37 LineChunkExtract(u64, u64),
39 RoundRobin(u64),
41 RoundRobinExtract(u64, u64),
43}
44
45#[derive(Clone, Debug)]
47pub struct SplitConfig {
48 pub mode: SplitMode,
49 pub suffix_type: SuffixType,
50 pub suffix_length: usize,
51 pub additional_suffix: String,
52 pub prefix: String,
53 pub elide_empty: bool,
54 pub verbose: bool,
55 pub filter: Option<String>,
56 pub separator: u8,
57}
58
59impl Default for SplitConfig {
60 fn default() -> Self {
61 Self {
62 mode: SplitMode::Lines(1000),
63 suffix_type: SuffixType::Alphabetic,
64 suffix_length: 2,
65 additional_suffix: String::new(),
66 prefix: "x".to_string(),
67 elide_empty: false,
68 verbose: false,
69 filter: None,
70 separator: b'\n',
71 }
72 }
73}
74
75pub fn parse_size(s: &str) -> Result<u64, String> {
80 let s = s.trim();
81 if s.is_empty() {
82 return Err("empty size".to_string());
83 }
84
85 let mut num_end = 0;
87 for (i, c) in s.char_indices() {
88 if c.is_ascii_digit() || (i == 0 && (c == '+' || c == '-')) {
89 num_end = i + c.len_utf8();
90 } else {
91 break;
92 }
93 }
94
95 if num_end == 0 {
96 return Err(format!("invalid number: '{}'", s));
97 }
98
99 let num_str = &s[..num_end];
100 let suffix = &s[num_end..];
101
102 let num: u64 = num_str
103 .parse()
104 .map_err(|_| format!("invalid number: '{}'", num_str))?;
105
106 let multiplier: u64 = match suffix {
107 "" => 1,
108 "b" => 512,
109 "kB" | "KB" => 1000,
110 "k" | "K" | "KiB" => 1024,
111 "MB" => 1_000_000,
112 "m" | "M" | "MiB" => 1_048_576,
113 "GB" => 1_000_000_000,
114 "g" | "G" | "GiB" => 1_073_741_824,
115 "TB" => 1_000_000_000_000,
116 "t" | "T" | "TiB" => 1_099_511_627_776,
117 "PB" => 1_000_000_000_000_000,
118 "p" | "P" | "PiB" => 1_125_899_906_842_624,
119 "EB" => 1_000_000_000_000_000_000,
120 "e" | "E" | "EiB" => 1_152_921_504_606_846_976,
121 "ZB" | "z" | "Z" | "ZiB" | "YB" | "y" | "Y" | "YiB" => {
122 if num > 0 {
123 return Ok(u64::MAX);
124 }
125 return Ok(0);
126 }
127 _ => return Err(format!("invalid suffix in '{}'", s)),
128 };
129
130 num.checked_mul(multiplier)
131 .ok_or_else(|| format!("number too large: '{}'", s))
132}
133
134pub fn generate_suffix(index: u64, suffix_type: &SuffixType, suffix_length: usize) -> String {
136 match suffix_type {
137 SuffixType::Alphabetic => {
138 let mut result = Vec::with_capacity(suffix_length);
139 let mut remaining = index;
140 for _ in 0..suffix_length {
141 result.push(b'a' + (remaining % 26) as u8);
142 remaining /= 26;
143 }
144 result.reverse();
145 String::from_utf8(result).unwrap()
146 }
147 SuffixType::Numeric(start) => {
148 let val = start + index;
149 format!("{:0>width$}", val, width = suffix_length)
150 }
151 SuffixType::Hex(start) => {
152 let val = start + index;
153 format!("{:0>width$x}", val, width = suffix_length)
154 }
155 }
156}
157
158pub fn max_chunks(suffix_type: &SuffixType, suffix_length: usize) -> u64 {
160 match suffix_type {
161 SuffixType::Alphabetic => 26u64.saturating_pow(suffix_length as u32),
162 SuffixType::Numeric(_) | SuffixType::Hex(_) => 10u64.saturating_pow(suffix_length as u32),
163 }
164}
165
166fn output_path(config: &SplitConfig, index: u64) -> String {
168 let suffix = generate_suffix(index, &config.suffix_type, config.suffix_length);
169 format!("{}{}{}", config.prefix, suffix, config.additional_suffix)
170}
171
172trait ChunkWriter: Write {
174 fn finish(&mut self) -> io::Result<()>;
175}
176
177struct FileChunkWriter {
179 writer: BufWriter<File>,
180}
181
182impl FileChunkWriter {
183 fn create(path: &str) -> io::Result<Self> {
184 let file = File::create(path)?;
185 Ok(Self {
186 writer: BufWriter::with_capacity(1024 * 1024, file), })
188 }
189}
190
191impl Write for FileChunkWriter {
192 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
193 self.writer.write(buf)
194 }
195
196 fn flush(&mut self) -> io::Result<()> {
197 self.writer.flush()
198 }
199}
200
201impl ChunkWriter for FileChunkWriter {
202 fn finish(&mut self) -> io::Result<()> {
203 self.writer.flush()
204 }
205}
206
207struct FilterChunkWriter {
209 child: std::process::Child,
210 _stdin_taken: bool,
211}
212
213impl FilterChunkWriter {
214 fn create(filter_cmd: &str, output_path: &str) -> io::Result<Self> {
215 let child = Command::new("sh")
216 .arg("-c")
217 .arg(filter_cmd)
218 .env("FILE", output_path)
219 .stdin(Stdio::piped())
220 .spawn()?;
221 Ok(Self {
222 child,
223 _stdin_taken: false,
224 })
225 }
226}
227
228impl Write for FilterChunkWriter {
229 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
230 if let Some(ref mut stdin) = self.child.stdin {
231 stdin.write(buf)
232 } else {
233 Err(io::Error::new(io::ErrorKind::BrokenPipe, "stdin closed"))
234 }
235 }
236
237 fn flush(&mut self) -> io::Result<()> {
238 if let Some(ref mut stdin) = self.child.stdin {
239 stdin.flush()
240 } else {
241 Ok(())
242 }
243 }
244}
245
246impl ChunkWriter for FilterChunkWriter {
247 fn finish(&mut self) -> io::Result<()> {
248 self.child.stdin.take();
250 let status = self.child.wait()?;
251 if !status.success() {
252 return Err(io::Error::other(format!(
253 "filter command exited with status {}",
254 status
255 )));
256 }
257 Ok(())
258 }
259}
260
261fn create_writer(config: &SplitConfig, index: u64) -> io::Result<Box<dyn ChunkWriter>> {
263 let path = output_path(config, index);
264 if config.verbose {
265 eprintln!("creating file '{}'", path);
266 }
267 if let Some(ref filter_cmd) = config.filter {
268 Ok(Box::new(FilterChunkWriter::create(filter_cmd, &path)?))
269 } else {
270 Ok(Box::new(FileChunkWriter::create(&path)?))
271 }
272}
273
274fn split_by_lines(
278 reader: &mut dyn BufRead,
279 config: &SplitConfig,
280 lines_per_chunk: u64,
281) -> io::Result<()> {
282 let limit = max_chunks(&config.suffix_type, config.suffix_length);
283 let mut chunk_index: u64 = 0;
284 let mut lines_in_chunk: u64 = 0;
285 let mut writer: Option<Box<dyn ChunkWriter>> = None;
286 let sep = config.separator;
287
288 loop {
289 let available = match reader.fill_buf() {
290 Ok([]) => break,
291 Ok(b) => b,
292 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
293 Err(e) => return Err(e),
294 };
295
296 let mut pos = 0;
297 let buf_len = available.len();
298
299 while pos < buf_len {
300 if writer.is_none() {
301 if chunk_index >= limit {
302 return Err(io::Error::other("output file suffixes exhausted"));
303 }
304 writer = Some(create_writer(config, chunk_index)?);
305 lines_in_chunk = 0;
306 }
307
308 let lines_needed = lines_per_chunk - lines_in_chunk;
310 let slice = &available[pos..];
311
312 let mut found = 0u64;
315 let mut last_sep_end = 0;
316
317 for offset in memchr::memchr_iter(sep, slice) {
318 found += 1;
319 last_sep_end = offset + 1;
320 if found >= lines_needed {
321 break;
322 }
323 }
324
325 if found >= lines_needed {
326 writer.as_mut().unwrap().write_all(&slice[..last_sep_end])?;
328 pos += last_sep_end;
329 writer.as_mut().unwrap().finish()?;
331 writer = None;
332 chunk_index += 1;
333 } else {
334 writer.as_mut().unwrap().write_all(slice)?;
336 lines_in_chunk += found;
337 pos = buf_len;
338 }
339 }
340
341 let consumed = buf_len;
342 reader.consume(consumed);
343 }
344
345 if let Some(ref mut w) = writer {
347 w.finish()?;
348 }
349
350 Ok(())
351}
352
353fn split_by_bytes(
355 reader: &mut dyn Read,
356 config: &SplitConfig,
357 bytes_per_chunk: u64,
358) -> io::Result<()> {
359 let limit = max_chunks(&config.suffix_type, config.suffix_length);
360 let mut chunk_index: u64 = 0;
361 let mut bytes_in_chunk: u64 = 0;
362 let mut writer: Option<Box<dyn ChunkWriter>> = None;
363
364 let mut read_buf = vec![0u8; 1024 * 1024]; loop {
366 let bytes_read = match reader.read(&mut read_buf) {
367 Ok(0) => break,
368 Ok(n) => n,
369 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
370 Err(e) => return Err(e),
371 };
372
373 let mut offset = 0usize;
374 while offset < bytes_read {
375 if writer.is_none() {
376 if chunk_index >= limit {
377 return Err(io::Error::other("output file suffixes exhausted"));
378 }
379 writer = Some(create_writer(config, chunk_index)?);
380 bytes_in_chunk = 0;
381 }
382
383 let remaining_in_chunk = (bytes_per_chunk - bytes_in_chunk) as usize;
384 let remaining_in_buf = bytes_read - offset;
385 let to_write = remaining_in_chunk.min(remaining_in_buf);
386
387 writer
388 .as_mut()
389 .unwrap()
390 .write_all(&read_buf[offset..offset + to_write])?;
391 bytes_in_chunk += to_write as u64;
392 offset += to_write;
393
394 if bytes_in_chunk >= bytes_per_chunk {
395 writer.as_mut().unwrap().finish()?;
396 writer = None;
397 chunk_index += 1;
398 }
399 }
400 }
401
402 if let Some(ref mut w) = writer {
403 if config.elide_empty && bytes_in_chunk == 0 {
404 w.finish()?;
405 let path = output_path(config, chunk_index);
407 let _ = fs::remove_file(&path);
408 } else {
409 w.finish()?;
410 }
411 }
412
413 Ok(())
414}
415
416fn split_by_line_bytes(
421 reader: &mut dyn Read,
422 config: &SplitConfig,
423 max_bytes: u64,
424) -> io::Result<()> {
425 let limit = max_chunks(&config.suffix_type, config.suffix_length);
426 let max = max_bytes as usize;
427 let sep = config.separator;
428
429 let mut data = Vec::new();
431 reader.read_to_end(&mut data)?;
432
433 if data.is_empty() {
434 return Ok(());
435 }
436
437 let total = data.len();
438 let mut chunk_index: u64 = 0;
439 let mut offset = 0;
440
441 while offset < total {
442 if chunk_index >= limit {
443 return Err(io::Error::other("output file suffixes exhausted"));
444 }
445
446 let remaining = total - offset;
447 let window = remaining.min(max);
448 let slice = &data[offset..offset + window];
449
450 let end = if remaining < max {
456 offset + window
457 } else if let Some(pos) = memchr::memrchr(sep, slice) {
458 offset + pos + 1
460 } else {
461 offset + window
463 };
464
465 let chunk_data = &data[offset..end];
466
467 let mut writer = create_writer(config, chunk_index)?;
468 writer.write_all(chunk_data)?;
469 writer.finish()?;
470
471 offset = end;
472 chunk_index += 1;
473 }
474
475 Ok(())
476}
477
478fn split_by_number(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
481 let limit = max_chunks(&config.suffix_type, config.suffix_length);
482 if n_chunks > limit {
483 return Err(io::Error::other("output file suffixes exhausted"));
484 }
485 if n_chunks == 0 {
486 return Err(io::Error::new(
487 io::ErrorKind::InvalidInput,
488 "invalid number of chunks: 0",
489 ));
490 }
491
492 let data: crate::common::io::FileData = if input_path == "-" {
494 let mut buf = Vec::new();
495 io::stdin().lock().read_to_end(&mut buf)?;
496 crate::common::io::FileData::Owned(buf)
497 } else {
498 crate::common::io::read_file(Path::new(input_path))?
499 };
500
501 let total = data.len() as u64;
502 let base_chunk_size = total / n_chunks;
503 let remainder = total % n_chunks;
504
505 let mut offset: u64 = 0;
506 for i in 0..n_chunks {
507 let chunk_size = base_chunk_size + if i < remainder { 1 } else { 0 };
509
510 if config.elide_empty && chunk_size == 0 {
511 continue;
512 }
513
514 let mut writer = create_writer(config, i)?;
515 if chunk_size > 0 {
516 let start = offset as usize;
517 let end = start + chunk_size as usize;
518 writer.write_all(&data[start..end])?;
519 }
520 writer.finish()?;
521 offset += chunk_size;
522 }
523
524 Ok(())
525}
526
527fn split_by_number_extract(input_path: &str, k: u64, n: u64) -> io::Result<()> {
529 let data: crate::common::io::FileData = if input_path == "-" {
530 let mut buf = Vec::new();
531 io::stdin().lock().read_to_end(&mut buf)?;
532 crate::common::io::FileData::Owned(buf)
533 } else {
534 crate::common::io::read_file(Path::new(input_path))?
535 };
536
537 let total = data.len() as u64;
538 let base_chunk_size = total / n;
539 let remainder = total % n;
540
541 let mut offset: u64 = 0;
542 for i in 0..n {
543 let chunk_size = base_chunk_size + if i < remainder { 1 } else { 0 };
544 if i + 1 == k {
545 if chunk_size > 0 {
546 let start = offset as usize;
547 let end = start + chunk_size as usize;
548 let stdout = io::stdout();
549 let mut out = stdout.lock();
550 out.write_all(&data[start..end])?;
551 }
552 return Ok(());
553 }
554 offset += chunk_size;
555 }
556 Ok(())
557}
558
559fn read_input_data(input_path: &str) -> io::Result<FileData> {
561 if input_path == "-" {
562 let mut buf = Vec::new();
563 io::stdin().lock().read_to_end(&mut buf)?;
564 Ok(FileData::Owned(buf))
565 } else {
566 let data = crate::common::io::read_file(Path::new(input_path))?;
567 Ok(data)
568 }
569}
570
571fn compute_line_chunk_boundaries(data: &[u8], n_chunks: u64, sep: u8) -> Vec<u64> {
576 let total = data.len() as u64;
577 let base_chunk_size = total / n_chunks;
578 let remainder = total % n_chunks;
579
580 let mut boundaries = Vec::with_capacity(n_chunks as usize);
582 let mut target_end: u64 = 0;
583 for i in 0..n_chunks {
584 target_end += base_chunk_size + if i < remainder { 1 } else { 0 };
585 boundaries.push(target_end);
586 }
587
588 let mut chunk_ends = Vec::with_capacity(n_chunks as usize);
590 let mut pos: u64 = 0;
591 let mut chunk_idx: u64 = 0;
592
593 for sep_pos in memchr::memchr_iter(sep, data) {
594 let line_end = sep_pos as u64 + 1; pos = line_end;
596
597 while chunk_idx < n_chunks && pos >= boundaries[chunk_idx as usize] {
599 chunk_ends.push(pos);
600 chunk_idx += 1;
601 }
602 }
603
604 if pos < total {
606 pos = total;
607 while chunk_idx < n_chunks && pos >= boundaries[chunk_idx as usize] {
608 chunk_ends.push(pos);
609 chunk_idx += 1;
610 }
611 }
612
613 while (chunk_ends.len() as u64) < n_chunks {
615 chunk_ends.push(pos);
616 }
617
618 chunk_ends
619}
620
621fn split_by_line_chunks(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
623 let data = read_input_data(input_path)?;
624 let sep = config.separator;
625
626 let chunk_ends = compute_line_chunk_boundaries(&data, n_chunks, sep);
627
628 let mut offset: u64 = 0;
629 for i in 0..n_chunks {
630 let end = chunk_ends[i as usize];
631 let chunk_size = end - offset;
632
633 if config.elide_empty && chunk_size == 0 {
634 continue;
635 }
636
637 let mut writer = create_writer(config, i)?;
638 if chunk_size > 0 {
639 writer.write_all(&data[offset as usize..end as usize])?;
640 }
641 writer.finish()?;
642 offset = end;
643 }
644 Ok(())
645}
646
647fn split_by_line_chunk_extract(
649 input_path: &str,
650 config: &SplitConfig,
651 k: u64,
652 n_chunks: u64,
653) -> io::Result<()> {
654 let data = read_input_data(input_path)?;
655 let sep = config.separator;
656
657 let chunk_ends = compute_line_chunk_boundaries(&data, n_chunks, sep);
658
659 let mut offset: u64 = 0;
660 for i in 0..n_chunks {
661 let end = chunk_ends[i as usize];
662 if i + 1 == k {
663 let chunk_size = end - offset;
664 if chunk_size > 0 {
665 let stdout = io::stdout();
666 let mut out = stdout.lock();
667 out.write_all(&data[offset as usize..end as usize])?;
668 }
669 return Ok(());
670 }
671 offset = end;
672 }
673 Ok(())
674}
675
676fn split_by_round_robin(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
678 let data = read_input_data(input_path)?;
679 let sep = config.separator;
680
681 let mut lines: Vec<&[u8]> = Vec::new();
683 let mut start = 0;
684 for pos in memchr::memchr_iter(sep, &data) {
685 lines.push(&data[start..=pos]);
686 start = pos + 1;
687 }
688 if start < data.len() {
689 lines.push(&data[start..]);
690 }
691
692 let mut writers: Vec<Option<Box<dyn ChunkWriter>>> = (0..n_chunks)
694 .map(|i| {
695 if config.elide_empty && lines.len() as u64 <= i {
696 None
697 } else {
698 Some(create_writer(config, i).unwrap())
699 }
700 })
701 .collect();
702
703 for (idx, line) in lines.iter().enumerate() {
705 let chunk_idx = (idx as u64) % n_chunks;
706 if let Some(ref mut writer) = writers[chunk_idx as usize] {
707 writer.write_all(line)?;
708 }
709 }
710
711 for writer in &mut writers {
713 if let Some(mut w) = writer.take() {
714 w.finish()?;
715 }
716 }
717
718 Ok(())
719}
720
721fn split_by_round_robin_extract(input_path: &str, k: u64, n: u64) -> io::Result<()> {
723 let data = read_input_data(input_path)?;
724 let sep = b'\n';
725
726 let stdout = io::stdout();
727 let mut out = stdout.lock();
728
729 let mut start = 0;
730 let mut line_idx: u64 = 0;
731 for pos in memchr::memchr_iter(sep, &data) {
732 if line_idx % n == k - 1 {
733 out.write_all(&data[start..=pos])?;
734 }
735 start = pos + 1;
736 line_idx += 1;
737 }
738 if start < data.len() && line_idx % n == k - 1 {
739 out.write_all(&data[start..])?;
740 }
741
742 Ok(())
743}
744
745#[cfg(unix)]
749fn split_lines_streaming_fast(
750 file: &File,
751 config: &SplitConfig,
752 lines_per_chunk: u64,
753) -> io::Result<()> {
754 use std::os::unix::io::AsRawFd;
755
756 let in_fd = file.as_raw_fd();
757 #[cfg(target_os = "linux")]
759 unsafe {
760 libc::posix_fadvise(in_fd, 0, 0, libc::POSIX_FADV_SEQUENTIAL);
761 }
762
763 let limit = max_chunks(&config.suffix_type, config.suffix_length);
764 let sep = config.separator;
765
766 const BUF_SIZE: usize = 256 * 1024; let mut buf = vec![0u8; BUF_SIZE];
768 let mut chunk_index: u64 = 0;
769 let mut lines_in_chunk: u64 = 0;
770 let mut out_fd: i32 = -1;
771 let mut _out_file: Option<File> = None; let raw_write_all = |fd: i32, mut data: &[u8]| -> io::Result<()> {
775 while !data.is_empty() {
776 let ret =
777 unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, data.len() as _) };
778 if ret > 0 {
779 data = &data[ret as usize..];
780 } else if ret == 0 {
781 return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
782 } else {
783 let err = io::Error::last_os_error();
784 if err.kind() == io::ErrorKind::Interrupted {
785 continue;
786 }
787 return Err(err);
788 }
789 }
790 Ok(())
791 };
792
793 loop {
794 let n = unsafe { libc::read(in_fd, buf.as_mut_ptr() as *mut libc::c_void, BUF_SIZE as _) };
795 if n == 0 {
796 break;
797 }
798 if n < 0 {
799 let err = io::Error::last_os_error();
800 if err.kind() == io::ErrorKind::Interrupted {
801 continue;
802 }
803 return Err(err);
804 }
805 let n = n as usize;
806
807 let data = &buf[..n];
808 let mut pos = 0;
809
810 while pos < n {
811 if out_fd < 0 {
812 if chunk_index >= limit {
813 return Err(io::Error::other("output file suffixes exhausted"));
814 }
815 let path = output_path(config, chunk_index);
816 if config.verbose {
817 eprintln!("creating file '{}'", path);
818 }
819 let file = File::create(path)?;
820 out_fd = file.as_raw_fd();
821 _out_file = Some(file);
822 lines_in_chunk = 0;
823 }
824
825 let slice = &data[pos..];
826 let lines_needed = lines_per_chunk - lines_in_chunk;
827 let mut found = 0u64;
828 let mut last_sep_end = 0;
829
830 for offset in memchr::memchr_iter(sep, slice) {
831 found += 1;
832 last_sep_end = offset + 1;
833 if found >= lines_needed {
834 break;
835 }
836 }
837
838 if found >= lines_needed {
839 raw_write_all(out_fd, &slice[..last_sep_end])?;
840 pos += last_sep_end;
841 _out_file = None;
842 out_fd = -1;
843 chunk_index += 1;
844 } else {
845 raw_write_all(out_fd, slice)?;
846 lines_in_chunk += found;
847 pos = n;
848 }
849 }
850 }
851
852 Ok(())
853}
854
855#[cfg(target_os = "linux")]
859fn split_bytes_zero_copy(
860 input_fd: std::os::unix::io::RawFd,
861 file_size: u64,
862 config: &SplitConfig,
863 bytes_per_chunk: u64,
864) -> io::Result<()> {
865 use std::os::unix::io::AsRawFd;
866 let limit = max_chunks(&config.suffix_type, config.suffix_length);
867 let chunk_size = bytes_per_chunk as usize;
868 let total = file_size as usize;
869
870 let mut chunks: Vec<(usize, usize)> = Vec::new();
872 let mut off = 0usize;
873 while off < total {
874 if chunks.len() as u64 >= limit {
875 return Err(io::Error::other("output file suffixes exhausted"));
876 }
877 let remaining = total - off;
878 let sz = remaining.min(chunk_size);
879 chunks.push((off, sz));
880 off += sz;
881 }
882
883 if chunks.is_empty() {
884 return Ok(());
885 }
886
887 let paths: Vec<String> = (0..chunks.len())
889 .map(|i| output_path(config, i as u64))
890 .collect();
891
892 if config.verbose {
893 for path in &paths {
894 eprintln!("creating file '{}'", path);
895 }
896 }
897
898 let copy_chunk =
900 |input_fd: i32, chunk_offset: usize, chunk_len: usize, path: &str| -> io::Result<()> {
901 let out_file = File::create(path)?;
902 let out_fd = out_file.as_raw_fd();
903
904 let mut off_in = chunk_offset as i64;
905 let mut copied = 0usize;
906 while copied < chunk_len {
907 let n = unsafe {
908 libc::copy_file_range(
909 input_fd,
910 &mut off_in as *mut i64 as *mut libc::off64_t,
911 out_fd,
912 std::ptr::null_mut(),
913 chunk_len - copied,
914 0,
915 )
916 };
917 if n > 0 {
918 copied += n as usize;
919 } else if n == 0 {
920 break;
921 } else {
922 let err = io::Error::last_os_error();
923 if err.raw_os_error() == Some(libc::EINTR) {
924 continue;
925 }
926 while copied < chunk_len {
928 let n = unsafe {
929 libc::sendfile(
930 out_fd,
931 input_fd,
932 &mut off_in as *mut i64 as *mut libc::off_t,
933 chunk_len - copied,
934 )
935 };
936 if n > 0 {
937 copied += n as usize;
938 } else if n == 0 {
939 break;
940 } else {
941 let err2 = io::Error::last_os_error();
942 if err2.raw_os_error() == Some(libc::EINTR) {
943 continue;
944 }
945 return Err(err2);
946 }
947 }
948 break;
949 }
950 }
951 Ok(())
952 };
953
954 if chunks.len() >= 4 && !config.verbose {
956 chunks.par_iter().zip(paths.par_iter()).try_for_each(
957 |(&(chunk_off, chunk_len), path)| copy_chunk(input_fd, chunk_off, chunk_len, path),
958 )?;
959 } else {
960 for (i, &(chunk_off, chunk_len)) in chunks.iter().enumerate() {
961 copy_chunk(input_fd, chunk_off, chunk_len, &paths[i])?;
962 }
963 }
964
965 Ok(())
966}
967
968#[cfg(all(unix, not(target_os = "linux")))]
972fn split_bytes_preloaded(
973 data: &[u8],
974 config: &SplitConfig,
975 bytes_per_chunk: u64,
976) -> io::Result<()> {
977 let limit = max_chunks(&config.suffix_type, config.suffix_length);
978 let chunk_size = bytes_per_chunk as usize;
979
980 let mut chunks: Vec<(usize, usize)> = Vec::new();
982 let mut offset = 0;
983 while offset < data.len() {
984 if chunks.len() as u64 >= limit {
985 return Err(io::Error::other("output file suffixes exhausted"));
986 }
987 let end = (offset + chunk_size).min(data.len());
988 chunks.push((offset, end));
989 offset = end;
990 }
991
992 if chunks.is_empty() {
993 return Ok(());
994 }
995
996 let paths: Vec<String> = chunks
998 .iter()
999 .enumerate()
1000 .map(|(i, _)| output_path(config, i as u64))
1001 .collect();
1002
1003 if config.verbose {
1004 for path in &paths {
1005 eprintln!("creating file '{}'", path);
1006 }
1007 }
1008
1009 if chunks.len() >= 4 && !config.verbose {
1011 chunks.par_iter().zip(paths.par_iter()).try_for_each(
1012 |(&(start, end), path)| -> io::Result<()> {
1013 let mut file = File::create(path)?;
1014 file.write_all(&data[start..end])?;
1015 Ok(())
1016 },
1017 )?;
1018 } else {
1019 for (i, &(start, end)) in chunks.iter().enumerate() {
1020 let mut file = File::create(&paths[i])?;
1021 file.write_all(&data[start..end])?;
1022 }
1023 }
1024
1025 Ok(())
1026}
1027
1028#[cfg(unix)]
1030fn split_line_bytes_preloaded(data: &[u8], config: &SplitConfig, max_bytes: u64) -> io::Result<()> {
1031 let limit = max_chunks(&config.suffix_type, config.suffix_length);
1032 let max = max_bytes as usize;
1033 let sep = config.separator;
1034
1035 let mut chunks: Vec<(usize, usize)> = Vec::new();
1036 let mut offset = 0;
1037
1038 while offset < data.len() {
1039 if chunks.len() as u64 >= limit {
1040 return Err(io::Error::other("output file suffixes exhausted"));
1041 }
1042 let remaining = data.len() - offset;
1043 let window = remaining.min(max);
1044 let slice = &data[offset..offset + window];
1045
1046 let end = if remaining < max {
1047 offset + window
1049 } else if let Some(pos) = memchr::memrchr(sep, slice) {
1050 offset + pos + 1
1051 } else {
1052 offset + window
1053 };
1054
1055 chunks.push((offset, end));
1056 offset = end;
1057 }
1058
1059 if chunks.is_empty() {
1060 return Ok(());
1061 }
1062
1063 let paths: Vec<String> = chunks
1064 .iter()
1065 .enumerate()
1066 .map(|(i, _)| output_path(config, i as u64))
1067 .collect();
1068
1069 if config.verbose {
1070 for path in &paths {
1071 eprintln!("creating file '{}'", path);
1072 }
1073 }
1074
1075 if chunks.len() >= 4 && !config.verbose {
1076 chunks.par_iter().zip(paths.par_iter()).try_for_each(
1077 |(&(start, end), path)| -> io::Result<()> {
1078 let mut file = File::create(path)?;
1079 file.write_all(&data[start..end])?;
1080 Ok(())
1081 },
1082 )?;
1083 } else {
1084 for (i, &(start, end)) in chunks.iter().enumerate() {
1085 let mut file = File::create(&paths[i])?;
1086 file.write_all(&data[start..end])?;
1087 }
1088 }
1089
1090 Ok(())
1091}
1092
1093pub fn split_file(input_path: &str, config: &SplitConfig) -> io::Result<()> {
1096 if let SplitMode::Number(n) = config.mode {
1098 return split_by_number(input_path, config, n);
1099 }
1100 if let SplitMode::NumberExtract(k, n) = config.mode {
1101 return split_by_number_extract(input_path, k, n);
1102 }
1103 if let SplitMode::LineChunks(n) = config.mode {
1104 return split_by_line_chunks(input_path, config, n);
1105 }
1106 if let SplitMode::LineChunkExtract(k, n) = config.mode {
1107 return split_by_line_chunk_extract(input_path, config, k, n);
1108 }
1109 if let SplitMode::RoundRobin(n) = config.mode {
1110 return split_by_round_robin(input_path, config, n);
1111 }
1112 if let SplitMode::RoundRobinExtract(k, n) = config.mode {
1113 return split_by_round_robin_extract(input_path, k, n);
1114 }
1115
1116 #[cfg(unix)]
1120 if let SplitMode::Lines(n) = config.mode {
1121 if input_path != "-" && config.filter.is_none() {
1122 if let Ok(file) = File::open(input_path) {
1123 if let Ok(meta) = file.metadata() {
1124 if meta.file_type().is_file() && meta.len() > 0 {
1125 return split_lines_streaming_fast(&file, config, n);
1126 }
1127 }
1128 }
1129 }
1130 }
1131
1132 #[cfg(target_os = "linux")]
1135 if let SplitMode::Bytes(bytes_per_chunk) = config.mode {
1136 if input_path != "-" && config.filter.is_none() {
1137 if let Ok(file) = File::open(input_path) {
1138 if let Ok(meta) = file.metadata() {
1139 if meta.file_type().is_file() && meta.len() > 0 {
1140 use std::os::unix::io::AsRawFd;
1141 unsafe {
1142 libc::posix_fadvise(
1143 file.as_raw_fd(),
1144 0,
1145 0,
1146 libc::POSIX_FADV_SEQUENTIAL,
1147 );
1148 }
1149 return split_bytes_zero_copy(
1150 file.as_raw_fd(),
1151 meta.len(),
1152 config,
1153 bytes_per_chunk,
1154 );
1155 }
1156 }
1157 }
1158 }
1159 }
1160
1161 #[cfg(all(unix, not(target_os = "linux")))]
1164 if let SplitMode::Bytes(bytes_per_chunk) = config.mode {
1165 if input_path != "-" && config.filter.is_none() {
1166 const FAST_PATH_LIMIT: u64 = 512 * 1024 * 1024;
1167 if let Ok(file) = File::open(input_path) {
1168 if let Ok(meta) = file.metadata() {
1169 if meta.file_type().is_file() && meta.len() <= FAST_PATH_LIMIT && meta.len() > 0
1170 {
1171 if let Ok(mmap) = unsafe { memmap2::MmapOptions::new().map(&file) } {
1172 let _ = mmap.advise(memmap2::Advice::Sequential);
1173 #[cfg(not(target_os = "linux"))]
1174 {
1175 let _ = mmap.advise(memmap2::Advice::WillNeed);
1176 }
1177 return split_bytes_preloaded(&mmap, config, bytes_per_chunk);
1178 }
1179 }
1180 }
1181 }
1182 }
1183 }
1184
1185 #[cfg(unix)]
1187 if let SplitMode::LineBytes(max_bytes) = config.mode {
1188 if input_path != "-" && config.filter.is_none() {
1189 const FAST_PATH_LIMIT: u64 = 512 * 1024 * 1024;
1190 if let Ok(file) = File::open(input_path) {
1191 if let Ok(meta) = file.metadata() {
1192 if meta.file_type().is_file() && meta.len() <= FAST_PATH_LIMIT && meta.len() > 0
1193 {
1194 if let Ok(mmap) = unsafe { memmap2::MmapOptions::new().map(&file) } {
1195 let _ = mmap.advise(memmap2::Advice::Sequential);
1196 #[cfg(target_os = "linux")]
1197 {
1198 let len = mmap.len();
1199 if len >= 2 * 1024 * 1024 {
1200 let _ = mmap.advise(memmap2::Advice::HugePage);
1201 }
1202 if len >= 4 * 1024 * 1024 {
1203 if mmap.advise(memmap2::Advice::PopulateRead).is_err() {
1204 let _ = mmap.advise(memmap2::Advice::WillNeed);
1205 }
1206 } else {
1207 let _ = mmap.advise(memmap2::Advice::WillNeed);
1208 }
1209 }
1210 #[cfg(not(target_os = "linux"))]
1211 {
1212 let _ = mmap.advise(memmap2::Advice::WillNeed);
1213 }
1214 return split_line_bytes_preloaded(&mmap, config, max_bytes);
1215 }
1216 }
1217 }
1218 }
1219 }
1220 }
1221
1222 let reader: Box<dyn Read> = if input_path == "-" {
1224 Box::new(io::stdin().lock())
1225 } else {
1226 let path = Path::new(input_path);
1227 if !path.exists() {
1228 return Err(io::Error::new(
1229 io::ErrorKind::NotFound,
1230 format!(
1231 "cannot open '{}' for reading: No such file or directory",
1232 input_path
1233 ),
1234 ));
1235 }
1236 let file = File::open(path)?;
1237 #[cfg(target_os = "linux")]
1239 {
1240 use std::os::unix::io::AsRawFd;
1241 unsafe {
1242 libc::posix_fadvise(file.as_raw_fd(), 0, 0, libc::POSIX_FADV_SEQUENTIAL);
1243 }
1244 }
1245 Box::new(file)
1246 };
1247
1248 match config.mode {
1249 SplitMode::Lines(n) => {
1250 let mut buf_reader = BufReader::with_capacity(1024 * 1024, reader);
1251 split_by_lines(&mut buf_reader, config, n)
1252 }
1253 SplitMode::Bytes(n) => {
1254 let mut reader = reader;
1255 split_by_bytes(&mut reader, config, n)
1256 }
1257 SplitMode::LineBytes(n) => {
1258 let mut reader = reader;
1259 split_by_line_bytes(&mut reader, config, n)
1260 }
1261 SplitMode::Number(_)
1262 | SplitMode::NumberExtract(_, _)
1263 | SplitMode::LineChunks(_)
1264 | SplitMode::LineChunkExtract(_, _)
1265 | SplitMode::RoundRobin(_)
1266 | SplitMode::RoundRobinExtract(_, _) => unreachable!(),
1267 }
1268}
1269
1270pub fn output_paths(config: &SplitConfig, count: u64) -> Vec<PathBuf> {
1272 (0..count)
1273 .map(|i| PathBuf::from(output_path(config, i)))
1274 .collect()
1275}