use super::{Concurrent, TemporaryNonBlockingGuard};
use crate::io::Fd;
use crate::system::{Errno, Fcntl, Read, Write};
use std::cell::LazyCell;
use std::iter::repeat_n;
impl<S> Concurrent<S>
where
S: Fcntl + Read,
{
pub async fn read_all_to(&self, fd: Fd, buffer: &mut Vec<u8>) -> Result<(), Errno> {
let this = TemporaryNonBlockingGuard::new(self, fd);
let waker = LazyCell::default();
let mut effective_length = buffer.len();
loop {
let unused = buffer.capacity() - effective_length;
buffer.reserve(0x400_usize.saturating_sub(unused));
buffer.extend(repeat_n(0, buffer.capacity() - buffer.len()));
match this.inner.read(fd, &mut buffer[effective_length..]).await {
Ok(0) => {
buffer.truncate(effective_length);
return Ok(());
}
Ok(n) => {
effective_length += n;
}
#[allow(unreachable_patterns)]
Err(Errno::EAGAIN | Errno::EWOULDBLOCK) => this.yield_for_read(fd, &waker).await,
Err(e) => {
buffer.truncate(effective_length);
return Err(e);
}
}
}
}
pub async fn read_all(&self, fd: Fd) -> Result<Vec<u8>, Errno> {
let mut buffer = Vec::new();
self.read_all_to(fd, &mut buffer).await?;
Ok(buffer)
}
}
impl<S> Concurrent<S>
where
S: Fcntl + Write,
{
pub async fn write_all(&self, fd: Fd, mut data: &[u8]) -> Result<(), Errno> {
if data.is_empty() {
return Ok(());
}
let this = TemporaryNonBlockingGuard::new(self, fd);
let waker = LazyCell::default();
loop {
match this.inner.write(fd, data).await {
#[allow(unreachable_patterns)]
Ok(0) | Err(Errno::EAGAIN | Errno::EWOULDBLOCK) => {
this.yield_for_write(fd, &waker).await
}
Ok(n) => {
data = &data[n..];
if data.is_empty() {
return Ok(());
}
}
Err(e) => return Err(e),
}
}
}
#[inline]
pub async fn print_error<T: AsRef<[u8]>>(&self, message: T) {
_ = self.write_all(Fd::STDERR, message.as_ref()).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::system::r#virtual::{PIPE_SIZE, VirtualSystem};
use crate::system::{Close as _, Mode, OfdAccess, Open as _, OpenFlag, Pipe as _};
use futures_util::FutureExt as _;
use std::rc::Rc;
use yash_executor::Executor;
use yash_executor::forwarder::TryReceiveError;
#[test]
fn read_all_and_write_all_transfer_all_data() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let (read_fd, write_fd) = system.pipe().unwrap();
let mut source = [0; PIPE_SIZE * 10];
for (i, byte) in source.iter_mut().enumerate() {
*byte = ((i * 37 + 13) % 256) as u8;
}
let executor = Executor::new();
let read = unsafe { executor.spawn(system.read_all(read_fd)) };
let write = unsafe {
executor.spawn(async {
let result = system.write_all(write_fd, &source).await;
assert_eq!(result, Ok(()));
let result = system.close(write_fd);
assert_eq!(result, Ok(()));
})
};
let transferred = loop {
executor.run_until_stalled();
match read.try_receive() {
Ok(result) => break result,
Err(TryReceiveError::NotSent) => {
}
Err(e) => panic!("unexpected error: {e:?}"),
}
system.select().now_or_never().unwrap();
};
assert_eq!(transferred.unwrap(), source);
assert_eq!(write.try_receive(), Ok(()));
}
#[test]
fn read_all_preserves_fd_blocking_mode() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let fd = system
.open(
c"/foo",
OfdAccess::ReadOnly,
OpenFlag::Create.into(),
Mode::empty(),
)
.now_or_never()
.unwrap()
.unwrap();
system.read_all(fd).now_or_never().unwrap().unwrap();
assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
system.inner.get_and_set_nonblocking(fd, true).ok();
system.read_all(fd).now_or_never().unwrap().unwrap();
assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
}
#[test]
fn write_all_preserves_fd_blocking_mode() {
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let fd = system
.open(
c"/foo",
OfdAccess::WriteOnly,
OpenFlag::Create.into(),
Mode::empty(),
)
.now_or_never()
.unwrap()
.unwrap();
system
.write_all(fd, b"hello")
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
system.inner.get_and_set_nonblocking(fd, true).ok();
system
.write_all(fd, b"world")
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
}
}