1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use super::Activated;
use super::Capture;
use super::Error;
use super::SelectableCapture;
use crate::PacketCodec;
use futures::ready;
use std::io;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{self, Poll};
use tokio::io::unix::AsyncFd;
pub struct PacketStream<T: Activated + ?Sized, C> {
inner: AsyncFd<SelectableCapture<T>>,
codec: C,
}
impl<T: Activated + ?Sized, C> PacketStream<T, C> {
pub(crate) fn new(capture: SelectableCapture<T>, codec: C) -> Result<Self, Error> {
Ok(PacketStream {
inner: AsyncFd::with_interest(capture, tokio::io::Interest::READABLE)?,
codec,
})
}
pub fn capture_mut(&mut self) -> &mut Capture<T> {
&mut self.inner.get_mut().inner
}
}
impl<T: Activated + ?Sized, C> Unpin for PacketStream<T, C> {}
impl<T: Activated + ?Sized, C: PacketCodec> futures::Stream for PacketStream<T, C> {
type Item = Result<C::Item, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let stream = Pin::into_inner(self);
let codec = &mut stream.codec;
loop {
let mut guard = ready!(stream.inner.poll_read_ready_mut(cx))?;
match guard.try_io(|inner| match inner.get_mut().inner.next_packet() {
Ok(p) => Ok(Ok(codec.decode(p))),
Err(e @ Error::TimeoutExpired) => Err(io::Error::new(io::ErrorKind::WouldBlock, e)),
Err(e) => Ok(Err(e)),
}) {
Ok(result) => {
return Poll::Ready(Some(result?));
}
Err(_would_block) => continue,
}
}
}
}