use std::marker::Unpin;
use std::pin::Pin;
use std::task::{self, Poll};
use futures::{ready, FutureExt};
use tokio::task::JoinHandle;
use windows_sys::Win32::{Foundation::HANDLE, System::Threading::WaitForSingleObject};
use crate::{
capture::{Activated, Capture},
codec::PacketCodec,
Error,
};
pub struct PacketStream<T: Activated + ?Sized, C> {
event_handle: EventHandle,
capture: Capture<T>,
codec: C,
}
impl<T: Activated + ?Sized, C> PacketStream<T, C> {
pub(crate) fn new(capture: Capture<T>, codec: C) -> Result<Self, Error> {
Ok(Self {
event_handle: EventHandle::new(&capture),
capture,
codec,
})
}
pub fn capture_mut(&mut self) -> &mut Capture<T> {
&mut self.capture
}
}
impl<T: Activated + ?Sized, C> Unpin for PacketStream<T, C> {}
impl<T: Activated + ?Sized, C: PacketCodec> futures::Stream for PacketStream<T, C> {
type Item = Result<C::Item, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let stream = Pin::into_inner(self);
let codec = &mut stream.codec;
loop {
ready!(stream.event_handle.poll_ready(cx));
let res = match stream.capture.next_packet() {
Ok(p) => Ok(codec.decode(p)),
Err(Error::TimeoutExpired) => {
stream.event_handle.clear_ready();
continue;
}
Err(e) => Err(e),
};
return Poll::Ready(Some(res));
}
}
}
struct EventHandle {
handle: HANDLE,
state: EventHandleState,
}
enum EventHandleState {
Init,
Polling(JoinHandle<()>),
Ready,
}
impl EventHandle {
pub fn new<T: Activated + ?Sized>(capture: &Capture<T>) -> Self {
Self {
handle: unsafe {
capture.get_event()
},
state: EventHandleState::Init,
}
}
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<()> {
loop {
match self.state {
EventHandleState::Init => {
let handle = self.handle;
self.state =
EventHandleState::Polling(tokio::task::spawn_blocking(move || {
const INFINITE: u32 = !0;
unsafe {
WaitForSingleObject(handle, INFINITE);
}
}));
}
EventHandleState::Polling(ref mut join_handle) => {
let _ = ready!(join_handle.poll_unpin(cx));
self.state = EventHandleState::Ready;
}
EventHandleState::Ready => return Poll::Ready(()),
}
}
}
pub fn clear_ready(&mut self) {
self.state = EventHandleState::Init;
}
}