Skip to main content

linux_cec/
async_support.rs

1/*
2 * Copyright © 2024 Valve Software
3 * SPDX-License-Identifier: LGPL-2.1-or-later
4 */
5
6use linux_cec_sys::structs::{cec_caps, cec_msg};
7use nix::poll::PollTimeout;
8use std::ffi::OsString;
9use std::fs::File;
10use std::panic::resume_unwind;
11use std::path::Path;
12use std::sync::mpsc::{channel, Receiver, RecvError, SendError, Sender};
13use std::thread::{self, JoinHandle};
14use tokio::fs::OpenOptions;
15use tokio::sync::oneshot;
16
17use crate::device::{
18    Capabilities, ConnectorInfo, Device, DevicePoller, Envelope, PollResult, PollStatus,
19};
20use crate::message::{Message, Opcode};
21use crate::operand::UiCommand;
22use crate::{
23    Error, FollowerMode, InitiatorMode, LogicalAddress, LogicalAddressType, PhysicalAddress,
24    Result, Timeout, VendorId,
25};
26
27macro_rules! relay {
28    ($self:expr, $message:ident) => {{
29        let (tx, rx) = oneshot::channel();
30        $self.tx
31            .send(DeviceCommand::$message(tx))?;
32        rx.await?
33    }};
34
35    ($self:expr, $message:ident => $($args:expr),*) => {{
36        let (tx, rx) = oneshot::channel();
37        $self.tx
38            .send(DeviceCommand::$message($($args,)* tx))?;
39        rx.await?
40    }};
41}
42
43type ResultChannel<T> = oneshot::Sender<Result<T>>;
44
45enum DeviceCommand {
46    Drop,
47    GetPoller(ResultChannel<AsyncDevicePoller>),
48    SetBlocking(bool, ResultChannel<()>),
49    GetInitiatorMode(ResultChannel<InitiatorMode>),
50    SetInitiatorMode(InitiatorMode, ResultChannel<()>),
51    GetFollowerMode(ResultChannel<FollowerMode>),
52    SetFollowerMode(FollowerMode, ResultChannel<()>),
53    GetRawCapabilities(ResultChannel<cec_caps>),
54    GetCapabilities(ResultChannel<Capabilities>),
55    GetDriverName(ResultChannel<OsString>),
56    GetAdapterName(ResultChannel<OsString>),
57    GetPhysicalAddress(ResultChannel<PhysicalAddress>),
58    SetPhysicalAddress(PhysicalAddress, ResultChannel<()>),
59    GetLogicalAddresses(ResultChannel<Vec<LogicalAddress>>),
60    SetLogicalAddresses(Vec<LogicalAddressType>, ResultChannel<()>),
61    SetLogicalAddress(LogicalAddressType, ResultChannel<()>),
62    ClearLogicalAddresses(ResultChannel<()>),
63    GetOsdName(ResultChannel<OsString>),
64    SetOsdName(String, ResultChannel<()>),
65    GetVendorId(ResultChannel<Option<VendorId>>),
66    SetVendorId(Option<VendorId>, ResultChannel<()>),
67    TransmitMessage(Message, LogicalAddress, ResultChannel<u32>),
68    TransmitRawMessage(cec_msg, ResultChannel<cec_msg>),
69    TransmitReceiveMessage(
70        Message,
71        LogicalAddress,
72        Opcode,
73        Timeout,
74        ResultChannel<Envelope>,
75    ),
76    ReceiveMessage(Timeout, ResultChannel<Envelope>),
77    ReceiveRawMessage(u32, ResultChannel<cec_msg>),
78    PollAddress(LogicalAddress, ResultChannel<()>),
79    HandleStatus(PollStatus, ResultChannel<Vec<PollResult>>),
80    GetConnectorInfo(ResultChannel<ConnectorInfo>),
81    SetActiveSource(Option<PhysicalAddress>, ResultChannel<()>),
82    Wake(bool, bool, ResultChannel<()>),
83    Standby(LogicalAddress, ResultChannel<()>),
84    PressUserControl(UiCommand, LogicalAddress, ResultChannel<()>),
85    ReleaseUserControl(LogicalAddress, ResultChannel<()>),
86}
87
88/// An asynchronous version of [`Device`].
89///
90/// As this is simply an asynchronous interface for the standard [`Device`] struct,
91/// refer to the documentation there for information about the individual methods.
92#[derive(Debug)]
93pub struct AsyncDevice {
94    thread: Option<JoinHandle<Result<()>>>,
95    tx: Sender<DeviceCommand>,
96}
97
98#[derive(Debug)]
99pub struct AsyncDevicePoller {
100    thread: Option<JoinHandle<Result<()>>>,
101    tx: Sender<PollerCommand>,
102}
103
104enum PollerCommand {
105    Drop,
106    Poll(PollTimeout, ResultChannel<PollStatus>),
107}
108
109struct DeviceThread {
110    device: Device,
111    rx: Receiver<DeviceCommand>,
112}
113
114impl AsyncDevice {
115    pub async fn open(path: impl AsRef<Path>) -> Result<AsyncDevice> {
116        let file = OpenOptions::new()
117            .read(true)
118            .write(true)
119            .create(false)
120            .open(path)
121            .await?;
122
123        let file = file.into_std().await;
124        AsyncDevice::try_from(file)
125    }
126
127    pub async fn set_blocking(&self, blocking: bool) -> Result<()> {
128        relay! { self, SetBlocking => blocking }
129    }
130
131    pub async fn get_poller(&self) -> Result<AsyncDevicePoller> {
132        relay! { self, GetPoller }
133    }
134
135    pub async fn poll(&self, timeout: PollTimeout) -> Result<Vec<PollResult>> {
136        let poller = self.get_poller().await?;
137        let status = poller.poll(timeout).await?;
138        self.handle_status(status).await
139    }
140
141    pub async fn get_initiator_mode(&self) -> Result<InitiatorMode> {
142        relay! { self, GetInitiatorMode }
143    }
144
145    pub async fn set_initiator_mode(&self, mode: InitiatorMode) -> Result<()> {
146        relay! { self, SetInitiatorMode => mode }
147    }
148
149    pub async fn get_follower_mode(&self) -> Result<FollowerMode> {
150        relay! { self, GetFollowerMode }
151    }
152
153    pub async fn set_follower_mode(&self, mode: FollowerMode) -> Result<()> {
154        relay! { self, SetFollowerMode => mode }
155    }
156
157    pub async fn get_capabilities(&self) -> Result<Capabilities> {
158        relay! { self, GetCapabilities }
159    }
160
161    pub async fn get_raw_capabilities(&self) -> Result<cec_caps> {
162        relay! { self, GetRawCapabilities }
163    }
164
165    pub async fn get_driver_name(&self) -> Result<OsString> {
166        relay! { self, GetDriverName }
167    }
168
169    pub async fn get_adapter_name(&self) -> Result<OsString> {
170        relay! { self, GetAdapterName }
171    }
172
173    pub async fn get_physical_address(&self) -> Result<PhysicalAddress> {
174        relay! { self, GetPhysicalAddress }
175    }
176
177    pub async fn set_physical_address(&self, phys_addr: PhysicalAddress) -> Result<()> {
178        relay! { self, SetPhysicalAddress => phys_addr }
179    }
180
181    pub async fn get_logical_addresses(&self) -> Result<Vec<LogicalAddress>> {
182        relay! { self, GetLogicalAddresses }
183    }
184
185    pub async fn set_logical_addresses(&self, log_addrs: &[LogicalAddressType]) -> Result<()> {
186        relay! { self, SetLogicalAddresses => Vec::from(log_addrs) }
187    }
188
189    pub async fn set_logical_address(&self, log_addr: LogicalAddressType) -> Result<()> {
190        relay! { self, SetLogicalAddress => log_addr }
191    }
192
193    pub async fn clear_logical_addresses(&self) -> Result<()> {
194        relay! { self, ClearLogicalAddresses }
195    }
196
197    pub async fn get_osd_name(&self) -> Result<OsString> {
198        relay! { self, GetOsdName }
199    }
200
201    pub async fn set_osd_name(&self, name: &str) -> Result<()> {
202        relay! { self, SetOsdName => name.to_string() }
203    }
204
205    pub async fn get_vendor_id(&self) -> Result<Option<VendorId>> {
206        relay! { self, GetVendorId }
207    }
208
209    pub async fn set_vendor_id(&self, vendor_id: Option<VendorId>) -> Result<()> {
210        relay! { self, SetVendorId => vendor_id }
211    }
212
213    pub async fn tx_message(&self, message: &Message, destination: LogicalAddress) -> Result<u32> {
214        relay! { self, TransmitMessage => *message, destination }
215    }
216
217    pub async fn tx_raw_message(&self, message: &mut cec_msg) -> Result<()> {
218        let new_message = relay! { self, TransmitRawMessage => message.clone() }?;
219        *message = new_message;
220        Ok(())
221    }
222
223    pub async fn tx_rx_message(
224        &self,
225        message: &Message,
226        destination: LogicalAddress,
227        opcode: Opcode,
228        timeout: Timeout,
229    ) -> Result<Envelope> {
230        relay! { self, TransmitReceiveMessage => *message, destination, opcode, timeout }
231    }
232
233    pub async fn rx_message(&self, timeout: Timeout) -> Result<Envelope> {
234        relay! { self, ReceiveMessage => timeout }
235    }
236
237    pub async fn rx_raw_message(&self, timeout: u32) -> Result<cec_msg> {
238        relay! { self, ReceiveRawMessage => timeout }
239    }
240
241    pub async fn poll_address(&self, destination: LogicalAddress) -> Result<()> {
242        relay! { self, PollAddress => destination }
243    }
244
245    pub async fn handle_status(&self, status: PollStatus) -> Result<Vec<PollResult>> {
246        relay! { self, HandleStatus => status }
247    }
248
249    pub async fn get_connector_info(&self) -> Result<ConnectorInfo> {
250        relay! { self, GetConnectorInfo }
251    }
252
253    pub async fn set_active_source(&self, address: Option<PhysicalAddress>) -> Result<()> {
254        relay! { self, SetActiveSource => address }
255    }
256
257    pub async fn wake(&self, set_active: bool, text_view: bool) -> Result<()> {
258        relay! { self, Wake => set_active, text_view }
259    }
260
261    pub async fn standby(&self, target: LogicalAddress) -> Result<()> {
262        relay! { self, Standby => target }
263    }
264
265    pub async fn press_user_control(
266        &self,
267        ui_command: UiCommand,
268        target: LogicalAddress,
269    ) -> Result<()> {
270        relay! { self, PressUserControl => ui_command, target }
271    }
272
273    pub async fn release_user_control(&self, target: LogicalAddress) -> Result<()> {
274        relay! { self, ReleaseUserControl => target }
275    }
276
277    /// Clean up the underlying [`Device`] struct. This is safer than just dropping it,
278    /// as you can handle any errors that may occur.
279    pub async fn close(mut self) -> Result<()> {
280        self.tx.send(DeviceCommand::Drop)?;
281        let Some(thread): Option<JoinHandle<Result<()>>> = self.thread.take() else {
282            return Ok(());
283        };
284        match thread.join() {
285            Ok(r) => r,
286            Err(e) => resume_unwind(e),
287        }
288    }
289}
290
291impl From<Device> for AsyncDevice {
292    fn from(device: Device) -> AsyncDevice {
293        let (tx, rx) = channel();
294        let mut thread = DeviceThread { device, rx };
295
296        let thread = thread::spawn(move || thread.run());
297        AsyncDevice {
298            thread: Some(thread),
299            tx,
300        }
301    }
302}
303
304impl TryFrom<File> for AsyncDevice {
305    type Error = Error;
306
307    fn try_from(file: File) -> Result<AsyncDevice> {
308        let device = Device::try_from(file)?;
309        Ok(AsyncDevice::from(device))
310    }
311}
312
313impl Drop for AsyncDevice {
314    fn drop(&mut self) {
315        let Some(thread) = self.thread.take() else {
316            return;
317        };
318        let _ = self.tx.send(DeviceCommand::Drop);
319        let _ = thread.join();
320    }
321}
322
323impl DeviceThread {
324    fn run(&mut self) -> Result<()> {
325        loop {
326            match self.rx.recv()? {
327                DeviceCommand::Drop => break,
328                DeviceCommand::GetPoller(tx) => {
329                    let _ = tx.send(self.device.get_poller().map(AsyncDevicePoller::from));
330                }
331                DeviceCommand::SetBlocking(block, tx) => {
332                    let _ = tx.send(self.device.set_blocking(block));
333                }
334                DeviceCommand::GetInitiatorMode(tx) => {
335                    let _ = tx.send(self.device.get_initiator_mode());
336                }
337                DeviceCommand::SetInitiatorMode(mode, tx) => {
338                    let _ = tx.send(self.device.set_initiator_mode(mode));
339                }
340                DeviceCommand::GetFollowerMode(tx) => {
341                    let _ = tx.send(self.device.get_follower_mode());
342                }
343                DeviceCommand::SetFollowerMode(mode, tx) => {
344                    let _ = tx.send(self.device.set_follower_mode(mode));
345                }
346                DeviceCommand::GetCapabilities(tx) => {
347                    let _ = tx.send(self.device.get_capabilities());
348                }
349                DeviceCommand::GetRawCapabilities(tx) => {
350                    let _ = tx.send(self.device.get_raw_capabilities());
351                }
352                DeviceCommand::GetDriverName(tx) => {
353                    let _ = tx.send(self.device.get_driver_name());
354                }
355                DeviceCommand::GetAdapterName(tx) => {
356                    let _ = tx.send(self.device.get_adapter_name());
357                }
358                DeviceCommand::GetPhysicalAddress(tx) => {
359                    let _ = tx.send(self.device.get_physical_address());
360                }
361                DeviceCommand::SetPhysicalAddress(phys_addr, tx) => {
362                    let _ = tx.send(self.device.set_physical_address(phys_addr));
363                }
364                DeviceCommand::GetLogicalAddresses(tx) => {
365                    let _ = tx.send(self.device.get_logical_addresses());
366                }
367                DeviceCommand::SetLogicalAddresses(log_addrs, tx) => {
368                    let _ = tx.send(self.device.set_logical_addresses(&log_addrs));
369                }
370                DeviceCommand::SetLogicalAddress(log_addr, tx) => {
371                    let _ = tx.send(self.device.set_logical_address(log_addr));
372                }
373                DeviceCommand::ClearLogicalAddresses(tx) => {
374                    let _ = tx.send(self.device.clear_logical_addresses());
375                }
376                DeviceCommand::GetOsdName(tx) => {
377                    let _ = tx.send(self.device.get_osd_name());
378                }
379                DeviceCommand::SetOsdName(name, tx) => {
380                    let _ = tx.send(self.device.set_osd_name(&name));
381                }
382                DeviceCommand::GetVendorId(tx) => {
383                    let _ = tx.send(self.device.get_vendor_id());
384                }
385                DeviceCommand::SetVendorId(vendor_id, tx) => {
386                    let _ = tx.send(self.device.set_vendor_id(vendor_id));
387                }
388                DeviceCommand::TransmitMessage(message, dest, tx) => {
389                    let _ = tx.send(self.device.tx_message(&message, dest));
390                }
391                DeviceCommand::TransmitRawMessage(msg, tx) => {
392                    let mut msg = msg.clone();
393                    let _ = tx.send(self.device.tx_raw_message(&mut msg).and(Ok(msg)));
394                }
395                DeviceCommand::TransmitReceiveMessage(message, dest, opcode, timeout, tx) => {
396                    let _ = tx.send(self.device.tx_rx_message(&message, dest, opcode, timeout));
397                }
398                DeviceCommand::ReceiveMessage(timeout, tx) => {
399                    let _ = tx.send(self.device.rx_message(timeout));
400                }
401                DeviceCommand::ReceiveRawMessage(timeout, tx) => {
402                    let _ = tx.send(self.device.rx_raw_message(timeout));
403                }
404                DeviceCommand::PollAddress(dest, tx) => {
405                    let _ = tx.send(self.device.poll_address(dest));
406                }
407                DeviceCommand::HandleStatus(status, tx) => {
408                    let _ = tx.send(self.device.handle_status(status));
409                }
410                DeviceCommand::GetConnectorInfo(tx) => {
411                    let _ = tx.send(self.device.get_connector_info());
412                }
413                DeviceCommand::SetActiveSource(address, tx) => {
414                    let _ = tx.send(self.device.set_active_source(address));
415                }
416                DeviceCommand::Wake(set_active, text_view, tx) => {
417                    let _ = tx.send(self.device.wake(set_active, text_view));
418                }
419                DeviceCommand::Standby(target, tx) => {
420                    let _ = tx.send(self.device.standby(target));
421                }
422                DeviceCommand::PressUserControl(ui_command, target, tx) => {
423                    let _ = tx.send(self.device.press_user_control(ui_command, target));
424                }
425                DeviceCommand::ReleaseUserControl(target, tx) => {
426                    let _ = tx.send(self.device.release_user_control(target));
427                }
428            }
429        }
430        Ok(())
431    }
432}
433
434impl From<RecvError> for Error {
435    fn from(error: RecvError) -> Error {
436        Error::UnknownError(error.to_string())
437    }
438}
439
440impl<T> From<SendError<T>> for Error {
441    fn from(error: SendError<T>) -> Error {
442        Error::UnknownError(error.to_string())
443    }
444}
445
446impl From<oneshot::error::RecvError> for Error {
447    fn from(error: oneshot::error::RecvError) -> Error {
448        Error::UnknownError(error.to_string())
449    }
450}
451
452impl From<DevicePoller> for AsyncDevicePoller {
453    fn from(poller: DevicePoller) -> AsyncDevicePoller {
454        let (tx, rx) = channel();
455
456        let thread = thread::spawn(move || {
457            loop {
458                match rx.recv()? {
459                    PollerCommand::Drop => break,
460                    PollerCommand::Poll(timeout, tx) => {
461                        let _ = tx.send(poller.poll(timeout));
462                    }
463                }
464            }
465            Ok(())
466        });
467        AsyncDevicePoller {
468            thread: Some(thread),
469            tx,
470        }
471    }
472}
473
474impl AsyncDevicePoller {
475    pub async fn poll(&self, timeout: PollTimeout) -> Result<PollStatus> {
476        let (tx, rx) = oneshot::channel();
477        self.tx.send(PollerCommand::Poll(timeout, tx))?;
478        rx.await?
479    }
480}
481
482impl Drop for AsyncDevicePoller {
483    fn drop(&mut self) {
484        let _ = self.tx.send(PollerCommand::Drop);
485        let _ = self.thread.take().unwrap().join();
486    }
487}