use std::collections::VecDeque;
use std::fs::File;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
const ADAPTIVE_WINDOW: usize = 16;
const CHUNK_BYTES_MIN: u64 = 4 * 1024 * 1024;
const CHUNK_BYTES_MAX: u64 = 256 * 1024 * 1024;
const ADAPTIVE_GROW_MS: u64 = 200;
const ADAPTIVE_SHRINK_MS: u64 = 20;
const SIZE_LOG_INTERVAL: u64 = 32;
const WAIT_AFTER_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) struct WritebackPipeline {
fd: RawFd,
chunk_bytes: u64,
last_flush_pos: u64,
pending: Option<(u64, u64)>,
wait_after_window: VecDeque<u64>,
chunk_count: u64,
is_nfs: bool,
degraded: Arc<AtomicBool>,
}
impl WritebackPipeline {
pub(crate) fn new(file: &File, start_pos: u64, chunk_bytes: u64) -> Self {
let fd = file.as_raw_fd();
let is_nfs = detect_nfs(fd);
tracing::info!(
target: "mux",
"WritebackPipeline fd={fd} is_nfs={is_nfs} chunk_bytes={chunk_bytes} strategy={}",
if is_nfs { "nfs-skip-wait" } else { "wait+dontneed" }
);
Self {
fd,
chunk_bytes,
last_flush_pos: start_pos,
pending: None,
wait_after_window: VecDeque::with_capacity(ADAPTIVE_WINDOW),
chunk_count: 0,
is_nfs,
degraded: Arc::new(AtomicBool::new(false)),
}
}
#[inline]
fn skip_wait(&self) -> bool {
self.is_nfs || self.degraded.load(Ordering::Relaxed)
}
pub(crate) fn note_progress(&mut self, pos: u64) {
if pos < self.last_flush_pos.saturating_add(self.chunk_bytes) {
return;
}
let chunk_off = self.last_flush_pos as i64;
let chunk_len = (pos - self.last_flush_pos) as i64;
let mut wait_ms: u64 = 0;
let mut fadvise_ms: u64 = 0;
unsafe {
libc::sync_file_range(self.fd, chunk_off, chunk_len, libc::SYNC_FILE_RANGE_WRITE);
}
if let Some((prev_off, prev_len)) = self.pending.take() {
if self.skip_wait() {
} else {
match wait_after_with_timeout(self.fd, prev_off, prev_len) {
Some(ms) => {
wait_ms = ms;
let t_fadv = Instant::now();
unsafe {
libc::posix_fadvise(
self.fd,
prev_off as i64,
prev_len as i64,
libc::POSIX_FADV_DONTNEED,
);
}
fadvise_ms = t_fadv.elapsed().as_millis() as u64;
self.record_wait(wait_ms);
}
None => {
self.degraded.store(true, Ordering::Relaxed);
tracing::error!(
target: "mux",
"WritebackPipeline WAIT_AFTER timed out after {}s on chunk off={} len={}, marking writeback degraded (subsequent chunks will skip WAIT_AFTER + DONTNEED)",
WAIT_AFTER_TIMEOUT.as_secs(),
prev_off,
prev_len
);
}
}
}
}
self.pending = Some((chunk_off as u64, chunk_len as u64));
self.last_flush_pos = pos;
self.chunk_count += 1;
tracing::trace!(
target: "mux",
"WritebackPipeline chunk off={} len={} sync_file_range_ms={wait_ms} fadvise_ms={fadvise_ms} chunk_bytes={} skip_wait={}",
chunk_off,
chunk_len,
self.chunk_bytes,
self.skip_wait(),
);
if self.chunk_count % SIZE_LOG_INTERVAL == 0 {
tracing::debug!(
target: "mux",
"WritebackPipeline chunk_bytes={} after {} chunks is_nfs={} degraded={}",
self.chunk_bytes,
self.chunk_count,
self.is_nfs,
self.degraded.load(Ordering::Relaxed),
);
}
}
fn record_wait(&mut self, wait_ms: u64) {
if self.wait_after_window.len() == ADAPTIVE_WINDOW {
self.wait_after_window.pop_front();
}
self.wait_after_window.push_back(wait_ms);
if self.wait_after_window.len() < ADAPTIVE_WINDOW {
return;
}
let mut sorted: Vec<u64> = self.wait_after_window.iter().copied().collect();
sorted.sort_unstable();
let p95 = sorted[14];
let old = self.chunk_bytes;
let new = if p95 > ADAPTIVE_GROW_MS && self.chunk_bytes < CHUNK_BYTES_MAX {
(self.chunk_bytes * 2).min(CHUNK_BYTES_MAX)
} else if p95 < ADAPTIVE_SHRINK_MS && self.chunk_bytes > CHUNK_BYTES_MIN {
(self.chunk_bytes / 2).max(CHUNK_BYTES_MIN)
} else {
self.chunk_bytes
};
if new != old {
self.chunk_bytes = new;
tracing::info!(
target: "mux",
"WritebackPipeline adaptive chunk_bytes {} -> {} p95_ms={p95}",
old,
new
);
}
}
pub(crate) fn handle_seek(&mut self, new_pos: u64) {
self.finalize();
self.last_flush_pos = new_pos;
}
pub(crate) fn finalize(&mut self) {
if let Some((prev_off, prev_len)) = self.pending.take() {
tracing::debug!(
target: "mux",
"WritebackPipeline finalize chunk off={prev_off} len={prev_len} skip_wait={} is_nfs={} degraded={}",
self.skip_wait(),
self.is_nfs,
self.degraded.load(Ordering::Relaxed),
);
if self.skip_wait() {
return;
}
match wait_after_with_timeout(self.fd, prev_off, prev_len) {
Some(_ms) => unsafe {
libc::posix_fadvise(
self.fd,
prev_off as i64,
prev_len as i64,
libc::POSIX_FADV_DONTNEED,
);
},
None => {
self.degraded.store(true, Ordering::Relaxed);
tracing::error!(
target: "mux",
"WritebackPipeline finalize WAIT_AFTER timed out after {}s on chunk off={prev_off} len={prev_len}, marking writeback degraded",
WAIT_AFTER_TIMEOUT.as_secs(),
);
}
}
}
}
}
fn detect_nfs(fd: RawFd) -> bool {
matches!(
crate::platform::fs_type::detect_fd(fd),
crate::platform::fs_type::FsType::Nfs
)
}
fn wait_after_with_timeout(fd: RawFd, off: u64, len: u64) -> Option<u64> {
let started = Instant::now();
match crate::io::bounded::bounded_syscall(None, WAIT_AFTER_TIMEOUT, move || unsafe {
libc::sync_file_range(fd, off as i64, len as i64, libc::SYNC_FILE_RANGE_WAIT_AFTER);
}) {
Ok(()) => Some(started.elapsed().as_millis() as u64),
Err(crate::io::bounded::BoundedError::Timeout)
| Err(crate::io::bounded::BoundedError::Halted) => None,
Err(crate::io::bounded::BoundedError::WorkerLost) => {
Some(0)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
fn local_pipeline(chunk_bytes: u64) -> (NamedTempFile, WritebackPipeline) {
let f = NamedTempFile::new().expect("tempfile create");
let pipeline = WritebackPipeline::new(f.as_file(), 0, chunk_bytes);
(f, pipeline)
}
#[test]
fn new_pipeline_starts_active() {
let (_f, p) = local_pipeline(32 * 1024 * 1024);
assert!(!p.is_nfs, "local tempfile must not classify as NFS");
assert!(!p.degraded.load(Ordering::Relaxed));
assert!(!p.skip_wait(), "fresh local pipeline must not skip wait");
}
#[test]
fn degraded_flag_short_circuits_wait() {
let (_f, p) = local_pipeline(32 * 1024 * 1024);
assert!(!p.skip_wait());
p.degraded.store(true, Ordering::Relaxed);
assert!(
p.skip_wait(),
"degraded flag must force the wait+dontneed bypass"
);
}
#[test]
fn record_wait_grows_chunk_on_high_p95() {
let (_f, mut p) = local_pipeline(16 * 1024 * 1024);
for _ in 0..ADAPTIVE_WINDOW {
p.record_wait(ADAPTIVE_GROW_MS + 50);
}
assert!(
p.chunk_bytes > 16 * 1024 * 1024,
"chunk should have grown; got {}",
p.chunk_bytes
);
assert!(p.chunk_bytes <= CHUNK_BYTES_MAX);
}
#[test]
fn record_wait_shrinks_chunk_on_low_p95() {
let (_f, mut p) = local_pipeline(64 * 1024 * 1024);
for _ in 0..ADAPTIVE_WINDOW {
p.record_wait(1); }
assert!(
p.chunk_bytes < 64 * 1024 * 1024,
"chunk should have shrunk; got {}",
p.chunk_bytes
);
assert!(p.chunk_bytes >= CHUNK_BYTES_MIN);
}
#[test]
fn record_wait_no_op_below_window_fill() {
let (_f, mut p) = local_pipeline(16 * 1024 * 1024);
let initial = p.chunk_bytes;
for _ in 0..(ADAPTIVE_WINDOW - 1) {
p.record_wait(ADAPTIVE_GROW_MS + 100);
}
assert_eq!(
p.chunk_bytes, initial,
"chunk must not change before window is full"
);
}
#[test]
fn record_wait_clamps_to_chunk_bounds() {
let (_f, mut p) = local_pipeline(CHUNK_BYTES_MAX);
for _ in 0..ADAPTIVE_WINDOW {
p.record_wait(ADAPTIVE_GROW_MS + 1000);
}
assert_eq!(p.chunk_bytes, CHUNK_BYTES_MAX, "must clamp to MAX");
let (_f, mut p) = local_pipeline(CHUNK_BYTES_MIN);
for _ in 0..ADAPTIVE_WINDOW {
p.record_wait(0);
}
assert_eq!(p.chunk_bytes, CHUNK_BYTES_MIN, "must clamp to MIN");
}
#[test]
fn detect_nfs_local_file_is_false() {
let f = NamedTempFile::new().expect("tempfile create");
use std::os::unix::io::AsRawFd;
assert!(!detect_nfs(f.as_file().as_raw_fd()));
}
#[test]
fn note_progress_below_chunk_is_noop() {
let (_f, mut p) = local_pipeline(32 * 1024 * 1024);
let before = p.chunk_count;
p.note_progress(1024); assert_eq!(p.chunk_count, before);
assert!(p.pending.is_none());
}
}