breadx_blocking/
blocking_display.rs

1// MIT/Apache2 License
2
3use super::spawn_blocking;
4use breadx::{
5    display::{AsyncDisplay, Display, DisplayBase, PendingItem, PollOr, RequestInfo, StaticSetup},
6    event::Event,
7    XID,
8};
9use std::{
10    future::Future,
11    num::NonZeroU32,
12    pin::Pin,
13    task::{Context, Poll, Waker},
14};
15
16/// An `AsyncDisplay` that sends operations for the `Display` onto a blocking thread-pool.
17pub struct BlockingDisplay<T> {
18    display: Option<T>,
19
20    // cached futures for wait() and send_request()
21    wait: Option<Pin<Box<dyn Future<Output = (breadx::Result, T)> + Send + Sync + 'static>>>,
22    send_request:
23        Option<Pin<Box<dyn Future<Output = (breadx::Result<u16>, T)> + Send + Sync + 'static>>>,
24
25    // wakers waiting on begin_send_request_info()
26    send_pending_request_wakers: Vec<Waker>,
27}
28
29impl<T> BlockingDisplay<T> {
30    /// Create a new `BlockingDisplay`.
31    #[inline]
32    pub fn new(display: T) -> BlockingDisplay<T> {
33        BlockingDisplay {
34            display: Some(display),
35            wait: None,
36            send_request: None,
37            send_pending_request_wakers: Vec::new(),
38        }
39    }
40
41    /// Get the inner display. This function will need to wait until it can cancel all of the ongoing wait or
42    /// send request operations until it returns.
43    #[inline]
44    pub async fn into_inner(mut self) -> T {
45        if let Some(display) = self.display.take() {
46            display
47        } else if let Some(wait) = self.wait.take() {
48            let (_, display) = wait.await;
49            display
50        } else {
51            let send_request = self
52                .send_request
53                .take()
54                .unwrap_or_else(|| panic!("Invalid state"));
55            let (_, display) = send_request.await;
56            display
57        }
58    }
59
60    /// Get a mutable reference to the inner display. This function will need to wait until it can cancel all of
61    /// the ongoing wait or send request operations until it returns.
62    #[inline]
63    pub async fn get_mut(&mut self) -> &mut T {
64        if let Some(ref mut display) = self.display {
65            return display;
66        }
67
68        if let Some(wait) = self.wait.take() {
69            let (_, display) = wait.await;
70            self.display.insert(display)
71        } else {
72            let send_request = self
73                .send_request
74                .take()
75                .unwrap_or_else(|| panic!("Invalid state"));
76            let (_, display) = send_request.await;
77            self.display.insert(display)
78        }
79    }
80
81    #[inline]
82    fn inner(&self) -> &T {
83        self.display.as_ref().expect("Invalid state")
84    }
85
86    #[inline]
87    fn inner_mut(&mut self) -> &mut T {
88        self.display.as_mut().expect("Invalid state")
89    }
90}
91
92// Note: functions in DisplayBase should be non-blocking, so we can just forward to the inner impl.
93impl<T: DisplayBase> DisplayBase for BlockingDisplay<T> {
94    #[inline]
95    fn setup(&self) -> &StaticSetup {
96        self.inner().setup()
97    }
98
99    #[inline]
100    fn default_screen_index(&self) -> usize {
101        self.inner().default_screen_index()
102    }
103
104    #[inline]
105    fn next_request_number(&mut self) -> u64 {
106        self.inner_mut().next_request_number()
107    }
108
109    #[inline]
110    fn generate_xid(&mut self) -> Option<XID> {
111        self.inner_mut().generate_xid()
112    }
113
114    #[inline]
115    fn add_pending_item(&mut self, req_id: u16, item: PendingItem) {
116        self.inner_mut().add_pending_item(req_id, item)
117    }
118
119    #[inline]
120    fn get_pending_item(&mut self, req_id: u16) -> Option<PendingItem> {
121        self.inner_mut().get_pending_item(req_id)
122    }
123
124    #[inline]
125    fn take_pending_item(&mut self, req_id: u16) -> Option<PendingItem> {
126        self.inner_mut().take_pending_item(req_id)
127    }
128
129    #[inline]
130    fn has_pending_event(&self) -> bool {
131        self.inner().has_pending_event()
132    }
133
134    #[inline]
135    fn push_event(&mut self, event: Event) {
136        self.inner_mut().push_event(event)
137    }
138
139    #[inline]
140    fn pop_event(&mut self) -> Option<Event> {
141        self.inner_mut().pop_event()
142    }
143
144    #[inline]
145    fn create_special_event_queue(&mut self, xid: XID) {
146        self.inner_mut().create_special_event_queue(xid)
147    }
148
149    #[inline]
150    fn push_special_event(&mut self, xid: XID, event: Event) -> Result<(), Event> {
151        self.inner_mut().push_special_event(xid, event)
152    }
153
154    #[inline]
155    fn pop_special_event(&mut self, xid: XID) -> Option<Event> {
156        self.inner_mut().pop_special_event(xid)
157    }
158
159    #[inline]
160    fn delete_special_event_queue(&mut self, xid: XID) {
161        self.inner_mut().delete_special_event_queue(xid)
162    }
163
164    #[inline]
165    fn checked(&self) -> bool {
166        self.inner().checked()
167    }
168
169    #[inline]
170    fn set_checked(&mut self, checked: bool) {
171        self.inner_mut().set_checked(checked)
172    }
173
174    #[inline]
175    fn bigreq_enabled(&self) -> bool {
176        self.inner().bigreq_enabled()
177    }
178
179    #[inline]
180    fn max_request_len(&self) -> usize {
181        self.inner().max_request_len()
182    }
183
184    #[inline]
185    fn get_extension_opcode(&mut self, key: &[u8; 24]) -> Option<u8> {
186        self.inner_mut().get_extension_opcode(key)
187    }
188
189    #[inline]
190    fn set_extension_opcode(&mut self, key: [u8; 24], opcode: u8) {
191        self.inner_mut().set_extension_opcode(key, opcode)
192    }
193
194    #[inline]
195    fn wm_protocols_atom(&self) -> Option<NonZeroU32> {
196        self.inner().wm_protocols_atom()
197    }
198
199    #[inline]
200    fn set_wm_protocols_atom(&mut self, a: NonZeroU32) {
201        self.inner_mut().set_wm_protocols_atom(a)
202    }
203}
204
205impl<T: Display + Send + Sync + 'static> AsyncDisplay for BlockingDisplay<T> {
206    #[inline]
207    fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<breadx::Result> {
208        // start polling for wait if we haven't already
209        let wait = match &mut self.wait {
210            Some(wait) => wait,
211            None => {
212                let mut display = self.display.take().expect("Invalid state");
213                let wait = spawn_blocking(move || {
214                    let res = display.wait();
215                    (res, display)
216                });
217
218                self.wait.insert(Box::pin(wait))
219            }
220        };
221
222        match wait.as_mut().poll(cx) {
223            Poll::Ready((res, display)) => {
224                self.display = Some(display);
225                self.wait = None;
226                Poll::Ready(res)
227            }
228            Poll::Pending => Poll::Pending,
229        }
230    }
231
232    #[inline]
233    fn begin_send_request_raw(
234        &mut self,
235        req: RequestInfo,
236        cx: &mut Context<'_>,
237    ) -> PollOr<(), RequestInfo> {
238        if self.send_request.is_none() {
239            let mut display = self.display.take().expect("Invalid state");
240            let send_request = spawn_blocking(move || {
241                let res = display.send_request_raw(req);
242                (res, display)
243            });
244
245            self.send_request = Some(Box::pin(send_request));
246            PollOr::Ready(())
247        } else {
248            self.send_pending_request_wakers.push(cx.waker().clone());
249            PollOr::Pending(req)
250        }
251    }
252
253    #[inline]
254    fn poll_send_request_raw(&mut self, cx: &mut Context<'_>) -> Poll<breadx::Result<u16>> {
255        let send_request = self.send_request.as_mut().expect("Invalid state");
256
257        match send_request.as_mut().poll(cx) {
258            Poll::Ready((res, display)) => {
259                self.display = Some(display);
260                self.send_request = None;
261                // wake any waker that was waiting on this completing
262                self.send_pending_request_wakers
263                    .drain(..)
264                    .for_each(|waker| waker.wake());
265                Poll::Ready(res)
266            }
267            Poll::Pending => Poll::Pending,
268        }
269    }
270}