wl_client/connection/
flush.rs

1#[expect(unused_imports)]
2use crate::QueueWatcher;
3use {
4    crate::{
5        Connection,
6        connection::data::ConnectionData2,
7        utils::{
8            executor::Executor,
9            os_error::OsError,
10            poller::{self, Poller},
11        },
12    },
13    parking_lot::Mutex,
14    std::{
15        convert::Infallible,
16        future::poll_fn,
17        io::{self, ErrorKind},
18        sync::Arc,
19        task::{Poll, Waker},
20    },
21};
22
23#[cfg(test)]
24mod tests;
25
26pub(super) struct Flusher {
27    data: Arc<Data1>,
28}
29
30#[derive(Default)]
31struct Data1 {
32    data: Mutex<Data2>,
33}
34
35#[derive(Default)]
36struct Data2 {
37    have_request: bool,
38    last_error: Option<OsError>,
39    waker: Option<Waker>,
40}
41
42impl Flusher {
43    pub(super) fn new(poller: &Poller, executor: &Executor, con: &Arc<ConnectionData2>) -> Self {
44        let slf = Self {
45            data: Default::default(),
46        };
47        let con = con.clone();
48        let poll_data = poller.data.clone();
49        let data = slf.data.clone();
50        executor.add(async move {
51            let res: io::Result<Infallible> = async {
52                loop {
53                    while let Err(e) = con.try_flush() {
54                        match e.kind() {
55                            ErrorKind::WouldBlock => {}
56                            ErrorKind::Interrupted => continue,
57                            _ => return Err(e),
58                        }
59                        poller::writable(&poll_data).await?;
60                    }
61                    poll_fn(|ctx| {
62                        let d = &mut *data.data.lock();
63                        if d.have_request {
64                            d.have_request = false;
65                            d.waker = None;
66                            Poll::Ready(())
67                        } else {
68                            d.waker = Some(ctx.waker().clone());
69                            Poll::Pending
70                        }
71                    })
72                    .await;
73                }
74            }
75            .await;
76            let err = res.unwrap_err();
77            let d = &mut *data.data.lock();
78            d.last_error = Some(err.into());
79        });
80        slf
81    }
82}
83
84impl Connection {
85    /// Schedules outgoing messages to be sent to the compositor.
86    ///
87    /// This function must be used if the application uses a [`QueueWatcher`] to integrate
88    /// the connection into an event loop. The blocking or async integration methods
89    /// perform a flush automatically.
90    ///
91    /// This function never blocks. It only schedules messages to be flushed on another
92    /// thread.
93    ///
94    /// # Example
95    ///
96    /// ```
97    /// # use std::os::fd::AsRawFd;
98    /// # use mio::Interest;
99    /// # use mio::unix::SourceFd;
100    /// # use wl_client::Libwayland;
101    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
102    /// #
103    /// let lib = Libwayland::open().unwrap();
104    /// let con = lib.connect_to_default_display().unwrap();
105    /// let queue = con.create_queue(c"queue name");
106    /// let watcher = queue.create_watcher().unwrap();
107    /// let token = mio::Token(0);
108    /// let mut events = mio::Events::with_capacity(2);
109    /// let mut poll = mio::Poll::new().unwrap();
110    /// poll
111    ///     .registry()
112    ///     .register(&mut SourceFd(&watcher.as_raw_fd()), token, Interest::READABLE)
113    ///     .unwrap();
114    ///
115    /// // perform requests
116    /// // ...
117    /// # let _sync = queue.display::<WlDisplay>().sync();
118    ///
119    /// // flush the requests
120    /// con.flush().unwrap();
121    ///
122    /// // wait for new events
123    /// poll.poll(&mut events, None).unwrap();
124    /// ```
125    pub fn flush(&self) -> io::Result<()> {
126        let data = &self.data.flusher.data;
127        let d = &mut *data.data.lock();
128        if let Some(err) = d.last_error {
129            return Err(err.into());
130        }
131        d.have_request = true;
132        if let Some(waker) = d.waker.take() {
133            waker.wake();
134        }
135        Ok(())
136    }
137}
138
139impl ConnectionData2 {
140    fn try_flush(&self) -> io::Result<()> {
141        // SAFETY: The display function returns a valid pointer.
142        let ret = unsafe { self.libwayland.wl_display_flush(self.wl_display().as_ptr()) };
143        if ret == -1 {
144            return Err(io::Error::last_os_error());
145        }
146        Ok(())
147    }
148}