pcap/stream/
windows.rs

1//! Support for asynchronous packet iteration.
2//!
3//! See [`Capture::stream`](super::Capture::stream).
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::task::{self, Poll};
7
8use futures::{ready, FutureExt};
9use tokio::task::JoinHandle;
10use windows_sys::Win32::{Foundation::HANDLE, System::Threading::WaitForSingleObject};
11
12use crate::{
13    capture::{Activated, Capture},
14    codec::PacketCodec,
15    Error,
16};
17
18/// Implement Stream for async use of pcap
19pub struct PacketStream<T: Activated + ?Sized, C> {
20    event_handle: EventHandle,
21    capture: Capture<T>,
22    codec: C,
23}
24
25impl<T: Activated + ?Sized, C> PacketStream<T, C> {
26    pub(crate) fn new(capture: Capture<T>, codec: C) -> Result<Self, Error> {
27        Ok(Self {
28            event_handle: EventHandle::new(&capture),
29            capture,
30            codec,
31        })
32    }
33
34    /// Returns a mutable reference to the inner [`Capture`].
35    ///
36    /// The caller must ensure the capture will not be set to be blocking.
37    pub fn capture_mut(&mut self) -> &mut Capture<T> {
38        &mut self.capture
39    }
40}
41
42impl<T: Activated + ?Sized, C> Unpin for PacketStream<T, C> {}
43
44impl<T: Activated + ?Sized, C: PacketCodec> futures::Stream for PacketStream<T, C> {
45    type Item = Result<C::Item, Error>;
46
47    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
48        let stream = Pin::into_inner(self);
49        let codec = &mut stream.codec;
50
51        loop {
52            ready!(stream.event_handle.poll_ready(cx));
53
54            let res = match stream.capture.next_packet() {
55                Ok(p) => Ok(codec.decode(p)),
56                Err(Error::TimeoutExpired) => {
57                    stream.event_handle.clear_ready();
58                    continue;
59                }
60                Err(e) => Err(e),
61            };
62            return Poll::Ready(Some(res));
63        }
64    }
65}
66
67/// A wrapper around a HANDLE that can be used to call `WaitForSingleObject`
68/// from an asynchronous context. Once the call to `WaitForSingleObject`
69/// completes, the handle is considered ready and will keep returning `Ready`
70/// until it's reset.
71struct EventHandle {
72    handle: HANDLE,
73    state: EventHandleState,
74}
75
76enum EventHandleState {
77    /// We haven't started waiting for an event yet.
78    Init,
79    /// We're currently waiting for an event.
80    Polling(JoinHandle<()>),
81    /// We waited for an event.
82    Ready,
83}
84
85impl EventHandle {
86    pub fn new<T: Activated + ?Sized>(capture: &Capture<T>) -> Self {
87        Self {
88            handle: unsafe {
89                // SAFETY: PacketStream stores the handle before the capture,
90                // so the handle will be dropped before the capture.
91                capture.get_event()
92            },
93            state: EventHandleState::Init,
94        }
95    }
96
97    pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<()> {
98        loop {
99            match self.state {
100                EventHandleState::Init => {
101                    let handle = self.handle;
102                    self.state =
103                        EventHandleState::Polling(tokio::task::spawn_blocking(move || {
104                            const INFINITE: u32 = !0;
105                            unsafe {
106                                WaitForSingleObject(handle, INFINITE);
107                            }
108                        }));
109                }
110                EventHandleState::Polling(ref mut join_handle) => {
111                    let _ = ready!(join_handle.poll_unpin(cx));
112                    self.state = EventHandleState::Ready;
113                }
114                EventHandleState::Ready => return Poll::Ready(()),
115            }
116        }
117    }
118
119    /// Reset the internal state. This will trigger a call to
120    /// `WaitForSingleObject` the next time `poll_ready` is called.
121    pub fn clear_ready(&mut self) {
122        self.state = EventHandleState::Init;
123    }
124}