1use crate::config::Config;
2use crate::errors::Error;
3use crate::handle::Handle;
4use crate::packet::{Packet, PacketFuture};
5use crate::pcap_util;
6
7use futures::stream::{Stream, StreamExt};
8use log::*;
9use pin_project::pin_project;
10use std::future::Future;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15
16pub type StreamItem<E> = Result<Vec<Packet>, E>;
17
18#[pin_project]
19pub struct PacketStream {
20 config: Config,
21 handle: Arc<Handle>,
22 pending: Option<PacketFuture>,
23 complete: bool,
24}
25
26impl PacketStream {
27 pub fn new(config: Config, handle: Arc<Handle>) -> Result<PacketStream, Error> {
28 let live_capture = handle.is_live_capture();
29
30 if live_capture {
31 let h = handle
32 .set_snaplen(config.snaplen())?
33 .set_promiscuous()?
34 .set_buffer_size(config.buffer_size())?
35 .activate()?;
36 if !config.blocking() {
37 h.set_non_block()?;
38 }
39
40 if let Some(bpf) = config.bpf() {
41 let bpf = handle.compile_bpf(bpf)?;
42 handle.set_bpf(bpf)?;
43 }
44 }
45
46 Ok(PacketStream {
47 config: config,
48 handle: handle,
49 pending: None,
50 complete: false,
51 })
52 }
53}
54
55impl Stream for PacketStream {
56 type Item = StreamItem<Error>;
57
58 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59 let this = self.project();
60
61 if *this.complete {
62 return Poll::Ready(None);
63 }
64
65 let mut f = if let Some(f) = this.pending.take() {
66 f
67 } else {
68 match PacketFuture::new(this.config, this.handle) {
69 Err(e) => {
70 *this.complete = true;
71 return Poll::Ready(Some(Err(e)));
72 }
73 Ok(f) => f,
74 }
75 };
76
77 match Pin::new(&mut f).poll(cx) {
78 Poll::Pending => {
79 *this.pending = Some(f);
80 Poll::Pending
81 }
82 Poll::Ready(None) => {
83 debug!("Stream was complete");
84 *this.complete = true;
85 Poll::Ready(None)
86 }
87 Poll::Ready(Some(Err(e))) => {
88 *this.complete = true;
89 Poll::Ready(Some(Err(e)))
90 }
91 Poll::Ready(Some(Ok(packets))) => {
92 trace!("Returning {} packets", packets.len());
93 Poll::Ready(Some(Ok(packets)))
94 }
95 }
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use byteorder::{ByteOrder, ReadBytesExt};
103 use futures::{Future, Stream};
104 use std::io::Cursor;
105 use std::path::PathBuf;
106
107 #[test]
108 fn packets_from_file() {
109 let _ = env_logger::try_init();
110
111 let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
112 .join("resources")
113 .join("canary.pcap");
114
115 info!("Testing against {:?}", pcap_path);
116
117 let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
118 .expect("No handle created");
119
120 let packets = smol::block_on(async move {
121 let packet_provider =
122 PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
123 let fut_packets = packet_provider.collect::<Vec<_>>();
124 let packets: Vec<_> = fut_packets
125 .await
126 .into_iter()
127 .flatten()
128 .flatten()
129 .filter(|p| p.data().len() == p.actual_length() as usize)
130 .collect();
131
132 handle.interrupt();
133
134 packets
135 });
136
137 assert_eq!(packets.len(), 10);
138
139 let packet = packets.first().cloned().expect("No packets");
140 let data = packet
141 .into_pcap_record::<byteorder::BigEndian>()
142 .expect("Failed to convert to pcap record");
143 let mut cursor = Cursor::new(data);
144 let ts_sec = cursor
145 .read_u32::<byteorder::BigEndian>()
146 .expect("Failed to read");
147 let ts_usec = cursor
148 .read_u32::<byteorder::BigEndian>()
149 .expect("Failed to read");
150 let actual_length = cursor
151 .read_u32::<byteorder::BigEndian>()
152 .expect("Failed to read");
153 assert_eq!(
154 ts_sec as u64 * 1_000_000 as u64 + ts_usec as u64,
155 1513735120021685
156 );
157 assert_eq!(actual_length, 54);
158 }
159
160 #[test]
161 fn packets_from_large_file() {
162 let _ = env_logger::try_init();
163
164 let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
165 .join("resources")
166 .join("4SICS-GeekLounge-151020.pcap");
167
168 info!("Testing against {:?}", pcap_path);
169
170 let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
171 .expect("No handle created");
172
173 let packets = smol::block_on(async move {
174 let packet_provider =
175 PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
176 let fut_packets = packet_provider.collect::<Vec<_>>();
177 let packets: Vec<_> = fut_packets
178 .await
179 .into_iter()
180 .flatten()
181 .flatten()
182 .filter(|p| p.data().len() == p.actual_length() as usize)
183 .collect();
184
185 handle.interrupt();
186
187 packets
188 });
189
190 assert_eq!(packets.len(), 246137);
191 }
192
193 #[test]
194 fn packets_from_file_next() {
195 let _ = env_logger::try_init();
196
197 let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
198 .join("resources")
199 .join("canary.pcap");
200
201 info!("Testing against {:?}", pcap_path);
202
203 let packets = smol::block_on(async move {
204 let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
205 .expect("No handle created");
206
207 let packet_provider =
208 PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
209 let fut_packets = async move {
210 let mut packet_provider = packet_provider.boxed();
211 let mut packets = vec![];
212 while let Some(p) = packet_provider.next().await {
213 packets.extend(p);
214 }
215 packets
216 };
217 let packets = fut_packets
218 .await
219 .into_iter()
220 .flatten()
221 .filter(|p| p.data().len() == p.actual_length() as _)
222 .count();
223
224 handle.interrupt();
225
226 packets
227 });
228
229 assert_eq!(packets, 10);
230 }
231
232 #[test]
233 fn packets_from_lookup() {
234 let _ = env_logger::try_init();
235
236 let handle = Handle::lookup().expect("No handle created");
237
238 let stream = PacketStream::new(Config::default(), handle);
239
240 assert!(
241 stream.is_ok(),
242 format!("Could not build stream {}", stream.err().unwrap())
243 );
244
245 let mut stream = stream.unwrap();
246
247 smol::block_on(async move { stream.next().await })
248 .unwrap()
249 .unwrap();
250 }
251
252 #[test]
253 fn packets_from_lookup_with_bpf() {
254 let _ = env_logger::try_init();
255
256 let mut cfg = Config::default();
257 cfg.with_bpf(
258 "(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
259 .to_owned(),
260 );
261 let handle = Handle::lookup().expect("No handle created");
262
263 let stream = PacketStream::new(cfg, handle);
264
265 assert!(
266 stream.is_ok(),
267 format!("Could not build stream {}", stream.err().unwrap())
268 );
269 }
270}