wl_data_control_protocol_evt/
ext_data_control_stream.rs1use crate::{
2 ExtDataControlConnectError, ExtDataControlReadError,
3 app_connection::AppConnection,
4 epoll::{Epoll, EpollError, EpollResult},
5 reader::{ReadResult, Reader},
6 rw_stream::{ReaderWriterEvent, ReaderWriterStream},
7 wl_events_stream::WlEventsStream,
8 writer::{WriteResult, Writer},
9};
10use rustix::fs::Timespec;
11use std::{
12 collections::HashMap,
13 os::fd::{AsFd, AsRawFd},
14};
15use wayland_client::backend::WaylandError;
16
17pub struct ExtDataControlStream {
19 connection: AppConnection,
20 rw_stream: ReaderWriterStream,
21
22 epoll: Epoll,
23 epoll_events: Vec<rustix::event::epoll::Event>,
24
25 readers: HashMap<i32, Reader>,
26 writers: HashMap<i32, Writer>,
27}
28
29impl ExtDataControlStream {
30 pub fn new() -> Result<Self, ExtDataControlConnectError> {
36 let connection = AppConnection::connect()?;
37 let epoll = Epoll::new(connection.as_fd())?;
38
39 Ok(Self {
40 connection,
41 rw_stream: ReaderWriterStream::new(),
42
43 epoll,
44 epoll_events: Vec::with_capacity(16),
45
46 readers: HashMap::new(),
47 writers: HashMap::new(),
48 })
49 }
50
51 fn remove_reader(&mut self, fd: i32) -> Result<(), EpollError> {
52 let Some(reader) = self.readers.remove(&fd) else {
53 return Ok(());
54 };
55 self.epoll.delete(reader.as_fd())?;
56 reader.destroy();
57 Ok(())
58 }
59
60 fn remove_writer(&mut self, fd: i32) -> Result<(), EpollError> {
61 let Some(writer) = self.writers.remove(&fd) else {
62 return Ok(());
63 };
64 self.epoll.delete(writer.as_fd())?;
65 Ok(())
66 }
67
68 pub fn offer_text(&mut self, text: impl Into<String>) -> Result<(), WaylandError> {
74 let source = self
75 .connection
76 .offer_text(self.rw_stream.mime_type_mask())?;
77 self.rw_stream.save_offer(text, source);
78 Ok(())
79 }
80
81 fn process_epoll_result(
82 &mut self,
83 epoll_result: EpollResult,
84 events: &mut Vec<ExtDataControlEvent>,
85 ) -> Result<(), ExtDataControlReadError> {
86 let EpollResult {
87 wl_is_readable,
88 readers,
89 writers,
90 } = epoll_result;
91
92 if wl_is_readable {
93 self.read_from_wl_socket_until_blocked(events)?;
94 }
95
96 for fd in readers.dead {
97 self.remove_reader(fd)?;
98 }
99
100 for fd in writers.dead {
101 self.remove_writer(fd)?;
102 }
103
104 for fd in readers.ready {
105 self.read_offer(fd, events)?;
106 }
107
108 for fd in writers.ready {
109 self.write_source(fd)?;
110 }
111
112 Ok(())
113 }
114
115 fn read_from_wl_socket_until_blocked(
116 &mut self,
117 events: &mut Vec<ExtDataControlEvent>,
118 ) -> Result<(), ExtDataControlReadError> {
119 for event in self.rw_stream.read_until_blocked(&mut self.connection)? {
120 match event {
121 ReaderWriterEvent::NewReader(reader) => {
122 self.epoll.register_readable(reader.as_fd())?;
123 self.readers.insert(reader.as_raw_fd(), *reader);
124 }
125 ReaderWriterEvent::NewWriter(writer) => {
126 self.epoll.register_writable(writer.as_fd())?;
127 self.writers.insert(writer.as_raw_fd(), writer);
128 }
129 ReaderWriterEvent::Finished => events.push(ExtDataControlEvent::Finished),
130 }
131 }
132
133 Ok(())
134 }
135
136 fn read_offer(
137 &mut self,
138 fd: i32,
139 events: &mut Vec<ExtDataControlEvent>,
140 ) -> Result<(), ExtDataControlReadError> {
141 if let Some(reader) = self.readers.get_mut(&fd) {
142 match reader.read() {
143 Ok(ReadResult::Done(text)) => {
144 self.remove_reader(fd)?;
145 events.push(ExtDataControlEvent::Received(text));
146 }
147 Ok(ReadResult::Pending) => {}
148 Err(err) => {
149 log::error!("reader {fd:?} returned error {err:?}");
150 self.remove_reader(fd)?;
151 }
152 }
153 }
154
155 Ok(())
156 }
157
158 fn write_source(&mut self, fd: i32) -> Result<(), ExtDataControlReadError> {
159 if let Some(writer) = self.writers.get_mut(&fd) {
160 match writer.write() {
161 Ok(WriteResult::Done) => {
162 self.remove_writer(fd)?;
163 }
164 Ok(WriteResult::Pending) => {}
165 Err(err) => {
166 log::error!("writer {fd:?} returned error {err:?}");
167 self.remove_writer(fd)?;
168 }
169 }
170 }
171 Ok(())
172 }
173
174 fn epoll_wait(
175 &mut self,
176 timeout: Option<&Timespec>,
177 ) -> Result<Option<EpollResult>, ExtDataControlReadError> {
178 let epoll_result = self.epoll.wait(
179 &mut self.epoll_events,
180 timeout,
181 self.connection.as_fd(),
182 &self.readers,
183 &self.writers,
184 )?;
185 self.epoll_events.clear();
186 Ok(epoll_result)
187 }
188
189 const ZERO_TIMEOUT: Option<&Timespec> = Some(&Timespec {
190 tv_sec: 0,
191 tv_nsec: 0,
192 });
193
194 pub fn drain(&mut self) -> Result<Vec<ExtDataControlEvent>, ExtDataControlReadError> {
204 let mut events = vec![];
205
206 while let Some(epoll_result) = self.epoll_wait(Self::ZERO_TIMEOUT)? {
207 self.process_epoll_result(epoll_result, &mut events)?;
208
209 self.connection.queue.flush()?;
210 self.connection
211 .queue
212 .dispatch_pending(&mut WlEventsStream)?;
213 }
214
215 Ok(events)
216 }
217}
218
219impl AsFd for ExtDataControlStream {
220 fn as_fd(&self) -> std::os::unix::prelude::BorrowedFd<'_> {
221 self.epoll.as_fd()
222 }
223}
224
225impl AsRawFd for ExtDataControlStream {
226 fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
227 self.epoll.as_raw_fd()
228 }
229}
230
231impl Drop for ExtDataControlStream {
232 fn drop(&mut self) {
233 for reader in self.readers.values() {
234 reader.destroy();
235 }
236 self.rw_stream.cleanup();
237 self.connection.cleanup_and_flush();
238 }
239}
240
241#[derive(Debug)]
243pub enum ExtDataControlEvent {
244 Received(String),
246 Finished,
249}