1use linux_cec_sys::structs::{cec_caps, cec_msg};
7use nix::poll::PollTimeout;
8use std::ffi::OsString;
9use std::fs::File;
10use std::panic::resume_unwind;
11use std::path::Path;
12use std::sync::mpsc::{channel, Receiver, RecvError, SendError, Sender};
13use std::thread::{self, JoinHandle};
14use tokio::fs::OpenOptions;
15use tokio::sync::oneshot;
16
17use crate::device::{
18 Capabilities, ConnectorInfo, Device, DevicePoller, Envelope, PollResult, PollStatus,
19};
20use crate::message::{Message, Opcode};
21use crate::operand::UiCommand;
22use crate::{
23 Error, FollowerMode, InitiatorMode, LogicalAddress, LogicalAddressType, PhysicalAddress,
24 Result, Timeout, VendorId,
25};
26
27macro_rules! relay {
28 ($self:expr, $message:ident) => {{
29 let (tx, rx) = oneshot::channel();
30 $self.tx
31 .send(DeviceCommand::$message(tx))?;
32 rx.await?
33 }};
34
35 ($self:expr, $message:ident => $($args:expr),*) => {{
36 let (tx, rx) = oneshot::channel();
37 $self.tx
38 .send(DeviceCommand::$message($($args,)* tx))?;
39 rx.await?
40 }};
41}
42
43type ResultChannel<T> = oneshot::Sender<Result<T>>;
44
45enum DeviceCommand {
46 Drop,
47 GetPoller(ResultChannel<AsyncDevicePoller>),
48 SetBlocking(bool, ResultChannel<()>),
49 GetInitiatorMode(ResultChannel<InitiatorMode>),
50 SetInitiatorMode(InitiatorMode, ResultChannel<()>),
51 GetFollowerMode(ResultChannel<FollowerMode>),
52 SetFollowerMode(FollowerMode, ResultChannel<()>),
53 GetRawCapabilities(ResultChannel<cec_caps>),
54 GetCapabilities(ResultChannel<Capabilities>),
55 GetDriverName(ResultChannel<OsString>),
56 GetAdapterName(ResultChannel<OsString>),
57 GetPhysicalAddress(ResultChannel<PhysicalAddress>),
58 SetPhysicalAddress(PhysicalAddress, ResultChannel<()>),
59 GetLogicalAddresses(ResultChannel<Vec<LogicalAddress>>),
60 SetLogicalAddresses(Vec<LogicalAddressType>, ResultChannel<()>),
61 SetLogicalAddress(LogicalAddressType, ResultChannel<()>),
62 ClearLogicalAddresses(ResultChannel<()>),
63 GetOsdName(ResultChannel<OsString>),
64 SetOsdName(String, ResultChannel<()>),
65 GetVendorId(ResultChannel<Option<VendorId>>),
66 SetVendorId(Option<VendorId>, ResultChannel<()>),
67 TransmitMessage(Message, LogicalAddress, ResultChannel<u32>),
68 TransmitRawMessage(cec_msg, ResultChannel<cec_msg>),
69 TransmitReceiveMessage(
70 Message,
71 LogicalAddress,
72 Opcode,
73 Timeout,
74 ResultChannel<Envelope>,
75 ),
76 ReceiveMessage(Timeout, ResultChannel<Envelope>),
77 ReceiveRawMessage(u32, ResultChannel<cec_msg>),
78 PollAddress(LogicalAddress, ResultChannel<()>),
79 HandleStatus(PollStatus, ResultChannel<Vec<PollResult>>),
80 GetConnectorInfo(ResultChannel<ConnectorInfo>),
81 SetActiveSource(Option<PhysicalAddress>, ResultChannel<()>),
82 Wake(bool, bool, ResultChannel<()>),
83 Standby(LogicalAddress, ResultChannel<()>),
84 PressUserControl(UiCommand, LogicalAddress, ResultChannel<()>),
85 ReleaseUserControl(LogicalAddress, ResultChannel<()>),
86}
87
88#[derive(Debug)]
93pub struct AsyncDevice {
94 thread: Option<JoinHandle<Result<()>>>,
95 tx: Sender<DeviceCommand>,
96}
97
98#[derive(Debug)]
99pub struct AsyncDevicePoller {
100 thread: Option<JoinHandle<Result<()>>>,
101 tx: Sender<PollerCommand>,
102}
103
104enum PollerCommand {
105 Drop,
106 Poll(PollTimeout, ResultChannel<PollStatus>),
107}
108
109struct DeviceThread {
110 device: Device,
111 rx: Receiver<DeviceCommand>,
112}
113
114impl AsyncDevice {
115 pub async fn open(path: impl AsRef<Path>) -> Result<AsyncDevice> {
116 let file = OpenOptions::new()
117 .read(true)
118 .write(true)
119 .create(false)
120 .open(path)
121 .await?;
122
123 let file = file.into_std().await;
124 AsyncDevice::try_from(file)
125 }
126
127 pub async fn set_blocking(&self, blocking: bool) -> Result<()> {
128 relay! { self, SetBlocking => blocking }
129 }
130
131 pub async fn get_poller(&self) -> Result<AsyncDevicePoller> {
132 relay! { self, GetPoller }
133 }
134
135 pub async fn poll(&self, timeout: PollTimeout) -> Result<Vec<PollResult>> {
136 let poller = self.get_poller().await?;
137 let status = poller.poll(timeout).await?;
138 self.handle_status(status).await
139 }
140
141 pub async fn get_initiator_mode(&self) -> Result<InitiatorMode> {
142 relay! { self, GetInitiatorMode }
143 }
144
145 pub async fn set_initiator_mode(&self, mode: InitiatorMode) -> Result<()> {
146 relay! { self, SetInitiatorMode => mode }
147 }
148
149 pub async fn get_follower_mode(&self) -> Result<FollowerMode> {
150 relay! { self, GetFollowerMode }
151 }
152
153 pub async fn set_follower_mode(&self, mode: FollowerMode) -> Result<()> {
154 relay! { self, SetFollowerMode => mode }
155 }
156
157 pub async fn get_capabilities(&self) -> Result<Capabilities> {
158 relay! { self, GetCapabilities }
159 }
160
161 pub async fn get_raw_capabilities(&self) -> Result<cec_caps> {
162 relay! { self, GetRawCapabilities }
163 }
164
165 pub async fn get_driver_name(&self) -> Result<OsString> {
166 relay! { self, GetDriverName }
167 }
168
169 pub async fn get_adapter_name(&self) -> Result<OsString> {
170 relay! { self, GetAdapterName }
171 }
172
173 pub async fn get_physical_address(&self) -> Result<PhysicalAddress> {
174 relay! { self, GetPhysicalAddress }
175 }
176
177 pub async fn set_physical_address(&self, phys_addr: PhysicalAddress) -> Result<()> {
178 relay! { self, SetPhysicalAddress => phys_addr }
179 }
180
181 pub async fn get_logical_addresses(&self) -> Result<Vec<LogicalAddress>> {
182 relay! { self, GetLogicalAddresses }
183 }
184
185 pub async fn set_logical_addresses(&self, log_addrs: &[LogicalAddressType]) -> Result<()> {
186 relay! { self, SetLogicalAddresses => Vec::from(log_addrs) }
187 }
188
189 pub async fn set_logical_address(&self, log_addr: LogicalAddressType) -> Result<()> {
190 relay! { self, SetLogicalAddress => log_addr }
191 }
192
193 pub async fn clear_logical_addresses(&self) -> Result<()> {
194 relay! { self, ClearLogicalAddresses }
195 }
196
197 pub async fn get_osd_name(&self) -> Result<OsString> {
198 relay! { self, GetOsdName }
199 }
200
201 pub async fn set_osd_name(&self, name: &str) -> Result<()> {
202 relay! { self, SetOsdName => name.to_string() }
203 }
204
205 pub async fn get_vendor_id(&self) -> Result<Option<VendorId>> {
206 relay! { self, GetVendorId }
207 }
208
209 pub async fn set_vendor_id(&self, vendor_id: Option<VendorId>) -> Result<()> {
210 relay! { self, SetVendorId => vendor_id }
211 }
212
213 pub async fn tx_message(&self, message: &Message, destination: LogicalAddress) -> Result<u32> {
214 relay! { self, TransmitMessage => *message, destination }
215 }
216
217 pub async fn tx_raw_message(&self, message: &mut cec_msg) -> Result<()> {
218 let new_message = relay! { self, TransmitRawMessage => message.clone() }?;
219 *message = new_message;
220 Ok(())
221 }
222
223 pub async fn tx_rx_message(
224 &self,
225 message: &Message,
226 destination: LogicalAddress,
227 opcode: Opcode,
228 timeout: Timeout,
229 ) -> Result<Envelope> {
230 relay! { self, TransmitReceiveMessage => *message, destination, opcode, timeout }
231 }
232
233 pub async fn rx_message(&self, timeout: Timeout) -> Result<Envelope> {
234 relay! { self, ReceiveMessage => timeout }
235 }
236
237 pub async fn rx_raw_message(&self, timeout: u32) -> Result<cec_msg> {
238 relay! { self, ReceiveRawMessage => timeout }
239 }
240
241 pub async fn poll_address(&self, destination: LogicalAddress) -> Result<()> {
242 relay! { self, PollAddress => destination }
243 }
244
245 pub async fn handle_status(&self, status: PollStatus) -> Result<Vec<PollResult>> {
246 relay! { self, HandleStatus => status }
247 }
248
249 pub async fn get_connector_info(&self) -> Result<ConnectorInfo> {
250 relay! { self, GetConnectorInfo }
251 }
252
253 pub async fn set_active_source(&self, address: Option<PhysicalAddress>) -> Result<()> {
254 relay! { self, SetActiveSource => address }
255 }
256
257 pub async fn wake(&self, set_active: bool, text_view: bool) -> Result<()> {
258 relay! { self, Wake => set_active, text_view }
259 }
260
261 pub async fn standby(&self, target: LogicalAddress) -> Result<()> {
262 relay! { self, Standby => target }
263 }
264
265 pub async fn press_user_control(
266 &self,
267 ui_command: UiCommand,
268 target: LogicalAddress,
269 ) -> Result<()> {
270 relay! { self, PressUserControl => ui_command, target }
271 }
272
273 pub async fn release_user_control(&self, target: LogicalAddress) -> Result<()> {
274 relay! { self, ReleaseUserControl => target }
275 }
276
277 pub async fn close(mut self) -> Result<()> {
280 self.tx.send(DeviceCommand::Drop)?;
281 let Some(thread): Option<JoinHandle<Result<()>>> = self.thread.take() else {
282 return Ok(());
283 };
284 match thread.join() {
285 Ok(r) => r,
286 Err(e) => resume_unwind(e),
287 }
288 }
289}
290
291impl From<Device> for AsyncDevice {
292 fn from(device: Device) -> AsyncDevice {
293 let (tx, rx) = channel();
294 let mut thread = DeviceThread { device, rx };
295
296 let thread = thread::spawn(move || thread.run());
297 AsyncDevice {
298 thread: Some(thread),
299 tx,
300 }
301 }
302}
303
304impl TryFrom<File> for AsyncDevice {
305 type Error = Error;
306
307 fn try_from(file: File) -> Result<AsyncDevice> {
308 let device = Device::try_from(file)?;
309 Ok(AsyncDevice::from(device))
310 }
311}
312
313impl Drop for AsyncDevice {
314 fn drop(&mut self) {
315 let Some(thread) = self.thread.take() else {
316 return;
317 };
318 let _ = self.tx.send(DeviceCommand::Drop);
319 let _ = thread.join();
320 }
321}
322
323impl DeviceThread {
324 fn run(&mut self) -> Result<()> {
325 loop {
326 match self.rx.recv()? {
327 DeviceCommand::Drop => break,
328 DeviceCommand::GetPoller(tx) => {
329 let _ = tx.send(self.device.get_poller().map(AsyncDevicePoller::from));
330 }
331 DeviceCommand::SetBlocking(block, tx) => {
332 let _ = tx.send(self.device.set_blocking(block));
333 }
334 DeviceCommand::GetInitiatorMode(tx) => {
335 let _ = tx.send(self.device.get_initiator_mode());
336 }
337 DeviceCommand::SetInitiatorMode(mode, tx) => {
338 let _ = tx.send(self.device.set_initiator_mode(mode));
339 }
340 DeviceCommand::GetFollowerMode(tx) => {
341 let _ = tx.send(self.device.get_follower_mode());
342 }
343 DeviceCommand::SetFollowerMode(mode, tx) => {
344 let _ = tx.send(self.device.set_follower_mode(mode));
345 }
346 DeviceCommand::GetCapabilities(tx) => {
347 let _ = tx.send(self.device.get_capabilities());
348 }
349 DeviceCommand::GetRawCapabilities(tx) => {
350 let _ = tx.send(self.device.get_raw_capabilities());
351 }
352 DeviceCommand::GetDriverName(tx) => {
353 let _ = tx.send(self.device.get_driver_name());
354 }
355 DeviceCommand::GetAdapterName(tx) => {
356 let _ = tx.send(self.device.get_adapter_name());
357 }
358 DeviceCommand::GetPhysicalAddress(tx) => {
359 let _ = tx.send(self.device.get_physical_address());
360 }
361 DeviceCommand::SetPhysicalAddress(phys_addr, tx) => {
362 let _ = tx.send(self.device.set_physical_address(phys_addr));
363 }
364 DeviceCommand::GetLogicalAddresses(tx) => {
365 let _ = tx.send(self.device.get_logical_addresses());
366 }
367 DeviceCommand::SetLogicalAddresses(log_addrs, tx) => {
368 let _ = tx.send(self.device.set_logical_addresses(&log_addrs));
369 }
370 DeviceCommand::SetLogicalAddress(log_addr, tx) => {
371 let _ = tx.send(self.device.set_logical_address(log_addr));
372 }
373 DeviceCommand::ClearLogicalAddresses(tx) => {
374 let _ = tx.send(self.device.clear_logical_addresses());
375 }
376 DeviceCommand::GetOsdName(tx) => {
377 let _ = tx.send(self.device.get_osd_name());
378 }
379 DeviceCommand::SetOsdName(name, tx) => {
380 let _ = tx.send(self.device.set_osd_name(&name));
381 }
382 DeviceCommand::GetVendorId(tx) => {
383 let _ = tx.send(self.device.get_vendor_id());
384 }
385 DeviceCommand::SetVendorId(vendor_id, tx) => {
386 let _ = tx.send(self.device.set_vendor_id(vendor_id));
387 }
388 DeviceCommand::TransmitMessage(message, dest, tx) => {
389 let _ = tx.send(self.device.tx_message(&message, dest));
390 }
391 DeviceCommand::TransmitRawMessage(msg, tx) => {
392 let mut msg = msg.clone();
393 let _ = tx.send(self.device.tx_raw_message(&mut msg).and(Ok(msg)));
394 }
395 DeviceCommand::TransmitReceiveMessage(message, dest, opcode, timeout, tx) => {
396 let _ = tx.send(self.device.tx_rx_message(&message, dest, opcode, timeout));
397 }
398 DeviceCommand::ReceiveMessage(timeout, tx) => {
399 let _ = tx.send(self.device.rx_message(timeout));
400 }
401 DeviceCommand::ReceiveRawMessage(timeout, tx) => {
402 let _ = tx.send(self.device.rx_raw_message(timeout));
403 }
404 DeviceCommand::PollAddress(dest, tx) => {
405 let _ = tx.send(self.device.poll_address(dest));
406 }
407 DeviceCommand::HandleStatus(status, tx) => {
408 let _ = tx.send(self.device.handle_status(status));
409 }
410 DeviceCommand::GetConnectorInfo(tx) => {
411 let _ = tx.send(self.device.get_connector_info());
412 }
413 DeviceCommand::SetActiveSource(address, tx) => {
414 let _ = tx.send(self.device.set_active_source(address));
415 }
416 DeviceCommand::Wake(set_active, text_view, tx) => {
417 let _ = tx.send(self.device.wake(set_active, text_view));
418 }
419 DeviceCommand::Standby(target, tx) => {
420 let _ = tx.send(self.device.standby(target));
421 }
422 DeviceCommand::PressUserControl(ui_command, target, tx) => {
423 let _ = tx.send(self.device.press_user_control(ui_command, target));
424 }
425 DeviceCommand::ReleaseUserControl(target, tx) => {
426 let _ = tx.send(self.device.release_user_control(target));
427 }
428 }
429 }
430 Ok(())
431 }
432}
433
434impl From<RecvError> for Error {
435 fn from(error: RecvError) -> Error {
436 Error::UnknownError(error.to_string())
437 }
438}
439
440impl<T> From<SendError<T>> for Error {
441 fn from(error: SendError<T>) -> Error {
442 Error::UnknownError(error.to_string())
443 }
444}
445
446impl From<oneshot::error::RecvError> for Error {
447 fn from(error: oneshot::error::RecvError) -> Error {
448 Error::UnknownError(error.to_string())
449 }
450}
451
452impl From<DevicePoller> for AsyncDevicePoller {
453 fn from(poller: DevicePoller) -> AsyncDevicePoller {
454 let (tx, rx) = channel();
455
456 let thread = thread::spawn(move || {
457 loop {
458 match rx.recv()? {
459 PollerCommand::Drop => break,
460 PollerCommand::Poll(timeout, tx) => {
461 let _ = tx.send(poller.poll(timeout));
462 }
463 }
464 }
465 Ok(())
466 });
467 AsyncDevicePoller {
468 thread: Some(thread),
469 tx,
470 }
471 }
472}
473
474impl AsyncDevicePoller {
475 pub async fn poll(&self, timeout: PollTimeout) -> Result<PollStatus> {
476 let (tx, rx) = oneshot::channel();
477 self.tx.send(PollerCommand::Poll(timeout, tx))?;
478 rx.await?
479 }
480}
481
482impl Drop for AsyncDevicePoller {
483 fn drop(&mut self) {
484 let _ = self.tx.send(PollerCommand::Drop);
485 let _ = self.thread.take().unwrap().join();
486 }
487}