1use std::marker::Unpin;
5use std::pin::Pin;
6use std::task::{self, Poll};
7
8use futures::{ready, FutureExt};
9use tokio::task::JoinHandle;
10use windows_sys::Win32::{Foundation::HANDLE, System::Threading::WaitForSingleObject};
11
12use crate::{
13 capture::{Activated, Capture},
14 codec::PacketCodec,
15 Error,
16};
17
18pub struct PacketStream<T: Activated + ?Sized, C> {
20 event_handle: EventHandle,
21 capture: Capture<T>,
22 codec: C,
23}
24
25impl<T: Activated + ?Sized, C> PacketStream<T, C> {
26 pub(crate) fn new(capture: Capture<T>, codec: C) -> Result<Self, Error> {
27 Ok(Self {
28 event_handle: EventHandle::new(&capture),
29 capture,
30 codec,
31 })
32 }
33
34 pub fn capture_mut(&mut self) -> &mut Capture<T> {
38 &mut self.capture
39 }
40}
41
42impl<T: Activated + ?Sized, C> Unpin for PacketStream<T, C> {}
43
44impl<T: Activated + ?Sized, C: PacketCodec> futures::Stream for PacketStream<T, C> {
45 type Item = Result<C::Item, Error>;
46
47 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
48 let stream = Pin::into_inner(self);
49 let codec = &mut stream.codec;
50
51 loop {
52 ready!(stream.event_handle.poll_ready(cx));
53
54 let res = match stream.capture.next_packet() {
55 Ok(p) => Ok(codec.decode(p)),
56 Err(Error::TimeoutExpired) => {
57 stream.event_handle.clear_ready();
58 continue;
59 }
60 Err(e) => Err(e),
61 };
62 return Poll::Ready(Some(res));
63 }
64 }
65}
66
67struct EventHandle {
72 handle: HANDLE,
73 state: EventHandleState,
74}
75
76enum EventHandleState {
77 Init,
79 Polling(JoinHandle<()>),
81 Ready,
83}
84
85impl EventHandle {
86 pub fn new<T: Activated + ?Sized>(capture: &Capture<T>) -> Self {
87 Self {
88 handle: unsafe {
89 capture.get_event()
92 },
93 state: EventHandleState::Init,
94 }
95 }
96
97 pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<()> {
98 loop {
99 match self.state {
100 EventHandleState::Init => {
101 let handle = self.handle;
102 self.state =
103 EventHandleState::Polling(tokio::task::spawn_blocking(move || {
104 const INFINITE: u32 = !0;
105 unsafe {
106 WaitForSingleObject(handle, INFINITE);
107 }
108 }));
109 }
110 EventHandleState::Polling(ref mut join_handle) => {
111 let _ = ready!(join_handle.poll_unpin(cx));
112 self.state = EventHandleState::Ready;
113 }
114 EventHandleState::Ready => return Poll::Ready(()),
115 }
116 }
117 }
118
119 pub fn clear_ready(&mut self) {
122 self.state = EventHandleState::Init;
123 }
124}