use crate::config::Config;
use crate::errors::Error;
use crate::handle::Handle;
use crate::packet::Packet;
use crate::packet_future::PacketFuture;
use crate::pcap_util;
use crate::stream::StreamItem;
use futures::future::Pending;
use futures::stream::{Stream, StreamExt};
use log::*;
use pin_project::pin_project;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::SystemTime;
use tokio::time::Delay;
use failure::Fail;
struct BridgeStreamState<E, T>
where
E: Fail + Sync + Send,
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
{
stream: T,
existing: Vec<Packet>,
current: Vec<Packet>,
delaying: Option<Delay>,
complete: bool,
}
#[pin_project]
pub struct BridgeStream<E: Fail + Sync + Send, T>
where
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
{
retry_after: std::time::Duration,
stream_states: VecDeque<BridgeStreamState<E, T>>,
}
impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStream<E, T> {
pub fn new(retry_after: std::time::Duration, streams: Vec<T>) -> Result<BridgeStream<E, T>, Error> {
let mut stream_states = VecDeque::with_capacity(streams.len());
for stream in streams {
let new_state = BridgeStreamState {
stream: stream,
existing: Vec::new(),
current: Vec::new(),
delaying: None,
complete: false,
};
stream_states.push_back(new_state);
}
Ok(BridgeStream {
retry_after: retry_after,
stream_states: stream_states,
})
}
}
fn gather_packets<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>(
stream_states: &mut VecDeque<BridgeStreamState<E, T>>,
gather_to: Option<SystemTime>,
) -> Vec<Packet> {
let mut to_sort = vec![];
for iface in stream_states.iter_mut() {
let v = std::mem::replace(&mut iface.existing, vec![]);
to_sort.extend(v);
}
trace!("Have {} existing packets", to_sort.len());
if let Some(ts) = gather_to {
for state in stream_states.iter_mut() {
let current = std::mem::replace(&mut state.current, vec![]);
let t: (Vec<_>, Vec<_>) = current.into_iter().partition(|p| *p.timestamp() < ts);
let (before_ts, after_ts) = t;
trace!(
"Adding {} packets based on timestamp, {} packets adding to existing",
before_ts.len(),
after_ts.len()
);
to_sort.extend(before_ts);
state.existing = after_ts;
}
} else {
for iface in stream_states.iter_mut() {
trace!("Moving {} packets into existing", iface.current.len());
std::mem::swap(&mut iface.existing, &mut iface.current);
}
}
to_sort.sort_by_key(|p| *p.timestamp());
to_sort
}
impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream for BridgeStream<E, T> {
type Item = StreamItem<E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
trace!("Interfaces: {:?}", this.stream_states.len());
let states: &mut VecDeque<BridgeStreamState<E, T>> = this.stream_states;
let retry_after: &mut std::time::Duration = this.retry_after;
let mut gather_to: Option<SystemTime> = None;
for state in states.iter_mut() {
if let Some(mut existing_delay) = state.delaying.take() {
if let Poll::Pending = Pin::new(&mut existing_delay).poll(cx) {
trace!("Delaying");
state.delaying = Some(existing_delay);
continue; }
}
match Pin::new(&mut state.stream).poll_next(cx) {
Poll::Pending => {
trace!("Pending");
continue;
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
trace!("Interface has completed");
state.complete = true;
continue;
}
Poll::Ready(Some(Ok(v))) => {
if v.is_empty() {
state.delaying = Some(tokio::time::delay_for(*retry_after));
continue;
}
if let Some(p) = v.last() {
gather_to = gather_to
.map(|ts| std::cmp::min(ts, *p.timestamp()))
.or(Some(*p.timestamp()));
}
trace!("Adding {} packets to current", v.len());
state.current.extend(v);
}
}
}
let res = gather_packets(states, gather_to);
states.retain(|iface| {
return !iface.complete;
});
if res.is_empty() && states.is_empty() {
return Poll::Ready(None);
} else {
return Poll::Ready(Some(Ok(res)));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::PacketStream;
use byteorder::{ByteOrder, ReadBytesExt};
use futures::stream;
use futures::{Future, Stream};
use std::io::Cursor;
use std::path::PathBuf;
#[tokio::test]
async fn packets_from_file() {
let _ = env_logger::try_init();
let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("resources")
.join("canary.pcap");
info!("Testing against {:?}", pcap_path);
let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
.expect("No handle created");
let packet_stream =
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
let packet_provider =
BridgeStream::new(Config::default().retry_after().clone(), vec![packet_stream]).expect("Failed to build");
let fut_packets = packet_provider.collect::<Vec<_>>();
let packets: Vec<_> = fut_packets
.await
.into_iter()
.flatten()
.flatten()
.filter(|p| p.data().len() == p.actual_length() as _)
.collect();
handle.interrupt();
assert_eq!(packets.len(), 10);
let packet = packets.first().cloned().expect("No packets");
let data = packet
.into_pcap_record::<byteorder::BigEndian>()
.expect("Failed to convert to pcap record");
let mut cursor = Cursor::new(data);
let ts_sec = cursor
.read_u32::<byteorder::BigEndian>()
.expect("Failed to read");
let ts_usec = cursor
.read_u32::<byteorder::BigEndian>()
.expect("Failed to read");
let actual_length = cursor
.read_u32::<byteorder::BigEndian>()
.expect("Failed to read");
assert_eq!(
ts_sec as u64 * 1_000_000 as u64 + ts_usec as u64,
1513735120021685
);
assert_eq!(actual_length, 54);
}
#[tokio::test]
async fn packets_from_file_next_bridge() {
let _ = env_logger::try_init();
let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("resources")
.join("canary.pcap");
info!("Testing against {:?}", pcap_path);
let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
.expect("No handle created");
let packet_stream =
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
let packet_provider =
BridgeStream::new(Config::default().retry_after().clone(), vec![packet_stream]).expect("Failed to build");
let fut_packets = async move {
let mut packet_provider = packet_provider.boxed();
let mut packets = vec![];
while let Some(p) = packet_provider.next().await {
println!("packets returned {:?}", p);
packets.extend(p);
}
packets
};
let packets = fut_packets
.await
.into_iter()
.flatten()
.filter(|p| p.data().len() == p.actual_length() as _)
.count();
handle.interrupt();
assert_eq!(packets, 10);
}
#[test]
fn packets_from_lookup_bridge() {
let _ = env_logger::try_init();
let handle = Handle::lookup().expect("No handle created");
let packet_stream =
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
let stream = BridgeStream::new(Config::default().retry_after().clone(), vec![packet_stream]);
assert!(
stream.is_ok(),
format!("Could not build stream {}", stream.err().unwrap())
);
}
#[test]
fn packets_from_lookup_with_bpf() {
let _ = env_logger::try_init();
let mut cfg = Config::default();
cfg.with_bpf(
"(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
.to_owned(),
);
let handle = Handle::lookup().expect("No handle created");
let packet_stream =
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
let stream = BridgeStream::new(cfg.retry_after().clone(), vec![packet_stream]);
assert!(
stream.is_ok(),
format!("Could not build stream {}", stream.err().unwrap())
);
}
#[tokio::test]
async fn packets_come_out_time_ordered() {
let mut packets1 = vec![];
let mut packets2 = vec![];
let base_time = std::time::SystemTime::UNIX_EPOCH;
let cfg = Config::default();
for s in 0..20 {
let d = base_time + std::time::Duration::from_secs(s);
let p = Packet::new(d, 0, 0, vec![]);
packets1.push(p)
}
for s in 5..15 {
let d = base_time + std::time::Duration::from_secs(s);
let p = Packet::new(d, 0, 0, vec![]);
packets2.push(p)
}
let item1: StreamItem<Error> = Ok(packets1);
let item2: StreamItem<Error> = Ok(packets2);
let stream1 = futures::stream::iter(vec![item1]);
let stream2 = futures::stream::iter(vec![item2]);
let bridge = BridgeStream::new(cfg.retry_after().clone(), vec![stream1, stream2]);
let mut result = bridge
.expect("Unable to create BridgeStream")
.collect::<Vec<StreamItem<Error>>>()
.await;
assert_eq!(result.len(), 2);
let batch1 = result
.first()
.expect("Expected value")
.as_ref()
.expect("Err not expected");
let batch2 = result
.last()
.expect("Expected value")
.as_ref()
.expect("Err not expected");
let (batch1_min, batch1_max) = (batch1.first(), batch1.last());
let (batch2_min, batch2_max) = (batch2.first(), batch2.last());
assert_eq!(
batch1_min
.unwrap()
.timestamp()
.duration_since(base_time)
.unwrap()
.as_secs(),
0
);
assert_eq!(
batch1_max
.unwrap()
.timestamp()
.duration_since(base_time)
.unwrap()
.as_secs(),
13
);
assert_eq!(
batch2_min
.unwrap()
.timestamp()
.duration_since(base_time)
.unwrap()
.as_secs(),
14
);
assert_eq!(
batch2_max
.unwrap()
.timestamp()
.duration_since(base_time)
.unwrap()
.as_secs(),
19
);
let flat_result: Vec<Packet> = result.drain(..).flat_map(|r| r.unwrap()).collect();
assert_eq!(flat_result.len(), 30);
println!("Results: {:?}", result);
}
}