1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//! Support for asynchronous packet iteration.
//!
//! See [`Capture::stream`](super::Capture::stream).
use super::Activated;
use super::Capture;
use super::Error;
use super::SelectableCapture;
use crate::PacketCodec;
use futures::ready;
use std::io;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{self, Poll};
use tokio::io::unix::AsyncFd;

/// Implement Stream for async use of pcap
pub struct PacketStream<T: Activated + ?Sized, C> {
    inner: AsyncFd<SelectableCapture<T>>,
    codec: C,
}

impl<T: Activated + ?Sized, C> PacketStream<T, C> {
    pub(crate) fn new(capture: SelectableCapture<T>, codec: C) -> Result<Self, Error> {
        Ok(PacketStream {
            inner: AsyncFd::with_interest(capture, tokio::io::Interest::READABLE)?,
            codec,
        })
    }

    /// Returns a mutable reference to the inner [`Capture`].
    ///
    /// The caller must ensure the capture will not be set to be
    /// blocking.
    pub fn capture_mut(&mut self) -> &mut Capture<T> {
        &mut self.inner.get_mut().inner
    }
}

impl<T: Activated + ?Sized, C> Unpin for PacketStream<T, C> {}

impl<T: Activated + ?Sized, C: PacketCodec> futures::Stream for PacketStream<T, C> {
    type Item = Result<C::Item, Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
        let stream = Pin::into_inner(self);
        let codec = &mut stream.codec;

        loop {
            let mut guard = ready!(stream.inner.poll_read_ready_mut(cx))?;
            match guard.try_io(|inner| match inner.get_mut().inner.next_packet() {
                Ok(p) => Ok(Ok(codec.decode(p))),
                Err(e @ Error::TimeoutExpired) => Err(io::Error::new(io::ErrorKind::WouldBlock, e)),
                Err(e) => Ok(Err(e)),
            }) {
                Ok(result) => {
                    return Poll::Ready(Some(result?));
                }
                Err(_would_block) => continue,
            }
        }
    }
}