pcap_async/
bridge_stream.rs

1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use std::thread::current;
8use std::time::{Duration, SystemTime};
9
10use futures::future::Pending;
11use futures::stream::{Stream, StreamExt};
12use log::*;
13
14use futures::stream::FuturesUnordered;
15use pin_project::pin_project;
16
17use crate::config::Config;
18use crate::errors::Error;
19use crate::handle::Handle;
20use crate::packet::Packet;
21use crate::pcap_util;
22use crate::stream::StreamItem;
23
24#[pin_project]
25struct CallbackFuture<E, T>
26where
27    E: Sync + Send,
28    T: Stream<Item = StreamItem<E>> + Sized + Unpin,
29{
30    idx: usize,
31    stream: Option<T>,
32}
33
34impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
35    for CallbackFuture<E, T>
36{
37    type Output = (usize, Option<(T, StreamItem<E>)>);
38
39    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40        let this = self.project();
41        let stream: &mut Option<T> = this.stream;
42        let idx: usize = *this.idx;
43        if let Some(mut stream) = stream.take() {
44            let polled = Pin::new(&mut stream).poll_next(cx);
45            match polled {
46                Poll::Pending => {
47                    let _old_stream = std::mem::replace(this.stream, Some(stream));
48                    return Poll::Pending;
49                }
50                Poll::Ready(Some(t)) => {
51                    return Poll::Ready((idx, Some((stream, t))));
52                }
53                _ => {
54                    return Poll::Ready((idx, None));
55                }
56            }
57        } else {
58            panic!("Should not not have a stream!")
59        }
60    }
61}
62
63struct BridgeStreamState<E, T>
64where
65    E: Sync + Send,
66    T: Stream<Item = StreamItem<E>> + Sized + Unpin,
67{
68    stream: Option<T>,
69    current: Vec<Vec<Packet>>,
70    complete: bool,
71}
72
73impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStreamState<E, T> {
74    fn is_complete(&self) -> bool {
75        self.complete && self.current.is_empty()
76    }
77
78    fn spread(&self) -> Duration {
79        let min = self.current.first().map(|s| s.first()).flatten();
80
81        let max = self.current.last().map(|s| s.last()).flatten();
82
83        match (min, max) {
84            (Some(min), Some(max)) => {
85                let since = max.timestamp().duration_since(*min.timestamp());
86                if let Ok(since) = since {
87                    return since;
88                } else {
89                    Duration::from_millis(0)
90                }
91            }
92            _ => Duration::from_millis(0),
93        }
94    }
95}
96
97// The BridgeStream attempts to time order packets from downstream.
98// It does this by collecting a `min_states_needed` amount of packet batches, and then sorting them.
99// We also allow `max_buffer_time` to act as a fallback in case we have 1 slow stream and one fast stream.
100// `max_buffer_time` will check the spread of packets, and if it to large it will sort what it has and pass it on.
101
102#[pin_project]
103pub struct BridgeStream<E: Sync + Send, T>
104where
105    T: Stream<Item = StreamItem<E>> + Sized + Unpin,
106{
107    stream_states: VecDeque<BridgeStreamState<E, T>>,
108    max_buffer_time: Duration,
109    min_states_needed: usize,
110    poll_queue: FuturesUnordered<CallbackFuture<E, T>>,
111}
112
113impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStream<E, T> {
114    pub fn new(
115        streams: Vec<T>,
116        max_buffer_time: Duration,
117        min_states_needed: usize,
118    ) -> Result<BridgeStream<E, T>, Error> {
119        let poll_queue = FuturesUnordered::new();
120        let mut stream_states = VecDeque::with_capacity(streams.len());
121        for (idx, stream) in streams.into_iter().enumerate() {
122            let new_state = BridgeStreamState {
123                stream: None,
124                current: vec![],
125                complete: false,
126            };
127            let fut = CallbackFuture {
128                idx,
129                stream: Some(stream),
130            };
131            poll_queue.push(fut);
132            stream_states.push_back(new_state);
133        }
134
135        Ok(BridgeStream {
136            stream_states: stream_states,
137            max_buffer_time,
138            min_states_needed: min_states_needed,
139            poll_queue,
140        })
141    }
142}
143
144fn gather_packets<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>(
145    stream_states: &mut VecDeque<BridgeStreamState<E, T>>,
146) -> Vec<Packet> {
147    let mut result = vec![];
148    let mut gather_to: Option<SystemTime> = None;
149
150    for s in stream_states.iter() {
151        let last_time = s
152            .current
153            .last()
154            .iter()
155            .flat_map(|p| p.last())
156            .last()
157            .map(|p| *p.timestamp());
158
159        if let Some(last_time) = last_time {
160            gather_to = gather_to
161                .map(|prev| prev.min(last_time))
162                .or(Some(last_time));
163        }
164    }
165
166    if let Some(gather_to) = gather_to {
167        for s in stream_states.iter_mut() {
168            let current = std::mem::take(&mut s.current);
169            let (to_send, to_keep) = current
170                .into_iter()
171                .flat_map(|ps| ps.into_iter())
172                .partition(|p| p.timestamp() <= &gather_to);
173
174            let to_keep: Vec<Packet> = to_keep;
175            if !to_keep.is_empty() {
176                s.current.push(to_keep);
177            }
178            result.extend(to_send)
179        }
180    } else {
181    }
182    result.sort_by_key(|p| *p.timestamp()); // todo convert
183    result
184}
185
186impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
187    for BridgeStream<E, T>
188{
189    type Item = StreamItem<E>;
190
191    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192        let this = self.project();
193        trace!(
194            "Interfaces: {:?} poll_queue {}",
195            this.stream_states.len(),
196            this.poll_queue.len()
197        );
198        let states: &mut VecDeque<BridgeStreamState<E, T>> = this.stream_states;
199        let min_states_needed: usize = *this.min_states_needed;
200        let max_buffer_time = this.max_buffer_time;
201        let mut max_time_spread: Duration = Duration::from_millis(0);
202        let mut not_pending: usize = 0;
203        let mut poll_queue: &mut FuturesUnordered<CallbackFuture<E, T>> = this.poll_queue;
204
205        loop {
206            match Pin::new(&mut poll_queue).poll_next(cx) {
207                Poll::Ready(Some((_, Some((_, Err(err)))))) => {
208                    trace!("got a error, passing upstream");
209                    return Poll::Ready(Some(Err(err)));
210                }
211                Poll::Ready(Some((idx, Some((stream, Ok(item)))))) => {
212                    //When the future gives us a result we are given a index, that we use to locate an existing State, and re-add the stream.
213                    //For that reason the order must never change!
214                    trace!("Got Ready");
215                    not_pending += 1;
216                    if let Some(state) = states.get_mut(idx) {
217                        trace!("Appending results");
218                        max_time_spread = state.spread().max(max_time_spread);
219                        state.stream = Some(stream);
220                        state.current.push(item);
221                    }
222                }
223                Poll::Ready(Some((idx, None))) => {
224                    if let Some(state) = states.get_mut(idx) {
225                        trace!("Interface {} has completed", idx);
226                        state.complete = true;
227                        continue;
228                    }
229                }
230                Poll::Pending => {
231                    trace!("Got Pending");
232                    break;
233                }
234                Poll::Ready(None) => {
235                    trace!("Reached the end.");
236                    break;
237                }
238            }
239        }
240
241        for (idx, state) in states.iter_mut().enumerate() {
242            if let Some(stream) = state.stream.take() {
243                //readded = true;
244                trace!("re-adding stream to poll queue {}", idx);
245                let f = CallbackFuture {
246                    idx,
247                    stream: Some(stream),
248                };
249                poll_queue.push(f);
250            }
251        }
252
253        let one_buffer_is_over = max_time_spread > *max_buffer_time;
254
255        let ready_count = states
256            .iter()
257            .filter(|s| s.current.len() >= min_states_needed || s.complete)
258            .count();
259
260        let enough_state = ready_count == states.len();
261
262        let res = if enough_state || one_buffer_is_over {
263            trace!("Reporting");
264            gather_packets(states)
265        } else {
266            trace!("Not reporting {} {}", enough_state, one_buffer_is_over);
267            vec![]
268        };
269
270        let completed_count = states.iter().filter(|s| s.complete).count();
271
272        if res.is_empty() && completed_count == states.len() {
273            trace!("All ifaces are complete.");
274            return Poll::Ready(None);
275        } else if res.is_empty() && not_pending == 0 && !states.is_empty() {
276            trace!("All ifaces are delayed.");
277            return Poll::Pending;
278        } else {
279            trace!("Returning results {}", res.len());
280            return Poll::Ready(Some(Ok(res)));
281        }
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use std::io::Cursor;
288    use std::ops::Range;
289    use std::path::PathBuf;
290
291    use byteorder::{ByteOrder, ReadBytesExt};
292    use futures::stream;
293    use futures::{Future, Stream};
294    use rand;
295
296    use crate::PacketStream;
297
298    use super::*;
299
300    fn make_packet(ts: usize) -> Packet {
301        Packet {
302            timestamp: SystemTime::UNIX_EPOCH + Duration::from_millis(ts as _),
303            actual_length: 0,
304            original_length: 0,
305            data: vec![],
306        }
307    }
308
309    #[test]
310    fn packets_from_file() {
311        let _ = env_logger::try_init();
312
313        let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
314            .join("resources")
315            .join("canary.pcap");
316
317        info!("Testing against {:?}", pcap_path);
318
319        let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
320            .expect("No handle created");
321
322        let packet_stream =
323            PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
324
325        let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
326            .expect("Failed to build");
327
328        let packets = smol::block_on(async move {
329            let fut_packets = packet_provider.collect::<Vec<_>>();
330            let packets: Vec<_> = fut_packets
331                .await
332                .into_iter()
333                .flatten()
334                .flatten()
335                .filter(|p| p.data().len() == p.actual_length() as usize)
336                .collect();
337
338            handle.interrupt();
339
340            packets
341        });
342
343        assert_eq!(packets.len(), 10);
344
345        let packet = packets.first().cloned().expect("No packets");
346        let data = packet
347            .into_pcap_record::<byteorder::BigEndian>()
348            .expect("Failed to convert to pcap record");
349        let mut cursor = Cursor::new(data);
350        let ts_sec = cursor
351            .read_u32::<byteorder::BigEndian>()
352            .expect("Failed to read");
353        let ts_usec = cursor
354            .read_u32::<byteorder::BigEndian>()
355            .expect("Failed to read");
356        let actual_length = cursor
357            .read_u32::<byteorder::BigEndian>()
358            .expect("Failed to read");
359        assert_eq!(
360            ts_sec as u64 * 1_000_000 as u64 + ts_usec as u64,
361            1513735120021685
362        );
363        assert_eq!(actual_length, 54);
364    }
365
366    #[test]
367    fn packets_from_file_next_bridge() {
368        let _ = env_logger::try_init();
369
370        let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
371            .join("resources")
372            .join("canary.pcap");
373
374        info!("Testing against {:?}", pcap_path);
375
376        let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
377            .expect("No handle created");
378
379        let packet_stream =
380            PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
381
382        let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
383            .expect("Failed to build");
384
385        let packets = smol::block_on(async move {
386            let fut_packets = async move {
387                let mut packet_provider = packet_provider.boxed();
388                let mut packets = vec![];
389                while let Some(p) = packet_provider.next().await {
390                    info!("packets returned {:?}", p);
391                    packets.extend(p);
392                }
393                packets
394            };
395            let packets = fut_packets
396                .await
397                .into_iter()
398                .flatten()
399                .filter(|p| p.data().len() == p.actual_length() as _)
400                .count();
401
402            handle.interrupt();
403
404            packets
405        });
406
407        assert_eq!(packets, 10);
408    }
409
410    #[test]
411    fn packets_from_lookup_bridge() {
412        let _ = env_logger::try_init();
413
414        let handle = Handle::lookup().expect("No handle created");
415        let packet_stream =
416            PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
417
418        let stream = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2);
419
420        assert!(
421            stream.is_ok(),
422            format!("Could not build stream {}", stream.err().unwrap())
423        );
424    }
425
426    #[test]
427    fn packets_from_lookup_with_bpf() {
428        let _ = env_logger::try_init();
429
430        let mut cfg = Config::default();
431        cfg.with_bpf(
432            "(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
433                .to_owned(),
434        );
435        let handle = Handle::lookup().expect("No handle created");
436        let packet_stream =
437            PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
438
439        let stream = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2);
440
441        assert!(
442            stream.is_ok(),
443            format!("Could not build stream {}", stream.err().unwrap())
444        );
445    }
446
447    #[test]
448    fn packets_come_out_time_ordered() {
449        let mut packets1 = vec![];
450        let mut packets2 = vec![];
451
452        let base_time = std::time::SystemTime::UNIX_EPOCH;
453
454        for s in 0..20 {
455            let d = base_time + std::time::Duration::from_secs(s);
456            let p = Packet::new(d, 0, 0, vec![]);
457            packets1.push(p)
458        }
459
460        for s in 5..15 {
461            let d = base_time + std::time::Duration::from_secs(s);
462            let p = Packet::new(d, 0, 0, vec![]);
463            packets2.push(p)
464        }
465
466        let item1: StreamItem<Error> = Ok(packets1.clone());
467        let item2: StreamItem<Error> = Ok(packets2.clone());
468
469        let stream1 = futures::stream::iter(vec![item1]);
470        let stream2 = futures::stream::iter(vec![item2]);
471
472        let result = smol::block_on(async move {
473            let bridge = BridgeStream::new(vec![stream1, stream2], Duration::from_millis(100), 0);
474
475            let result = bridge
476                .expect("Unable to create BridgeStream")
477                .collect::<Vec<StreamItem<Error>>>()
478                .await;
479            result
480                .into_iter()
481                .map(|r| r.unwrap())
482                .flatten()
483                .collect::<Vec<Packet>>()
484        });
485        info!("Result {:?}", result);
486
487        let mut expected = vec![packets1, packets2]
488            .into_iter()
489            .flatten()
490            .collect::<Vec<Packet>>();
491        expected.sort_by_key(|p| p.timestamp().clone());
492        let expected_time = expected.iter().map(|p| p.timestamp()).collect::<Vec<_>>();
493        let result_time = result.iter().map(|p| p.timestamp()).collect::<Vec<_>>();
494        assert_eq!(result.len(), expected.len());
495        assert_eq!(result_time, expected_time);
496
497        info!("result: {:?}", result);
498        info!("expected: {:?}", expected);
499    }
500}