x11rb_async/rust_connection/
stream.rs1use std::future::Future;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[cfg(unix)]
9use std::os::unix::io::AsFd;
10
11#[cfg(windows)]
12use std::os::windows::io::AsSocket as AsFd;
13
14use async_io::Async;
15use futures_lite::future;
16
17use x11rb::rust_connection::{
18 DefaultStream as X11rbDefaultStream, PollMode, Stream as X11rbStream,
19};
20use x11rb::utils::RawFdContainer;
21
22pub trait StreamBase<'a>: X11rbStream {
24 type Readable: Future<Output = io::Result<()>> + Send + 'a;
26
27 type Writable: Future<Output = io::Result<()>> + Send + 'a;
29
30 fn readable(&'a self) -> Self::Readable;
32
33 fn writable(&'a self) -> Self::Writable;
35}
36
37pub trait Stream: for<'a> StreamBase<'a> {}
39impl<S: for<'a> StreamBase<'a>> Stream for S {}
40
41pub type DefaultStream = StreamAdaptor<X11rbDefaultStream>;
43
44#[derive(Debug)]
46pub struct StreamAdaptor<S> {
47 inner: Async<S>,
48}
49
50impl<S: AsFd> StreamAdaptor<S> {
51 pub fn new(stream: S) -> io::Result<Self> {
53 Async::new(stream).map(|inner| Self { inner })
54 }
55}
56
57#[derive(Debug)]
59pub struct Readable<'a, S>(async_io::Readable<'a, S>);
60
61impl<S> Unpin for Readable<'_, S> {}
62
63impl<S> Future for Readable<'_, S> {
64 type Output = io::Result<()>;
65
66 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
67 Pin::new(&mut self.0).poll(cx)
68 }
69}
70
71#[derive(Debug)]
73pub struct Writable<'a, S>(async_io::Writable<'a, S>);
74
75impl<S> Unpin for Writable<'_, S> {}
76
77impl<S> Future for Writable<'_, S> {
78 type Output = io::Result<()>;
79
80 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81 Pin::new(&mut self.0).poll(cx)
82 }
83}
84
85impl<'a, S: 'a + X11rbStream + Sync> StreamBase<'a> for StreamAdaptor<S> {
86 type Readable = Readable<'a, S>;
87 type Writable = Writable<'a, S>;
88
89 fn readable(&'a self) -> Self::Readable {
90 Readable(self.inner.readable())
91 }
92
93 fn writable(&'a self) -> Self::Writable {
94 Writable(self.inner.writable())
95 }
96}
97
98impl<S: X11rbStream> X11rbStream for StreamAdaptor<S> {
99 fn poll(&self, mode: PollMode) -> io::Result<()> {
100 use async_io::block_on;
101
102 match mode {
104 PollMode::Readable => block_on(self.inner.readable()),
105 PollMode::Writable => block_on(self.inner.writable()),
106 PollMode::ReadAndWritable => {
107 block_on(future::or(self.inner.readable(), self.inner.writable()))
108 }
109 }
110 }
111
112 fn read(&self, buf: &mut [u8], fd_storage: &mut Vec<RawFdContainer>) -> io::Result<usize> {
113 self.inner.get_ref().read(buf, fd_storage)
114 }
115
116 fn write(&self, buf: &[u8], fds: &mut Vec<RawFdContainer>) -> io::Result<usize> {
117 self.inner.get_ref().write(buf, fds)
118 }
119
120 fn write_vectored(
121 &self,
122 bufs: &[io::IoSlice<'_>],
123 fds: &mut Vec<RawFdContainer>,
124 ) -> io::Result<usize> {
125 self.inner.get_ref().write_vectored(bufs, fds)
126 }
127}