x11rb_async/rust_connection/
stream.rs

1//! Implements the `Stream` trait for `RustConnection`.
2
3use std::future::Future;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[cfg(unix)]
9use std::os::unix::io::AsFd;
10
11#[cfg(windows)]
12use std::os::windows::io::AsSocket as AsFd;
13
14use async_io::Async;
15use futures_lite::future;
16
17use x11rb::rust_connection::{
18    DefaultStream as X11rbDefaultStream, PollMode, Stream as X11rbStream,
19};
20use x11rb::utils::RawFdContainer;
21
22/// A stream that bytes can be read from or written to.
23pub trait StreamBase<'a>: X11rbStream {
24    /// The future returned by `readable`.
25    type Readable: Future<Output = io::Result<()>> + Send + 'a;
26
27    /// The future returned by `writable`.
28    type Writable: Future<Output = io::Result<()>> + Send + 'a;
29
30    /// Wait until the stream is readable.
31    fn readable(&'a self) -> Self::Readable;
32
33    /// Wait until the stream is writable.
34    fn writable(&'a self) -> Self::Writable;
35}
36
37/// A stream that bytes can be read from or written to.
38pub trait Stream: for<'a> StreamBase<'a> {}
39impl<S: for<'a> StreamBase<'a>> Stream for S {}
40
41/// The default stream type.
42pub type DefaultStream = StreamAdaptor<X11rbDefaultStream>;
43
44/// An adaptor that implements a `Stream` for a type that implements `X11rbStream`.
45#[derive(Debug)]
46pub struct StreamAdaptor<S> {
47    inner: Async<S>,
48}
49
50impl<S: AsFd> StreamAdaptor<S> {
51    /// Create a new `StreamAdaptor` from a stream.
52    pub fn new(stream: S) -> io::Result<Self> {
53        Async::new(stream).map(|inner| Self { inner })
54    }
55}
56
57/// A future for reading from a [`StreamAdaptor`].
58#[derive(Debug)]
59pub struct Readable<'a, S>(async_io::Readable<'a, S>);
60
61impl<S> Unpin for Readable<'_, S> {}
62
63impl<S> Future for Readable<'_, S> {
64    type Output = io::Result<()>;
65
66    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
67        Pin::new(&mut self.0).poll(cx)
68    }
69}
70
71/// A future for writing to a [`StreamAdaptor`].
72#[derive(Debug)]
73pub struct Writable<'a, S>(async_io::Writable<'a, S>);
74
75impl<S> Unpin for Writable<'_, S> {}
76
77impl<S> Future for Writable<'_, S> {
78    type Output = io::Result<()>;
79
80    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81        Pin::new(&mut self.0).poll(cx)
82    }
83}
84
85impl<'a, S: 'a + X11rbStream + Sync> StreamBase<'a> for StreamAdaptor<S> {
86    type Readable = Readable<'a, S>;
87    type Writable = Writable<'a, S>;
88
89    fn readable(&'a self) -> Self::Readable {
90        Readable(self.inner.readable())
91    }
92
93    fn writable(&'a self) -> Self::Writable {
94        Writable(self.inner.writable())
95    }
96}
97
98impl<S: X11rbStream> X11rbStream for StreamAdaptor<S> {
99    fn poll(&self, mode: PollMode) -> io::Result<()> {
100        use async_io::block_on;
101
102        // Block on the necessary futures.
103        match mode {
104            PollMode::Readable => block_on(self.inner.readable()),
105            PollMode::Writable => block_on(self.inner.writable()),
106            PollMode::ReadAndWritable => {
107                block_on(future::or(self.inner.readable(), self.inner.writable()))
108            }
109        }
110    }
111
112    fn read(&self, buf: &mut [u8], fd_storage: &mut Vec<RawFdContainer>) -> io::Result<usize> {
113        self.inner.get_ref().read(buf, fd_storage)
114    }
115
116    fn write(&self, buf: &[u8], fds: &mut Vec<RawFdContainer>) -> io::Result<usize> {
117        self.inner.get_ref().write(buf, fds)
118    }
119
120    fn write_vectored(
121        &self,
122        bufs: &[io::IoSlice<'_>],
123        fds: &mut Vec<RawFdContainer>,
124    ) -> io::Result<usize> {
125        self.inner.get_ref().write_vectored(bufs, fds)
126    }
127}