Skip to main content

compio_process/
unix.rs

1use std::{io, process};
2
3use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
4use compio_driver::{
5    BufferRef, ResultTakeBuffer, ToSharedFd,
6    op::{BufResultExt, Read, ReadManaged, Write},
7};
8use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
9use compio_runtime::{ResumeUnwind, Runtime};
10
11use crate::{ChildStderr, ChildStdin, ChildStdout};
12
13pub async fn child_wait(mut child: process::Child) -> io::Result<process::ExitStatus> {
14    compio_runtime::spawn_blocking(move || child.wait())
15        .await
16        .resume_unwind()
17        .expect("shouldn't be cancelled")
18}
19
20impl AsyncRead for ChildStdout {
21    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
22        let fd = self.to_shared_fd();
23        let op = Read::new(fd, buffer);
24        let res = compio_runtime::submit(op).await.into_inner();
25        unsafe { res.map_advanced() }
26    }
27}
28
29impl AsyncReadManaged for ChildStdout {
30    type Buffer = BufferRef;
31
32    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
33        let fd = self.to_shared_fd();
34        let res = Runtime::with_current(|rt| {
35            let buffer_pool = rt.buffer_pool()?;
36            let op = ReadManaged::new(fd, &buffer_pool, len)?;
37            io::Result::Ok(rt.submit(op))
38        })?
39        .await;
40        unsafe { res.take_buffer() }
41    }
42}
43
44impl AsyncRead for ChildStderr {
45    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
46        let fd = self.to_shared_fd();
47        let op = Read::new(fd, buffer);
48        let res = compio_runtime::submit(op).await.into_inner();
49        unsafe { res.map_advanced() }
50    }
51}
52
53impl AsyncReadManaged for ChildStderr {
54    type Buffer = BufferRef;
55
56    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
57        let fd = self.to_shared_fd();
58        let res = Runtime::with_current(|rt| {
59            let buffer_pool = rt.buffer_pool()?;
60            let op = ReadManaged::new(fd, &buffer_pool, len)?;
61            io::Result::Ok(rt.submit(op))
62        })?
63        .await;
64        unsafe { res.take_buffer() }
65    }
66}
67
68impl AsyncWrite for ChildStdin {
69    async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
70        let fd = self.to_shared_fd();
71        let op = Write::new(fd, buffer);
72        compio_runtime::submit(op).await.into_inner()
73    }
74
75    async fn flush(&mut self) -> io::Result<()> {
76        Ok(())
77    }
78
79    async fn shutdown(&mut self) -> io::Result<()> {
80        Ok(())
81    }
82}