use std::fs::{self, File};
use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use crate::common::io::FileData;
#[cfg(unix)]
use rayon::prelude::*;
#[derive(Clone, Debug, PartialEq)]
pub enum SuffixType {
Alphabetic,
Numeric(u64),
Hex(u64),
}
#[derive(Clone, Debug)]
pub enum SplitMode {
Lines(u64),
Bytes(u64),
LineBytes(u64),
Number(u64),
NumberExtract(u64, u64),
LineChunks(u64),
LineChunkExtract(u64, u64),
RoundRobin(u64),
RoundRobinExtract(u64, u64),
}
#[derive(Clone, Debug)]
pub struct SplitConfig {
pub mode: SplitMode,
pub suffix_type: SuffixType,
pub suffix_length: usize,
pub additional_suffix: String,
pub prefix: String,
pub elide_empty: bool,
pub verbose: bool,
pub filter: Option<String>,
pub separator: u8,
}
impl Default for SplitConfig {
fn default() -> Self {
Self {
mode: SplitMode::Lines(1000),
suffix_type: SuffixType::Alphabetic,
suffix_length: 2,
additional_suffix: String::new(),
prefix: "x".to_string(),
elide_empty: false,
verbose: false,
filter: None,
separator: b'\n',
}
}
}
pub fn parse_size(s: &str) -> Result<u64, String> {
let s = s.trim();
if s.is_empty() {
return Err("empty size".to_string());
}
let mut num_end = 0;
for (i, c) in s.char_indices() {
if c.is_ascii_digit() || (i == 0 && (c == '+' || c == '-')) {
num_end = i + c.len_utf8();
} else {
break;
}
}
if num_end == 0 {
return Err(format!("invalid number: '{}'", s));
}
let num_str = &s[..num_end];
let suffix = &s[num_end..];
let num: u64 = num_str
.parse()
.map_err(|_| format!("invalid number: '{}'", num_str))?;
let multiplier: u64 = match suffix {
"" => 1,
"b" => 512,
"kB" | "KB" => 1000,
"k" | "K" | "KiB" => 1024,
"MB" => 1_000_000,
"m" | "M" | "MiB" => 1_048_576,
"GB" => 1_000_000_000,
"g" | "G" | "GiB" => 1_073_741_824,
"TB" => 1_000_000_000_000,
"t" | "T" | "TiB" => 1_099_511_627_776,
"PB" => 1_000_000_000_000_000,
"p" | "P" | "PiB" => 1_125_899_906_842_624,
"EB" => 1_000_000_000_000_000_000,
"e" | "E" | "EiB" => 1_152_921_504_606_846_976,
"ZB" | "z" | "Z" | "ZiB" | "YB" | "y" | "Y" | "YiB" => {
if num > 0 {
return Ok(u64::MAX);
}
return Ok(0);
}
_ => return Err(format!("invalid suffix in '{}'", s)),
};
num.checked_mul(multiplier)
.ok_or_else(|| format!("number too large: '{}'", s))
}
pub fn generate_suffix(index: u64, suffix_type: &SuffixType, suffix_length: usize) -> String {
match suffix_type {
SuffixType::Alphabetic => {
let mut result = Vec::with_capacity(suffix_length);
let mut remaining = index;
for _ in 0..suffix_length {
result.push(b'a' + (remaining % 26) as u8);
remaining /= 26;
}
result.reverse();
String::from_utf8(result).unwrap()
}
SuffixType::Numeric(start) => {
let val = start + index;
format!("{:0>width$}", val, width = suffix_length)
}
SuffixType::Hex(start) => {
let val = start + index;
format!("{:0>width$x}", val, width = suffix_length)
}
}
}
pub fn max_chunks(suffix_type: &SuffixType, suffix_length: usize) -> u64 {
match suffix_type {
SuffixType::Alphabetic => 26u64.saturating_pow(suffix_length as u32),
SuffixType::Numeric(_) | SuffixType::Hex(_) => 10u64.saturating_pow(suffix_length as u32),
}
}
fn output_path(config: &SplitConfig, index: u64) -> String {
let suffix = generate_suffix(index, &config.suffix_type, config.suffix_length);
format!("{}{}{}", config.prefix, suffix, config.additional_suffix)
}
trait ChunkWriter: Write {
fn finish(&mut self) -> io::Result<()>;
}
struct FileChunkWriter {
writer: BufWriter<File>,
}
impl FileChunkWriter {
fn create(path: &str) -> io::Result<Self> {
let file = File::create(path)?;
Ok(Self {
writer: BufWriter::with_capacity(1024 * 1024, file), })
}
}
impl Write for FileChunkWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.writer.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}
impl ChunkWriter for FileChunkWriter {
fn finish(&mut self) -> io::Result<()> {
self.writer.flush()
}
}
struct FilterChunkWriter {
child: std::process::Child,
_stdin_taken: bool,
}
impl FilterChunkWriter {
fn create(filter_cmd: &str, output_path: &str) -> io::Result<Self> {
let child = Command::new("sh")
.arg("-c")
.arg(filter_cmd)
.env("FILE", output_path)
.stdin(Stdio::piped())
.spawn()?;
Ok(Self {
child,
_stdin_taken: false,
})
}
}
impl Write for FilterChunkWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Some(ref mut stdin) = self.child.stdin {
stdin.write(buf)
} else {
Err(io::Error::new(io::ErrorKind::BrokenPipe, "stdin closed"))
}
}
fn flush(&mut self) -> io::Result<()> {
if let Some(ref mut stdin) = self.child.stdin {
stdin.flush()
} else {
Ok(())
}
}
}
impl ChunkWriter for FilterChunkWriter {
fn finish(&mut self) -> io::Result<()> {
self.child.stdin.take();
let status = self.child.wait()?;
if !status.success() {
return Err(io::Error::other(format!(
"filter command exited with status {}",
status
)));
}
Ok(())
}
}
fn create_writer(config: &SplitConfig, index: u64) -> io::Result<Box<dyn ChunkWriter>> {
let path = output_path(config, index);
if config.verbose {
eprintln!("creating file '{}'", path);
}
if let Some(ref filter_cmd) = config.filter {
Ok(Box::new(FilterChunkWriter::create(filter_cmd, &path)?))
} else {
Ok(Box::new(FileChunkWriter::create(&path)?))
}
}
fn split_by_lines(
reader: &mut dyn BufRead,
config: &SplitConfig,
lines_per_chunk: u64,
) -> io::Result<()> {
let limit = max_chunks(&config.suffix_type, config.suffix_length);
let mut chunk_index: u64 = 0;
let mut lines_in_chunk: u64 = 0;
let mut writer: Option<Box<dyn ChunkWriter>> = None;
let sep = config.separator;
loop {
let available = match reader.fill_buf() {
Ok([]) => break,
Ok(b) => b,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
let mut pos = 0;
let buf_len = available.len();
while pos < buf_len {
if writer.is_none() {
if chunk_index >= limit {
return Err(io::Error::other("output file suffixes exhausted"));
}
writer = Some(create_writer(config, chunk_index)?);
lines_in_chunk = 0;
}
let lines_needed = lines_per_chunk - lines_in_chunk;
let slice = &available[pos..];
let mut found = 0u64;
let mut last_sep_end = 0;
for offset in memchr::memchr_iter(sep, slice) {
found += 1;
last_sep_end = offset + 1;
if found >= lines_needed {
break;
}
}
if found >= lines_needed {
writer.as_mut().unwrap().write_all(&slice[..last_sep_end])?;
pos += last_sep_end;
writer.as_mut().unwrap().finish()?;
writer = None;
chunk_index += 1;
} else {
writer.as_mut().unwrap().write_all(slice)?;
lines_in_chunk += found;
pos = buf_len;
}
}
let consumed = buf_len;
reader.consume(consumed);
}
if let Some(ref mut w) = writer {
w.finish()?;
}
Ok(())
}
fn split_by_bytes(
reader: &mut dyn Read,
config: &SplitConfig,
bytes_per_chunk: u64,
) -> io::Result<()> {
let limit = max_chunks(&config.suffix_type, config.suffix_length);
let mut chunk_index: u64 = 0;
let mut bytes_in_chunk: u64 = 0;
let mut writer: Option<Box<dyn ChunkWriter>> = None;
let mut read_buf = vec![0u8; 1024 * 1024]; loop {
let bytes_read = match reader.read(&mut read_buf) {
Ok(0) => break,
Ok(n) => n,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
let mut offset = 0usize;
while offset < bytes_read {
if writer.is_none() {
if chunk_index >= limit {
return Err(io::Error::other("output file suffixes exhausted"));
}
writer = Some(create_writer(config, chunk_index)?);
bytes_in_chunk = 0;
}
let remaining_in_chunk = (bytes_per_chunk - bytes_in_chunk) as usize;
let remaining_in_buf = bytes_read - offset;
let to_write = remaining_in_chunk.min(remaining_in_buf);
writer
.as_mut()
.unwrap()
.write_all(&read_buf[offset..offset + to_write])?;
bytes_in_chunk += to_write as u64;
offset += to_write;
if bytes_in_chunk >= bytes_per_chunk {
writer.as_mut().unwrap().finish()?;
writer = None;
chunk_index += 1;
}
}
}
if let Some(ref mut w) = writer {
if config.elide_empty && bytes_in_chunk == 0 {
w.finish()?;
let path = output_path(config, chunk_index);
let _ = fs::remove_file(&path);
} else {
w.finish()?;
}
}
Ok(())
}
fn split_by_line_bytes(
reader: &mut dyn Read,
config: &SplitConfig,
max_bytes: u64,
) -> io::Result<()> {
let limit = max_chunks(&config.suffix_type, config.suffix_length);
let max = max_bytes as usize;
let sep = config.separator;
let mut data = Vec::new();
reader.read_to_end(&mut data)?;
if data.is_empty() {
return Ok(());
}
let total = data.len();
let mut chunk_index: u64 = 0;
let mut offset = 0;
while offset < total {
if chunk_index >= limit {
return Err(io::Error::other("output file suffixes exhausted"));
}
let remaining = total - offset;
let window = remaining.min(max);
let slice = &data[offset..offset + window];
let end = if remaining < max {
offset + window
} else if let Some(pos) = memchr::memrchr(sep, slice) {
offset + pos + 1
} else {
offset + window
};
let chunk_data = &data[offset..end];
let mut writer = create_writer(config, chunk_index)?;
writer.write_all(chunk_data)?;
writer.finish()?;
offset = end;
chunk_index += 1;
}
Ok(())
}
fn split_by_number(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
let limit = max_chunks(&config.suffix_type, config.suffix_length);
if n_chunks > limit {
return Err(io::Error::other("output file suffixes exhausted"));
}
if n_chunks == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid number of chunks: 0",
));
}
let data: crate::common::io::FileData = if input_path == "-" {
let mut buf = Vec::new();
io::stdin().lock().read_to_end(&mut buf)?;
crate::common::io::FileData::Owned(buf)
} else {
crate::common::io::read_file(Path::new(input_path))?
};
let total = data.len() as u64;
let base_chunk_size = total / n_chunks;
let remainder = total % n_chunks;
let mut offset: u64 = 0;
for i in 0..n_chunks {
let chunk_size = base_chunk_size + if i < remainder { 1 } else { 0 };
if config.elide_empty && chunk_size == 0 {
continue;
}
let mut writer = create_writer(config, i)?;
if chunk_size > 0 {
let start = offset as usize;
let end = start + chunk_size as usize;
writer.write_all(&data[start..end])?;
}
writer.finish()?;
offset += chunk_size;
}
Ok(())
}
fn split_by_number_extract(input_path: &str, k: u64, n: u64) -> io::Result<()> {
let data: crate::common::io::FileData = if input_path == "-" {
let mut buf = Vec::new();
io::stdin().lock().read_to_end(&mut buf)?;
crate::common::io::FileData::Owned(buf)
} else {
crate::common::io::read_file(Path::new(input_path))?
};
let total = data.len() as u64;
let base_chunk_size = total / n;
let remainder = total % n;
let mut offset: u64 = 0;
for i in 0..n {
let chunk_size = base_chunk_size + if i < remainder { 1 } else { 0 };
if i + 1 == k {
if chunk_size > 0 {
let start = offset as usize;
let end = start + chunk_size as usize;
let stdout = io::stdout();
let mut out = stdout.lock();
out.write_all(&data[start..end])?;
}
return Ok(());
}
offset += chunk_size;
}
Ok(())
}
fn read_input_data(input_path: &str) -> io::Result<FileData> {
if input_path == "-" {
let mut buf = Vec::new();
io::stdin().lock().read_to_end(&mut buf)?;
Ok(FileData::Owned(buf))
} else {
let data = crate::common::io::read_file(Path::new(input_path))?;
Ok(data)
}
}
fn compute_line_chunk_boundaries(data: &[u8], n_chunks: u64, sep: u8) -> Vec<u64> {
let total = data.len() as u64;
let base_chunk_size = total / n_chunks;
let remainder = total % n_chunks;
let mut boundaries = Vec::with_capacity(n_chunks as usize);
let mut target_end: u64 = 0;
for i in 0..n_chunks {
target_end += base_chunk_size + if i < remainder { 1 } else { 0 };
boundaries.push(target_end);
}
let mut chunk_ends = Vec::with_capacity(n_chunks as usize);
let mut pos: u64 = 0;
let mut chunk_idx: u64 = 0;
for sep_pos in memchr::memchr_iter(sep, data) {
let line_end = sep_pos as u64 + 1; pos = line_end;
while chunk_idx < n_chunks && pos >= boundaries[chunk_idx as usize] {
chunk_ends.push(pos);
chunk_idx += 1;
}
}
if pos < total {
pos = total;
while chunk_idx < n_chunks && pos >= boundaries[chunk_idx as usize] {
chunk_ends.push(pos);
chunk_idx += 1;
}
}
while (chunk_ends.len() as u64) < n_chunks {
chunk_ends.push(pos);
}
chunk_ends
}
fn split_by_line_chunks(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
let data = read_input_data(input_path)?;
let sep = config.separator;
let chunk_ends = compute_line_chunk_boundaries(&data, n_chunks, sep);
let mut offset: u64 = 0;
for i in 0..n_chunks {
let end = chunk_ends[i as usize];
let chunk_size = end - offset;
if config.elide_empty && chunk_size == 0 {
continue;
}
let mut writer = create_writer(config, i)?;
if chunk_size > 0 {
writer.write_all(&data[offset as usize..end as usize])?;
}
writer.finish()?;
offset = end;
}
Ok(())
}
fn split_by_line_chunk_extract(
input_path: &str,
config: &SplitConfig,
k: u64,
n_chunks: u64,
) -> io::Result<()> {
let data = read_input_data(input_path)?;
let sep = config.separator;
let chunk_ends = compute_line_chunk_boundaries(&data, n_chunks, sep);
let mut offset: u64 = 0;
for i in 0..n_chunks {
let end = chunk_ends[i as usize];
if i + 1 == k {
let chunk_size = end - offset;
if chunk_size > 0 {
let stdout = io::stdout();
let mut out = stdout.lock();
out.write_all(&data[offset as usize..end as usize])?;
}
return Ok(());
}
offset = end;
}
Ok(())
}
fn split_by_round_robin(input_path: &str, config: &SplitConfig, n_chunks: u64) -> io::Result<()> {
let data = read_input_data(input_path)?;
let sep = config.separator;
let mut lines: Vec<&[u8]> = Vec::new();
let mut start = 0;
for pos in memchr::memchr_iter(sep, &data) {
lines.push(&data[start..=pos]);
start = pos + 1;
}
if start < data.len() {
lines.push(&data[start..]);
}
let mut writers: Vec<Option<Box<dyn ChunkWriter>>> = (0..n_chunks)
.map(|i| {
if config.elide_empty && lines.len() as u64 <= i {
None
} else {
Some(create_writer(config, i).unwrap())
}
})
.collect();
for (idx, line) in lines.iter().enumerate() {
let chunk_idx = (idx as u64) % n_chunks;
if let Some(ref mut writer) = writers[chunk_idx as usize] {
writer.write_all(line)?;
}
}
for writer in &mut writers {
if let Some(mut w) = writer.take() {
w.finish()?;
}
}
Ok(())
}
fn split_by_round_robin_extract(input_path: &str, k: u64, n: u64) -> io::Result<()> {
let data = read_input_data(input_path)?;
let sep = b'\n';
let stdout = io::stdout();
let mut out = stdout.lock();
let mut start = 0;
let mut line_idx: u64 = 0;
for pos in memchr::memchr_iter(sep, &data) {
if line_idx % n == k - 1 {
out.write_all(&data[start..=pos])?;
}
start = pos + 1;
line_idx += 1;
}
if start < data.len() && line_idx % n == k - 1 {
out.write_all(&data[start..])?;
}
Ok(())
}
#[cfg(unix)]
fn split_lines_streaming_fast(
file: &File,
config: &SplitConfig,
lines_per_chunk: u64,
) -> io::Result<()> {
use std::os::unix::io::AsRawFd;
let in_fd = file.as_raw_fd();
#[cfg(target_os = "linux")]
unsafe {
libc::posix_fadvise(in_fd, 0, 0, libc::POSIX_FADV_SEQUENTIAL);
}
let limit = max_chunks(&config.suffix_type, config.suffix_length);
let sep = config.separator;
const BUF_SIZE: usize = 256 * 1024; let mut buf = vec![0u8; BUF_SIZE];
let mut chunk_index: u64 = 0;
let mut lines_in_chunk: u64 = 0;
let mut out_fd: i32 = -1;
let mut _out_file: Option<File> = None;
let raw_write_all = |fd: i32, mut data: &[u8]| -> io::Result<()> {
while !data.is_empty() {
let ret =
unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, data.len() as _) };
if ret > 0 {
data = &data[ret as usize..];
} else if ret == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
} else {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(err);
}
}
Ok(())
};
loop {
let n = unsafe { libc::read(in_fd, buf.as_mut_ptr() as *mut libc::c_void, BUF_SIZE as _) };
if n == 0 {
break;
}
if n < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(err);
}
let n = n as usize;
let data = &buf[..n];
let mut pos = 0;
while pos < n {
if out_fd < 0 {
if chunk_index >= limit {
return Err(io::Error::other("output file suffixes exhausted"));
}
let path = output_path(config, chunk_index);
if config.verbose {
eprintln!("creating file '{}'", path);
}
let file = File::create(path)?;
out_fd = file.as_raw_fd();
_out_file = Some(file);
lines_in_chunk = 0;
}
let slice = &data[pos..];
let lines_needed = lines_per_chunk - lines_in_chunk;
let mut found = 0u64;
let mut last_sep_end = 0;
for offset in memchr::memchr_iter(sep, slice) {
found += 1;
last_sep_end = offset + 1;
if found >= lines_needed {
break;
}
}
if found >= lines_needed {
raw_write_all(out_fd, &slice[..last_sep_end])?;
pos += last_sep_end;
_out_file = None;
out_fd = -1;
chunk_index += 1;
} else {
raw_write_all(out_fd, slice)?;
lines_in_chunk += found;
pos = n;
}
}
}
Ok(())
}
#[cfg(target_os = "linux")]
fn split_bytes_zero_copy(
input_fd: std::os::unix::io::RawFd,
file_size: u64,
config: &SplitConfig,
bytes_per_chunk: u64,
) -> io::Result<()> {
use std::os::unix::io::AsRawFd;
let limit = max_chunks(&config.suffix_type, config.suffix_length);
let chunk_size = bytes_per_chunk as usize;
let total = file_size as usize;
let mut chunks: Vec<(usize, usize)> = Vec::new();
let mut off = 0usize;
while off < total {
if chunks.len() as u64 >= limit {
return Err(io::Error::other("output file suffixes exhausted"));
}
let remaining = total - off;
let sz = remaining.min(chunk_size);
chunks.push((off, sz));
off += sz;
}
if chunks.is_empty() {
return Ok(());
}
let paths: Vec<String> = (0..chunks.len())
.map(|i| output_path(config, i as u64))
.collect();
if config.verbose {
for path in &paths {
eprintln!("creating file '{}'", path);
}
}
let copy_chunk =
|input_fd: i32, chunk_offset: usize, chunk_len: usize, path: &str| -> io::Result<()> {
let out_file = File::create(path)?;
let out_fd = out_file.as_raw_fd();
let mut off_in = chunk_offset as i64;
let mut copied = 0usize;
while copied < chunk_len {
let n = unsafe {
libc::copy_file_range(
input_fd,
&mut off_in as *mut i64 as *mut libc::off64_t,
out_fd,
std::ptr::null_mut(),
chunk_len - copied,
0,
)
};
if n > 0 {
copied += n as usize;
} else if n == 0 {
break;
} else {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EINTR) {
continue;
}
while copied < chunk_len {
let n = unsafe {
libc::sendfile(
out_fd,
input_fd,
&mut off_in as *mut i64 as *mut libc::off_t,
chunk_len - copied,
)
};
if n > 0 {
copied += n as usize;
} else if n == 0 {
break;
} else {
let err2 = io::Error::last_os_error();
if err2.raw_os_error() == Some(libc::EINTR) {
continue;
}
return Err(err2);
}
}
break;
}
}
Ok(())
};
if chunks.len() >= 4 && !config.verbose {
chunks.par_iter().zip(paths.par_iter()).try_for_each(
|(&(chunk_off, chunk_len), path)| copy_chunk(input_fd, chunk_off, chunk_len, path),
)?;
} else {
for (i, &(chunk_off, chunk_len)) in chunks.iter().enumerate() {
copy_chunk(input_fd, chunk_off, chunk_len, &paths[i])?;
}
}
Ok(())
}
#[cfg(all(unix, not(target_os = "linux")))]
fn split_bytes_preloaded(
data: &[u8],
config: &SplitConfig,
bytes_per_chunk: u64,
) -> io::Result<()> {
let limit = max_chunks(&config.suffix_type, config.suffix_length);
let chunk_size = bytes_per_chunk as usize;
let mut chunks: Vec<(usize, usize)> = Vec::new();
let mut offset = 0;
while offset < data.len() {
if chunks.len() as u64 >= limit {
return Err(io::Error::other("output file suffixes exhausted"));
}
let end = (offset + chunk_size).min(data.len());
chunks.push((offset, end));
offset = end;
}
if chunks.is_empty() {
return Ok(());
}
let paths: Vec<String> = chunks
.iter()
.enumerate()
.map(|(i, _)| output_path(config, i as u64))
.collect();
if config.verbose {
for path in &paths {
eprintln!("creating file '{}'", path);
}
}
if chunks.len() >= 4 && !config.verbose {
chunks.par_iter().zip(paths.par_iter()).try_for_each(
|(&(start, end), path)| -> io::Result<()> {
let mut file = File::create(path)?;
file.write_all(&data[start..end])?;
Ok(())
},
)?;
} else {
for (i, &(start, end)) in chunks.iter().enumerate() {
let mut file = File::create(&paths[i])?;
file.write_all(&data[start..end])?;
}
}
Ok(())
}
#[cfg(unix)]
fn split_line_bytes_preloaded(data: &[u8], config: &SplitConfig, max_bytes: u64) -> io::Result<()> {
let limit = max_chunks(&config.suffix_type, config.suffix_length);
let max = max_bytes as usize;
let sep = config.separator;
let mut chunks: Vec<(usize, usize)> = Vec::new();
let mut offset = 0;
while offset < data.len() {
if chunks.len() as u64 >= limit {
return Err(io::Error::other("output file suffixes exhausted"));
}
let remaining = data.len() - offset;
let window = remaining.min(max);
let slice = &data[offset..offset + window];
let end = if remaining < max {
offset + window
} else if let Some(pos) = memchr::memrchr(sep, slice) {
offset + pos + 1
} else {
offset + window
};
chunks.push((offset, end));
offset = end;
}
if chunks.is_empty() {
return Ok(());
}
let paths: Vec<String> = chunks
.iter()
.enumerate()
.map(|(i, _)| output_path(config, i as u64))
.collect();
if config.verbose {
for path in &paths {
eprintln!("creating file '{}'", path);
}
}
if chunks.len() >= 4 && !config.verbose {
chunks.par_iter().zip(paths.par_iter()).try_for_each(
|(&(start, end), path)| -> io::Result<()> {
let mut file = File::create(path)?;
file.write_all(&data[start..end])?;
Ok(())
},
)?;
} else {
for (i, &(start, end)) in chunks.iter().enumerate() {
let mut file = File::create(&paths[i])?;
file.write_all(&data[start..end])?;
}
}
Ok(())
}
pub fn split_file(input_path: &str, config: &SplitConfig) -> io::Result<()> {
if let SplitMode::Number(n) = config.mode {
return split_by_number(input_path, config, n);
}
if let SplitMode::NumberExtract(k, n) = config.mode {
return split_by_number_extract(input_path, k, n);
}
if let SplitMode::LineChunks(n) = config.mode {
return split_by_line_chunks(input_path, config, n);
}
if let SplitMode::LineChunkExtract(k, n) = config.mode {
return split_by_line_chunk_extract(input_path, config, k, n);
}
if let SplitMode::RoundRobin(n) = config.mode {
return split_by_round_robin(input_path, config, n);
}
if let SplitMode::RoundRobinExtract(k, n) = config.mode {
return split_by_round_robin_extract(input_path, k, n);
}
#[cfg(unix)]
if let SplitMode::Lines(n) = config.mode {
if input_path != "-" && config.filter.is_none() {
if let Ok(file) = File::open(input_path) {
if let Ok(meta) = file.metadata() {
if meta.file_type().is_file() && meta.len() > 0 {
return split_lines_streaming_fast(&file, config, n);
}
}
}
}
}
#[cfg(target_os = "linux")]
if let SplitMode::Bytes(bytes_per_chunk) = config.mode {
if input_path != "-" && config.filter.is_none() {
if let Ok(file) = File::open(input_path) {
if let Ok(meta) = file.metadata() {
if meta.file_type().is_file() && meta.len() > 0 {
use std::os::unix::io::AsRawFd;
unsafe {
libc::posix_fadvise(
file.as_raw_fd(),
0,
0,
libc::POSIX_FADV_SEQUENTIAL,
);
}
return split_bytes_zero_copy(
file.as_raw_fd(),
meta.len(),
config,
bytes_per_chunk,
);
}
}
}
}
}
#[cfg(all(unix, not(target_os = "linux")))]
if let SplitMode::Bytes(bytes_per_chunk) = config.mode {
if input_path != "-" && config.filter.is_none() {
const FAST_PATH_LIMIT: u64 = 512 * 1024 * 1024;
if let Ok(file) = File::open(input_path) {
if let Ok(meta) = file.metadata() {
if meta.file_type().is_file() && meta.len() <= FAST_PATH_LIMIT && meta.len() > 0
{
if let Ok(mmap) = unsafe { memmap2::MmapOptions::new().map(&file) } {
let _ = mmap.advise(memmap2::Advice::Sequential);
#[cfg(not(target_os = "linux"))]
{
let _ = mmap.advise(memmap2::Advice::WillNeed);
}
return split_bytes_preloaded(&mmap, config, bytes_per_chunk);
}
}
}
}
}
}
#[cfg(unix)]
if let SplitMode::LineBytes(max_bytes) = config.mode {
if input_path != "-" && config.filter.is_none() {
const FAST_PATH_LIMIT: u64 = 512 * 1024 * 1024;
if let Ok(file) = File::open(input_path) {
if let Ok(meta) = file.metadata() {
if meta.file_type().is_file() && meta.len() <= FAST_PATH_LIMIT && meta.len() > 0
{
if let Ok(mmap) = unsafe { memmap2::MmapOptions::new().map(&file) } {
let _ = mmap.advise(memmap2::Advice::Sequential);
#[cfg(target_os = "linux")]
{
let len = mmap.len();
if len >= 2 * 1024 * 1024 {
let _ = mmap.advise(memmap2::Advice::HugePage);
}
if len >= 4 * 1024 * 1024 {
if mmap.advise(memmap2::Advice::PopulateRead).is_err() {
let _ = mmap.advise(memmap2::Advice::WillNeed);
}
} else {
let _ = mmap.advise(memmap2::Advice::WillNeed);
}
}
#[cfg(not(target_os = "linux"))]
{
let _ = mmap.advise(memmap2::Advice::WillNeed);
}
return split_line_bytes_preloaded(&mmap, config, max_bytes);
}
}
}
}
}
}
let reader: Box<dyn Read> = if input_path == "-" {
Box::new(io::stdin().lock())
} else {
let path = Path::new(input_path);
if !path.exists() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!(
"cannot open '{}' for reading: No such file or directory",
input_path
),
));
}
let file = File::open(path)?;
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
unsafe {
libc::posix_fadvise(file.as_raw_fd(), 0, 0, libc::POSIX_FADV_SEQUENTIAL);
}
}
Box::new(file)
};
match config.mode {
SplitMode::Lines(n) => {
let mut buf_reader = BufReader::with_capacity(1024 * 1024, reader);
split_by_lines(&mut buf_reader, config, n)
}
SplitMode::Bytes(n) => {
let mut reader = reader;
split_by_bytes(&mut reader, config, n)
}
SplitMode::LineBytes(n) => {
let mut reader = reader;
split_by_line_bytes(&mut reader, config, n)
}
SplitMode::Number(_)
| SplitMode::NumberExtract(_, _)
| SplitMode::LineChunks(_)
| SplitMode::LineChunkExtract(_, _)
| SplitMode::RoundRobin(_)
| SplitMode::RoundRobinExtract(_, _) => unreachable!(),
}
}
pub fn output_paths(config: &SplitConfig, count: u64) -> Vec<PathBuf> {
(0..count)
.map(|i| PathBuf::from(output_path(config, i)))
.collect()
}