#![allow(clippy::unused_async)]
use crate::fs::OpenOptions;
use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use crate::runtime::spawn_blocking_io;
use std::fs::{Metadata, Permissions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[derive(Debug)]
pub struct File {
pub(crate) inner: Arc<std::fs::File>,
}
impl File {
async fn with_inner<R, F>(&self, op: F) -> io::Result<R>
where
R: Send + 'static,
F: FnOnce(Arc<std::fs::File>) -> io::Result<R> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
spawn_blocking_io(move || op(inner)).await
}
pub async fn open(path: impl AsRef<Path>) -> io::Result<Self> {
let path = path.as_ref().to_owned();
let file = spawn_blocking_io(move || std::fs::File::open(&path)).await?;
Ok(Self {
inner: Arc::new(file),
})
}
pub async fn create(path: impl AsRef<Path>) -> io::Result<Self> {
let path = path.as_ref().to_owned();
let file = spawn_blocking_io(move || std::fs::File::create(&path)).await?;
Ok(Self {
inner: Arc::new(file),
})
}
pub async fn create_new(path: impl AsRef<Path>) -> io::Result<Self> {
let path = path.as_ref().to_owned();
let file = spawn_blocking_io(move || {
std::fs::OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&path)
})
.await?;
Ok(Self {
inner: Arc::new(file),
})
}
#[must_use]
pub fn options() -> OpenOptions {
OpenOptions::new()
}
#[must_use]
pub fn from_std(file: std::fs::File) -> Self {
Self {
inner: Arc::new(file),
}
}
pub fn into_std(self) -> io::Result<std::fs::File> {
match Arc::try_unwrap(self.inner) {
Ok(file) => Ok(file),
Err(shared) => shared.try_clone(),
}
}
pub async fn sync_all(&self) -> io::Result<()> {
self.with_inner(|inner| inner.sync_all()).await
}
pub async fn sync_data(&self) -> io::Result<()> {
self.with_inner(|inner| inner.sync_data()).await
}
pub async fn set_len(&self, size: u64) -> io::Result<()> {
self.with_inner(move |inner| inner.set_len(size)).await
}
pub async fn metadata(&self) -> io::Result<Metadata> {
self.with_inner(|inner| inner.metadata()).await
}
pub async fn try_clone(&self) -> io::Result<Self> {
self.with_inner(|inner| inner.try_clone())
.await
.map(Self::from_std)
}
pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
self.with_inner(move |inner| inner.set_permissions(perm))
.await
}
pub async fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.with_inner(move |inner| {
let mut inner_ref: &std::fs::File = &inner;
Seek::seek(&mut inner_ref, pos)
})
.await
}
pub async fn stream_position(&mut self) -> io::Result<u64> {
self.with_inner(move |inner| {
let mut inner_ref: &std::fs::File = &inner;
Seek::stream_position(&mut inner_ref)
})
.await
}
pub async fn rewind(&mut self) -> io::Result<()> {
self.with_inner(move |inner| {
let mut inner_ref: &std::fs::File = &inner;
Seek::rewind(&mut inner_ref)
})
.await
}
pub async fn read_into_vec(&mut self, mut buf: Vec<u8>) -> io::Result<(Vec<u8>, usize)> {
self.with_inner(move |inner| {
let mut inner_ref: &std::fs::File = &inner;
let bytes_read = Read::read(&mut inner_ref, buf.as_mut_slice())?;
Ok((buf, bytes_read))
})
.await
}
}
impl AsyncRead for File {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let mut inner_ref: &std::fs::File = &self.inner;
let n = Read::read(&mut inner_ref, buf.unfilled())?;
buf.advance(n);
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for File {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let mut inner_ref: &std::fs::File = &self.inner;
let n = Write::write(&mut inner_ref, buf)?;
Poll::Ready(Ok(n))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut inner_ref: &std::fs::File = &self.inner;
Write::flush(&mut inner_ref)?;
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsyncSeek for File {
fn poll_seek(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
let mut inner_ref: &std::fs::File = &self.inner;
let n = Seek::seek(&mut inner_ref, pos)?;
Poll::Ready(Ok(n))
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use crate::io::{AsyncReadExt, AsyncWriteExt}; use tempfile::tempdir;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[test]
fn test_file_create_write_read() {
init_test("test_file_create_write_read");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("test.txt");
let mut file = File::create(&path).await.unwrap();
file.write_all(b"hello world").await.unwrap();
file.sync_all().await.unwrap();
drop(file);
let mut file = File::open(&path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
crate::assert_with_log!(
contents == "hello world",
"contents",
"hello world",
contents
);
});
crate::test_complete!("test_file_create_write_read");
}
#[test]
fn test_file_seek() {
init_test("test_file_seek");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("test_seek.txt");
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
.await
.unwrap();
file.write_all(b"0123456789").await.unwrap();
file.seek(SeekFrom::Start(5)).await.unwrap();
let mut buf = [0u8; 5];
file.read_exact(&mut buf).await.unwrap();
crate::assert_with_log!(&buf == b"56789", "seek contents", b"56789", buf);
});
crate::test_complete!("test_file_seek");
}
#[test]
fn test_file_read_into_vec_reuses_owned_buffer() {
init_test("test_file_read_into_vec_reuses_owned_buffer");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("test_read_into_vec.txt");
std::fs::write(&path, b"abcdefg").unwrap();
let mut file = File::open(&path).await.unwrap();
let buffer = vec![0_u8; 4];
let capacity = buffer.capacity();
let (buffer, bytes_read) = file.read_into_vec(buffer).await.unwrap();
crate::assert_with_log!(bytes_read == 4, "first bytes read", 4usize, bytes_read);
crate::assert_with_log!(
&buffer[..bytes_read] == b"abcd",
"first chunk",
b"abcd",
&buffer[..bytes_read]
);
crate::assert_with_log!(
buffer.capacity() == capacity,
"buffer capacity reused",
capacity,
buffer.capacity()
);
let (buffer, bytes_read) = file.read_into_vec(buffer).await.unwrap();
crate::assert_with_log!(bytes_read == 3, "second bytes read", 3usize, bytes_read);
crate::assert_with_log!(
&buffer[..bytes_read] == b"efg",
"second chunk",
b"efg",
&buffer[..bytes_read]
);
});
crate::test_complete!("test_file_read_into_vec_reuses_owned_buffer");
}
#[test]
fn test_file_create_new_is_exclusive_and_read_write() {
init_test("test_file_create_new_is_exclusive_and_read_write");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("exclusive.txt");
let mut file = File::create_new(&path).await.unwrap();
file.write_all(b"exclusive").await.unwrap();
file.rewind().await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
crate::assert_with_log!(
contents == "exclusive",
"create_new file is read-write",
"exclusive",
contents
);
drop(file);
let err = File::create_new(&path)
.await
.expect_err("second create_new must fail");
crate::assert_with_log!(
err.kind() == io::ErrorKind::AlreadyExists,
"create_new existing error kind",
io::ErrorKind::AlreadyExists,
err.kind()
);
});
crate::test_complete!("test_file_create_new_is_exclusive_and_read_write");
}
#[test]
fn test_file_metadata() {
init_test("test_file_metadata");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("test_metadata.txt");
let mut file = File::create(&path).await.unwrap();
file.write_all(b"test content").await.unwrap();
file.sync_all().await.unwrap();
drop(file);
let file = File::open(&path).await.unwrap();
let metadata = file.metadata().await.unwrap();
crate::assert_with_log!(metadata.is_file(), "is_file", true, metadata.is_file());
crate::assert_with_log!(metadata.len() == 12, "file length", 12u64, metadata.len());
});
crate::test_complete!("test_file_metadata");
}
#[test]
fn test_file_set_len() {
init_test("test_file_set_len");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("test_truncate.txt");
let mut file = File::create(&path).await.unwrap();
file.write_all(b"hello world").await.unwrap();
file.sync_all().await.unwrap();
file.set_len(5).await.unwrap();
file.sync_all().await.unwrap();
drop(file);
let mut file = File::open(&path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
crate::assert_with_log!(contents == "hello", "truncated contents", "hello", contents);
});
crate::test_complete!("test_file_set_len");
}
#[test]
fn test_cancellation_safety_soft_cancel() {
init_test("test_cancellation_safety_soft_cancel");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("test_cancel.txt");
let file = File::create(&path).await.unwrap();
drop(file);
let file = File::open(&path).await.unwrap();
let metadata = file.metadata().await.unwrap();
crate::assert_with_log!(metadata.is_file(), "file exists", true, metadata.is_file());
});
crate::test_complete!("test_cancellation_safety_soft_cancel");
}
#[test]
fn test_file_from_std_into_std_roundtrip() {
init_test("test_file_from_std_into_std_roundtrip");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("std_roundtrip.txt");
let std_file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(&path)
.unwrap();
let file = File::from_std(std_file);
let mut roundtrip = file.into_std().unwrap();
roundtrip.write_all(b"std bridge").unwrap();
roundtrip.sync_all().unwrap();
drop(roundtrip);
let mut file = File::open(&path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
crate::assert_with_log!(
contents == "std bridge",
"roundtrip contents",
"std bridge",
contents
);
});
crate::test_complete!("test_file_from_std_into_std_roundtrip");
}
#[test]
fn test_file_into_std_when_shared() {
init_test("test_file_into_std_when_shared");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("shared_into_std.txt");
let file = File::create(&path).await.unwrap();
let _other = file.try_clone().await.unwrap();
let std_file = file.into_std().unwrap();
let len = std_file.metadata().unwrap().len();
crate::assert_with_log!(len == 0, "shared into_std len", 0u64, len);
});
crate::test_complete!("test_file_into_std_when_shared");
}
#[test]
fn test_shared_arc_file_handles_support_seek_and_async_read() {
init_test("test_shared_arc_file_handles_support_seek_and_async_read");
futures_lite::future::block_on(async {
let dir = tempdir().unwrap();
let path = dir.path().join("shared_arc_seek_read.txt");
std::fs::write(&path, b"0123456789").unwrap();
let std_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
let shared = Arc::new(std_file);
let mut seeker = File {
inner: Arc::clone(&shared),
};
let mut reader = File {
inner: Arc::clone(&shared),
};
seeker.seek(SeekFrom::Start(5)).await.unwrap();
let mut buf = [0u8; 5];
reader.read_exact(&mut buf).await.unwrap();
crate::assert_with_log!(
&buf == b"56789",
"shared handle seek/read contents",
b"56789",
buf
);
});
crate::test_complete!("test_shared_arc_file_handles_support_seek_and_async_read");
}
}