1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use std::thread::current;
8use std::time::{Duration, SystemTime};
9
10use futures::future::Pending;
11use futures::stream::{Stream, StreamExt};
12use log::*;
13
14use futures::stream::FuturesUnordered;
15use pin_project::pin_project;
16
17use crate::config::Config;
18use crate::errors::Error;
19use crate::handle::Handle;
20use crate::packet::Packet;
21use crate::pcap_util;
22use crate::stream::StreamItem;
23
24#[pin_project]
25struct CallbackFuture<E, T>
26where
27 E: Sync + Send,
28 T: Stream<Item = StreamItem<E>> + Sized + Unpin,
29{
30 idx: usize,
31 stream: Option<T>,
32}
33
34impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
35 for CallbackFuture<E, T>
36{
37 type Output = (usize, Option<(T, StreamItem<E>)>);
38
39 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40 let this = self.project();
41 let stream: &mut Option<T> = this.stream;
42 let idx: usize = *this.idx;
43 if let Some(mut stream) = stream.take() {
44 let polled = Pin::new(&mut stream).poll_next(cx);
45 match polled {
46 Poll::Pending => {
47 let _old_stream = std::mem::replace(this.stream, Some(stream));
48 return Poll::Pending;
49 }
50 Poll::Ready(Some(t)) => {
51 return Poll::Ready((idx, Some((stream, t))));
52 }
53 _ => {
54 return Poll::Ready((idx, None));
55 }
56 }
57 } else {
58 panic!("Should not not have a stream!")
59 }
60 }
61}
62
63struct BridgeStreamState<E, T>
64where
65 E: Sync + Send,
66 T: Stream<Item = StreamItem<E>> + Sized + Unpin,
67{
68 stream: Option<T>,
69 current: Vec<Vec<Packet>>,
70 complete: bool,
71}
72
73impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStreamState<E, T> {
74 fn is_complete(&self) -> bool {
75 self.complete && self.current.is_empty()
76 }
77
78 fn spread(&self) -> Duration {
79 let min = self.current.first().map(|s| s.first()).flatten();
80
81 let max = self.current.last().map(|s| s.last()).flatten();
82
83 match (min, max) {
84 (Some(min), Some(max)) => {
85 let since = max.timestamp().duration_since(*min.timestamp());
86 if let Ok(since) = since {
87 return since;
88 } else {
89 Duration::from_millis(0)
90 }
91 }
92 _ => Duration::from_millis(0),
93 }
94 }
95}
96
97#[pin_project]
103pub struct BridgeStream<E: Sync + Send, T>
104where
105 T: Stream<Item = StreamItem<E>> + Sized + Unpin,
106{
107 stream_states: VecDeque<BridgeStreamState<E, T>>,
108 max_buffer_time: Duration,
109 min_states_needed: usize,
110 poll_queue: FuturesUnordered<CallbackFuture<E, T>>,
111}
112
113impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStream<E, T> {
114 pub fn new(
115 streams: Vec<T>,
116 max_buffer_time: Duration,
117 min_states_needed: usize,
118 ) -> Result<BridgeStream<E, T>, Error> {
119 let poll_queue = FuturesUnordered::new();
120 let mut stream_states = VecDeque::with_capacity(streams.len());
121 for (idx, stream) in streams.into_iter().enumerate() {
122 let new_state = BridgeStreamState {
123 stream: None,
124 current: vec![],
125 complete: false,
126 };
127 let fut = CallbackFuture {
128 idx,
129 stream: Some(stream),
130 };
131 poll_queue.push(fut);
132 stream_states.push_back(new_state);
133 }
134
135 Ok(BridgeStream {
136 stream_states: stream_states,
137 max_buffer_time,
138 min_states_needed: min_states_needed,
139 poll_queue,
140 })
141 }
142}
143
144fn gather_packets<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>(
145 stream_states: &mut VecDeque<BridgeStreamState<E, T>>,
146) -> Vec<Packet> {
147 let mut result = vec![];
148 let mut gather_to: Option<SystemTime> = None;
149
150 for s in stream_states.iter() {
151 let last_time = s
152 .current
153 .last()
154 .iter()
155 .flat_map(|p| p.last())
156 .last()
157 .map(|p| *p.timestamp());
158
159 if let Some(last_time) = last_time {
160 gather_to = gather_to
161 .map(|prev| prev.min(last_time))
162 .or(Some(last_time));
163 }
164 }
165
166 if let Some(gather_to) = gather_to {
167 for s in stream_states.iter_mut() {
168 let current = std::mem::take(&mut s.current);
169 let (to_send, to_keep) = current
170 .into_iter()
171 .flat_map(|ps| ps.into_iter())
172 .partition(|p| p.timestamp() <= &gather_to);
173
174 let to_keep: Vec<Packet> = to_keep;
175 if !to_keep.is_empty() {
176 s.current.push(to_keep);
177 }
178 result.extend(to_send)
179 }
180 } else {
181 }
182 result.sort_by_key(|p| *p.timestamp()); result
184}
185
186impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
187 for BridgeStream<E, T>
188{
189 type Item = StreamItem<E>;
190
191 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192 let this = self.project();
193 trace!(
194 "Interfaces: {:?} poll_queue {}",
195 this.stream_states.len(),
196 this.poll_queue.len()
197 );
198 let states: &mut VecDeque<BridgeStreamState<E, T>> = this.stream_states;
199 let min_states_needed: usize = *this.min_states_needed;
200 let max_buffer_time = this.max_buffer_time;
201 let mut max_time_spread: Duration = Duration::from_millis(0);
202 let mut not_pending: usize = 0;
203 let mut poll_queue: &mut FuturesUnordered<CallbackFuture<E, T>> = this.poll_queue;
204
205 loop {
206 match Pin::new(&mut poll_queue).poll_next(cx) {
207 Poll::Ready(Some((_, Some((_, Err(err)))))) => {
208 trace!("got a error, passing upstream");
209 return Poll::Ready(Some(Err(err)));
210 }
211 Poll::Ready(Some((idx, Some((stream, Ok(item)))))) => {
212 trace!("Got Ready");
215 not_pending += 1;
216 if let Some(state) = states.get_mut(idx) {
217 trace!("Appending results");
218 max_time_spread = state.spread().max(max_time_spread);
219 state.stream = Some(stream);
220 state.current.push(item);
221 }
222 }
223 Poll::Ready(Some((idx, None))) => {
224 if let Some(state) = states.get_mut(idx) {
225 trace!("Interface {} has completed", idx);
226 state.complete = true;
227 continue;
228 }
229 }
230 Poll::Pending => {
231 trace!("Got Pending");
232 break;
233 }
234 Poll::Ready(None) => {
235 trace!("Reached the end.");
236 break;
237 }
238 }
239 }
240
241 for (idx, state) in states.iter_mut().enumerate() {
242 if let Some(stream) = state.stream.take() {
243 trace!("re-adding stream to poll queue {}", idx);
245 let f = CallbackFuture {
246 idx,
247 stream: Some(stream),
248 };
249 poll_queue.push(f);
250 }
251 }
252
253 let one_buffer_is_over = max_time_spread > *max_buffer_time;
254
255 let ready_count = states
256 .iter()
257 .filter(|s| s.current.len() >= min_states_needed || s.complete)
258 .count();
259
260 let enough_state = ready_count == states.len();
261
262 let res = if enough_state || one_buffer_is_over {
263 trace!("Reporting");
264 gather_packets(states)
265 } else {
266 trace!("Not reporting {} {}", enough_state, one_buffer_is_over);
267 vec![]
268 };
269
270 let completed_count = states.iter().filter(|s| s.complete).count();
271
272 if res.is_empty() && completed_count == states.len() {
273 trace!("All ifaces are complete.");
274 return Poll::Ready(None);
275 } else if res.is_empty() && not_pending == 0 && !states.is_empty() {
276 trace!("All ifaces are delayed.");
277 return Poll::Pending;
278 } else {
279 trace!("Returning results {}", res.len());
280 return Poll::Ready(Some(Ok(res)));
281 }
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use std::io::Cursor;
288 use std::ops::Range;
289 use std::path::PathBuf;
290
291 use byteorder::{ByteOrder, ReadBytesExt};
292 use futures::stream;
293 use futures::{Future, Stream};
294 use rand;
295
296 use crate::PacketStream;
297
298 use super::*;
299
300 fn make_packet(ts: usize) -> Packet {
301 Packet {
302 timestamp: SystemTime::UNIX_EPOCH + Duration::from_millis(ts as _),
303 actual_length: 0,
304 original_length: 0,
305 data: vec![],
306 }
307 }
308
309 #[test]
310 fn packets_from_file() {
311 let _ = env_logger::try_init();
312
313 let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
314 .join("resources")
315 .join("canary.pcap");
316
317 info!("Testing against {:?}", pcap_path);
318
319 let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
320 .expect("No handle created");
321
322 let packet_stream =
323 PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
324
325 let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
326 .expect("Failed to build");
327
328 let packets = smol::block_on(async move {
329 let fut_packets = packet_provider.collect::<Vec<_>>();
330 let packets: Vec<_> = fut_packets
331 .await
332 .into_iter()
333 .flatten()
334 .flatten()
335 .filter(|p| p.data().len() == p.actual_length() as usize)
336 .collect();
337
338 handle.interrupt();
339
340 packets
341 });
342
343 assert_eq!(packets.len(), 10);
344
345 let packet = packets.first().cloned().expect("No packets");
346 let data = packet
347 .into_pcap_record::<byteorder::BigEndian>()
348 .expect("Failed to convert to pcap record");
349 let mut cursor = Cursor::new(data);
350 let ts_sec = cursor
351 .read_u32::<byteorder::BigEndian>()
352 .expect("Failed to read");
353 let ts_usec = cursor
354 .read_u32::<byteorder::BigEndian>()
355 .expect("Failed to read");
356 let actual_length = cursor
357 .read_u32::<byteorder::BigEndian>()
358 .expect("Failed to read");
359 assert_eq!(
360 ts_sec as u64 * 1_000_000 as u64 + ts_usec as u64,
361 1513735120021685
362 );
363 assert_eq!(actual_length, 54);
364 }
365
366 #[test]
367 fn packets_from_file_next_bridge() {
368 let _ = env_logger::try_init();
369
370 let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
371 .join("resources")
372 .join("canary.pcap");
373
374 info!("Testing against {:?}", pcap_path);
375
376 let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
377 .expect("No handle created");
378
379 let packet_stream =
380 PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
381
382 let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
383 .expect("Failed to build");
384
385 let packets = smol::block_on(async move {
386 let fut_packets = async move {
387 let mut packet_provider = packet_provider.boxed();
388 let mut packets = vec![];
389 while let Some(p) = packet_provider.next().await {
390 info!("packets returned {:?}", p);
391 packets.extend(p);
392 }
393 packets
394 };
395 let packets = fut_packets
396 .await
397 .into_iter()
398 .flatten()
399 .filter(|p| p.data().len() == p.actual_length() as _)
400 .count();
401
402 handle.interrupt();
403
404 packets
405 });
406
407 assert_eq!(packets, 10);
408 }
409
410 #[test]
411 fn packets_from_lookup_bridge() {
412 let _ = env_logger::try_init();
413
414 let handle = Handle::lookup().expect("No handle created");
415 let packet_stream =
416 PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
417
418 let stream = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2);
419
420 assert!(
421 stream.is_ok(),
422 format!("Could not build stream {}", stream.err().unwrap())
423 );
424 }
425
426 #[test]
427 fn packets_from_lookup_with_bpf() {
428 let _ = env_logger::try_init();
429
430 let mut cfg = Config::default();
431 cfg.with_bpf(
432 "(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
433 .to_owned(),
434 );
435 let handle = Handle::lookup().expect("No handle created");
436 let packet_stream =
437 PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
438
439 let stream = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2);
440
441 assert!(
442 stream.is_ok(),
443 format!("Could not build stream {}", stream.err().unwrap())
444 );
445 }
446
447 #[test]
448 fn packets_come_out_time_ordered() {
449 let mut packets1 = vec![];
450 let mut packets2 = vec![];
451
452 let base_time = std::time::SystemTime::UNIX_EPOCH;
453
454 for s in 0..20 {
455 let d = base_time + std::time::Duration::from_secs(s);
456 let p = Packet::new(d, 0, 0, vec![]);
457 packets1.push(p)
458 }
459
460 for s in 5..15 {
461 let d = base_time + std::time::Duration::from_secs(s);
462 let p = Packet::new(d, 0, 0, vec![]);
463 packets2.push(p)
464 }
465
466 let item1: StreamItem<Error> = Ok(packets1.clone());
467 let item2: StreamItem<Error> = Ok(packets2.clone());
468
469 let stream1 = futures::stream::iter(vec![item1]);
470 let stream2 = futures::stream::iter(vec![item2]);
471
472 let result = smol::block_on(async move {
473 let bridge = BridgeStream::new(vec![stream1, stream2], Duration::from_millis(100), 0);
474
475 let result = bridge
476 .expect("Unable to create BridgeStream")
477 .collect::<Vec<StreamItem<Error>>>()
478 .await;
479 result
480 .into_iter()
481 .map(|r| r.unwrap())
482 .flatten()
483 .collect::<Vec<Packet>>()
484 });
485 info!("Result {:?}", result);
486
487 let mut expected = vec![packets1, packets2]
488 .into_iter()
489 .flatten()
490 .collect::<Vec<Packet>>();
491 expected.sort_by_key(|p| p.timestamp().clone());
492 let expected_time = expected.iter().map(|p| p.timestamp()).collect::<Vec<_>>();
493 let result_time = result.iter().map(|p| p.timestamp()).collect::<Vec<_>>();
494 assert_eq!(result.len(), expected.len());
495 assert_eq!(result_time, expected_time);
496
497 info!("result: {:?}", result);
498 info!("expected: {:?}", expected);
499 }
500}