yash_env/system/concurrency/
rw_all.rs1use super::{Concurrent, TemporaryNonBlockingGuard};
20use crate::io::Fd;
21use crate::system::{Errno, Fcntl, Read, Write};
22use std::cell::LazyCell;
23use std::iter::repeat_n;
24
25impl<S> Concurrent<S>
26where
27 S: Fcntl + Read,
28{
29 pub async fn read_all_to(&self, fd: Fd, buffer: &mut Vec<u8>) -> Result<(), Errno> {
38 let this = TemporaryNonBlockingGuard::new(self, fd);
39 let waker = LazyCell::default();
40 let mut effective_length = buffer.len();
41 loop {
42 let unused = buffer.capacity() - effective_length;
45 buffer.reserve(0x400_usize.saturating_sub(unused));
46 buffer.extend(repeat_n(0, buffer.capacity() - buffer.len()));
47
48 match this.inner.read(fd, &mut buffer[effective_length..]).await {
49 Ok(0) => {
50 buffer.truncate(effective_length);
51 return Ok(());
52 }
53 Ok(n) => {
54 effective_length += n;
55 }
56
57 #[allow(unreachable_patterns)]
59 Err(Errno::EAGAIN | Errno::EWOULDBLOCK) => this.yield_for_read(fd, &waker).await,
60
61 Err(e) => {
62 buffer.truncate(effective_length);
63 return Err(e);
64 }
65 }
66 }
67 }
68
69 pub async fn read_all(&self, fd: Fd) -> Result<Vec<u8>, Errno> {
75 let mut buffer = Vec::new();
76 self.read_all_to(fd, &mut buffer).await?;
77 Ok(buffer)
78 }
79}
80
81impl<S> Concurrent<S>
82where
83 S: Fcntl + Write,
84{
85 pub async fn write_all(&self, fd: Fd, mut data: &[u8]) -> Result<(), Errno> {
93 if data.is_empty() {
94 return Ok(());
95 }
96
97 let this = TemporaryNonBlockingGuard::new(self, fd);
98 let waker = LazyCell::default();
99 loop {
100 match this.inner.write(fd, data).await {
101 #[allow(unreachable_patterns)]
103 Ok(0) | Err(Errno::EAGAIN | Errno::EWOULDBLOCK) => {
104 this.yield_for_write(fd, &waker).await
105 }
106
107 Ok(n) => {
108 data = &data[n..];
109 if data.is_empty() {
110 return Ok(());
111 }
112 }
113
114 Err(e) => return Err(e),
115 }
116 }
117 }
118
119 #[inline]
124 pub async fn print_error<T: AsRef<[u8]>>(&self, message: T) {
125 _ = self.write_all(Fd::STDERR, message.as_ref()).await
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use crate::system::r#virtual::{PIPE_SIZE, VirtualSystem};
133 use crate::system::{Close as _, Mode, OfdAccess, Open as _, OpenFlag, Pipe as _};
134 use futures_util::FutureExt as _;
135 use std::rc::Rc;
136 use yash_executor::Executor;
137 use yash_executor::forwarder::TryReceiveError;
138
139 #[test]
140 fn read_all_and_write_all_transfer_all_data() {
141 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
142 let (read_fd, write_fd) = system.pipe().unwrap();
143
144 let mut source = [0; PIPE_SIZE * 10];
146 for (i, byte) in source.iter_mut().enumerate() {
147 *byte = ((i * 37 + 13) % 256) as u8;
148 }
149
150 let executor = Executor::new();
151 let read = unsafe { executor.spawn(system.read_all(read_fd)) };
152 let write = unsafe {
153 executor.spawn(async {
154 let result = system.write_all(write_fd, &source).await;
155 assert_eq!(result, Ok(()));
156 let result = system.close(write_fd);
157 assert_eq!(result, Ok(()));
158 })
159 };
160
161 let transferred = loop {
163 executor.run_until_stalled();
164
165 match read.try_receive() {
166 Ok(result) => break result,
167 Err(TryReceiveError::NotSent) => {
168 }
170 Err(e) => panic!("unexpected error: {e:?}"),
171 }
172
173 system.select().now_or_never().unwrap();
174 };
175
176 assert_eq!(transferred.unwrap(), source);
177 assert_eq!(write.try_receive(), Ok(()));
178 }
179
180 #[test]
181 fn read_all_preserves_fd_blocking_mode() {
182 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
183 let fd = system
184 .open(
185 c"/foo",
186 OfdAccess::ReadOnly,
187 OpenFlag::Create.into(),
188 Mode::empty(),
189 )
190 .now_or_never()
191 .unwrap()
192 .unwrap();
193
194 system.read_all(fd).now_or_never().unwrap().unwrap();
195
196 assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
199
200 system.inner.get_and_set_nonblocking(fd, true).ok();
201 system.read_all(fd).now_or_never().unwrap().unwrap();
202 assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
205 }
206
207 #[test]
208 fn write_all_preserves_fd_blocking_mode() {
209 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
210 let fd = system
211 .open(
212 c"/foo",
213 OfdAccess::WriteOnly,
214 OpenFlag::Create.into(),
215 Mode::empty(),
216 )
217 .now_or_never()
218 .unwrap()
219 .unwrap();
220
221 system
222 .write_all(fd, b"hello")
223 .now_or_never()
224 .unwrap()
225 .unwrap();
226
227 assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
230
231 system.inner.get_and_set_nonblocking(fd, true).ok();
232 system
233 .write_all(fd, b"world")
234 .now_or_never()
235 .unwrap()
236 .unwrap();
237 assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
240 }
241}