use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use crate::async_adapters::tokio_adapter::AsyncCapture;
use crate::dedup::Dedup;
use crate::error::Error;
use crate::packet::OwnedPacket;
use crate::traits::PacketSource;
pub struct DedupStream<S>
where
S: PacketSource + std::os::unix::io::AsRawFd,
{
cap: AsyncCapture<S>,
dedup: Dedup,
pending: VecDeque<OwnedPacket>,
}
impl<S> DedupStream<S>
where
S: PacketSource + std::os::unix::io::AsRawFd,
{
pub(crate) fn new(cap: AsyncCapture<S>, dedup: Dedup) -> Self {
Self {
cap,
dedup,
pending: VecDeque::new(),
}
}
pub fn dedup(&self) -> &Dedup {
&self.dedup
}
pub fn dedup_mut(&mut self) -> &mut Dedup {
&mut self.dedup
}
}
impl<S> Stream for DedupStream<S>
where
S: PacketSource + std::os::unix::io::AsRawFd + Unpin,
{
type Item = Result<OwnedPacket, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(pkt) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(pkt)));
}
let mut guard = match this.cap.poll_read_ready_mut(cx) {
Poll::Ready(Ok(g)) => g,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error::Io(e)))),
Poll::Pending => return Poll::Pending,
};
let got_batch = {
let inner = guard.get_inner_mut();
if let Some(batch) = inner.next_batch() {
for pkt in &batch {
if this.dedup.keep(&pkt) {
this.pending.push_back(pkt.to_owned());
}
}
drop(batch);
true
} else {
false
}
};
if !got_batch {
guard.clear_ready();
}
}
}
}
impl<S> AsyncCapture<S>
where
S: PacketSource + std::os::unix::io::AsRawFd,
{
pub fn dedup_stream(self, dedup: Dedup) -> DedupStream<S> {
DedupStream::new(self, dedup)
}
}