use std::fs::{File, OpenOptions};
use std::io::{self, Seek, SeekFrom, Write};
use std::path::Path;
use super::writeback::WritebackPipeline;
const WRITEBACK_CHUNK_BYTES: u64 = 32 * 1024 * 1024;
pub(crate) struct WritebackFile {
file: File,
pipeline: WritebackPipeline,
pos: u64,
}
impl WritebackFile {
pub(crate) fn new(mut file: File) -> io::Result<Self> {
let pos = file.stream_position()?;
let pipeline = WritebackPipeline::new(&file, pos, WRITEBACK_CHUNK_BYTES);
Ok(Self {
file,
pipeline,
pos,
})
}
#[allow(dead_code)]
pub(crate) fn create(path: &Path) -> io::Result<Self> {
let file = File::create(path)?;
Self::new(file)
}
pub(crate) fn create_with_size_hint(path: &Path, size_bytes: u64) -> io::Result<Self> {
let file = File::create(path)?;
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
let rc = unsafe {
libc::fallocate(
file.as_raw_fd(),
libc::FALLOC_FL_KEEP_SIZE,
0,
size_bytes as i64,
)
};
tracing::debug!(
target: "mux",
"WritebackFile fallocate size_hint={size_bytes} rc={rc} ok={}",
rc == 0
);
}
#[cfg(not(target_os = "linux"))]
{
tracing::debug!(
target: "mux",
"WritebackFile fallocate size_hint={size_bytes} skipped (non-linux)"
);
}
Self::new(file)
}
pub(crate) fn open(path: &Path) -> io::Result<Self> {
let file = OpenOptions::new().write(true).open(path)?;
Self::new(file)
}
pub(crate) fn sync_all(&mut self) -> io::Result<()> {
self.pipeline.finalize();
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
use std::time::Duration;
let fd = self.file.as_raw_fd();
match crate::io::bounded::bounded_syscall(
None,
Duration::from_secs(60),
move || -> io::Result<()> {
let rc = unsafe { libc::fsync(fd) };
if rc == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
},
) {
Ok(inner) => inner,
Err(crate::io::bounded::BoundedError::Timeout) => {
tracing::error!(
target: "mux",
"WritebackFile::sync_all fsync timed out after 60s; kernel will flush on close (best-effort)"
);
Ok(())
}
Err(crate::io::bounded::BoundedError::Halted) => Ok(()),
Err(crate::io::bounded::BoundedError::WorkerLost) => Ok(()),
}
}
#[cfg(not(unix))]
{
self.file.sync_all()
}
}
}
impl Write for WritebackFile {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.file.write(buf)?;
self.pos += n as u64;
self.pipeline.note_progress(self.pos);
Ok(n)
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.file.write_all(buf)?;
self.pos += buf.len() as u64;
self.pipeline.note_progress(self.pos);
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
self.file.flush()
}
}
impl Seek for WritebackFile {
fn seek(&mut self, from: SeekFrom) -> io::Result<u64> {
let p = self.file.seek(from)?;
if p != self.pos {
let from_pos = self.pos;
let to_pos = p;
let delta: i64 = (to_pos as i64).wrapping_sub(from_pos as i64);
tracing::debug!(
target: "mux",
"WritebackFile seek from={from_pos} to={to_pos} delta={delta}"
);
self.pipeline.handle_seek(p);
self.pos = p;
}
Ok(p)
}
}
impl Drop for WritebackFile {
fn drop(&mut self) {
self.pipeline.finalize();
}
}