compio_process/
unix.rs

1use std::{io, panic::resume_unwind, process};
2
3use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
4use compio_driver::{
5    ToSharedFd,
6    op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send},
7};
8use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
9use compio_runtime::{BorrowedBuffer, BufferPool};
10
11use crate::{ChildStderr, ChildStdin, ChildStdout};
12
13pub async fn child_wait(mut child: process::Child) -> io::Result<process::ExitStatus> {
14    compio_runtime::spawn_blocking(move || child.wait())
15        .await
16        .unwrap_or_else(|e| resume_unwind(e))
17}
18
19impl AsyncRead for ChildStdout {
20    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
21        let fd = self.to_shared_fd();
22        let op = Recv::new(fd, buffer);
23        compio_runtime::submit(op).await.into_inner().map_advanced()
24    }
25}
26
27impl AsyncReadManaged for ChildStdout {
28    type Buffer<'a> = BorrowedBuffer<'a>;
29    type BufferPool = BufferPool;
30
31    async fn read_managed<'a>(
32        &mut self,
33        buffer_pool: &'a Self::BufferPool,
34        len: usize,
35    ) -> io::Result<Self::Buffer<'a>> {
36        let fd = self.to_shared_fd();
37        let buffer_pool = buffer_pool.try_inner()?;
38        let op = RecvManaged::new(fd, buffer_pool, len)?;
39        compio_runtime::submit_with_flags(op)
40            .await
41            .take_buffer(buffer_pool)
42    }
43}
44
45impl AsyncRead for ChildStderr {
46    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
47        let fd = self.to_shared_fd();
48        let op = Recv::new(fd, buffer);
49        compio_runtime::submit(op).await.into_inner().map_advanced()
50    }
51}
52
53impl AsyncReadManaged for ChildStderr {
54    type Buffer<'a> = BorrowedBuffer<'a>;
55    type BufferPool = BufferPool;
56
57    async fn read_managed<'a>(
58        &mut self,
59        buffer_pool: &'a Self::BufferPool,
60        len: usize,
61    ) -> io::Result<Self::Buffer<'a>> {
62        let fd = self.to_shared_fd();
63        let buffer_pool = buffer_pool.try_inner()?;
64        let op = RecvManaged::new(fd, buffer_pool, len)?;
65        compio_runtime::submit_with_flags(op)
66            .await
67            .take_buffer(buffer_pool)
68    }
69}
70
71impl AsyncWrite for ChildStdin {
72    async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
73        let fd = self.to_shared_fd();
74        let op = Send::new(fd, buffer);
75        compio_runtime::submit(op).await.into_inner()
76    }
77
78    async fn flush(&mut self) -> io::Result<()> {
79        Ok(())
80    }
81
82    async fn shutdown(&mut self) -> io::Result<()> {
83        Ok(())
84    }
85}