wasmcloud_component/wrappers/
io.rs

1use std::io::{Read, Write};
2
3pub struct InputStreamReader<'a> {
4    stream: &'a mut ::wasi::io::streams::InputStream,
5}
6
7impl<'a> From<&'a mut ::wasi::io::streams::InputStream> for InputStreamReader<'a> {
8    fn from(stream: &'a mut ::wasi::io::streams::InputStream) -> Self {
9        Self { stream }
10    }
11}
12
13impl std::io::Read for InputStreamReader<'_> {
14    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
15        self.stream.read(buf)
16    }
17}
18
19pub struct OutputStreamWriter<'a> {
20    stream: &'a mut ::wasi::io::streams::OutputStream,
21}
22
23impl<'a> From<&'a mut ::wasi::io::streams::OutputStream> for OutputStreamWriter<'a> {
24    fn from(stream: &'a mut ::wasi::io::streams::OutputStream) -> Self {
25        Self { stream }
26    }
27}
28
29impl std::io::Write for OutputStreamWriter<'_> {
30    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
31        self.stream.write(buf)
32    }
33
34    fn flush(&mut self) -> std::io::Result<()> {
35        self.stream.flush()
36    }
37}
38
39pub struct StdioStream<'a> {
40    stdin: std::io::StdinLock<'a>,
41    stdout: std::io::StdoutLock<'a>,
42}
43
44impl StdioStream<'_> {
45    #[must_use]
46    pub fn new() -> Self {
47        Self::default()
48    }
49}
50
51impl Read for StdioStream<'_> {
52    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
53        self.stdin.read(buf)
54    }
55}
56
57impl Write for StdioStream<'_> {
58    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
59        self.stdout.write(buf)
60    }
61
62    fn flush(&mut self) -> std::io::Result<()> {
63        self.stdout.flush()
64    }
65}
66
67impl Default for StdioStream<'_> {
68    fn default() -> Self {
69        Self {
70            stdin: std::io::stdin().lock(),
71            stdout: std::io::stdout().lock(),
72        }
73    }
74}
75
76/// Similar to [`crate::wasi::io::poll::poll`], but polls all `pollables` until they are all ready.
77///
78/// Poll for completion on a set of pollables.
79///
80/// This function takes a list of pollables, which identify I/O sources of interest, and waits until all of the events are ready for I/O.
81pub fn join(pollables: &[&crate::wasi::io::poll::Pollable]) {
82    let mut pollables = pollables.to_vec();
83    while !pollables.is_empty() {
84        let ready_indices = crate::wasi::io::poll::poll(&pollables);
85        ready_indices.iter().rev().for_each(|&i| {
86            pollables.swap_remove(i as usize);
87        });
88    }
89}
90
91#[cfg(feature = "futures")]
92impl futures::AsyncRead for StdioStream<'_> {
93    fn poll_read(
94        mut self: std::pin::Pin<&mut Self>,
95        _cx: &mut std::task::Context<'_>,
96        buf: &mut [u8],
97    ) -> std::task::Poll<std::io::Result<usize>> {
98        std::task::Poll::Ready(self.stdin.read(buf))
99    }
100}
101
102#[cfg(feature = "futures")]
103impl futures::AsyncWrite for StdioStream<'_> {
104    fn poll_write(
105        mut self: std::pin::Pin<&mut Self>,
106        _cx: &mut std::task::Context<'_>,
107        buf: &[u8],
108    ) -> std::task::Poll<std::io::Result<usize>> {
109        std::task::Poll::Ready(self.stdout.write(buf))
110    }
111
112    fn poll_flush(
113        mut self: std::pin::Pin<&mut Self>,
114        _cx: &mut std::task::Context<'_>,
115    ) -> std::task::Poll<std::io::Result<()>> {
116        std::task::Poll::Ready(self.stdout.flush())
117    }
118
119    fn poll_close(
120        self: std::pin::Pin<&mut Self>,
121        cx: &mut std::task::Context<'_>,
122    ) -> std::task::Poll<std::io::Result<()>> {
123        self.poll_flush(cx)
124    }
125}
126
127#[cfg(feature = "tokio")]
128impl tokio::io::AsyncRead for StdioStream<'_> {
129    fn poll_read(
130        mut self: std::pin::Pin<&mut Self>,
131        _cx: &mut std::task::Context<'_>,
132        buf: &mut tokio::io::ReadBuf<'_>,
133    ) -> std::task::Poll<std::io::Result<()>> {
134        let mut fill = vec![0; buf.capacity()];
135        std::task::Poll::Ready({
136            let n = self.stdin.read(&mut fill)?;
137            buf.put_slice(&fill[..n]);
138            Ok(())
139        })
140    }
141}
142
143#[cfg(feature = "tokio")]
144impl tokio::io::AsyncWrite for StdioStream<'_> {
145    fn poll_write(
146        mut self: std::pin::Pin<&mut Self>,
147        _cx: &mut std::task::Context<'_>,
148        buf: &[u8],
149    ) -> std::task::Poll<Result<usize, std::io::Error>> {
150        std::task::Poll::Ready(self.stdout.write(buf))
151    }
152
153    fn poll_flush(
154        mut self: std::pin::Pin<&mut Self>,
155        _cx: &mut std::task::Context<'_>,
156    ) -> std::task::Poll<Result<(), std::io::Error>> {
157        std::task::Poll::Ready(self.stdout.flush())
158    }
159
160    fn poll_shutdown(
161        self: std::pin::Pin<&mut Self>,
162        cx: &mut std::task::Context<'_>,
163    ) -> std::task::Poll<Result<(), std::io::Error>> {
164        self.poll_flush(cx)
165    }
166}