#[cfg(target_os = "linux")]
use std::collections::VecDeque;
use std::io;
use std::net::SocketAddr;
use std::os::fd::RawFd;
use std::path::PathBuf;
#[cfg(target_os = "linux")]
use std::sync::Mutex;
#[cfg(target_os = "linux")]
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum IoOp {
Read {
fd: RawFd,
buf_len: usize,
offset: u64,
},
Write {
fd: RawFd,
data: Vec<u8>,
offset: u64,
},
Accept { listener_fd: RawFd },
Connect { fd: RawFd, addr: SocketAddr },
Close { fd: RawFd },
Fsync { fd: RawFd },
Openat {
dir_fd: RawFd,
path: PathBuf,
flags: i32,
mode: u32,
},
Statx {
dir_fd: RawFd,
path: PathBuf,
flags: i32,
mask: u32,
},
ListDir { path: PathBuf },
MakeDir { path: PathBuf },
DelFile { path: PathBuf },
DelDir { path: PathBuf },
Rename {
source: PathBuf,
destination: PathBuf,
},
SendMsg {
fd: RawFd,
data: Vec<u8>,
addr: SocketAddr,
},
RecvMsg { fd: RawFd, buf_len: usize },
Nop,
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct StatxData {
pub mask: u32,
pub mode: u32,
pub size: u64,
pub blocks: u64,
pub dev_major: u32,
pub dev_minor: u32,
pub inode: u64,
pub nlink: u64,
pub uid: u32,
pub gid: u32,
pub atime_sec: i64,
pub mtime_sec: i64,
pub ctime_sec: i64,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum IoResult {
BytesRead(usize, Vec<u8>),
BytesWritten(usize),
Accepted(RawFd, SocketAddr),
Connected,
Closed,
Synced,
Opened(RawFd),
StatResult(StatxData),
DatagramSent(usize),
DatagramReceived {
bytes: usize,
data: Vec<u8>,
addr: SocketAddr,
},
DirList(Vec<Vec<u8>>),
Completed,
}
#[derive(Debug)]
pub struct IoCompletion {
pub op_id: u64,
pub result: io::Result<IoResult>,
}
pub trait CompletionRing: Send + Sync {
fn submit(&self, op: IoOp) -> u64;
fn poll_completions(&self, timeout: Duration) -> Vec<IoCompletion>;
fn pending_count(&self) -> usize;
fn shutdown(&self);
}
#[cfg(target_os = "linux")]
pub(crate) struct FailedRing {
next_op_id: AtomicU64,
error_kind: io::ErrorKind,
error_message: String,
completions: Mutex<VecDeque<IoCompletion>>,
}
#[cfg(target_os = "linux")]
impl FailedRing {
pub(crate) fn new(error: io::Error) -> Self {
Self {
next_op_id: AtomicU64::new(1),
error_kind: error.kind(),
error_message: error.to_string(),
completions: Mutex::new(VecDeque::new()),
}
}
}
#[cfg(target_os = "linux")]
impl CompletionRing for FailedRing {
fn submit(&self, _op: IoOp) -> u64 {
let op_id = self.next_op_id.fetch_add(1, Ordering::Relaxed);
let completion = IoCompletion {
op_id,
result: Err(io::Error::new(
self.error_kind,
format!(
"completion ring backend unavailable: {}",
self.error_message
),
)),
};
if let Ok(mut completions) = self.completions.lock() {
completions.push_back(completion);
}
op_id
}
fn poll_completions(&self, _timeout: Duration) -> Vec<IoCompletion> {
self.completions
.lock()
.map(|mut completions| completions.drain(..).collect())
.unwrap_or_default()
}
fn pending_count(&self) -> usize {
0
}
fn shutdown(&self) {}
}