1use crate::connection::{Connection, Fut, RequestConnection};
6use crate::errors::{ConnectError, ConnectionError, ParseError, ReplyOrIdError};
7use crate::x11_utils::X11Error;
8use crate::SequenceNumber;
9
10use std::future::Future;
11use std::io::IoSlice;
12use std::mem;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use x11rb::connection::{Connection as BlConnection, ReplyOrError, RequestKind};
17use x11rb::rust_connection::{RustConnection, Stream};
18
19#[cfg(feature = "allow-unsafe-code")]
20use std::ffi::CStr;
21#[cfg(feature = "allow-unsafe-code")]
22use x11rb::xcb_ffi::XCBConnection;
23
24use x11rb_protocol::DiscardMode;
25
26#[derive(Debug)]
41pub struct BlockingConnection<C> {
42 inner: Arc<C>,
43}
44
45impl<C: BlConnection + Send + Sync + 'static> BlockingConnection<C> {
46 pub fn new(conn: Arc<C>) -> Self {
48 Self { inner: conn }
49 }
50
51 fn with_conn<T, F>(&self, f: F) -> blocking::Task<T>
53 where
54 F: FnOnce(&C) -> T + Send + 'static,
55 T: Send + 'static,
56 {
57 let inner = self.inner.clone();
58 blocking::unblock(move || f(&inner))
59 }
60
61 pub fn get_ref(&self) -> &C {
63 &self.inner
64 }
65}
66
67impl BlockingConnection<RustConnection> {
68 pub async fn connect(display: Option<&str>) -> Result<(Self, usize), ConnectError> {
70 let display = display.map(|s| s.to_string());
71
72 let (inner, screen) =
73 blocking::unblock(move || RustConnection::connect(display.as_deref())).await?;
74
75 Ok((Self::new(Arc::new(inner)), screen))
76 }
77}
78
79impl<S: Stream + Send + Sync + 'static> BlockingConnection<RustConnection<S>> {
80 pub async fn connect_to_stream(stream: S, screen: usize) -> Result<Self, ConnectError> {
82 let inner =
83 blocking::unblock(move || RustConnection::connect_to_stream(stream, screen)).await?;
84
85 Ok(Self::new(Arc::new(inner)))
86 }
87
88 pub async fn connect_to_stream_with_auth_info(
90 stream: S,
91 screen: usize,
92 auth_name: Vec<u8>,
93 auth_data: Vec<u8>,
94 ) -> Result<Self, ConnectError> {
95 let inner = blocking::unblock(move || {
96 RustConnection::connect_to_stream_with_auth_info(stream, screen, auth_name, auth_data)
97 })
98 .await?;
99
100 Ok(Self::new(Arc::new(inner)))
101 }
102}
103
104#[cfg(feature = "allow-unsafe-code")]
105impl BlockingConnection<XCBConnection> {
106 pub async fn connect(display: Option<&CStr>) -> Result<(Self, usize), ConnectError> {
108 let display = display.map(|s| s.to_owned());
109
110 let (inner, screen) =
111 blocking::unblock(move || XCBConnection::connect(display.as_deref())).await?;
112
113 Ok((Self::new(Arc::new(inner)), screen))
114 }
115}
116
117impl<C: BlConnection + Send + Sync + 'static> RequestConnection for BlockingConnection<C> {
118 type Buf = C::Buf;
119
120 fn check_for_raw_error(
121 &self,
122 sequence: SequenceNumber,
123 ) -> Fut<'_, Option<Self::Buf>, ConnectionError> {
124 Box::pin(self.with_conn(move |conn| conn.check_for_raw_error(sequence)))
125 }
126
127 fn discard_reply(&self, sequence: SequenceNumber, kind: RequestKind, mode: DiscardMode) {
128 self.inner.discard_reply(sequence, kind, mode);
130 }
131
132 fn extension_information(
133 &self,
134 name: &'static str,
135 ) -> Fut<'_, Option<x11rb::x11_utils::ExtensionInformation>, ConnectionError> {
136 Box::pin(self.with_conn(move |conn| conn.extension_information(name)))
137 }
138
139 fn prefetch_extension_information(&self, name: &'static str) -> Fut<'_, (), ConnectionError> {
140 Box::pin(self.with_conn(move |conn| conn.prefetch_extension_information(name)))
141 }
142
143 fn maximum_request_bytes(&self) -> Pin<Box<dyn Future<Output = usize> + Send + '_>> {
144 Box::pin(self.with_conn(|conn| conn.maximum_request_bytes()))
145 }
146
147 fn prefetch_maximum_request_bytes(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
148 Box::pin(self.with_conn(|conn| conn.prefetch_maximum_request_bytes()))
149 }
150
151 fn parse_error(&self, error: &[u8]) -> Result<X11Error, ParseError> {
152 self.inner.parse_error(error)
154 }
155
156 fn parse_event(&self, event: &[u8]) -> Result<x11rb::protocol::Event, ParseError> {
157 self.inner.parse_event(event)
159 }
160
161 fn send_request_with_reply<'this, 'bufs, 'sl, 're, 'future, R>(
162 &'this self,
163 bufs: &'bufs [IoSlice<'sl>],
164 fds: Vec<x11rb_protocol::RawFdContainer>,
165 ) -> Fut<'future, crate::Cookie<'this, Self, R>, ConnectionError>
166 where
167 'this: 'future,
168 'bufs: 'future,
169 'sl: 'future,
170 're: 'future,
171 R: x11rb::x11_utils::TryParse + Send + 're,
172 {
173 let mut buf = Vec::with_capacity(bufs.iter().map(|b| b.len()).sum());
174 for b in bufs {
175 buf.extend_from_slice(b);
176 }
177
178 Box::pin(async move {
179 let res = self
180 .with_conn(move |conn| {
181 let slices = [IoSlice::new(&buf)];
182 let cookie = conn.send_request_with_reply::<R>(&slices, fds)?;
183
184 let sequence = {
185 let sequence = cookie.sequence_number();
186 mem::forget(cookie);
187 sequence
188 };
189
190 Ok::<_, ConnectionError>(sequence)
191 })
192 .await?;
193
194 Ok(crate::Cookie::new(self, res))
195 })
196 }
197
198 fn send_request_with_reply_with_fds<'this, 'bufs, 'sl, 're, 'future, R>(
199 &'this self,
200 bufs: &'bufs [IoSlice<'sl>],
201 fds: Vec<x11rb_protocol::RawFdContainer>,
202 ) -> Fut<'future, crate::CookieWithFds<'this, Self, R>, ConnectionError>
203 where
204 'this: 'future,
205 'bufs: 'future,
206 'sl: 'future,
207 're: 'future,
208 R: x11rb::x11_utils::TryParseFd + Send + 're,
209 {
210 let mut buf = Vec::with_capacity(bufs.iter().map(|b| b.len()).sum());
211 for b in bufs {
212 buf.extend_from_slice(b);
213 }
214
215 Box::pin(async move {
216 let res = self
217 .with_conn(move |conn| {
218 let slices = [IoSlice::new(&buf)];
219 let cookie = conn.send_request_with_reply_with_fds::<R>(&slices, fds)?;
220
221 let sequence = {
222 let sequence = cookie.sequence_number();
223 mem::forget(cookie);
224 sequence
225 };
226
227 Ok::<_, ConnectionError>(sequence)
228 })
229 .await?;
230
231 Ok(crate::CookieWithFds::new(self, res))
232 })
233 }
234
235 fn send_request_without_reply<'this, 'bufs, 'sl, 'future>(
236 &'this self,
237 bufs: &'bufs [IoSlice<'sl>],
238 fds: Vec<x11rb_protocol::RawFdContainer>,
239 ) -> Fut<'future, crate::VoidCookie<'this, Self>, ConnectionError>
240 where
241 'this: 'future,
242 'bufs: 'future,
243 'sl: 'future,
244 {
245 let mut buf = Vec::with_capacity(bufs.iter().map(|b| b.len()).sum());
246 for b in bufs {
247 buf.extend_from_slice(b);
248 }
249
250 Box::pin(async move {
251 let res = self
252 .with_conn(move |conn| {
253 let slices = [IoSlice::new(&buf)];
254 let cookie = conn.send_request_without_reply(&slices, fds)?;
255
256 let sequence = {
257 let sequence = cookie.sequence_number();
258 mem::forget(cookie);
259 sequence
260 };
261
262 Ok::<_, ConnectionError>(sequence)
263 })
264 .await?;
265
266 Ok(crate::VoidCookie::new(self, res))
267 })
268 }
269
270 fn wait_for_reply(
271 &self,
272 sequence: SequenceNumber,
273 ) -> Fut<'_, Option<Self::Buf>, ConnectionError> {
274 Box::pin(self.with_conn(move |conn| conn.wait_for_reply(sequence)))
275 }
276
277 fn wait_for_reply_or_raw_error(
278 &self,
279 sequence: SequenceNumber,
280 ) -> Fut<'_, ReplyOrError<Self::Buf>, ConnectionError> {
281 Box::pin(self.with_conn(move |conn| conn.wait_for_reply_or_raw_error(sequence)))
282 }
283
284 fn wait_for_reply_with_fds_raw(
285 &self,
286 sequence: SequenceNumber,
287 ) -> Fut<'_, ReplyOrError<x11rb::connection::BufWithFds<Self::Buf>, Self::Buf>, ConnectionError>
288 {
289 Box::pin(self.with_conn(move |conn| conn.wait_for_reply_with_fds_raw(sequence)))
290 }
291}
292
293impl<C: BlConnection + Send + Sync + 'static> Connection for BlockingConnection<C> {
294 fn poll_for_raw_event_with_sequence(
295 &self,
296 ) -> Result<Option<x11rb_protocol::RawEventAndSeqNumber<Self::Buf>>, ConnectionError> {
297 self.inner.poll_for_raw_event_with_sequence()
299 }
300
301 fn wait_for_raw_event_with_sequence(
302 &self,
303 ) -> Fut<'_, x11rb_protocol::RawEventAndSeqNumber<Self::Buf>, ConnectionError> {
304 Box::pin(self.with_conn(|conn| conn.wait_for_raw_event_with_sequence()))
305 }
306
307 fn generate_id(&self) -> Fut<'_, u32, ReplyOrIdError> {
308 Box::pin(self.with_conn(|conn| conn.generate_id()))
309 }
310
311 fn flush(&self) -> Fut<'_, (), ConnectionError> {
312 Box::pin(self.with_conn(|conn| conn.flush()))
313 }
314
315 fn setup(&self) -> &x11rb::protocol::xproto::Setup {
316 self.inner.setup()
317 }
318}