use std::collections::VecDeque;
use std::fs::File;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
const ADAPTIVE_WINDOW: usize = 16;
const CHUNK_BYTES_MIN: u64 = 4 * 1024 * 1024;
const CHUNK_BYTES_MAX: u64 = 256 * 1024 * 1024;
const ADAPTIVE_GROW_MS: u64 = 200;
const ADAPTIVE_SHRINK_MS: u64 = 20;
const SIZE_LOG_INTERVAL: u64 = 32;
const WAIT_AFTER_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) struct WritebackPipeline {
fd: RawFd,
chunk_bytes: u64,
last_flush_pos: u64,
pending: Option<(u64, u64)>,
wait_after_window: VecDeque<u64>,
chunk_count: u64,
is_nfs: bool,
degraded: Arc<AtomicBool>,
}
impl WritebackPipeline {
pub(crate) fn new(file: &File, start_pos: u64, chunk_bytes: u64) -> Self {
let fd = file.as_raw_fd();
let is_nfs = detect_nfs(fd);
tracing::info!(
target: "mux",
"WritebackPipeline fd={fd} is_nfs={is_nfs} chunk_bytes={chunk_bytes} strategy={}",
if is_nfs { "nfs-skip-wait" } else { "wait+dontneed" }
);
Self {
fd,
chunk_bytes,
last_flush_pos: start_pos,
pending: None,
wait_after_window: VecDeque::with_capacity(ADAPTIVE_WINDOW),
chunk_count: 0,
is_nfs,
degraded: Arc::new(AtomicBool::new(false)),
}
}
#[inline]
fn skip_wait(&self) -> bool {
self.is_nfs || self.degraded.load(Ordering::Relaxed)
}
pub(crate) fn note_progress(&mut self, pos: u64) {
if pos < self.last_flush_pos.saturating_add(self.chunk_bytes) {
return;
}
let chunk_off = self.last_flush_pos as i64;
let chunk_len = (pos - self.last_flush_pos) as i64;
let mut wait_ms: u64 = 0;
let mut fadvise_ms: u64 = 0;
unsafe {
libc::sync_file_range(self.fd, chunk_off, chunk_len, libc::SYNC_FILE_RANGE_WRITE);
}
if let Some((prev_off, prev_len)) = self.pending.take() {
if self.skip_wait() {
} else {
match wait_after_with_timeout(self.fd, prev_off, prev_len) {
Some(ms) => {
wait_ms = ms;
let t_fadv = Instant::now();
unsafe {
libc::posix_fadvise(
self.fd,
prev_off as i64,
prev_len as i64,
libc::POSIX_FADV_DONTNEED,
);
}
fadvise_ms = t_fadv.elapsed().as_millis() as u64;
self.record_wait(wait_ms);
}
None => {
self.degraded.store(true, Ordering::Relaxed);
tracing::error!(
target: "mux",
"WritebackPipeline WAIT_AFTER timed out after {}s on chunk off={} len={}, marking writeback degraded (subsequent chunks will skip WAIT_AFTER + DONTNEED)",
WAIT_AFTER_TIMEOUT.as_secs(),
prev_off,
prev_len
);
}
}
}
}
self.pending = Some((chunk_off as u64, chunk_len as u64));
self.last_flush_pos = pos;
self.chunk_count += 1;
tracing::trace!(
target: "mux",
"WritebackPipeline chunk off={} len={} sync_file_range_ms={wait_ms} fadvise_ms={fadvise_ms} chunk_bytes={} skip_wait={}",
chunk_off,
chunk_len,
self.chunk_bytes,
self.skip_wait(),
);
if self.chunk_count % SIZE_LOG_INTERVAL == 0 {
tracing::debug!(
target: "mux",
"WritebackPipeline chunk_bytes={} after {} chunks is_nfs={} degraded={}",
self.chunk_bytes,
self.chunk_count,
self.is_nfs,
self.degraded.load(Ordering::Relaxed),
);
}
}
fn record_wait(&mut self, wait_ms: u64) {
if self.wait_after_window.len() == ADAPTIVE_WINDOW {
self.wait_after_window.pop_front();
}
self.wait_after_window.push_back(wait_ms);
if self.wait_after_window.len() < ADAPTIVE_WINDOW {
return;
}
let mut sorted: Vec<u64> = self.wait_after_window.iter().copied().collect();
sorted.sort_unstable();
let p95 = sorted[14];
let old = self.chunk_bytes;
let new = if p95 > ADAPTIVE_GROW_MS && self.chunk_bytes < CHUNK_BYTES_MAX {
(self.chunk_bytes * 2).min(CHUNK_BYTES_MAX)
} else if p95 < ADAPTIVE_SHRINK_MS && self.chunk_bytes > CHUNK_BYTES_MIN {
(self.chunk_bytes / 2).max(CHUNK_BYTES_MIN)
} else {
self.chunk_bytes
};
if new != old {
self.chunk_bytes = new;
tracing::info!(
target: "mux",
"WritebackPipeline adaptive chunk_bytes {} -> {} p95_ms={p95}",
old,
new
);
}
}
pub(crate) fn handle_seek(&mut self, new_pos: u64) {
self.finalize();
self.last_flush_pos = new_pos;
}
pub(crate) fn finalize(&mut self) {
if let Some((prev_off, prev_len)) = self.pending.take() {
tracing::debug!(
target: "mux",
"WritebackPipeline finalize chunk off={prev_off} len={prev_len} skip_wait={} is_nfs={} degraded={}",
self.skip_wait(),
self.is_nfs,
self.degraded.load(Ordering::Relaxed),
);
if self.skip_wait() {
return;
}
match wait_after_with_timeout(self.fd, prev_off, prev_len) {
Some(_ms) => unsafe {
libc::posix_fadvise(
self.fd,
prev_off as i64,
prev_len as i64,
libc::POSIX_FADV_DONTNEED,
);
},
None => {
self.degraded.store(true, Ordering::Relaxed);
tracing::error!(
target: "mux",
"WritebackPipeline finalize WAIT_AFTER timed out after {}s on chunk off={prev_off} len={prev_len}, marking writeback degraded",
WAIT_AFTER_TIMEOUT.as_secs(),
);
}
}
}
}
}
fn detect_nfs(fd: RawFd) -> bool {
let mut buf: libc::statfs = unsafe { std::mem::zeroed() };
let rc = unsafe { libc::fstatfs(fd, &mut buf) };
if rc != 0 {
let errno = std::io::Error::last_os_error();
tracing::warn!(
target: "mux",
"WritebackPipeline fstatfs(fd={fd}) failed: {errno} — defaulting is_nfs=false",
);
return false;
}
#[allow(clippy::unnecessary_cast)]
let f_type = buf.f_type as i64;
#[allow(clippy::unnecessary_cast)]
let nfs_magic = libc::NFS_SUPER_MAGIC as i64;
f_type == nfs_magic
}
fn wait_after_with_timeout(fd: RawFd, off: u64, len: u64) -> Option<u64> {
let started = Instant::now();
match crate::io::bounded::bounded_syscall(None, WAIT_AFTER_TIMEOUT, move || unsafe {
libc::sync_file_range(fd, off as i64, len as i64, libc::SYNC_FILE_RANGE_WAIT_AFTER);
}) {
Ok(()) => Some(started.elapsed().as_millis() as u64),
Err(crate::io::bounded::BoundedError::Timeout)
| Err(crate::io::bounded::BoundedError::Halted) => None,
Err(crate::io::bounded::BoundedError::WorkerLost) => {
Some(0)
}
}
}