1use crate::{
48 asynchronous,
49 handles::{AsRawHandle, RawHandle},
50 FnOnceObject, KillHandle, Object,
51};
52use std::future::Future;
53use std::io::Result;
54use std::pin::pin;
55use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
56
57fn block_on<F: Future>(f: F) -> F::Output {
58 const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
60 const RAW: RawWaker = RawWaker::new(std::ptr::null(), &VTABLE);
61 let waker = unsafe { Waker::from_raw(RAW) };
62 let mut cx = Context::from_waker(&waker);
63 match pin!(f).poll(&mut cx) {
64 Poll::Ready(value) => value,
65 Poll::Pending => unreachable!(),
66 }
67}
68
69#[derive(Debug, Object)]
71pub struct Blocking(asynchronous::SyncStream);
72
73unsafe impl asynchronous::AsyncStream for Blocking {
74 fn try_new(stream: asynchronous::SyncStream) -> Result<Self> {
75 Ok(Self(stream))
76 }
77
78 fn as_raw_handle(&self) -> RawHandle {
79 self.0.as_raw_handle()
80 }
81
82 #[cfg(unix)]
83 const IS_BLOCKING: bool = true;
84
85 #[cfg(unix)]
86 async fn blocking_write<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
87 f()
88 }
89 #[cfg(windows)]
90 async fn write(&mut self, buf: &[u8]) -> Result<()> {
91 use std::io::Write;
92 self.0.write_all(buf)
93 }
94
95 #[cfg(unix)]
96 async fn blocking_read<T>(&self, mut f: impl FnMut() -> Result<T> + Send) -> Result<T> {
97 f()
98 }
99 #[cfg(windows)]
100 async fn read(&mut self, buf: &mut [u8]) -> Result<()> {
101 use std::io::Read;
102 self.0.read_exact(buf)
103 }
104}
105
106#[derive(Debug, Object)]
110pub struct Sender<T: Object>(pub(crate) asynchronous::Sender<Blocking, T>);
111
112#[derive(Debug, Object)]
116pub struct Receiver<T: Object>(pub(crate) asynchronous::Receiver<Blocking, T>);
117
118#[derive(Debug, Object)]
123pub struct Duplex<S: Object, R: Object>(pub(crate) asynchronous::Duplex<Blocking, S, R>);
124
125pub fn channel<T: Object>() -> Result<(Sender<T>, Receiver<T>)> {
127 let (tx, rx) = asynchronous::channel::<Blocking, T>()?;
128 Ok((Sender(tx), Receiver(rx)))
129}
130
131pub fn duplex<A: Object, B: Object>() -> Result<(Duplex<A, B>, Duplex<B, A>)> {
133 let (tx, rx) = asynchronous::duplex::<Blocking, A, B>()?;
134 Ok((Duplex(tx), Duplex(rx)))
135}
136
137impl<T: Object> Sender<T> {
138 pub fn send(&mut self, value: &T) -> Result<()> {
140 block_on(self.0.send(value))
141 }
142}
143
144#[cfg(unix)]
145impl<T: Object> std::os::unix::io::AsRawFd for Sender<T> {
146 fn as_raw_fd(&self) -> RawHandle {
147 self.0.as_raw_handle()
148 }
149}
150#[cfg(windows)]
151impl<T: Object> std::os::windows::io::AsRawHandle for Sender<T> {
152 fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
153 std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
154 }
155}
156
157#[cfg(unix)]
158impl<T: Object> std::os::unix::io::IntoRawFd for Sender<T> {
159 fn into_raw_fd(self) -> RawHandle {
160 self.0.fd.0.into_raw_fd()
161 }
162}
163#[cfg(windows)]
164impl<T: Object> std::os::windows::io::IntoRawHandle for Sender<T> {
165 fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
166 self.0.fd.0.into_raw_handle()
167 }
168}
169
170#[cfg(unix)]
171impl<T: Object> std::os::unix::io::FromRawFd for Sender<T> {
172 unsafe fn from_raw_fd(fd: RawHandle) -> Self {
173 Self(asynchronous::Sender::from_stream(Blocking(
174 asynchronous::SyncStream::from_raw_fd(fd),
175 )))
176 }
177}
178#[cfg(windows)]
179impl<T: Object> std::os::windows::io::FromRawHandle for Sender<T> {
180 unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
181 Self(asynchronous::Sender::from_stream(Blocking(
182 asynchronous::SyncStream::from_raw_handle(fd),
183 )))
184 }
185}
186
187impl<T: Object> Receiver<T> {
188 pub fn recv(&mut self) -> Result<Option<T>> {
192 block_on(self.0.recv())
193 }
194}
195
196#[cfg(unix)]
197impl<T: Object> std::os::unix::io::AsRawFd for Receiver<T> {
198 fn as_raw_fd(&self) -> RawHandle {
199 self.0.as_raw_handle()
200 }
201}
202#[cfg(windows)]
203impl<T: Object> std::os::windows::io::AsRawHandle for Receiver<T> {
204 fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
205 std::os::windows::io::AsRawHandle::as_raw_handle(&self.0)
206 }
207}
208
209#[cfg(unix)]
210impl<T: Object> std::os::unix::io::IntoRawFd for Receiver<T> {
211 fn into_raw_fd(self) -> RawHandle {
212 self.0.fd.0.into_raw_fd()
213 }
214}
215#[cfg(windows)]
216impl<T: Object> std::os::windows::io::IntoRawHandle for Receiver<T> {
217 fn into_raw_handle(self) -> std::os::windows::io::RawHandle {
218 self.0.fd.0.into_raw_handle()
219 }
220}
221
222#[cfg(unix)]
223impl<T: Object> std::os::unix::io::FromRawFd for Receiver<T> {
224 unsafe fn from_raw_fd(fd: RawHandle) -> Self {
225 Self(asynchronous::Receiver::from_stream(Blocking(
226 asynchronous::SyncStream::from_raw_fd(fd),
227 )))
228 }
229}
230#[cfg(windows)]
231impl<T: Object> std::os::windows::io::FromRawHandle for Receiver<T> {
232 unsafe fn from_raw_handle(fd: std::os::windows::io::RawHandle) -> Self {
233 Self(asynchronous::Receiver::from_stream(Blocking(
234 asynchronous::SyncStream::from_raw_handle(fd),
235 )))
236 }
237}
238
239impl<S: Object, R: Object> Duplex<S, R> {
240 pub fn send(&mut self, value: &S) -> Result<()> {
242 block_on(self.0.send(value))
243 }
244
245 pub fn recv(&mut self) -> Result<Option<R>> {
249 block_on(self.0.recv())
250 }
251
252 pub fn request(&mut self, value: &S) -> Result<R> {
256 block_on(self.0.request(value))
257 }
258
259 pub fn into_sender(self) -> Sender<S> {
260 Sender(self.0.into_sender())
261 }
262
263 pub fn into_receiver(self) -> Receiver<R> {
264 Receiver(self.0.into_receiver())
265 }
266}
267
268#[cfg(unix)]
269impl<S: Object, R: Object> std::os::unix::io::AsRawFd for Duplex<S, R> {
270 fn as_raw_fd(&self) -> RawHandle {
271 self.0.as_raw_handle()
272 }
273}
274
275#[cfg(unix)]
276impl<S: Object, R: Object> std::os::unix::io::IntoRawFd for Duplex<S, R> {
277 fn into_raw_fd(self) -> RawHandle {
278 self.0.fd.0.into_raw_fd()
279 }
280}
281
282#[cfg(unix)]
283impl<S: Object, R: Object> std::os::unix::io::FromRawFd for Duplex<S, R> {
284 unsafe fn from_raw_fd(fd: RawHandle) -> Self {
285 Self(asynchronous::Duplex::from_stream(Blocking(
286 asynchronous::SyncStream::from_raw_fd(fd),
287 )))
288 }
289}
290
291#[derive(Debug)]
293pub struct Child<T: Object>(asynchronous::Child<Blocking, T>);
294
295impl<T: Object> Child<T> {
296 pub fn get_kill_handle(&self) -> KillHandle {
298 self.0.get_kill_handle()
299 }
300
301 pub fn id(&self) -> asynchronous::ProcID {
303 self.0.id()
304 }
305
306 pub fn join(self) -> Result<T> {
312 block_on(self.0.join())
313 }
314}
315
316#[doc(hidden)]
317pub unsafe fn spawn<T: Object>(
318 entry: Box<dyn FnOnceObject<(RawHandle,), Output = i32>>,
319) -> Result<Child<T>> {
320 block_on(asynchronous::spawn::<Blocking, T>(entry)).map(Child)
321}