wl_client/connection/
flush.rs

1#[expect(unused_imports)]
2use crate::QueueWatcher;
3use {
4    crate::{
5        Connection,
6        connection::data::ConnectionData2,
7        utils::{
8            executor::Executor,
9            poller::{self, Poller},
10        },
11    },
12    parking_lot::Mutex,
13    std::{
14        convert::Infallible,
15        future::poll_fn,
16        io::{self, ErrorKind},
17        sync::Arc,
18        task::{Poll, Waker},
19    },
20};
21
22#[cfg(test)]
23mod tests;
24
25pub(super) struct Flusher {
26    data: Arc<Data1>,
27}
28
29#[derive(Default)]
30struct Data1 {
31    data: Mutex<Data2>,
32}
33
34#[derive(Default)]
35struct Data2 {
36    have_request: bool,
37    last_error: Option<ErrorKind>,
38    waker: Option<Waker>,
39}
40
41impl Flusher {
42    pub(super) fn new(poller: &Poller, executor: &Executor, con: &Arc<ConnectionData2>) -> Self {
43        let slf = Self {
44            data: Default::default(),
45        };
46        let con = con.clone();
47        let poll_data = poller.data.clone();
48        let data = slf.data.clone();
49        executor.add(async move {
50            let res: io::Result<Infallible> = async {
51                loop {
52                    while let Err(e) = con.try_flush() {
53                        match e.kind() {
54                            ErrorKind::WouldBlock => {}
55                            ErrorKind::Interrupted => continue,
56                            _ => return Err(e),
57                        }
58                        poller::writable(&poll_data).await?;
59                    }
60                    poll_fn(|ctx| {
61                        let d = &mut *data.data.lock();
62                        if d.have_request {
63                            d.have_request = false;
64                            d.waker = None;
65                            Poll::Ready(())
66                        } else {
67                            d.waker = Some(ctx.waker().clone());
68                            Poll::Pending
69                        }
70                    })
71                    .await;
72                }
73            }
74            .await;
75            let err = res.unwrap_err();
76            let d = &mut *data.data.lock();
77            d.last_error = Some(err.kind());
78        });
79        slf
80    }
81}
82
83impl Connection {
84    /// Schedules outgoing messages to be sent to the compositor.
85    ///
86    /// This function must be used if the application uses a [`QueueWatcher`] to integrate
87    /// the connection into an event loop. The blocking or async integration methods
88    /// perform a flush automatically.
89    ///
90    /// This function never blocks. It only schedules messages to be flushed on another
91    /// thread.
92    ///
93    /// # Example
94    ///
95    /// ```
96    /// # use std::os::fd::AsRawFd;
97    /// # use mio::Interest;
98    /// # use mio::unix::SourceFd;
99    /// # use wl_client::Libwayland;
100    /// # use wl_client::test_protocols::core::wl_display::WlDisplay;
101    /// #
102    /// let lib = Libwayland::open().unwrap();
103    /// let con = lib.connect_to_default_display().unwrap();
104    /// let queue = con.create_queue(c"queue name");
105    /// let watcher = queue.create_watcher().unwrap();
106    /// let token = mio::Token(0);
107    /// let mut events = mio::Events::with_capacity(2);
108    /// let mut poll = mio::Poll::new().unwrap();
109    /// poll
110    ///     .registry()
111    ///     .register(&mut SourceFd(&watcher.as_raw_fd()), token, Interest::READABLE)
112    ///     .unwrap();
113    ///
114    /// // perform requests
115    /// // ...
116    /// # let _sync = queue.display::<WlDisplay>().sync();
117    ///
118    /// // flush the requests
119    /// con.flush().unwrap();
120    ///
121    /// // wait for new events
122    /// poll.poll(&mut events, None).unwrap();
123    /// ```
124    pub fn flush(&self) -> io::Result<()> {
125        let data = &self.data.flusher.data;
126        let d = &mut *data.data.lock();
127        if let Some(err) = d.last_error {
128            return Err(err.into());
129        }
130        d.have_request = true;
131        if let Some(waker) = d.waker.take() {
132            waker.wake();
133        }
134        Ok(())
135    }
136}
137
138impl ConnectionData2 {
139    fn try_flush(&self) -> io::Result<()> {
140        // SAFETY: The display function returns a valid pointer.
141        let ret = unsafe { self.libwayland.wl_display_flush(self.wl_display().as_ptr()) };
142        if ret == -1 {
143            return Err(io::Error::last_os_error());
144        }
145        Ok(())
146    }
147}