Skip to main content

yash_env/system/concurrency/
rw_all.rs

1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2026 WATANABE Yuki
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17//! Extension of `Concurrent` for repeating reads and writes until all data is processed
18
19use super::{Concurrent, TemporaryNonBlockingGuard};
20use crate::io::Fd;
21use crate::system::{Errno, Fcntl, Read, Write};
22use std::cell::LazyCell;
23use std::iter::repeat_n;
24
25impl<S> Concurrent<S>
26where
27    S: Fcntl + Read,
28{
29    /// Reads from the file descriptor until EOF is reached, appending the data
30    /// to the provided buffer.
31    ///
32    /// In case of an error, the buffer will contain all data read up to the
33    /// point of failure.
34    ///
35    /// Use [`read_all`](Self::read_all) if you don't have an existing buffer to
36    /// append to.
37    pub async fn read_all_to(&self, fd: Fd, buffer: &mut Vec<u8>) -> Result<(), Errno> {
38        let this = TemporaryNonBlockingGuard::new(self, fd);
39        let waker = LazyCell::default();
40        let mut effective_length = buffer.len();
41        loop {
42            // The `read` method requires an initialized buffer, so we reserve
43            // additional capacity and fill it with zeros.
44            let unused = buffer.capacity() - effective_length;
45            buffer.reserve(0x400_usize.saturating_sub(unused));
46            buffer.extend(repeat_n(0, buffer.capacity() - buffer.len()));
47
48            match this.inner.read(fd, &mut buffer[effective_length..]).await {
49                Ok(0) => {
50                    buffer.truncate(effective_length);
51                    return Ok(());
52                }
53                Ok(n) => {
54                    effective_length += n;
55                }
56
57                // EWOULDBLOCK is unreachable if it has the same value as EAGAIN.
58                #[allow(unreachable_patterns)]
59                Err(Errno::EAGAIN | Errno::EWOULDBLOCK) => this.yield_for_read(fd, &waker).await,
60
61                Err(e) => {
62                    buffer.truncate(effective_length);
63                    return Err(e);
64                }
65            }
66        }
67    }
68
69    /// Reads from the file descriptor until EOF is reached, returning the
70    /// collected data as a `Vec<u8>`.
71    ///
72    /// This is a convenience method that allocates a buffer and calls
73    /// [`read_all_to`](Self::read_all_to).
74    pub async fn read_all(&self, fd: Fd) -> Result<Vec<u8>, Errno> {
75        let mut buffer = Vec::new();
76        self.read_all_to(fd, &mut buffer).await?;
77        Ok(buffer)
78    }
79}
80
81impl<S> Concurrent<S>
82where
83    S: Fcntl + Write,
84{
85    /// Writes all data from the provided buffer to the file descriptor.
86    ///
87    /// This method ensures that all data is written, even if multiple write
88    /// operations are required due to partial writes.
89    ///
90    /// If the data is empty, this method will return immediately without
91    /// performing write operations.
92    pub async fn write_all(&self, fd: Fd, mut data: &[u8]) -> Result<(), Errno> {
93        if data.is_empty() {
94            return Ok(());
95        }
96
97        let this = TemporaryNonBlockingGuard::new(self, fd);
98        let waker = LazyCell::default();
99        loop {
100            match this.inner.write(fd, data).await {
101                // EWOULDBLOCK is unreachable if it has the same value as EAGAIN.
102                #[allow(unreachable_patterns)]
103                Ok(0) | Err(Errno::EAGAIN | Errno::EWOULDBLOCK) => {
104                    this.yield_for_write(fd, &waker).await
105                }
106
107                Ok(n) => {
108                    data = &data[n..];
109                    if data.is_empty() {
110                        return Ok(());
111                    }
112                }
113
114                Err(e) => return Err(e),
115            }
116        }
117    }
118
119    /// Writes the given message to standard error.
120    ///
121    /// This is a convenience method that calls [`write_all`](Self::write_all)
122    /// with [`Fd::STDERR`].
123    #[inline]
124    pub async fn print_error<T: AsRef<[u8]>>(&self, message: T) {
125        _ = self.write_all(Fd::STDERR, message.as_ref()).await
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use crate::system::r#virtual::{PIPE_SIZE, VirtualSystem};
133    use crate::system::{Close as _, Mode, OfdAccess, Open as _, OpenFlag, Pipe as _};
134    use futures_util::FutureExt as _;
135    use std::rc::Rc;
136    use yash_executor::Executor;
137    use yash_executor::forwarder::TryReceiveError;
138
139    #[test]
140    fn read_all_and_write_all_transfer_all_data() {
141        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
142        let (read_fd, write_fd) = system.pipe().unwrap();
143
144        // Prepare a large buffer of data to write
145        let mut source = [0; PIPE_SIZE * 10];
146        for (i, byte) in source.iter_mut().enumerate() {
147            *byte = ((i * 37 + 13) % 256) as u8;
148        }
149
150        let executor = Executor::new();
151        let read = unsafe { executor.spawn(system.read_all(read_fd)) };
152        let write = unsafe {
153            executor.spawn(async {
154                let result = system.write_all(write_fd, &source).await;
155                assert_eq!(result, Ok(()));
156                let result = system.close(write_fd);
157                assert_eq!(result, Ok(()));
158            })
159        };
160
161        // Run both operations concurrently
162        let transferred = loop {
163            executor.run_until_stalled();
164
165            match read.try_receive() {
166                Ok(result) => break result,
167                Err(TryReceiveError::NotSent) => {
168                    // The read operation is not complete yet, so we continue running the executor
169                }
170                Err(e) => panic!("unexpected error: {e:?}"),
171            }
172
173            system.select().now_or_never().unwrap();
174        };
175
176        assert_eq!(transferred.unwrap(), source);
177        assert_eq!(write.try_receive(), Ok(()));
178    }
179
180    #[test]
181    fn read_all_preserves_fd_blocking_mode() {
182        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
183        let fd = system
184            .open(
185                c"/foo",
186                OfdAccess::ReadOnly,
187                OpenFlag::Create.into(),
188                Mode::empty(),
189            )
190            .now_or_never()
191            .unwrap()
192            .unwrap();
193
194        system.read_all(fd).now_or_never().unwrap().unwrap();
195
196        // The file descriptor should have the same blocking mode as before
197        // (which is blocking by default)
198        assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
199
200        system.inner.get_and_set_nonblocking(fd, true).ok();
201        system.read_all(fd).now_or_never().unwrap().unwrap();
202        // The file descriptor should have the same blocking mode as before
203        // (which was set to non-blocking before the read)
204        assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
205    }
206
207    #[test]
208    fn write_all_preserves_fd_blocking_mode() {
209        let system = Rc::new(Concurrent::new(VirtualSystem::new()));
210        let fd = system
211            .open(
212                c"/foo",
213                OfdAccess::WriteOnly,
214                OpenFlag::Create.into(),
215                Mode::empty(),
216            )
217            .now_or_never()
218            .unwrap()
219            .unwrap();
220
221        system
222            .write_all(fd, b"hello")
223            .now_or_never()
224            .unwrap()
225            .unwrap();
226
227        // The file descriptor should have the same blocking mode as before
228        // (which is blocking by default)
229        assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
230
231        system.inner.get_and_set_nonblocking(fd, true).ok();
232        system
233            .write_all(fd, b"world")
234            .now_or_never()
235            .unwrap()
236            .unwrap();
237        // The file descriptor should have the same blocking mode as before
238        // (which was set to non-blocking before the write)
239        assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
240    }
241}