x11rb_async/
blocking.rs

1// This code is dual licensed under MIT OR Apache 2.0.
2
3//! A `Connection` implementation that uses a threadpool to handle requests.
4
5use crate::connection::{Connection, Fut, RequestConnection};
6use crate::errors::{ConnectError, ConnectionError, ParseError, ReplyOrIdError};
7use crate::x11_utils::X11Error;
8use crate::SequenceNumber;
9
10use std::future::Future;
11use std::io::IoSlice;
12use std::mem;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use x11rb::connection::{Connection as BlConnection, ReplyOrError, RequestKind};
17use x11rb::rust_connection::{RustConnection, Stream};
18
19#[cfg(feature = "allow-unsafe-code")]
20use std::ffi::CStr;
21#[cfg(feature = "allow-unsafe-code")]
22use x11rb::xcb_ffi::XCBConnection;
23
24use x11rb_protocol::DiscardMode;
25
26/// A `Connection` implementation that uses a threadpool to handle requests.
27///
28/// This type wraps around an existing `x11rb` [`Connection`](x11rb::connection::Connection) type,
29/// and makes it non-blocking by pushing all operations to a threadpool. This is good if, for instance,
30/// you have a `Connection` type that can't trivially be integrated into a async runtime.
31///
32/// However, if you have the option of using a `Connection` type that is integrated into a real async
33/// reactor, you should use that instead.
34///
35/// # Implementation
36///
37/// The [`blocking`] threadpool is used to handle all requests.
38///
39/// [`blocking`]: https://docs.rs/blocking
40#[derive(Debug)]
41pub struct BlockingConnection<C> {
42    inner: Arc<C>,
43}
44
45impl<C: BlConnection + Send + Sync + 'static> BlockingConnection<C> {
46    /// Create a new `BlockingConnection` from an existing `Connection`.
47    pub fn new(conn: Arc<C>) -> Self {
48        Self { inner: conn }
49    }
50
51    /// Run the closure with a reference to the underlying connection.
52    fn with_conn<T, F>(&self, f: F) -> blocking::Task<T>
53    where
54        F: FnOnce(&C) -> T + Send + 'static,
55        T: Send + 'static,
56    {
57        let inner = self.inner.clone();
58        blocking::unblock(move || f(&inner))
59    }
60
61    /// Get a reference to the underlying connection.
62    pub fn get_ref(&self) -> &C {
63        &self.inner
64    }
65}
66
67impl BlockingConnection<RustConnection> {
68    /// Connect to the X11 server using this connection.
69    pub async fn connect(display: Option<&str>) -> Result<(Self, usize), ConnectError> {
70        let display = display.map(|s| s.to_string());
71
72        let (inner, screen) =
73            blocking::unblock(move || RustConnection::connect(display.as_deref())).await?;
74
75        Ok((Self::new(Arc::new(inner)), screen))
76    }
77}
78
79impl<S: Stream + Send + Sync + 'static> BlockingConnection<RustConnection<S>> {
80    /// Establish a connection over the given stream.
81    pub async fn connect_to_stream(stream: S, screen: usize) -> Result<Self, ConnectError> {
82        let inner =
83            blocking::unblock(move || RustConnection::connect_to_stream(stream, screen)).await?;
84
85        Ok(Self::new(Arc::new(inner)))
86    }
87
88    /// Establish a connection over the given stream using the given auth info
89    pub async fn connect_to_stream_with_auth_info(
90        stream: S,
91        screen: usize,
92        auth_name: Vec<u8>,
93        auth_data: Vec<u8>,
94    ) -> Result<Self, ConnectError> {
95        let inner = blocking::unblock(move || {
96            RustConnection::connect_to_stream_with_auth_info(stream, screen, auth_name, auth_data)
97        })
98        .await?;
99
100        Ok(Self::new(Arc::new(inner)))
101    }
102}
103
104#[cfg(feature = "allow-unsafe-code")]
105impl BlockingConnection<XCBConnection> {
106    /// Connect to the X11 server using this connection.
107    pub async fn connect(display: Option<&CStr>) -> Result<(Self, usize), ConnectError> {
108        let display = display.map(|s| s.to_owned());
109
110        let (inner, screen) =
111            blocking::unblock(move || XCBConnection::connect(display.as_deref())).await?;
112
113        Ok((Self::new(Arc::new(inner)), screen))
114    }
115}
116
117impl<C: BlConnection + Send + Sync + 'static> RequestConnection for BlockingConnection<C> {
118    type Buf = C::Buf;
119
120    fn check_for_raw_error(
121        &self,
122        sequence: SequenceNumber,
123    ) -> Fut<'_, Option<Self::Buf>, ConnectionError> {
124        Box::pin(self.with_conn(move |conn| conn.check_for_raw_error(sequence)))
125    }
126
127    fn discard_reply(&self, sequence: SequenceNumber, kind: RequestKind, mode: DiscardMode) {
128        // Doesn't block.
129        self.inner.discard_reply(sequence, kind, mode);
130    }
131
132    fn extension_information(
133        &self,
134        name: &'static str,
135    ) -> Fut<'_, Option<x11rb::x11_utils::ExtensionInformation>, ConnectionError> {
136        Box::pin(self.with_conn(move |conn| conn.extension_information(name)))
137    }
138
139    fn prefetch_extension_information(&self, name: &'static str) -> Fut<'_, (), ConnectionError> {
140        Box::pin(self.with_conn(move |conn| conn.prefetch_extension_information(name)))
141    }
142
143    fn maximum_request_bytes(&self) -> Pin<Box<dyn Future<Output = usize> + Send + '_>> {
144        Box::pin(self.with_conn(|conn| conn.maximum_request_bytes()))
145    }
146
147    fn prefetch_maximum_request_bytes(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
148        Box::pin(self.with_conn(|conn| conn.prefetch_maximum_request_bytes()))
149    }
150
151    fn parse_error(&self, error: &[u8]) -> Result<X11Error, ParseError> {
152        // Doesn't block.
153        self.inner.parse_error(error)
154    }
155
156    fn parse_event(&self, event: &[u8]) -> Result<x11rb::protocol::Event, ParseError> {
157        // Doesn't block.
158        self.inner.parse_event(event)
159    }
160
161    fn send_request_with_reply<'this, 'bufs, 'sl, 're, 'future, R>(
162        &'this self,
163        bufs: &'bufs [IoSlice<'sl>],
164        fds: Vec<x11rb_protocol::RawFdContainer>,
165    ) -> Fut<'future, crate::Cookie<'this, Self, R>, ConnectionError>
166    where
167        'this: 'future,
168        'bufs: 'future,
169        'sl: 'future,
170        're: 'future,
171        R: x11rb::x11_utils::TryParse + Send + 're,
172    {
173        let mut buf = Vec::with_capacity(bufs.iter().map(|b| b.len()).sum());
174        for b in bufs {
175            buf.extend_from_slice(b);
176        }
177
178        Box::pin(async move {
179            let res = self
180                .with_conn(move |conn| {
181                    let slices = [IoSlice::new(&buf)];
182                    let cookie = conn.send_request_with_reply::<R>(&slices, fds)?;
183
184                    let sequence = {
185                        let sequence = cookie.sequence_number();
186                        mem::forget(cookie);
187                        sequence
188                    };
189
190                    Ok::<_, ConnectionError>(sequence)
191                })
192                .await?;
193
194            Ok(crate::Cookie::new(self, res))
195        })
196    }
197
198    fn send_request_with_reply_with_fds<'this, 'bufs, 'sl, 're, 'future, R>(
199        &'this self,
200        bufs: &'bufs [IoSlice<'sl>],
201        fds: Vec<x11rb_protocol::RawFdContainer>,
202    ) -> Fut<'future, crate::CookieWithFds<'this, Self, R>, ConnectionError>
203    where
204        'this: 'future,
205        'bufs: 'future,
206        'sl: 'future,
207        're: 'future,
208        R: x11rb::x11_utils::TryParseFd + Send + 're,
209    {
210        let mut buf = Vec::with_capacity(bufs.iter().map(|b| b.len()).sum());
211        for b in bufs {
212            buf.extend_from_slice(b);
213        }
214
215        Box::pin(async move {
216            let res = self
217                .with_conn(move |conn| {
218                    let slices = [IoSlice::new(&buf)];
219                    let cookie = conn.send_request_with_reply_with_fds::<R>(&slices, fds)?;
220
221                    let sequence = {
222                        let sequence = cookie.sequence_number();
223                        mem::forget(cookie);
224                        sequence
225                    };
226
227                    Ok::<_, ConnectionError>(sequence)
228                })
229                .await?;
230
231            Ok(crate::CookieWithFds::new(self, res))
232        })
233    }
234
235    fn send_request_without_reply<'this, 'bufs, 'sl, 'future>(
236        &'this self,
237        bufs: &'bufs [IoSlice<'sl>],
238        fds: Vec<x11rb_protocol::RawFdContainer>,
239    ) -> Fut<'future, crate::VoidCookie<'this, Self>, ConnectionError>
240    where
241        'this: 'future,
242        'bufs: 'future,
243        'sl: 'future,
244    {
245        let mut buf = Vec::with_capacity(bufs.iter().map(|b| b.len()).sum());
246        for b in bufs {
247            buf.extend_from_slice(b);
248        }
249
250        Box::pin(async move {
251            let res = self
252                .with_conn(move |conn| {
253                    let slices = [IoSlice::new(&buf)];
254                    let cookie = conn.send_request_without_reply(&slices, fds)?;
255
256                    let sequence = {
257                        let sequence = cookie.sequence_number();
258                        mem::forget(cookie);
259                        sequence
260                    };
261
262                    Ok::<_, ConnectionError>(sequence)
263                })
264                .await?;
265
266            Ok(crate::VoidCookie::new(self, res))
267        })
268    }
269
270    fn wait_for_reply(
271        &self,
272        sequence: SequenceNumber,
273    ) -> Fut<'_, Option<Self::Buf>, ConnectionError> {
274        Box::pin(self.with_conn(move |conn| conn.wait_for_reply(sequence)))
275    }
276
277    fn wait_for_reply_or_raw_error(
278        &self,
279        sequence: SequenceNumber,
280    ) -> Fut<'_, ReplyOrError<Self::Buf>, ConnectionError> {
281        Box::pin(self.with_conn(move |conn| conn.wait_for_reply_or_raw_error(sequence)))
282    }
283
284    fn wait_for_reply_with_fds_raw(
285        &self,
286        sequence: SequenceNumber,
287    ) -> Fut<'_, ReplyOrError<x11rb::connection::BufWithFds<Self::Buf>, Self::Buf>, ConnectionError>
288    {
289        Box::pin(self.with_conn(move |conn| conn.wait_for_reply_with_fds_raw(sequence)))
290    }
291}
292
293impl<C: BlConnection + Send + Sync + 'static> Connection for BlockingConnection<C> {
294    fn poll_for_raw_event_with_sequence(
295        &self,
296    ) -> Result<Option<x11rb_protocol::RawEventAndSeqNumber<Self::Buf>>, ConnectionError> {
297        // Doesn't block.
298        self.inner.poll_for_raw_event_with_sequence()
299    }
300
301    fn wait_for_raw_event_with_sequence(
302        &self,
303    ) -> Fut<'_, x11rb_protocol::RawEventAndSeqNumber<Self::Buf>, ConnectionError> {
304        Box::pin(self.with_conn(|conn| conn.wait_for_raw_event_with_sequence()))
305    }
306
307    fn generate_id(&self) -> Fut<'_, u32, ReplyOrIdError> {
308        Box::pin(self.with_conn(|conn| conn.generate_id()))
309    }
310
311    fn flush(&self) -> Fut<'_, (), ConnectionError> {
312        Box::pin(self.with_conn(|conn| conn.flush()))
313    }
314
315    fn setup(&self) -> &x11rb::protocol::xproto::Setup {
316        self.inner.setup()
317    }
318}