r_extcap/controls/asynchronous/
mod.rs

1//! Tools for handling the Control Pipe with `async` (`tokio`).
2//!
3//! There are three main classes provided in this module:
4//!
5//! * [`ExtcapControlSender`] – Implements the sender side for sending control
6//!   packets from the extcap program you are implementing to Wireshark.
7//! * [`ExtcapControlReader`] – Implements the reader side that receives control
8//!   packets sent from Wireshark.
9//! * [`ChannelExtcapControlReader`] – A wrapper around `ExtcapControlReader`
10//!   that provides simpler, but less flexible, handling of the communication
11//!   using a Tokio channel.
12//!
13//! See Wireshark's [Adding Capture Interfaces And Log Sources Using
14//! Extcap](https://www.wireshark.org/docs/wsdg_html_chunked/ChCaptureExtcap.html#_messages)
15//! section 8.2.3.2.1 for a description of the protocol format.
16
17use async_trait::async_trait;
18use log::debug;
19use nom_derive::Parse;
20use std::path::{Path, PathBuf};
21use thiserror::Error;
22#[cfg(target_os = "windows")]
23use tokio::fs::File;
24#[cfg(not(target_os = "windows"))]
25use tokio::net::unix::pipe::{Receiver, Sender};
26use tokio::{
27    io::{AsyncReadExt, AsyncWriteExt},
28    sync::{
29        mpsc::{self, error::SendError},
30        Mutex,
31    },
32    task::JoinHandle,
33};
34
35pub mod util;
36use util::AsyncReadExt as _;
37
38use crate::controls::{ControlCommand, ControlPacket};
39
40/// Error type returned for control packet read operations.
41#[derive(Debug, Error)]
42pub enum ReadControlError {
43    /// Error reading the incoming control pipe.
44    #[error(transparent)]
45    IoError(#[from] tokio::io::Error),
46
47    /// Error parsing the incoming data into the [`ControlPacket`] format.
48    #[error("Error parsing control packet: {0}")]
49    ParseError(String),
50}
51
52/// Error associated with [`ChannelExtcapControlReader`].
53#[derive(Debug, Error)]
54pub enum ControlChannelError {
55    /// Error returned when the control packet cannot be read. See
56    /// the docs on [`ReadControlError`].
57    #[error(transparent)]
58    ReadControl(#[from] ReadControlError),
59
60    /// Error returned when the control packet cannot be sent on the channel.
61    /// This is caused by an underlying [`SendError`].
62    #[error("Cannot send control packet to channel")]
63    CannotSend,
64}
65
66impl<T> From<SendError<T>> for ControlChannelError {
67    fn from(_: SendError<T>) -> Self {
68        ControlChannelError::CannotSend
69    }
70}
71
72/// A reader for an Extcap Control using a [`Channel`][mpsc::channel]. This is
73/// the easier to use, but higher overhead way to read control packets. When the
74/// reader is spawned, a thread is spawned to continuously read messages and
75/// writes them into a bounded `channel`. This allows reading the control
76/// messages without worrying about spawning async tasks, by calling
77/// [`try_read_packet`][Self::try_read_packet] every once in a while.
78///
79/// Assuming the extcap `capture` implementation uses a loop to read or generate
80/// the packets, it can repeatedly call `try_read_packet` to read and handle the
81/// control packets until there are no more buffered messages before starting
82/// the main capturing logic.
83///
84/// For example:
85/// ```ignore
86/// fn capture(reader: &ChannelExtcapControlReader) -> Result<()> {
87///     let pcap_header = ...;
88///     let mut pcap_writer = PcapWriter::with_header(fifo, pcap_header)?;
89///     loop {
90///         while let Some(packet) = reader.try_read_packet().await {
91///             // Handle the control packet
92///         }
93///         pcap_writer.write_packet(...)?;
94///     }
95///     Ok(())
96/// }
97pub struct ChannelExtcapControlReader {
98    /// The join handle for the spawned thread. In most cases there is no need
99    /// to use this, as the control fifo is expected to run for the whole
100    /// duration of the capture.
101    pub join_handle: JoinHandle<Result<(), ControlChannelError>>,
102    /// The channel to receive control packets from.
103    pub read_channel: mpsc::Receiver<ControlPacket<'static>>,
104}
105
106impl ChannelExtcapControlReader {
107    /// Create a `ChannelExtcapControlReader` and spawns the underlying thread
108    /// it uses to start reading the control packets from the pipe given in
109    /// `in_path`.
110    pub fn spawn(in_path: PathBuf) -> Self {
111        let (tx, rx) = mpsc::channel::<ControlPacket<'static>>(10);
112        let join_handle = tokio::task::spawn(async move {
113            let mut reader = ExtcapControlReader::new(&in_path).await;
114            loop {
115                tx.send(reader.read_control_packet().await?).await?;
116            }
117        });
118        Self {
119            join_handle,
120            read_channel: rx,
121        }
122    }
123
124    /// Try to read a buffered control packet, or return `None` if there are no
125    /// incoming control packets.
126    pub async fn try_read_packet(&mut self) -> Option<ControlPacket<'static>> {
127        self.read_channel.try_recv().ok()
128    }
129
130    /// Reads a control packet. If the incoming channel is empty, this will
131    /// block and wait until an incoming packet comes in. This is typically used
132    /// when the extcap capture starts to wait for the `Initialized` packet from
133    /// the control channel.
134    ///
135    /// If you are only using this method and not using `try_read_packet`,
136    /// consider whether you can use [`ExtcapControlReader`] directly for lower
137    /// overhead.
138    pub async fn read_packet(&mut self) -> Option<ControlPacket<'static>> {
139        self.read_channel.recv().await
140    }
141}
142
143/// A reader for the Extcap control pipe.
144pub struct ExtcapControlReader {
145    /// The file to read the control packets from. This is the fifo passed with
146    /// the `--extcap-control-in` flag.
147    #[cfg(not(target_os = "windows"))]
148    in_file: Receiver,
149    /// The file to read the control packets from. This is the fifo passed with
150    /// the `--extcap-control-in` flag.
151    #[cfg(target_os = "windows")]
152    in_file: File,
153}
154
155impl ExtcapControlReader {
156    /// Creates a new instance of [`ExtcapControlReader`].
157    ///
158    /// * `in_path`: The path of the extcap control pipe passed with
159    ///   `--extcap-control-in`.
160    #[cfg(not(target_os = "windows"))]
161    pub async fn new(in_path: &Path) -> Self {
162        Self {
163            in_file: tokio::net::unix::pipe::OpenOptions::new()
164                .open_receiver(in_path)
165                .unwrap(),
166        }
167    }
168
169    /// Creates a new instance of [`ExtcapControlReader`].
170    ///
171    /// * `in_path`: The path of the extcap control pipe passed with
172    ///   `--extcap-control-in`.
173    #[cfg(target_os = "windows")]
174    pub async fn new(in_path: &Path) -> Self {
175        Self {
176            in_file: File::open(in_path).await.unwrap(),
177        }
178    }
179
180    /// Read one control packet, awaiting until the packet arrives. Since the
181    /// control packet pipe is expected to stay open for the entire duration of
182    /// the extcap program, if the pipe is closed prematurely in this function
183    /// here, `UnexpectedEof` will be returned.
184    pub async fn read_control_packet(
185        &mut self,
186    ) -> Result<ControlPacket<'static>, ReadControlError> {
187        let header_bytes = self
188            .in_file
189            .try_read_exact::<6>()
190            .await?
191            .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::UnexpectedEof))?;
192        debug!(
193            "Read header bytes from incoming control message, now parsing... {:?}",
194            header_bytes
195        );
196        let (_rem, packet) = match ControlPacket::parse(&header_bytes) {
197            Ok((rem, packet)) => (rem, packet.into_owned()),
198            Err(nom::Err::Incomplete(nom::Needed::Size(size))) => {
199                let mut payload_bytes = vec![0_u8; size.get()];
200                self.in_file.read_exact(&mut payload_bytes).await?;
201                let all_bytes = [header_bytes.as_slice(), payload_bytes.as_slice()].concat();
202                ControlPacket::parse(&all_bytes)
203                    .map(|(_, packet)| (&[][..], packet.into_owned()))
204                    .unwrap_or_else(|e| panic!("Unable to parse header packet: {e}"))
205            }
206            Err(e) => Err(ReadControlError::ParseError(e.to_string()))?,
207        };
208        debug!("Parsed incoming control message: {packet:?}");
209        Ok(packet)
210    }
211}
212
213const UNUSED_CONTROL_NUMBER: u8 = 255;
214
215/// Sender for extcap control packets. These control packets controls the UI
216/// generated by Wireshark. This trait also provides convenience functions for
217/// sending control packets formatted for particular usages like `info_message`
218/// and `status_message`. For other functions controlling various toolbar
219/// controls, see the methods in the [`control`][crate::controls] module instead.
220#[async_trait]
221pub trait ExtcapControlSenderTrait: Send + Sync + Sized {
222    /// Sends the given `packet` by writing it to the given output file (or
223    /// fifo).
224    async fn send(self, packet: ControlPacket<'_>) -> Result<(), tokio::io::Error>;
225
226    /// Shows a message in an information dialog popup. The message will show on
227    /// the screen until the user dismisses the popup.
228    async fn info_message(self, message: &str) -> Result<(), tokio::io::Error> {
229        self.send(ControlPacket::new_with_payload(
230            UNUSED_CONTROL_NUMBER,
231            ControlCommand::InformationMessage,
232            message.as_bytes(),
233        ))
234        .await
235    }
236
237    /// Shows a message in a warning dialog popup. The message will show on the
238    /// screen until the user dismisses the popup.
239    async fn warning_message(self, message: &str) -> Result<(), tokio::io::Error> {
240        self.send(ControlPacket::new_with_payload(
241            UNUSED_CONTROL_NUMBER,
242            ControlCommand::WarningMessage,
243            message.as_bytes(),
244        ))
245        .await
246    }
247
248    /// Shows a message in an error dialog popup. The message will show on the
249    /// screen until the user dismisses the popup.
250    async fn error_message(self, message: &str) -> Result<(), tokio::io::Error> {
251        self.send(ControlPacket::new_with_payload(
252            UNUSED_CONTROL_NUMBER,
253            ControlCommand::ErrorMessage,
254            message.as_bytes(),
255        ))
256        .await
257    }
258
259    /// Shows a message in the status bar at the bottom of the Wireshark window.
260    /// When the message is shown, the status bar will also flash yellow to
261    /// bring it to the user's attention. The message will stay on the status
262    /// bar for a few seconds, or until another message overwrites it.
263    async fn status_message(self, message: &str) -> Result<(), tokio::io::Error> {
264        self.send(ControlPacket::new_with_payload(
265            UNUSED_CONTROL_NUMBER,
266            ControlCommand::StatusbarMessage,
267            message.as_bytes(),
268        ))
269        .await
270    }
271}
272
273/// A sender for the extcap control packets. `out_file` should be the file given
274/// by the `--extcap-control-out` flag.
275pub struct ExtcapControlSender {
276    #[cfg(not(target_os = "windows"))]
277    out_file: Sender,
278    #[cfg(target_os = "windows")]
279    out_file: File,
280}
281
282impl ExtcapControlSender {
283    #[cfg(not(target_os = "windows"))]
284    /// Creates a new instance of [`ExtcapControlSender`].
285    ///
286    /// * `out_path`: The path specified by the `--extcap-control-out` flag.
287    pub async fn new(out_path: &Path) -> Self {
288        use std::time::Duration;
289
290        for i in 0..50 {
291            match tokio::net::unix::pipe::OpenOptions::new().open_sender(out_path) {
292                Ok(out_file) => return Self { out_file },
293                Err(e) => {
294                    if let Some(libc::ENXIO) = e.raw_os_error() {
295                        // This seems sketchy, but the docs for pipe::Sender says "The file is a
296                        // FIFO, but no process has it open for reading. Sleep for a while and try
297                        // again"
298                        // https://docs.rs/tokio/latest/tokio/net/unix/pipe/struct.Sender.html
299                        tokio::time::sleep(Duration::from_millis(i * 100)).await;
300                    } else {
301                        panic!("{e:?}");
302                    }
303                }
304            };
305        }
306        panic!("Failed waiting for extcap-control-out to be opened");
307    }
308
309    #[cfg(target_os = "windows")]
310    /// Creates a new instance of [`ExtcapControlSender`].
311    ///
312    /// * `out_path`: The path specified by the `--extcap-control-out` flag.
313    pub async fn new(out_path: &Path) -> Self {
314        Self {
315            out_file: File::create(out_path).await.unwrap(),
316        }
317    }
318}
319
320#[async_trait]
321impl<'a> ExtcapControlSenderTrait for &'a mut ExtcapControlSender {
322    async fn send(self, packet: ControlPacket<'_>) -> Result<(), tokio::io::Error> {
323        debug!("Sending extcap control message: {packet:#?}");
324        self.out_file.write_all(&packet.to_header_bytes()).await?;
325        self.out_file.write_all(&packet.payload).await?;
326        self.out_file.flush().await?;
327        Ok(())
328    }
329}
330
331/// An implementation of ExtcapControlSenderTrait that is no-op when the
332/// `Option` is `None`. Since Wireshark may not include the
333/// `--extcap-control-out` flag (e.g. when no controls are returned during
334/// `--extcap-interfaces`, or when running in tshark), this allows an easier but
335/// less efficient way to say `option_extcap_sender.status_message(...)` without
336/// constantly checking for the option.
337#[async_trait]
338impl<T> ExtcapControlSenderTrait for &mut Option<T>
339where
340    T: Send + Sync,
341    for<'a> &'a mut T: ExtcapControlSenderTrait,
342{
343    async fn send(self, packet: ControlPacket<'_>) -> Result<(), tokio::io::Error> {
344        if let Some(s) = self {
345            s.send(packet).await
346        } else {
347            Ok(())
348        }
349    }
350}
351
352/// Just for syntactic niceness when working with a control sender behind a
353/// mutex. This usage allows the sender to be locked only for the duration of
354/// that one control packet, so it can be interleaved in between other async
355/// function calls.
356#[async_trait]
357impl<T> ExtcapControlSenderTrait for &Mutex<T>
358where
359    T: Send,
360    for<'a> &'a mut T: ExtcapControlSenderTrait,
361{
362    /// Sends a control message to Wireshark.
363    async fn send(self, packet: ControlPacket<'_>) -> Result<(), tokio::io::Error> {
364        self.lock().await.send(packet).await
365    }
366}