wl_client/connection/flush.rs
1#[expect(unused_imports)]
2use crate::QueueWatcher;
3use {
4 crate::{
5 Connection,
6 connection::data::ConnectionData2,
7 utils::{
8 executor::Executor,
9 os_error::OsError,
10 poller::{self, Poller},
11 },
12 },
13 parking_lot::Mutex,
14 std::{
15 convert::Infallible,
16 future::poll_fn,
17 io::{self, ErrorKind},
18 sync::Arc,
19 task::{Poll, Waker},
20 },
21};
22
23#[cfg(test)]
24mod tests;
25
26pub(super) struct Flusher {
27 data: Arc<Data1>,
28}
29
30#[derive(Default)]
31struct Data1 {
32 data: Mutex<Data2>,
33}
34
35#[derive(Default)]
36struct Data2 {
37 have_request: bool,
38 last_error: Option<OsError>,
39 waker: Option<Waker>,
40}
41
42impl Flusher {
43 pub(super) fn new(poller: &Poller, executor: &Executor, con: &Arc<ConnectionData2>) -> Self {
44 let slf = Self {
45 data: Default::default(),
46 };
47 let con = con.clone();
48 let poll_data = poller.data.clone();
49 let data = slf.data.clone();
50 executor.add(async move {
51 let res: io::Result<Infallible> = async {
52 loop {
53 while let Err(e) = con.try_flush() {
54 match e.kind() {
55 ErrorKind::WouldBlock => {}
56 ErrorKind::Interrupted => continue,
57 _ => return Err(e),
58 }
59 poller::writable(&poll_data).await?;
60 }
61 poll_fn(|ctx| {
62 let d = &mut *data.data.lock();
63 if d.have_request {
64 d.have_request = false;
65 d.waker = None;
66 Poll::Ready(())
67 } else {
68 d.waker = Some(ctx.waker().clone());
69 Poll::Pending
70 }
71 })
72 .await;
73 }
74 }
75 .await;
76 let err = res.unwrap_err();
77 let d = &mut *data.data.lock();
78 d.last_error = Some(err.into());
79 });
80 slf
81 }
82}
83
84impl Connection {
85 /// Schedules outgoing messages to be sent to the compositor.
86 ///
87 /// This function must be used if the application uses a [`QueueWatcher`] to integrate
88 /// the connection into an event loop. The blocking or async integration methods
89 /// perform a flush automatically.
90 ///
91 /// This function never blocks. It only schedules messages to be flushed on another
92 /// thread.
93 ///
94 /// # Example
95 ///
96 /// ```
97 /// # use std::os::fd::AsRawFd;
98 /// # use mio::Interest;
99 /// # use mio::unix::SourceFd;
100 /// # use wl_client::Libwayland;
101 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
102 /// #
103 /// let lib = Libwayland::open().unwrap();
104 /// let con = lib.connect_to_default_display().unwrap();
105 /// let queue = con.create_queue(c"queue name");
106 /// let watcher = queue.create_watcher().unwrap();
107 /// let token = mio::Token(0);
108 /// let mut events = mio::Events::with_capacity(2);
109 /// let mut poll = mio::Poll::new().unwrap();
110 /// poll
111 /// .registry()
112 /// .register(&mut SourceFd(&watcher.as_raw_fd()), token, Interest::READABLE)
113 /// .unwrap();
114 ///
115 /// // perform requests
116 /// // ...
117 /// # let _sync = queue.display::<WlDisplay>().sync();
118 ///
119 /// // flush the requests
120 /// con.flush().unwrap();
121 ///
122 /// // wait for new events
123 /// poll.poll(&mut events, None).unwrap();
124 /// ```
125 pub fn flush(&self) -> io::Result<()> {
126 let data = &self.data.flusher.data;
127 let d = &mut *data.data.lock();
128 if let Some(err) = d.last_error {
129 return Err(err.into());
130 }
131 d.have_request = true;
132 if let Some(waker) = d.waker.take() {
133 waker.wake();
134 }
135 Ok(())
136 }
137}
138
139impl ConnectionData2 {
140 fn try_flush(&self) -> io::Result<()> {
141 // SAFETY: The display function returns a valid pointer.
142 let ret = unsafe { self.libwayland.wl_display_flush(self.wl_display().as_ptr()) };
143 if ret == -1 {
144 return Err(io::Error::last_os_error());
145 }
146 Ok(())
147 }
148}