1use std::fs::{self, File};
2use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
3use std::path::{Path, PathBuf};
4use std::process::{Command, Stdio};
5
6#[cfg(unix)]
7use rayon::prelude::*;
8
9#[derive(Clone, Debug, PartialEq)]
11pub enum SuffixType {
12 Alphabetic,
14 Numeric(u64),
16 Hex(u64),
18}
19
20#[derive(Clone, Debug)]
22pub enum SplitMode {
23 Lines(u64),
25 Bytes(u64),
27 LineBytes(u64),
29 Number(u64),
31 NumberExtract(u64, u64),
33 LineChunks(u64),
35 LineChunkExtract(u64, u64),
37 RoundRobin(u64),
39 RoundRobinExtract(u64, u64),
41}
42
43#[derive(Clone, Debug)]
45pub struct SplitConfig {
46 pub mode: SplitMode,
47 pub suffix_type: SuffixType,
48 pub suffix_length: usize,
49 pub additional_suffix: String,
50 pub prefix: String,
51 pub elide_empty: bool,
52 pub verbose: bool,
53 pub filter: Option<String>,
54 pub separator: u8,
55}
56
57impl Default for SplitConfig {
58 fn default() -> Self {
59 Self {
60 mode: SplitMode::Lines(1000),
61 suffix_type: SuffixType::Alphabetic,
62 suffix_length: 2,
63 additional_suffix: String::new(),
64 prefix: "x".to_string(),
65 elide_empty: false,
66 verbose: false,
67 filter: None,
68 separator: b'\n',
69 }
70 }
71}
72
73pub fn parse_size(s: &str) -> Result<u64, String> {
78 let s = s.trim();
79 if s.is_empty() {
80 return Err("empty size".to_string());
81 }
82
83 let mut num_end = 0;
85 for (i, c) in s.char_indices() {
86 if c.is_ascii_digit() || (i == 0 && (c == '+' || c == '-')) {
87 num_end = i + c.len_utf8();
88 } else {
89 break;
90 }
91 }
92
93 if num_end == 0 {
94 return Err(format!("invalid number: '{}'", s));
95 }
96
97 let num_str = &s[..num_end];
98 let suffix = &s[num_end..];
99
100 let num: u64 = num_str
101 .parse()
102 .map_err(|_| format!("invalid number: '{}'", num_str))?;
103
104 let multiplier: u64 = match suffix {
105 "" => 1,
106 "b" => 512,
107 "kB" | "KB" => 1000,
108 "k" | "K" | "KiB" => 1024,
109 "MB" => 1_000_000,
110 "m" | "M" | "MiB" => 1_048_576,
111 "GB" => 1_000_000_000,
112 "g" | "G" | "GiB" => 1_073_741_824,
113 "TB" => 1_000_000_000_000,
114 "t" | "T" | "TiB" => 1_099_511_627_776,
115 "PB" => 1_000_000_000_000_000,
116 "p" | "P" | "PiB" => 1_125_899_906_842_624,
117 "EB" => 1_000_000_000_000_000_000,
118 "e" | "E" | "EiB" => 1_152_921_504_606_846_976,
119 "ZB" | "z" | "Z" | "ZiB" | "YB" | "y" | "Y" | "YiB" => {
120 if num > 0 {
121 return Ok(u64::MAX);
122 }
123 return Ok(0);
124 }
125 _ => return Err(format!("invalid suffix in '{}'", s)),
126 };
127
128 num.checked_mul(multiplier)
129 .ok_or_else(|| format!("number too large: '{}'", s))
130}
131
132pub fn generate_suffix(index: u64, suffix_type: &SuffixType, suffix_length: usize) -> String {
134 match suffix_type {
135 SuffixType::Alphabetic => {
136 let mut result = Vec::with_capacity(suffix_length);
137 let mut remaining = index;
138 for _ in 0..suffix_length {
139 result.push(b'a' + (remaining % 26) as u8);
140 remaining /= 26;
141 }
142 result.reverse();
143 String::from_utf8(result).unwrap()
144 }
145 SuffixType::Numeric(start) => {
146 let val = start + index;
147 format!("{:0>width$}", val, width = suffix_length)
148 }
149 SuffixType::Hex(start) => {
150 let val = start + index;
151 format!("{:0>width$x}", val, width = suffix_length)
152 }
153 }
154}
155
156pub fn max_chunks(suffix_type: &SuffixType, suffix_length: usize) -> u64 {
158 match suffix_type {
159 SuffixType::Alphabetic => 26u64.saturating_pow(suffix_length as u32),
160 SuffixType::Numeric(_) | SuffixType::Hex(_) => 10u64.saturating_pow(suffix_length as u32),
161 }
162}
163
164fn output_path(config: &SplitConfig, index: u64) -> String {
166 let suffix = generate_suffix(index, &config.suffix_type, config.suffix_length);
167 format!("{}{}{}", config.prefix, suffix, config.additional_suffix)
168}
169
170trait ChunkWriter: Write {
172 fn finish(&mut self) -> io::Result<()>;
173}
174
175struct FileChunkWriter {
177 writer: BufWriter<File>,
178}
179
180impl FileChunkWriter {
181 fn create(path: &str) -> io::Result<Self> {
182 let file = File::create(path)?;
183 Ok(Self {
184 writer: BufWriter::with_capacity(1024 * 1024, file), })
186 }
187}
188
189impl Write for FileChunkWriter {
190 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
191 self.writer.write(buf)
192 }
193
194 fn flush(&mut self) -> io::Result<()> {
195 self.writer.flush()
196 }
197}
198
199impl ChunkWriter for FileChunkWriter {
200 fn finish(&mut self) -> io::Result<()> {
201 self.writer.flush()
202 }
203}
204
205struct FilterChunkWriter {
207 child: std::process::Child,
208 _stdin_taken: bool,
209}
210
211impl FilterChunkWriter {
212 fn create(filter_cmd: &str, output_path: &str) -> io::Result<Self> {
213 let child = Command::new("sh")
214 .arg("-c")
215 .arg(filter_cmd)
216 .env("FILE", output_path)
217 .stdin(Stdio::piped())
218 .spawn()?;
219 Ok(Self {
220 child,
221 _stdin_taken: false,
222 })
223 }
224}
225
226impl Write for FilterChunkWriter {
227 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
228 if let Some(ref mut stdin) = self.child.stdin {
229 stdin.write(buf)
230 } else {
231 Err(io::Error::new(io::ErrorKind::BrokenPipe, "stdin closed"))
232 }
233 }
234
235 fn flush(&mut self) -> io::Result<()> {
236 if let Some(ref mut stdin) = self.child.stdin {
237 stdin.flush()
238 } else {
239 Ok(())
240 }
241 }
242}
243
244impl ChunkWriter for FilterChunkWriter {
245 fn finish(&mut self) -> io::Result<()> {
246 self.child.stdin.take();
248 let status = self.child.wait()?;
249 if !status.success() {
250 return Err(io::Error::other(format!(
251 "filter command exited with status {}",
252 status
253 )));
254 }
255 Ok(())
256 }
257}
258
259fn create_writer(config: &SplitConfig, index: u64) -> io::Result<Box<dyn ChunkWriter>> {
261 let path = output_path(config, index);
262 if config.verbose {
263 eprintln!("creating file '{}'", path);
264 }
265 if let Some(ref filter_cmd) = config.filter {
266 Ok(Box::new(FilterChunkWriter::create(filter_cmd, &path)?))
267 } else {
268 Ok(Box::new(FileChunkWriter::create(&path)?))
269 }
270}
271
272fn split_by_lines(
276 reader: &mut dyn BufRead,
277 config: &SplitConfig,
278 lines_per_chunk: u64,
279) -> io::Result<()> {
280 let limit = max_chunks(&config.suffix_type, config.suffix_length);
281 let mut chunk_index: u64 = 0;
282 let mut lines_in_chunk: u64 = 0;
283 let mut writer: Option<Box<dyn ChunkWriter>> = None;
284 let sep = config.separator;
285
286 loop {
287 let available = match reader.fill_buf() {
288 Ok([]) => break,
289 Ok(b) => b,
290 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
291 Err(e) => return Err(e),
292 };
293
294 let mut pos = 0;
295 let buf_len = available.len();
296
297 while pos < buf_len {
298 if writer.is_none() {
299 if chunk_index >= limit {
300 return Err(io::Error::other("output file suffixes exhausted"));
301 }
302 writer = Some(create_writer(config, chunk_index)?);
303 lines_in_chunk = 0;
304 }
305
306 let lines_needed = lines_per_chunk - lines_in_chunk;
308 let slice = &available[pos..];
309
310 let mut found = 0u64;
313 let mut last_sep_end = 0;
314
315 for offset in memchr::memchr_iter(sep, slice) {
316 found += 1;
317 last_sep_end = offset + 1;
318 if found >= lines_needed {
319 break;
320 }
321 }
322
323 if found >= lines_needed {
324 writer.as_mut().unwrap().write_all(&slice[..last_sep_end])?;
326 pos += last_sep_end;
327 writer.as_mut().unwrap().finish()?;
329 writer = None;
330 chunk_index += 1;
331 } else {
332 writer.as_mut().unwrap().write_all(slice)?;
334 lines_in_chunk += found;
335 pos = buf_len;
336 }
337 }
338
339 let consumed = buf_len;
340 reader.consume(consumed);
341 }
342
343 if let Some(ref mut w) = writer {
345 w.finish()?;
346 }
347
348 Ok(())
349}
350
351fn split_by_bytes(
353 reader: &mut dyn Read,
354 config: &SplitConfig,
355 bytes_per_chunk: u64,
356) -> io::Result<()> {
357 let limit = max_chunks(&config.suffix_type, config.suffix_length);
358 let mut chunk_index: u64 = 0;
359 let mut bytes_in_chunk: u64 = 0;
360 let mut writer: Option<Box<dyn ChunkWriter>> = None;
361
362 let mut read_buf = vec![0u8; 1024 * 1024]; loop {
364 let bytes_read = match reader.read(&mut read_buf) {
365 Ok(0) => break,
366 Ok(n) => n,
367 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
368 Err(e) => return Err(e),
369 };
370
371 let mut offset = 0usize;
372 while offset < bytes_read {
373 if writer.is_none() {
374 if chunk_index >= limit {
375 return Err(io::Error::other("output file suffixes exhausted"));
376 }
377 writer = Some(create_writer(config, chunk_index)?);
378 bytes_in_chunk = 0;
379 }
380
381 let remaining_in_chunk = (bytes_per_chunk - bytes_in_chunk) as usize;
382 let remaining_in_buf = bytes_read - offset;
383 let to_write = remaining_in_chunk.min(remaining_in_buf);
384
385 writer
386 .as_mut()
387 .unwrap()
388 .write_all(&read_buf[offset..offset + to_write])?;
389 bytes_in_chunk += to_write as u64;
390 offset += to_write;
391
392 if bytes_in_chunk >= bytes_per_chunk {
393 writer.as_mut().unwrap().finish()?;
394 writer = None;
395 chunk_index += 1;
396 }
397 }
398 }
399
400 if let Some(ref mut w) = writer {
401 if config.elide_empty && bytes_in_chunk == 0 {
402 w.finish()?;
403 let path = output_path(config, chunk_index);
405 let _ = fs::remove_file(&path);
406 } else {
407 w.finish()?;
408 }
409 }
410
411 Ok(())
412}
413
414fn split_by_line_bytes(
419 reader: &mut dyn Read,
420 config: &SplitConfig,
421 max_bytes: u64,
422) -> io::Result<()> {
423 let limit = max_chunks(&config.suffix_type, config.suffix_length);
424 let max = max_bytes as usize;
425 let sep = config.separator;
426
427 let mut data = Vec::new();
429 reader.read_to_end(&mut data)?;
430
431 if data.is_empty() {
432 return Ok(());
433 }
434
435 let total = data.len();
436 let mut chunk_index: u64 = 0;
437 let mut offset = 0;
438
439 while offset < total {
440 if chunk_index >= limit {
441 return Err(io::Error::other("output file suffixes exhausted"));
442 }
443
444 let remaining = total - offset;
445 let window = remaining.min(max);
446 let slice = &data[offset..offset + window];
447
448 let end = if remaining < max {
454 offset + window
455 } else if let Some(pos) = memchr::memrchr(sep, slice) {
456 offset + pos + 1
458 } else {
459 offset + window
461 };
462
463 let chunk_data = &data[offset..end];
464
465 let mut writer = create_writer(config, chunk_index)?;
466 writer.write_all(chunk_data)?;
467 writer.finish()?;
468
469 offset = end;
470 chunk_index += 1;
471 }
472
473 Ok(())
474}
475
476fn split_by_number(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
479 let limit = max_chunks(&config.suffix_type, config.suffix_length);
480 if n_chunks > limit {
481 return Err(io::Error::other("output file suffixes exhausted"));
482 }
483 if n_chunks == 0 {
484 return Err(io::Error::new(
485 io::ErrorKind::InvalidInput,
486 "invalid number of chunks: 0",
487 ));
488 }
489
490 let data: crate::common::io::FileData = if input_path == "-" {
492 let mut buf = Vec::new();
493 io::stdin().lock().read_to_end(&mut buf)?;
494 crate::common::io::FileData::Owned(buf)
495 } else {
496 crate::common::io::read_file(Path::new(input_path))?
497 };
498
499 let total = data.len() as u64;
500 let base_chunk_size = total / n_chunks;
501 let remainder = total % n_chunks;
502
503 let mut offset: u64 = 0;
504 for i in 0..n_chunks {
505 let chunk_size = base_chunk_size + if i < remainder { 1 } else { 0 };
507
508 if config.elide_empty && chunk_size == 0 {
509 continue;
510 }
511
512 let mut writer = create_writer(config, i)?;
513 if chunk_size > 0 {
514 let start = offset as usize;
515 let end = start + chunk_size as usize;
516 writer.write_all(&data[start..end])?;
517 }
518 writer.finish()?;
519 offset += chunk_size;
520 }
521
522 Ok(())
523}
524
525fn split_by_number_extract(input_path: &str, k: u64, n: u64) -> io::Result<()> {
527 let data: crate::common::io::FileData = if input_path == "-" {
528 let mut buf = Vec::new();
529 io::stdin().lock().read_to_end(&mut buf)?;
530 crate::common::io::FileData::Owned(buf)
531 } else {
532 crate::common::io::read_file(Path::new(input_path))?
533 };
534
535 let total = data.len() as u64;
536 let base_chunk_size = total / n;
537 let remainder = total % n;
538
539 let mut offset: u64 = 0;
540 for i in 0..n {
541 let chunk_size = base_chunk_size + if i < remainder { 1 } else { 0 };
542 if i + 1 == k {
543 if chunk_size > 0 {
544 let start = offset as usize;
545 let end = start + chunk_size as usize;
546 let stdout = io::stdout();
547 let mut out = stdout.lock();
548 out.write_all(&data[start..end])?;
549 }
550 return Ok(());
551 }
552 offset += chunk_size;
553 }
554 Ok(())
555}
556
557fn read_input_data(input_path: &str) -> io::Result<Vec<u8>> {
559 if input_path == "-" {
560 let mut buf = Vec::new();
561 io::stdin().lock().read_to_end(&mut buf)?;
562 Ok(buf)
563 } else {
564 let data = crate::common::io::read_file(Path::new(input_path))?;
565 Ok(data.to_vec())
566 }
567}
568
569fn compute_line_chunk_boundaries(data: &[u8], n_chunks: u64, sep: u8) -> Vec<u64> {
574 let total = data.len() as u64;
575 let base_chunk_size = total / n_chunks;
576 let remainder = total % n_chunks;
577
578 let mut boundaries = Vec::with_capacity(n_chunks as usize);
580 let mut target_end: u64 = 0;
581 for i in 0..n_chunks {
582 target_end += base_chunk_size + if i < remainder { 1 } else { 0 };
583 boundaries.push(target_end);
584 }
585
586 let mut chunk_ends = Vec::with_capacity(n_chunks as usize);
588 let mut pos: u64 = 0;
589 let mut chunk_idx: u64 = 0;
590
591 for sep_pos in memchr::memchr_iter(sep, data) {
592 let line_end = sep_pos as u64 + 1; pos = line_end;
594
595 while chunk_idx < n_chunks && pos >= boundaries[chunk_idx as usize] {
597 chunk_ends.push(pos);
598 chunk_idx += 1;
599 }
600 }
601
602 if pos < total {
604 pos = total;
605 while chunk_idx < n_chunks && pos >= boundaries[chunk_idx as usize] {
606 chunk_ends.push(pos);
607 chunk_idx += 1;
608 }
609 }
610
611 while (chunk_ends.len() as u64) < n_chunks {
613 chunk_ends.push(pos);
614 }
615
616 chunk_ends
617}
618
619fn split_by_line_chunks(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
621 let data = read_input_data(input_path)?;
622 let sep = config.separator;
623
624 let chunk_ends = compute_line_chunk_boundaries(&data, n_chunks, sep);
625
626 let mut offset: u64 = 0;
627 for i in 0..n_chunks {
628 let end = chunk_ends[i as usize];
629 let chunk_size = end - offset;
630
631 if config.elide_empty && chunk_size == 0 {
632 continue;
633 }
634
635 let mut writer = create_writer(config, i)?;
636 if chunk_size > 0 {
637 writer.write_all(&data[offset as usize..end as usize])?;
638 }
639 writer.finish()?;
640 offset = end;
641 }
642 Ok(())
643}
644
645fn split_by_line_chunk_extract(
647 input_path: &str,
648 config: &SplitConfig,
649 k: u64,
650 n_chunks: u64,
651) -> io::Result<()> {
652 let data = read_input_data(input_path)?;
653 let sep = config.separator;
654
655 let chunk_ends = compute_line_chunk_boundaries(&data, n_chunks, sep);
656
657 let mut offset: u64 = 0;
658 for i in 0..n_chunks {
659 let end = chunk_ends[i as usize];
660 if i + 1 == k {
661 let chunk_size = end - offset;
662 if chunk_size > 0 {
663 let stdout = io::stdout();
664 let mut out = stdout.lock();
665 out.write_all(&data[offset as usize..end as usize])?;
666 }
667 return Ok(());
668 }
669 offset = end;
670 }
671 Ok(())
672}
673
674fn split_by_round_robin(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
676 let data = read_input_data(input_path)?;
677 let sep = config.separator;
678
679 let mut lines: Vec<&[u8]> = Vec::new();
681 let mut start = 0;
682 for pos in memchr::memchr_iter(sep, &data) {
683 lines.push(&data[start..=pos]);
684 start = pos + 1;
685 }
686 if start < data.len() {
687 lines.push(&data[start..]);
688 }
689
690 let mut writers: Vec<Option<Box<dyn ChunkWriter>>> = (0..n_chunks)
692 .map(|i| {
693 if config.elide_empty && lines.len() as u64 <= i {
694 None
695 } else {
696 Some(create_writer(config, i).unwrap())
697 }
698 })
699 .collect();
700
701 for (idx, line) in lines.iter().enumerate() {
703 let chunk_idx = (idx as u64) % n_chunks;
704 if let Some(ref mut writer) = writers[chunk_idx as usize] {
705 writer.write_all(line)?;
706 }
707 }
708
709 for writer in &mut writers {
711 if let Some(mut w) = writer.take() {
712 w.finish()?;
713 }
714 }
715
716 Ok(())
717}
718
719fn split_by_round_robin_extract(input_path: &str, k: u64, n: u64) -> io::Result<()> {
721 let data = read_input_data(input_path)?;
722 let sep = b'\n';
723
724 let stdout = io::stdout();
725 let mut out = stdout.lock();
726
727 let mut start = 0;
728 let mut line_idx: u64 = 0;
729 for pos in memchr::memchr_iter(sep, &data) {
730 if line_idx % n == k - 1 {
731 out.write_all(&data[start..=pos])?;
732 }
733 start = pos + 1;
734 line_idx += 1;
735 }
736 if start < data.len() && line_idx % n == k - 1 {
737 out.write_all(&data[start..])?;
738 }
739
740 Ok(())
741}
742
743#[cfg(unix)]
747fn split_lines_streaming_fast(
748 file: &File,
749 config: &SplitConfig,
750 lines_per_chunk: u64,
751) -> io::Result<()> {
752 use std::os::unix::io::AsRawFd;
753
754 let in_fd = file.as_raw_fd();
755 #[cfg(target_os = "linux")]
757 unsafe {
758 libc::posix_fadvise(in_fd, 0, 0, libc::POSIX_FADV_SEQUENTIAL);
759 }
760
761 let limit = max_chunks(&config.suffix_type, config.suffix_length);
762 let sep = config.separator;
763
764 const BUF_SIZE: usize = 256 * 1024; let mut buf = vec![0u8; BUF_SIZE];
766 let mut chunk_index: u64 = 0;
767 let mut lines_in_chunk: u64 = 0;
768 let mut out_fd: i32 = -1;
769 let mut _out_file: Option<File> = None; let raw_write_all = |fd: i32, mut data: &[u8]| -> io::Result<()> {
773 while !data.is_empty() {
774 let ret =
775 unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, data.len() as _) };
776 if ret > 0 {
777 data = &data[ret as usize..];
778 } else if ret == 0 {
779 return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
780 } else {
781 let err = io::Error::last_os_error();
782 if err.kind() == io::ErrorKind::Interrupted {
783 continue;
784 }
785 return Err(err);
786 }
787 }
788 Ok(())
789 };
790
791 loop {
792 let n = unsafe { libc::read(in_fd, buf.as_mut_ptr() as *mut libc::c_void, BUF_SIZE as _) };
793 if n == 0 {
794 break;
795 }
796 if n < 0 {
797 let err = io::Error::last_os_error();
798 if err.kind() == io::ErrorKind::Interrupted {
799 continue;
800 }
801 return Err(err);
802 }
803 let n = n as usize;
804
805 let data = &buf[..n];
806 let mut pos = 0;
807
808 while pos < n {
809 if out_fd < 0 {
810 if chunk_index >= limit {
811 return Err(io::Error::other("output file suffixes exhausted"));
812 }
813 let path = output_path(config, chunk_index);
814 if config.verbose {
815 eprintln!("creating file '{}'", path);
816 }
817 let file = File::create(path)?;
818 out_fd = file.as_raw_fd();
819 _out_file = Some(file);
820 lines_in_chunk = 0;
821 }
822
823 let slice = &data[pos..];
824 let lines_needed = lines_per_chunk - lines_in_chunk;
825 let mut found = 0u64;
826 let mut last_sep_end = 0;
827
828 for offset in memchr::memchr_iter(sep, slice) {
829 found += 1;
830 last_sep_end = offset + 1;
831 if found >= lines_needed {
832 break;
833 }
834 }
835
836 if found >= lines_needed {
837 raw_write_all(out_fd, &slice[..last_sep_end])?;
838 pos += last_sep_end;
839 _out_file = None;
840 out_fd = -1;
841 chunk_index += 1;
842 } else {
843 raw_write_all(out_fd, slice)?;
844 lines_in_chunk += found;
845 pos = n;
846 }
847 }
848 }
849
850 Ok(())
851}
852
853#[cfg(target_os = "linux")]
857fn split_bytes_zero_copy(
858 input_fd: std::os::unix::io::RawFd,
859 file_size: u64,
860 config: &SplitConfig,
861 bytes_per_chunk: u64,
862) -> io::Result<()> {
863 use std::os::unix::io::AsRawFd;
864 let limit = max_chunks(&config.suffix_type, config.suffix_length);
865 let chunk_size = bytes_per_chunk as usize;
866 let total = file_size as usize;
867
868 let mut chunks: Vec<(usize, usize)> = Vec::new();
870 let mut off = 0usize;
871 while off < total {
872 if chunks.len() as u64 >= limit {
873 return Err(io::Error::other("output file suffixes exhausted"));
874 }
875 let remaining = total - off;
876 let sz = remaining.min(chunk_size);
877 chunks.push((off, sz));
878 off += sz;
879 }
880
881 if chunks.is_empty() {
882 return Ok(());
883 }
884
885 let paths: Vec<String> = (0..chunks.len())
887 .map(|i| output_path(config, i as u64))
888 .collect();
889
890 if config.verbose {
891 for path in &paths {
892 eprintln!("creating file '{}'", path);
893 }
894 }
895
896 let copy_chunk =
898 |input_fd: i32, chunk_offset: usize, chunk_len: usize, path: &str| -> io::Result<()> {
899 let out_file = File::create(path)?;
900 let out_fd = out_file.as_raw_fd();
901
902 let mut off_in = chunk_offset as i64;
903 let mut copied = 0usize;
904 while copied < chunk_len {
905 let n = unsafe {
906 libc::copy_file_range(
907 input_fd,
908 &mut off_in as *mut i64 as *mut libc::off64_t,
909 out_fd,
910 std::ptr::null_mut(),
911 chunk_len - copied,
912 0,
913 )
914 };
915 if n > 0 {
916 copied += n as usize;
917 } else if n == 0 {
918 break;
919 } else {
920 let err = io::Error::last_os_error();
921 if err.raw_os_error() == Some(libc::EINTR) {
922 continue;
923 }
924 while copied < chunk_len {
926 let n = unsafe {
927 libc::sendfile(
928 out_fd,
929 input_fd,
930 &mut off_in as *mut i64 as *mut libc::off_t,
931 chunk_len - copied,
932 )
933 };
934 if n > 0 {
935 copied += n as usize;
936 } else if n == 0 {
937 break;
938 } else {
939 let err2 = io::Error::last_os_error();
940 if err2.raw_os_error() == Some(libc::EINTR) {
941 continue;
942 }
943 return Err(err2);
944 }
945 }
946 break;
947 }
948 }
949 Ok(())
950 };
951
952 if chunks.len() >= 4 && !config.verbose {
954 chunks.par_iter().zip(paths.par_iter()).try_for_each(
955 |(&(chunk_off, chunk_len), path)| copy_chunk(input_fd, chunk_off, chunk_len, path),
956 )?;
957 } else {
958 for (i, &(chunk_off, chunk_len)) in chunks.iter().enumerate() {
959 copy_chunk(input_fd, chunk_off, chunk_len, &paths[i])?;
960 }
961 }
962
963 Ok(())
964}
965
966#[cfg(all(unix, not(target_os = "linux")))]
970fn split_bytes_preloaded(
971 data: &[u8],
972 config: &SplitConfig,
973 bytes_per_chunk: u64,
974) -> io::Result<()> {
975 let limit = max_chunks(&config.suffix_type, config.suffix_length);
976 let chunk_size = bytes_per_chunk as usize;
977
978 let mut chunks: Vec<(usize, usize)> = Vec::new();
980 let mut offset = 0;
981 while offset < data.len() {
982 if chunks.len() as u64 >= limit {
983 return Err(io::Error::other("output file suffixes exhausted"));
984 }
985 let end = (offset + chunk_size).min(data.len());
986 chunks.push((offset, end));
987 offset = end;
988 }
989
990 if chunks.is_empty() {
991 return Ok(());
992 }
993
994 let paths: Vec<String> = chunks
996 .iter()
997 .enumerate()
998 .map(|(i, _)| output_path(config, i as u64))
999 .collect();
1000
1001 if config.verbose {
1002 for path in &paths {
1003 eprintln!("creating file '{}'", path);
1004 }
1005 }
1006
1007 if chunks.len() >= 4 && !config.verbose {
1009 chunks.par_iter().zip(paths.par_iter()).try_for_each(
1010 |(&(start, end), path)| -> io::Result<()> {
1011 let mut file = File::create(path)?;
1012 file.write_all(&data[start..end])?;
1013 Ok(())
1014 },
1015 )?;
1016 } else {
1017 for (i, &(start, end)) in chunks.iter().enumerate() {
1018 let mut file = File::create(&paths[i])?;
1019 file.write_all(&data[start..end])?;
1020 }
1021 }
1022
1023 Ok(())
1024}
1025
1026#[cfg(unix)]
1028fn split_line_bytes_preloaded(data: &[u8], config: &SplitConfig, max_bytes: u64) -> io::Result<()> {
1029 let limit = max_chunks(&config.suffix_type, config.suffix_length);
1030 let max = max_bytes as usize;
1031 let sep = config.separator;
1032
1033 let mut chunks: Vec<(usize, usize)> = Vec::new();
1034 let mut offset = 0;
1035
1036 while offset < data.len() {
1037 if chunks.len() as u64 >= limit {
1038 return Err(io::Error::other("output file suffixes exhausted"));
1039 }
1040 let remaining = data.len() - offset;
1041 let window = remaining.min(max);
1042 let slice = &data[offset..offset + window];
1043
1044 let end = if remaining < max {
1045 offset + window
1047 } else if let Some(pos) = memchr::memrchr(sep, slice) {
1048 offset + pos + 1
1049 } else {
1050 offset + window
1051 };
1052
1053 chunks.push((offset, end));
1054 offset = end;
1055 }
1056
1057 if chunks.is_empty() {
1058 return Ok(());
1059 }
1060
1061 let paths: Vec<String> = chunks
1062 .iter()
1063 .enumerate()
1064 .map(|(i, _)| output_path(config, i as u64))
1065 .collect();
1066
1067 if config.verbose {
1068 for path in &paths {
1069 eprintln!("creating file '{}'", path);
1070 }
1071 }
1072
1073 if chunks.len() >= 4 && !config.verbose {
1074 chunks.par_iter().zip(paths.par_iter()).try_for_each(
1075 |(&(start, end), path)| -> io::Result<()> {
1076 let mut file = File::create(path)?;
1077 file.write_all(&data[start..end])?;
1078 Ok(())
1079 },
1080 )?;
1081 } else {
1082 for (i, &(start, end)) in chunks.iter().enumerate() {
1083 let mut file = File::create(&paths[i])?;
1084 file.write_all(&data[start..end])?;
1085 }
1086 }
1087
1088 Ok(())
1089}
1090
1091pub fn split_file(input_path: &str, config: &SplitConfig) -> io::Result<()> {
1094 if let SplitMode::Number(n) = config.mode {
1096 return split_by_number(input_path, config, n);
1097 }
1098 if let SplitMode::NumberExtract(k, n) = config.mode {
1099 return split_by_number_extract(input_path, k, n);
1100 }
1101 if let SplitMode::LineChunks(n) = config.mode {
1102 return split_by_line_chunks(input_path, config, n);
1103 }
1104 if let SplitMode::LineChunkExtract(k, n) = config.mode {
1105 return split_by_line_chunk_extract(input_path, config, k, n);
1106 }
1107 if let SplitMode::RoundRobin(n) = config.mode {
1108 return split_by_round_robin(input_path, config, n);
1109 }
1110 if let SplitMode::RoundRobinExtract(k, n) = config.mode {
1111 return split_by_round_robin_extract(input_path, k, n);
1112 }
1113
1114 #[cfg(unix)]
1118 if let SplitMode::Lines(n) = config.mode {
1119 if input_path != "-" && config.filter.is_none() {
1120 if let Ok(file) = File::open(input_path) {
1121 if let Ok(meta) = file.metadata() {
1122 if meta.file_type().is_file() && meta.len() > 0 {
1123 return split_lines_streaming_fast(&file, config, n);
1124 }
1125 }
1126 }
1127 }
1128 }
1129
1130 #[cfg(target_os = "linux")]
1133 if let SplitMode::Bytes(bytes_per_chunk) = config.mode {
1134 if input_path != "-" && config.filter.is_none() {
1135 if let Ok(file) = File::open(input_path) {
1136 if let Ok(meta) = file.metadata() {
1137 if meta.file_type().is_file() && meta.len() > 0 {
1138 use std::os::unix::io::AsRawFd;
1139 unsafe {
1140 libc::posix_fadvise(
1141 file.as_raw_fd(),
1142 0,
1143 0,
1144 libc::POSIX_FADV_SEQUENTIAL,
1145 );
1146 }
1147 return split_bytes_zero_copy(
1148 file.as_raw_fd(),
1149 meta.len(),
1150 config,
1151 bytes_per_chunk,
1152 );
1153 }
1154 }
1155 }
1156 }
1157 }
1158
1159 #[cfg(all(unix, not(target_os = "linux")))]
1162 if let SplitMode::Bytes(bytes_per_chunk) = config.mode {
1163 if input_path != "-" && config.filter.is_none() {
1164 const FAST_PATH_LIMIT: u64 = 512 * 1024 * 1024;
1165 if let Ok(file) = File::open(input_path) {
1166 if let Ok(meta) = file.metadata() {
1167 if meta.file_type().is_file() && meta.len() <= FAST_PATH_LIMIT && meta.len() > 0
1168 {
1169 if let Ok(mmap) = unsafe { memmap2::MmapOptions::new().map(&file) } {
1170 let _ = mmap.advise(memmap2::Advice::Sequential);
1171 #[cfg(not(target_os = "linux"))]
1172 {
1173 let _ = mmap.advise(memmap2::Advice::WillNeed);
1174 }
1175 return split_bytes_preloaded(&mmap, config, bytes_per_chunk);
1176 }
1177 }
1178 }
1179 }
1180 }
1181 }
1182
1183 #[cfg(unix)]
1185 if let SplitMode::LineBytes(max_bytes) = config.mode {
1186 if input_path != "-" && config.filter.is_none() {
1187 const FAST_PATH_LIMIT: u64 = 512 * 1024 * 1024;
1188 if let Ok(file) = File::open(input_path) {
1189 if let Ok(meta) = file.metadata() {
1190 if meta.file_type().is_file() && meta.len() <= FAST_PATH_LIMIT && meta.len() > 0
1191 {
1192 if let Ok(mmap) = unsafe { memmap2::MmapOptions::new().map(&file) } {
1193 let _ = mmap.advise(memmap2::Advice::Sequential);
1194 #[cfg(target_os = "linux")]
1195 {
1196 let len = mmap.len();
1197 if len >= 2 * 1024 * 1024 {
1198 let _ = mmap.advise(memmap2::Advice::HugePage);
1199 }
1200 if len >= 4 * 1024 * 1024 {
1201 if mmap.advise(memmap2::Advice::PopulateRead).is_err() {
1202 let _ = mmap.advise(memmap2::Advice::WillNeed);
1203 }
1204 } else {
1205 let _ = mmap.advise(memmap2::Advice::WillNeed);
1206 }
1207 }
1208 #[cfg(not(target_os = "linux"))]
1209 {
1210 let _ = mmap.advise(memmap2::Advice::WillNeed);
1211 }
1212 return split_line_bytes_preloaded(&mmap, config, max_bytes);
1213 }
1214 }
1215 }
1216 }
1217 }
1218 }
1219
1220 let reader: Box<dyn Read> = if input_path == "-" {
1222 Box::new(io::stdin().lock())
1223 } else {
1224 let path = Path::new(input_path);
1225 if !path.exists() {
1226 return Err(io::Error::new(
1227 io::ErrorKind::NotFound,
1228 format!(
1229 "cannot open '{}' for reading: No such file or directory",
1230 input_path
1231 ),
1232 ));
1233 }
1234 let file = File::open(path)?;
1235 #[cfg(target_os = "linux")]
1237 {
1238 use std::os::unix::io::AsRawFd;
1239 unsafe {
1240 libc::posix_fadvise(file.as_raw_fd(), 0, 0, libc::POSIX_FADV_SEQUENTIAL);
1241 }
1242 }
1243 Box::new(file)
1244 };
1245
1246 match config.mode {
1247 SplitMode::Lines(n) => {
1248 let mut buf_reader = BufReader::with_capacity(1024 * 1024, reader);
1249 split_by_lines(&mut buf_reader, config, n)
1250 }
1251 SplitMode::Bytes(n) => {
1252 let mut reader = reader;
1253 split_by_bytes(&mut reader, config, n)
1254 }
1255 SplitMode::LineBytes(n) => {
1256 let mut reader = reader;
1257 split_by_line_bytes(&mut reader, config, n)
1258 }
1259 SplitMode::Number(_)
1260 | SplitMode::NumberExtract(_, _)
1261 | SplitMode::LineChunks(_)
1262 | SplitMode::LineChunkExtract(_, _)
1263 | SplitMode::RoundRobin(_)
1264 | SplitMode::RoundRobinExtract(_, _) => unreachable!(),
1265 }
1266}
1267
1268pub fn output_paths(config: &SplitConfig, count: u64) -> Vec<PathBuf> {
1270 (0..count)
1271 .map(|i| PathBuf::from(output_path(config, i)))
1272 .collect()
1273}