pcap_async/
stream.rs

1use crate::config::Config;
2use crate::errors::Error;
3use crate::handle::Handle;
4use crate::packet::{Packet, PacketFuture};
5use crate::pcap_util;
6
7use futures::stream::{Stream, StreamExt};
8use log::*;
9use pin_project::pin_project;
10use std::future::Future;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15
16pub type StreamItem<E> = Result<Vec<Packet>, E>;
17
18#[pin_project]
19pub struct PacketStream {
20    config: Config,
21    handle: Arc<Handle>,
22    pending: Option<PacketFuture>,
23    complete: bool,
24}
25
26impl PacketStream {
27    pub fn new(config: Config, handle: Arc<Handle>) -> Result<PacketStream, Error> {
28        let live_capture = handle.is_live_capture();
29
30        if live_capture {
31            let h = handle
32                .set_snaplen(config.snaplen())?
33                .set_promiscuous()?
34                .set_buffer_size(config.buffer_size())?
35                .activate()?;
36            if !config.blocking() {
37                h.set_non_block()?;
38            }
39
40            if let Some(bpf) = config.bpf() {
41                let bpf = handle.compile_bpf(bpf)?;
42                handle.set_bpf(bpf)?;
43            }
44        }
45
46        Ok(PacketStream {
47            config: config,
48            handle: handle,
49            pending: None,
50            complete: false,
51        })
52    }
53}
54
55impl Stream for PacketStream {
56    type Item = StreamItem<Error>;
57
58    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59        let this = self.project();
60
61        if *this.complete {
62            return Poll::Ready(None);
63        }
64
65        let mut f = if let Some(f) = this.pending.take() {
66            f
67        } else {
68            match PacketFuture::new(this.config, this.handle) {
69                Err(e) => {
70                    *this.complete = true;
71                    return Poll::Ready(Some(Err(e)));
72                }
73                Ok(f) => f,
74            }
75        };
76
77        match Pin::new(&mut f).poll(cx) {
78            Poll::Pending => {
79                *this.pending = Some(f);
80                Poll::Pending
81            }
82            Poll::Ready(None) => {
83                debug!("Stream was complete");
84                *this.complete = true;
85                Poll::Ready(None)
86            }
87            Poll::Ready(Some(Err(e))) => {
88                *this.complete = true;
89                Poll::Ready(Some(Err(e)))
90            }
91            Poll::Ready(Some(Ok(packets))) => {
92                trace!("Returning {} packets", packets.len());
93                Poll::Ready(Some(Ok(packets)))
94            }
95        }
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use byteorder::{ByteOrder, ReadBytesExt};
103    use futures::{Future, Stream};
104    use std::io::Cursor;
105    use std::path::PathBuf;
106
107    #[test]
108    fn packets_from_file() {
109        let _ = env_logger::try_init();
110
111        let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
112            .join("resources")
113            .join("canary.pcap");
114
115        info!("Testing against {:?}", pcap_path);
116
117        let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
118            .expect("No handle created");
119
120        let packets = smol::block_on(async move {
121            let packet_provider =
122                PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
123            let fut_packets = packet_provider.collect::<Vec<_>>();
124            let packets: Vec<_> = fut_packets
125                .await
126                .into_iter()
127                .flatten()
128                .flatten()
129                .filter(|p| p.data().len() == p.actual_length() as usize)
130                .collect();
131
132            handle.interrupt();
133
134            packets
135        });
136
137        assert_eq!(packets.len(), 10);
138
139        let packet = packets.first().cloned().expect("No packets");
140        let data = packet
141            .into_pcap_record::<byteorder::BigEndian>()
142            .expect("Failed to convert to pcap record");
143        let mut cursor = Cursor::new(data);
144        let ts_sec = cursor
145            .read_u32::<byteorder::BigEndian>()
146            .expect("Failed to read");
147        let ts_usec = cursor
148            .read_u32::<byteorder::BigEndian>()
149            .expect("Failed to read");
150        let actual_length = cursor
151            .read_u32::<byteorder::BigEndian>()
152            .expect("Failed to read");
153        assert_eq!(
154            ts_sec as u64 * 1_000_000 as u64 + ts_usec as u64,
155            1513735120021685
156        );
157        assert_eq!(actual_length, 54);
158    }
159
160    #[test]
161    fn packets_from_large_file() {
162        let _ = env_logger::try_init();
163
164        let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
165            .join("resources")
166            .join("4SICS-GeekLounge-151020.pcap");
167
168        info!("Testing against {:?}", pcap_path);
169
170        let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
171            .expect("No handle created");
172
173        let packets = smol::block_on(async move {
174            let packet_provider =
175                PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
176            let fut_packets = packet_provider.collect::<Vec<_>>();
177            let packets: Vec<_> = fut_packets
178                .await
179                .into_iter()
180                .flatten()
181                .flatten()
182                .filter(|p| p.data().len() == p.actual_length() as usize)
183                .collect();
184
185            handle.interrupt();
186
187            packets
188        });
189
190        assert_eq!(packets.len(), 246137);
191    }
192
193    #[test]
194    fn packets_from_file_next() {
195        let _ = env_logger::try_init();
196
197        let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
198            .join("resources")
199            .join("canary.pcap");
200
201        info!("Testing against {:?}", pcap_path);
202
203        let packets = smol::block_on(async move {
204            let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
205                .expect("No handle created");
206
207            let packet_provider =
208                PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
209            let fut_packets = async move {
210                let mut packet_provider = packet_provider.boxed();
211                let mut packets = vec![];
212                while let Some(p) = packet_provider.next().await {
213                    packets.extend(p);
214                }
215                packets
216            };
217            let packets = fut_packets
218                .await
219                .into_iter()
220                .flatten()
221                .filter(|p| p.data().len() == p.actual_length() as _)
222                .count();
223
224            handle.interrupt();
225
226            packets
227        });
228
229        assert_eq!(packets, 10);
230    }
231
232    #[test]
233    fn packets_from_lookup() {
234        let _ = env_logger::try_init();
235
236        let handle = Handle::lookup().expect("No handle created");
237
238        let stream = PacketStream::new(Config::default(), handle);
239
240        assert!(
241            stream.is_ok(),
242            format!("Could not build stream {}", stream.err().unwrap())
243        );
244
245        let mut stream = stream.unwrap();
246
247        smol::block_on(async move { stream.next().await })
248            .unwrap()
249            .unwrap();
250    }
251
252    #[test]
253    fn packets_from_lookup_with_bpf() {
254        let _ = env_logger::try_init();
255
256        let mut cfg = Config::default();
257        cfg.with_bpf(
258            "(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
259                .to_owned(),
260        );
261        let handle = Handle::lookup().expect("No handle created");
262
263        let stream = PacketStream::new(cfg, handle);
264
265        assert!(
266            stream.is_ok(),
267            format!("Could not build stream {}", stream.err().unwrap())
268        );
269    }
270}