#![allow(dead_code)]
use std::fs::File;
use std::io;
use std::os::unix::fs::FileExt;
#[cfg(feature = "linux-direct-io")]
use std::os::unix::io::RawFd;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use crate::reorder_buffer::ReorderBuffer;
use crate::write::metrics::WRITER_METRICS;
use crate::write::pipeline::{OutputChunk, PipelineItem, WRITE_AHEAD};
pub(crate) const POOL_SIZE: usize = 16;
pub(crate) const PER_WORKER_CAPACITY: usize = 8;
enum WriteOp {
Write { offset: u64, bytes: Vec<u8> },
#[cfg(feature = "linux-direct-io")]
CopyRange {
out_offset: u64,
in_fd: RawFd,
src_offset: u64,
len: u64,
},
}
type ErrSlot = Arc<Mutex<Option<io::Error>>>;
#[cfg(feature = "test-hooks")]
pub mod test_hooks {
use std::sync::atomic::{AtomicU64, Ordering};
pub static PANIC_AT_POOL_OP_COUNT: AtomicU64 = AtomicU64::new(u64::MAX);
pub static POOL_OP_COUNT: AtomicU64 = AtomicU64::new(0);
pub fn reset() {
PANIC_AT_POOL_OP_COUNT.store(u64::MAX, Ordering::Relaxed);
POOL_OP_COUNT.store(0, Ordering::Relaxed);
}
}
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn parallel_writer_thread(
rx: Receiver<PipelineItem>,
path: PathBuf,
framed_header: Vec<u8>,
init_tx: SyncSender<io::Result<()>>,
) -> io::Result<()> {
let result = parallel_writer_thread_inner(rx, path, framed_header, init_tx);
if let Err(ref e) = result {
eprintln!("[parallel_writer] thread exit: {e}");
}
result
}
#[allow(clippy::needless_pass_by_value)]
fn parallel_writer_thread_inner(
rx: Receiver<PipelineItem>,
path: PathBuf,
framed_header: Vec<u8>,
init_tx: SyncSender<io::Result<()>>,
) -> io::Result<()> {
let file = match File::create(&path) {
Ok(f) => f,
Err(e) => {
let sent = io::Error::new(e.kind(), format!("create {}: {e}", path.display()));
init_tx.send(Err(sent)).ok();
return Err(e);
}
};
if let Err(e) = file.write_all_at(&framed_header, 0) {
init_tx.send(Err(io::Error::new(e.kind(), format!("header write: {e}")))).ok();
return Err(e);
}
init_tx.send(Ok(())).ok();
let shared_file = Arc::new(file);
let err_slot: ErrSlot = Arc::new(Mutex::new(None));
let mut worker_txs: Vec<SyncSender<WriteOp>> = Vec::with_capacity(POOL_SIZE);
let mut worker_handles: Vec<JoinHandle<()>> = Vec::with_capacity(POOL_SIZE);
for _ in 0..POOL_SIZE {
let (wtx, wrx) = sync_channel::<WriteOp>(PER_WORKER_CAPACITY);
worker_txs.push(wtx);
let file = Arc::clone(&shared_file);
let err_slot = Arc::clone(&err_slot);
worker_handles.push(std::thread::spawn(move || worker_loop(wrx, &file, &err_slot)));
}
let result = dispatch_loop(
&rx,
&worker_txs,
&err_slot,
framed_header.len() as u64,
);
drop(worker_txs);
for handle in worker_handles {
if let Err(e) = handle.join() {
return Err(io::Error::other(format!("parallel writer pool panicked: {e:?}")));
}
}
result?;
if let Some(e) = err_slot
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take()
{
return Err(e);
}
let t_sync = std::time::Instant::now();
shared_file.sync_all()?;
WRITER_METRICS
.sync_all_ns
.fetch_add(
u64::try_from(t_sync.elapsed().as_nanos()).unwrap_or(u64::MAX),
Ordering::Relaxed,
);
Ok(())
}
fn dispatch_loop(
rx: &Receiver<PipelineItem>,
worker_txs: &[SyncSender<WriteOp>],
err_slot: &ErrSlot,
header_len: u64,
) -> io::Result<()> {
let mut pending: ReorderBuffer<io::Result<OutputChunk>> =
ReorderBuffer::with_capacity(WRITE_AHEAD);
let mut current_offset: u64 = header_len;
let mut next_worker: usize = 0;
while let Ok(item) = rx.recv() {
pending.push(item.seq, item.data);
WRITER_METRICS.record_reorder_high_water(pending.pending_len());
while let Some(result) = pending.pop_ready() {
let chunk = result?;
dispatch_chunk(
chunk,
worker_txs,
&mut current_offset,
&mut next_worker,
err_slot,
)?;
}
}
if pending.pending_len() > 0 {
return Err(io::Error::other(format!(
"parallel writer: channel closed with {} item(s) still in reorder buffer; \
an upstream framer dropped without sending an earlier seq",
pending.pending_len(),
)));
}
Ok(())
}
fn dispatch_chunk(
chunk: OutputChunk,
worker_txs: &[SyncSender<WriteOp>],
current_offset: &mut u64,
next_worker: &mut usize,
err_slot: &ErrSlot,
) -> io::Result<()> {
match chunk {
OutputChunk::Framed(parts) => {
let bytes = parts.into_vec();
let len = bytes.len() as u64;
send_to_worker(
worker_txs,
next_worker,
WriteOp::Write { offset: *current_offset, bytes },
err_slot,
)?;
*current_offset += len;
WRITER_METRICS.bytes_written.fetch_add(len, Ordering::Relaxed);
}
OutputChunk::Raw(bytes) => {
let len = bytes.len() as u64;
send_to_worker(
worker_txs,
next_worker,
WriteOp::Write { offset: *current_offset, bytes },
err_slot,
)?;
*current_offset += len;
WRITER_METRICS.bytes_written.fetch_add(len, Ordering::Relaxed);
}
OutputChunk::RawChunks(chunks) => {
for c in chunks {
let len = c.len() as u64;
send_to_worker(
worker_txs,
next_worker,
WriteOp::Write { offset: *current_offset, bytes: c },
err_slot,
)?;
*current_offset += len;
WRITER_METRICS.bytes_written.fetch_add(len, Ordering::Relaxed);
}
}
#[cfg(feature = "linux-direct-io")]
OutputChunk::CopyRange { in_fd, offset: src_offset, len } => {
send_to_worker(
worker_txs,
next_worker,
WriteOp::CopyRange {
out_offset: *current_offset,
in_fd,
src_offset,
len,
},
err_slot,
)?;
*current_offset += len;
WRITER_METRICS.bytes_written.fetch_add(len, Ordering::Relaxed);
}
}
Ok(())
}
fn send_to_worker(
worker_txs: &[SyncSender<WriteOp>],
next_worker: &mut usize,
op: WriteOp,
err_slot: &ErrSlot,
) -> io::Result<()> {
if worker_txs[*next_worker].send(op).is_err() {
if let Some(e) = err_slot
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take()
{
return Err(e);
}
return Err(io::Error::other("parallel writer worker channel closed"));
}
*next_worker = (*next_worker + 1) % worker_txs.len();
Ok(())
}
#[allow(clippy::needless_pass_by_value)]
fn worker_loop(rx: Receiver<WriteOp>, file: &File, err_slot: &ErrSlot) {
loop {
let op = match rx.recv() {
Ok(op) => op,
Err(_) => return,
};
#[cfg(feature = "test-hooks")]
{
let n = test_hooks::POOL_OP_COUNT.fetch_add(1, Ordering::Relaxed) + 1;
if n == test_hooks::PANIC_AT_POOL_OP_COUNT.load(Ordering::Relaxed) {
panic!("test-hooks: parallel_writer pool op #{n} panicking");
}
}
let t = std::time::Instant::now();
let result = match op {
WriteOp::Write { offset, bytes } => file.write_all_at(&bytes, offset),
#[cfg(feature = "linux-direct-io")]
WriteOp::CopyRange { out_offset, in_fd, src_offset, len } => {
copy_range_to_fd(file, in_fd, src_offset, out_offset, len)
}
};
WRITER_METRICS.write_ns.fetch_add(
u64::try_from(t.elapsed().as_nanos()).unwrap_or(u64::MAX),
Ordering::Relaxed,
);
if let Err(e) = result {
let mut slot = err_slot
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if slot.is_none() {
*slot = Some(e);
}
return;
}
}
}
#[cfg(feature = "linux-direct-io")]
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap, clippy::cast_sign_loss)]
fn copy_range_to_fd(
out: &File,
in_fd: RawFd,
mut src_offset: u64,
mut out_offset: u64,
mut remaining: u64,
) -> io::Result<()> {
use std::os::unix::io::AsRawFd;
let out_fd = out.as_raw_fd();
while remaining > 0 {
let mut src_off_i64 = src_offset as i64;
let mut out_off_i64 = out_offset as i64;
let chunk_len = usize::try_from(remaining).unwrap_or(usize::MAX);
let ret = unsafe {
libc::copy_file_range(
in_fd,
&mut src_off_i64,
out_fd,
&mut out_off_i64,
chunk_len,
0,
)
};
if ret < 0 {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EXDEV) {
return copy_range_fallback_pwrite(
out,
in_fd,
src_offset,
out_offset,
remaining,
);
}
return Err(err);
}
if ret == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"copy_file_range returned 0 before completing",
));
}
let advanced = u64::try_from(ret).map_err(|_| {
io::Error::other("copy_file_range returned negative advance")
})?;
src_offset += advanced;
out_offset += advanced;
remaining -= advanced;
}
Ok(())
}
#[cfg(feature = "linux-direct-io")]
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap, clippy::cast_sign_loss)]
fn copy_range_fallback_pwrite(
out: &File,
in_fd: RawFd,
mut src_offset: u64,
mut out_offset: u64,
mut remaining: u64,
) -> io::Result<()> {
let mut buf = vec![0u8; 256 * 1024];
while remaining > 0 {
let chunk = buf.len().min(remaining as usize);
let n = loop {
let ret = unsafe {
libc::pread(in_fd, buf.as_mut_ptr().cast(), chunk, src_offset as i64)
};
if ret < 0 {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EINTR) {
continue;
}
return Err(err);
}
break ret;
};
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"pread returned 0 during cross-device copy",
));
}
let n_u = usize::try_from(n).map_err(|_| {
io::Error::other("pread returned negative")
})?;
out.write_all_at(&buf[..n_u], out_offset)?;
src_offset += n_u as u64;
out_offset += n_u as u64;
remaining -= n_u as u64;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Read;
use tempfile::TempDir;
#[test]
fn parallel_writer_basic() -> io::Result<()> {
let dir = TempDir::new().map_err(io::Error::other)?;
let out = dir.path().join("out.bin");
let (tx, rx) = sync_channel::<PipelineItem>(WRITE_AHEAD);
let (init_tx, init_rx) = sync_channel::<io::Result<()>>(1);
let header = b"HDR".to_vec();
let path = out.clone();
let handle = std::thread::spawn(move || parallel_writer_thread(rx, path, header, init_tx));
init_rx
.recv()
.map_err(|_| io::Error::other("init channel closed"))??;
for i in 0..16u32 {
let payload = format!("item-{i:02}").into_bytes();
tx.send(PipelineItem {
seq: i as usize,
data: Ok(OutputChunk::Raw(payload)),
})
.map_err(|_| io::Error::other("send"))?;
}
drop(tx);
handle
.join()
.map_err(|_| io::Error::other("writer thread panicked"))??;
let mut actual = Vec::new();
std::fs::File::open(&out)?.read_to_end(&mut actual)?;
let mut expected: Vec<u8> = b"HDR".to_vec();
for i in 0..16u32 {
expected.extend_from_slice(&format!("item-{i:02}").into_bytes());
}
assert_eq!(actual, expected);
Ok(())
}
#[test]
fn dispatch_loop_surfaces_gap_at_channel_close() {
use std::sync::{Arc, Mutex};
let (tx, rx) = sync_channel::<PipelineItem>(4);
tx.send(PipelineItem {
seq: 2,
data: Ok(OutputChunk::Raw(vec![0u8; 8])),
})
.expect("send seq=2");
drop(tx);
let worker_txs: Vec<SyncSender<WriteOp>> = Vec::new();
let err_slot: ErrSlot = Arc::new(Mutex::new(None));
let result = dispatch_loop(&rx, &worker_txs, &err_slot, 0);
let err = result.expect_err("gap at channel close must surface as Err");
let msg = err.to_string();
assert!(
msg.contains("reorder buffer"),
"error should name the reorder-buffer state, got: {msg}",
);
}
}