wl_client/connection/flush.rs
1#[expect(unused_imports)]
2use crate::QueueWatcher;
3use {
4 crate::{
5 Connection,
6 connection::data::ConnectionData2,
7 utils::{
8 executor::Executor,
9 poller::{self, Poller},
10 },
11 },
12 parking_lot::Mutex,
13 std::{
14 convert::Infallible,
15 future::poll_fn,
16 io::{self, ErrorKind},
17 sync::Arc,
18 task::{Poll, Waker},
19 },
20};
21
22#[cfg(test)]
23mod tests;
24
25pub(super) struct Flusher {
26 data: Arc<Data1>,
27}
28
29#[derive(Default)]
30struct Data1 {
31 data: Mutex<Data2>,
32}
33
34#[derive(Default)]
35struct Data2 {
36 have_request: bool,
37 last_error: Option<ErrorKind>,
38 waker: Option<Waker>,
39}
40
41impl Flusher {
42 pub(super) fn new(poller: &Poller, executor: &Executor, con: &Arc<ConnectionData2>) -> Self {
43 let slf = Self {
44 data: Default::default(),
45 };
46 let con = con.clone();
47 let poll_data = poller.data.clone();
48 let data = slf.data.clone();
49 executor.add(async move {
50 let res: io::Result<Infallible> = async {
51 loop {
52 while let Err(e) = con.try_flush() {
53 match e.kind() {
54 ErrorKind::WouldBlock => {}
55 ErrorKind::Interrupted => continue,
56 _ => return Err(e),
57 }
58 poller::writable(&poll_data).await?;
59 }
60 poll_fn(|ctx| {
61 let d = &mut *data.data.lock();
62 if d.have_request {
63 d.have_request = false;
64 d.waker = None;
65 Poll::Ready(())
66 } else {
67 d.waker = Some(ctx.waker().clone());
68 Poll::Pending
69 }
70 })
71 .await;
72 }
73 }
74 .await;
75 let err = res.unwrap_err();
76 let d = &mut *data.data.lock();
77 d.last_error = Some(err.kind());
78 });
79 slf
80 }
81}
82
83impl Connection {
84 /// Schedules outgoing messages to be sent to the compositor.
85 ///
86 /// This function must be used if the application uses a [`QueueWatcher`] to integrate
87 /// the connection into an event loop. The blocking or async integration methods
88 /// perform a flush automatically.
89 ///
90 /// This function never blocks. It only schedules messages to be flushed on another
91 /// thread.
92 ///
93 /// # Example
94 ///
95 /// ```
96 /// # use std::os::fd::AsRawFd;
97 /// # use mio::Interest;
98 /// # use mio::unix::SourceFd;
99 /// # use wl_client::Libwayland;
100 /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
101 /// #
102 /// let lib = Libwayland::open().unwrap();
103 /// let con = lib.connect_to_default_display().unwrap();
104 /// let queue = con.create_queue(c"queue name");
105 /// let watcher = queue.create_watcher().unwrap();
106 /// let token = mio::Token(0);
107 /// let mut events = mio::Events::with_capacity(2);
108 /// let mut poll = mio::Poll::new().unwrap();
109 /// poll
110 /// .registry()
111 /// .register(&mut SourceFd(&watcher.as_raw_fd()), token, Interest::READABLE)
112 /// .unwrap();
113 ///
114 /// // perform requests
115 /// // ...
116 /// # let _sync = queue.display::<WlDisplay>().sync();
117 ///
118 /// // flush the requests
119 /// con.flush().unwrap();
120 ///
121 /// // wait for new events
122 /// poll.poll(&mut events, None).unwrap();
123 /// ```
124 pub fn flush(&self) -> io::Result<()> {
125 let data = &self.data.flusher.data;
126 let d = &mut *data.data.lock();
127 if let Some(err) = d.last_error {
128 return Err(err.into());
129 }
130 d.have_request = true;
131 if let Some(waker) = d.waker.take() {
132 waker.wake();
133 }
134 Ok(())
135 }
136}
137
138impl ConnectionData2 {
139 fn try_flush(&self) -> io::Result<()> {
140 // SAFETY: The display function returns a valid pointer.
141 let ret = unsafe { self.libwayland.wl_display_flush(self.wl_display().as_ptr()) };
142 if ret == -1 {
143 return Err(io::Error::last_os_error());
144 }
145 Ok(())
146 }
147}