use alloc::boxed::Box;
use alloc::string::{String, ToString};
use alloc::sync::Arc;
use alloc::vec::Vec;
use alloc::{format, vec};
use core::cell::UnsafeCell;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{AsyncRead, AsyncWrite};
use crate::linux::io_uring::{
AT_FDCWD, AT_STATX_SYNC_AS_STAT, Fd, IoUring, O_APPEND, O_CLOEXEC, O_CREAT, O_RDONLY, O_RDWR,
O_TRUNC, O_WRONLY, S_IRUSR, S_IWUSR, STATX_BASIC_STATS, Statx,
};
use crate::linux::sys::Errno;
use crate::{fs, linux};
pub struct File {
ring: Arc<IoUring>,
fd: Fd,
path: String,
offset: UnsafeCell<u64>,
current_read: UnsafeCell<Option<ReadFuture>>,
current_write: UnsafeCell<Option<WriteFuture>>,
current_close: UnsafeCell<Option<CloseFuture>>,
}
unsafe impl Send for File {}
unsafe impl Sync for File {}
struct ReadFuture {
ring: Arc<IoUring>,
fd: Fd,
buf_len: usize,
offset: u64,
state: UnsafeCell<
Option<
Pin<Box<dyn Future<Output = crate::linux::io_uring::Result<(Vec<u8>, usize)>> + Send>>,
>,
>,
}
struct WriteFuture {
ring: Arc<IoUring>,
fd: Fd,
buf: Vec<u8>,
offset: u64,
state: UnsafeCell<
Option<Pin<Box<dyn Future<Output = crate::linux::io_uring::Result<usize>> + Send>>>,
>,
}
struct CloseFuture {
ring: Arc<IoUring>,
fd: Fd,
state: UnsafeCell<
Option<Pin<Box<dyn Future<Output = crate::linux::io_uring::Result<()>> + Send>>>,
>,
}
impl fs::file::File<linux::runtime::Runtime, linux::runtime::Share> for File {
fn open(path: &str) -> impl Future<Output = fs::Result<Self>>
where
Self: Sized,
{
async move {
let ring = Arc::new(
IoUring::with_capacity(256)
.map_err(|e| fs::FileError::Io(format!("Failed to create io_uring: {}", e)))?,
);
let path_bytes = path.as_bytes();
let mut path_with_null = Vec::with_capacity(path_bytes.len() + 1);
path_with_null.extend_from_slice(path_bytes);
path_with_null.push(0);
let fd = ring
.openat(AT_FDCWD, &path_with_null, O_RDONLY | O_CLOEXEC, 0)
.await
.await
.map_err(|e| match e {
crate::linux::io_uring::IoUringError::System(errno) => {
let Some(x) = Errno::from_raw(errno) else {
return fs::FileError::Io(format!("Failed to parse file errno"));
};
match x {
Errno::NoEnt => fs::FileError::NotFound {
path: path.to_string(),
},
Errno::Acces => fs::FileError::PermissionDenied {
path: path.to_string(),
},
Errno::IsDir => fs::FileError::IsADirectory {
path: path.to_string(),
},
_ => fs::FileError::Io(format!("Failed to open file: errno {}", errno)),
}
}
_ => fs::FileError::Io(format!("Failed to open file: {}", e)),
})?;
Ok(File {
ring,
fd,
path: path.to_string(),
offset: UnsafeCell::new(0),
current_read: UnsafeCell::new(None),
current_write: UnsafeCell::new(None),
current_close: UnsafeCell::new(None),
})
}
}
fn create(path: &str) -> impl Future<Output = fs::Result<Self>>
where
Self: Sized,
{
async move {
let ring = Arc::new(
IoUring::with_capacity(256)
.map_err(|e| fs::FileError::Io(format!("Failed to create io_uring: {}", e)))?,
);
let path_bytes = path.as_bytes();
let mut path_with_null = Vec::with_capacity(path_bytes.len() + 1);
path_with_null.extend_from_slice(path_bytes);
path_with_null.push(0);
let fd = ring
.openat(
AT_FDCWD,
&path_with_null,
O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC,
S_IRUSR | S_IWUSR,
)
.await
.await
.map_err(|e| match e {
crate::linux::io_uring::IoUringError::System(errno) => {
let Some(x) = Errno::from_raw(errno) else {
return fs::FileError::Io(format!("Failed to parse file errno"));
};
match x {
Errno::Exist => fs::FileError::AlreadyExists {
path: path.to_string(),
},
Errno::Acces => fs::FileError::PermissionDenied {
path: path.to_string(),
},
Errno::NoSpc => fs::FileError::DiskFull,
Errno::NotDir => fs::FileError::NotADirectory {
path: path.to_string(),
},
_ => {
fs::FileError::Io(format!("Failed to determine file errno {errno}"))
}
}
}
_ => fs::FileError::Io(format!("Failed to create file: {}", e)),
})?;
Ok(File {
ring,
fd,
path: path.to_string(),
offset: UnsafeCell::new(0),
current_read: UnsafeCell::new(None),
current_write: UnsafeCell::new(None),
current_close: UnsafeCell::new(None),
})
}
}
fn metadata(&self) -> impl Future<Output = fs::Result<fs::file::Metadata>> {
async move {
let mut statx_buf = unsafe { core::mem::zeroed::<Statx>() };
let path_bytes = self.path.as_bytes();
let mut path_with_null = Vec::with_capacity(path_bytes.len() + 1);
path_with_null.extend_from_slice(path_bytes);
path_with_null.push(0);
self.ring
.statx(
AT_FDCWD,
&path_with_null,
AT_STATX_SYNC_AS_STAT,
STATX_BASIC_STATS,
&mut statx_buf,
)
.await
.await
.map_err(|e| fs::FileError::Io(format!("Failed to get metadata: {}", e)))?;
Ok(fs::file::Metadata {
len: statx_buf.stx_size,
is_file: (statx_buf.stx_mode & 0o170000) == 0o100000, is_dir: (statx_buf.stx_mode & 0o170000) == 0o040000, created: if statx_buf.stx_mask & crate::linux::io_uring::STATX_CTIME != 0 {
Some(statx_buf.stx_ctime.tv_sec as u64)
} else {
None
},
modified: if statx_buf.stx_mask & crate::linux::io_uring::STATX_MTIME != 0 {
Some(statx_buf.stx_mtime.tv_sec as u64)
} else {
None
},
accessed: if statx_buf.stx_mask & crate::linux::io_uring::STATX_ATIME != 0 {
Some(statx_buf.stx_atime.tv_sec as u64)
} else {
None
},
})
}
}
fn sync_all(&self) -> impl Future<Output = fs::Result<()>> {
async move {
self.ring
.fsync(self.fd, 0)
.await
.await
.map_err(|e| fs::FileError::Io(format!("Failed to sync file: {}", e)))?;
Ok(())
}
}
fn sync_data(&self) -> impl Future<Output = fs::Result<()>> {
async move {
self.ring
.fsync(self.fd, 1)
.await
.await
.map_err(|e| fs::FileError::Io(format!("Failed to sync data: {}", e)))?;
Ok(())
}
}
fn set_len(&self, size: u64) -> impl Future<Output = fs::Result<()>> {
async move {
unsafe {
linux::sys::ftruncate(*self.fd, size as i64)
.map_err(|e| fs::FileError::Io(format!("Failed to set file length: {}", e)))?;
}
Ok(())
}
}
}
impl AsyncRead for File {
fn poll_read(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &mut [u8],
) -> core::task::Poll<futures::io::Result<usize>> {
unsafe {
let this = self.get_unchecked_mut();
let current_read = &mut *this.current_read.get();
let offset = *this.offset.get();
let read_future = ReadFuture {
ring: this.ring.clone(),
fd: this.fd,
buf_len: buf.len(),
offset,
state: UnsafeCell::new(None),
};
*current_read = Some(read_future);
if let Some(read_op) = current_read {
let state = &mut *read_op.state.get();
if state.is_none() {
let ring = read_op.ring.clone();
let fd = read_op.fd;
let offset = read_op.offset;
let buf_len = read_op.buf_len;
let mut read_buf = vec![0u8; buf_len];
let fut = Box::pin(async move {
let result = ring.read(fd, &mut read_buf, offset).await.await;
result.map(|n| (read_buf, n))
});
*state = Some(fut);
}
match state.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready(Ok((read_buf, n))) => {
buf[..n].copy_from_slice(&read_buf[..n]);
*this.offset.get() = offset + n as u64;
*current_read = None; Poll::Ready(Ok(n))
}
Poll::Ready(Err(e)) => {
*current_read = None; Poll::Ready(Err(futures::io::Error::new(
futures::io::ErrorKind::Other,
format!("io_uring read error: {}", e),
)))
}
Poll::Pending => Poll::Pending,
}
} else {
unreachable!("Just created read operation");
}
}
}
}
impl AsyncWrite for File {
fn poll_write(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &[u8],
) -> core::task::Poll<futures::io::Result<usize>> {
unsafe {
let this = self.get_unchecked_mut();
let current_write = &mut *this.current_write.get();
let offset = *this.offset.get();
let write_future = WriteFuture {
ring: this.ring.clone(),
fd: this.fd,
buf: buf.to_vec(),
offset,
state: UnsafeCell::new(None),
};
*current_write = Some(write_future);
if let Some(write_op) = current_write {
let state = &mut *write_op.state.get();
if state.is_none() {
let ring = write_op.ring.clone();
let fd = write_op.fd;
let offset = write_op.offset;
let buf = write_op.buf.clone();
let fut = Box::pin(async move { ring.write(fd, &buf, offset).await.await });
*state = Some(fut);
}
match state.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready(Ok(n)) => {
*this.offset.get() = offset + n as u64;
*current_write = None; Poll::Ready(Ok(n))
}
Poll::Ready(Err(e)) => {
*current_write = None; Poll::Ready(Err(futures::io::Error::new(
futures::io::ErrorKind::Other,
format!("io_uring write error: {}", e),
)))
}
Poll::Pending => Poll::Pending,
}
} else {
unreachable!("Just created write operation");
}
}
}
fn poll_flush(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<futures::io::Result<()>> {
let this = unsafe { self.get_unchecked_mut() };
let ring = this.ring.clone();
let fd = this.fd;
let fut = async move {
ring.fsync(fd, 0).await.await.map_err(|e| {
futures::io::Error::new(
futures::io::ErrorKind::Other,
format!("io_uring fsync error: {}", e),
)
})
};
let mut pinned = Box::pin(fut);
pinned.as_mut().poll(cx)
}
fn poll_close(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<futures::io::Result<()>> {
unsafe {
let this = self.get_unchecked_mut();
let current_close = &mut *this.current_close.get();
if current_close.is_none() {
*current_close = Some(CloseFuture {
ring: this.ring.clone(),
fd: this.fd,
state: UnsafeCell::new(None),
});
}
if let Some(close_op) = current_close {
let state = &mut *close_op.state.get();
if state.is_none() {
let ring = close_op.ring.clone();
let fd = close_op.fd;
let fut = Box::pin(async move { ring.close(fd).await.await });
*state = Some(fut);
}
match state.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready(Ok(())) => {
*current_close = None;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
*current_close = None;
Poll::Ready(Err(futures::io::Error::new(
futures::io::ErrorKind::Other,
format!("io_uring close error: {}", e),
)))
}
Poll::Pending => Poll::Pending,
}
} else {
unreachable!("Just created close operation");
}
}
}
}