use async_trait::async_trait;
use log::debug;
use nom_derive::Parse;
use std::path::{Path, PathBuf};
use thiserror::Error;
#[cfg(target_os = "windows")]
use tokio::fs::File;
#[cfg(not(target_os = "windows"))]
use tokio::net::unix::pipe::{Receiver, Sender};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::{
mpsc::{self, error::SendError},
Mutex,
},
task::JoinHandle,
};
pub mod util;
use util::AsyncReadExt as _;
use crate::controls::{ControlCommand, ControlPacket};
#[derive(Debug, Error)]
pub enum ReadControlError {
#[error(transparent)]
IoError(#[from] tokio::io::Error),
#[error("Error parsing control packet: {0}")]
ParseError(String),
}
#[derive(Debug, Error)]
pub enum ControlChannelError {
#[error(transparent)]
ReadControl(#[from] ReadControlError),
#[error("Cannot send control packet to channel")]
CannotSend,
}
impl<T> From<SendError<T>> for ControlChannelError {
fn from(_: SendError<T>) -> Self {
ControlChannelError::CannotSend
}
}
pub struct ChannelExtcapControlReader {
pub join_handle: JoinHandle<Result<(), ControlChannelError>>,
pub read_channel: mpsc::Receiver<ControlPacket<'static>>,
}
impl ChannelExtcapControlReader {
pub fn spawn(in_path: PathBuf) -> Self {
let (tx, rx) = mpsc::channel::<ControlPacket<'static>>(10);
let join_handle = tokio::task::spawn(async move {
let mut reader = ExtcapControlReader::new(&in_path).await;
loop {
tx.send(reader.read_control_packet().await?).await?;
}
});
Self {
join_handle,
read_channel: rx,
}
}
pub async fn try_read_packet(&mut self) -> Option<ControlPacket<'static>> {
self.read_channel.try_recv().ok()
}
pub async fn read_packet(&mut self) -> Option<ControlPacket<'static>> {
self.read_channel.recv().await
}
}
pub struct ExtcapControlReader {
#[cfg(not(target_os = "windows"))]
in_file: Receiver,
#[cfg(target_os = "windows")]
in_file: File,
}
impl ExtcapControlReader {
#[cfg(not(target_os = "windows"))]
pub async fn new(in_path: &Path) -> Self {
Self {
in_file: tokio::net::unix::pipe::OpenOptions::new()
.open_receiver(in_path)
.unwrap(),
}
}
#[cfg(target_os = "windows")]
pub async fn new(in_path: &Path) -> Self {
Self {
in_file: File::open(in_path).await.unwrap(),
}
}
pub async fn read_control_packet(
&mut self,
) -> Result<ControlPacket<'static>, ReadControlError> {
let header_bytes = self
.in_file
.try_read_exact::<6>()
.await?
.ok_or_else(|| std::io::Error::from(std::io::ErrorKind::UnexpectedEof))?;
debug!(
"Read header bytes from incoming control message, now parsing... {:?}",
header_bytes
);
let (_rem, packet) = match ControlPacket::parse(&header_bytes) {
Ok((rem, packet)) => (rem, packet.into_owned()),
Err(nom::Err::Incomplete(nom::Needed::Size(size))) => {
let mut payload_bytes = vec![0_u8; size.get()];
self.in_file.read_exact(&mut payload_bytes).await?;
let all_bytes = [header_bytes.as_slice(), payload_bytes.as_slice()].concat();
ControlPacket::parse(&all_bytes)
.map(|(_, packet)| (&[][..], packet.into_owned()))
.unwrap_or_else(|e| panic!("Unable to parse header packet: {e}"))
}
Err(e) => Err(ReadControlError::ParseError(e.to_string()))?,
};
debug!("Parsed incoming control message: {packet:?}");
Ok(packet)
}
}
const UNUSED_CONTROL_NUMBER: u8 = 255;
#[async_trait]
pub trait ExtcapControlSenderTrait: Send + Sync + Sized {
async fn send(self, packet: ControlPacket<'_>) -> Result<(), tokio::io::Error>;
async fn info_message(self, message: &str) -> Result<(), tokio::io::Error> {
self.send(ControlPacket::new_with_payload(
UNUSED_CONTROL_NUMBER,
ControlCommand::InformationMessage,
message.as_bytes(),
))
.await
}
async fn warning_message(self, message: &str) -> Result<(), tokio::io::Error> {
self.send(ControlPacket::new_with_payload(
UNUSED_CONTROL_NUMBER,
ControlCommand::WarningMessage,
message.as_bytes(),
))
.await
}
async fn error_message(self, message: &str) -> Result<(), tokio::io::Error> {
self.send(ControlPacket::new_with_payload(
UNUSED_CONTROL_NUMBER,
ControlCommand::ErrorMessage,
message.as_bytes(),
))
.await
}
async fn status_message(self, message: &str) -> Result<(), tokio::io::Error> {
self.send(ControlPacket::new_with_payload(
UNUSED_CONTROL_NUMBER,
ControlCommand::StatusbarMessage,
message.as_bytes(),
))
.await
}
}
pub struct ExtcapControlSender {
#[cfg(not(target_os = "windows"))]
out_file: Sender,
#[cfg(target_os = "windows")]
out_file: File,
}
impl ExtcapControlSender {
#[cfg(not(target_os = "windows"))]
pub async fn new(out_path: &Path) -> Self {
use std::time::Duration;
for i in 0..50 {
match tokio::net::unix::pipe::OpenOptions::new().open_sender(out_path) {
Ok(out_file) => return Self { out_file },
Err(e) => {
if let Some(libc::ENXIO) = e.raw_os_error() {
tokio::time::sleep(Duration::from_millis(i * 100)).await;
} else {
panic!("{e:?}");
}
}
};
}
panic!("Failed waiting for extcap-control-out to be opened");
}
#[cfg(target_os = "windows")]
pub async fn new(out_path: &Path) -> Self {
Self {
out_file: File::create(out_path).await.unwrap(),
}
}
}
#[async_trait]
impl<'a> ExtcapControlSenderTrait for &'a mut ExtcapControlSender {
async fn send(self, packet: ControlPacket<'_>) -> Result<(), tokio::io::Error> {
debug!("Sending extcap control message: {packet:#?}");
self.out_file.write_all(&packet.to_header_bytes()).await?;
self.out_file.write_all(&packet.payload).await?;
self.out_file.flush().await?;
Ok(())
}
}
#[async_trait]
impl<T> ExtcapControlSenderTrait for &mut Option<T>
where
T: Send + Sync,
for<'a> &'a mut T: ExtcapControlSenderTrait,
{
async fn send(self, packet: ControlPacket<'_>) -> Result<(), tokio::io::Error> {
if let Some(s) = self {
s.send(packet).await
} else {
Ok(())
}
}
}
#[async_trait]
impl<T> ExtcapControlSenderTrait for &Mutex<T>
where
T: Send,
for<'a> &'a mut T: ExtcapControlSenderTrait,
{
async fn send(self, packet: ControlPacket<'_>) -> Result<(), tokio::io::Error> {
self.lock().await.send(packet).await
}
}