pako_core/
analyzer.rs

1use std::{cmp::min, net::IpAddr, ops::DerefMut, sync::Arc};
2
3use log::{debug, trace, warn};
4use pako_tools::*;
5use pcap_parser::{
6    data::{get_packetdata_raw, PacketData},
7    Linktype,
8};
9use pnet_packet::{
10    ethernet::{EtherType, EtherTypes, EthernetPacket},
11    gre::GrePacket,
12    icmp::IcmpPacket,
13    icmpv6::Icmpv6Packet,
14    ip::{IpNextHeaderProtocol, IpNextHeaderProtocols},
15    ipv4::{Ipv4Flags, Ipv4Packet},
16    ipv6::{ExtensionPacket, FragmentPacket, Ipv6Packet},
17    tcp::TcpPacket,
18    udp::UdpPacket,
19    vlan::VlanPacket,
20    Packet as PnetPacket, PacketSize,
21};
22
23use crate::{
24    erspan::ERSPANPacket,
25    flow_map::FlowMap,
26    geneve::*,
27    ip_defrag::{DefragEngine, Fragment, IPDefragEngine},
28    layers::LinkLayerType,
29    mpls::*,
30    packet_info::PacketInfo,
31    plugin::*,
32    plugin_registry::*,
33    ppp::{PppPacket, PppProtocolTypes},
34    pppoe::PppoeSessionPacket,
35    tcp_reassembly::{finalize_tcp_streams, TcpStreamError, TcpStreamReassembly},
36    vxlan::*,
37};
38
39#[derive(Clone, Debug, Default)]
40pub struct L3Info {
41    /// Layer 4 protocol (e.g TCP, UDP, ICMP)
42    pub l4_proto: u8,
43    pub three_tuple: ThreeTuple,
44}
45
46/// Pcap/Pcap-ng analyzer
47///
48/// Read input pcap/pcap-ng data, parse it and call plugin callbacks
49/// for each ISO layer (L2 if available, L3 and L4).
50/// Flows are created for L4 sessions. Events are sent when plugins
51/// are created or destroyed.
52///
53/// The number of worker threads can be configured from the `num_threads`
54/// configuration variable. By default, it is 0 (auto-detect the number
55/// of cores and create the same number of threads).
56///
57/// All callbacks for a single ISO layer will be called concurrently before
58/// calling the next level callbacks.
59pub struct Analyzer {
60    pub(crate) registry: Arc<PluginRegistry>,
61
62    pub(crate) flows: FlowMap,
63
64    ipv4_defrag: Box<dyn DefragEngine>,
65    ipv6_defrag: Box<dyn DefragEngine>,
66    pub(crate) tcp_defrag: TcpStreamReassembly,
67
68    defrag_count: usize,
69    do_checksums: bool,
70    skip_index: usize,
71    output_dir: Option<String>,
72}
73
74impl Analyzer {
75    pub fn new(registry: Arc<PluginRegistry>, config: &Config) -> Analyzer {
76        let do_checksums = config.get_bool("do_checksums").unwrap_or(true);
77        let skip_index = config.get_usize("skip_index").unwrap_or(0);
78        if skip_index > 0 {
79            debug!("Will skip to index {}", skip_index);
80        }
81        let output_dir = config.get("output_dir").map(|s| s.to_owned());
82        Analyzer {
83            registry,
84            flows: FlowMap::default(),
85            ipv4_defrag: Box::new(IPDefragEngine::new()),
86            ipv6_defrag: Box::new(IPDefragEngine::new()),
87            tcp_defrag: TcpStreamReassembly::default(),
88            defrag_count: 0,
89            do_checksums,
90            skip_index,
91            output_dir,
92        }
93    }
94
95    /// Get a reference to plugin registry
96    pub fn registry(&self) -> &PluginRegistry {
97        &self.registry
98    }
99
100    #[inline]
101    fn handle_l2(&mut self, packet: &Packet, ctx: &ParseContext, data: &[u8]) -> Result<(), Error> {
102        handle_l2(packet, ctx, data, self)
103    }
104
105    /// Use deterministic values for random numbers (for ex. flow IDs)
106    ///
107    /// This option is intended for use in testing
108    pub fn with_deterministic_rng(mut self) -> Self {
109        self.flows = self.flows.with_rng_seed(0);
110        self
111    }
112}
113
114pub(crate) fn handle_l2(
115    packet: &Packet,
116    ctx: &ParseContext,
117    data: &[u8],
118    analyzer: &mut Analyzer,
119) -> Result<(), Error> {
120    trace!("handle_l2 (idx={})", ctx.pcap_index);
121
122    // resize slice to remove padding
123    let datalen = min(packet.caplen as usize, data.len());
124    let data = &data[..datalen];
125
126    // let start = ::std::time::Instant::now();
127    run_plugins_v2_physical(packet, ctx, data, analyzer)?;
128    // let elapsed = start.elapsed();
129    // debug!("Time to run l2 plugins: {}.{}", elapsed.as_secs(), elapsed.as_millis());
130
131    match EthernetPacket::new(data) {
132        Some(eth) => {
133            // debug!("    source: {}", eth.get_source());
134            // debug!("    dest  : {}", eth.get_destination());
135            let dest = eth.get_destination();
136            if dest.is_multicast() {
137                match &data[..6] {
138                    [0x01, 0x00, 0x0c, 0xcc, 0xcc, 0xcc] => {
139                        debug!("Cisco CDP/VTP/UDLD - ignoring");
140                        // the 'ethertype' field is used for length
141                        return Ok(());
142                    }
143                    [0x01, 0x00, 0x0c, 0xcd, 0xcd, 0xd0] => {
144                        debug!("Cisco Multicast address - ignoring");
145                        return Ok(());
146                    }
147                    _ => {
148                        trace!("Ethernet broadcast (unknown type) (idx={})", ctx.pcap_index);
149                    }
150                }
151            }
152            let ethertype = eth.get_ethertype();
153            // detect if 802.3 or Ethernet II framing (https://en.wikipedia.org/wiki/Ethernet_frame#Ethernet_II)
154            match ethertype.0 {
155                0..=1500 => {
156                    // IEEE 802.3 frame
157                    // field is not an ethertype, but a length
158                    // next layer is a 802.2 LLC Header
159                    // [DSAP] [SSAP] [Control]
160                    // is SSAP is 0xAA, then this is a SNAP frame
161                    // see also https://www.cisco.com/c/en/us/support/docs/ibm-technologies/logical-link-control-llc/12247-45.html
162                    // and https://arxiv.org/pdf/1610.00635.pdf
163                    let payload = eth.payload();
164                    if payload.len() < 3 {
165                        warn!("Incomplete 802.3 frame (idx={})", ctx.pcap_index);
166                        return Ok(());
167                    }
168                    // if payload[1] == 0xAA {
169                    //     unimplemented!("802.3 with SNAP frame not implemented yet");
170                    // }
171                    //
172                    // LSAP values: https://en.wikipedia.org/wiki/IEEE_802.2#LSAP_values
173                    // value 6 is internet protocol
174                    // match payload[0] {
175                    //     _ => (),
176                    // }
177                    trace!("IEEE 802.3 frame, ignoring");
178                    return Ok(());
179                }
180                1501..=1536 => {
181                    warn!(
182                        "Undefined value in ethernet type/length field (idx={})",
183                        ctx.pcap_index
184                    );
185                }
186                _ => (),
187            }
188            let payload = eth.payload();
189            trace!("    ethertype: 0x{:x}", ethertype.0);
190            run_plugins_v2_link(packet, ctx, LinkLayerType::Ethernet, payload, analyzer)?;
191            handle_l3(packet, ctx, payload, ethertype, analyzer)
192        }
193        None => {
194            // packet too small to be ethernet
195            Ok(())
196        }
197    }
198}
199
200pub(crate) fn handle_l3(
201    packet: &Packet,
202    ctx: &ParseContext,
203    data: &[u8],
204    ethertype: EtherType,
205    analyzer: &mut Analyzer,
206) -> Result<(), Error> {
207    if data.is_empty() {
208        return Ok(());
209    }
210
211    // see https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml
212    match ethertype {
213        // Transparent Ethernet Bridging / Generic Routing Encapsulation (RFC 1701)
214        EtherType(0x6558) => handle_l2(packet, ctx, data, analyzer),
215        EtherTypes::Ipv4 => handle_l3_ipv4(packet, ctx, data, analyzer),
216        EtherTypes::Ipv6 => handle_l3_ipv6(packet, ctx, data, analyzer),
217        EtherTypes::Vlan => handle_l3_vlan_801q(packet, ctx, data, analyzer),
218        // ignore ARP packets
219        EtherTypes::Arp => Ok(()),
220        // 0x880b: PPP (rfc7042)
221        EtherType(0x880b) => handle_l3_ppp(packet, ctx, data, analyzer),
222        // 0x8847: MPLS (RFC5332)
223        // 0x8848: MPLS with upstream-assigned label (RFC5332)
224        EtherTypes::Mpls | EtherTypes::MplsMcast => handle_l3_mpls(packet, ctx, data, analyzer),
225        EtherType(0x88be) => handle_l3_erspan(packet, ctx, data, analyzer),
226        EtherTypes::PppoeSession => handle_l3_pppoesession(packet, ctx, data, analyzer),
227
228        e => {
229            trace!(
230                "unsupported ethertype {} (0x{:x}) (idx={})",
231                e,
232                e.0,
233                ctx.pcap_index
234            );
235            Ok(())
236        }
237    }
238}
239
240fn handle_l3_ipv4(
241    packet: &Packet,
242    ctx: &ParseContext,
243    data: &[u8],
244    analyzer: &mut Analyzer,
245) -> Result<(), Error> {
246    trace!("handle_l3_ipv4 (idx={})", ctx.pcap_index);
247    let ipv4 = Ipv4Packet::new(data).ok_or("Could not build IPv4 packet from data")?;
248    // eprintln!("ABORT pkt {:?}", ipv4);
249    let orig_len = data.len();
250
251    let ip_len = ipv4.get_total_length() as usize;
252
253    // remove padding
254    let (data, ipv4) = {
255        if ip_len < data.len() && ip_len > 0 {
256            let d = &data[..ip_len];
257            let ipv4 = Ipv4Packet::new(d).ok_or("Could not build IPv4 packet from data")?;
258            (d, ipv4)
259        } else {
260            (data, ipv4)
261        }
262    };
263
264    let l4_proto = ipv4.get_next_level_protocol().0;
265    let t3 = ThreeTuple {
266        src: IpAddr::V4(ipv4.get_source()),
267        dst: IpAddr::V4(ipv4.get_destination()),
268        l4_proto,
269    };
270
271    if analyzer.do_checksums {
272        let cksum = ::pnet_packet::ipv4::checksum(&ipv4);
273        if cksum != ipv4.get_checksum() {
274            warn!("IPv4: invalid checksum");
275        }
276    }
277
278    // if get_total_length is 0, assume TSO offloading and no padding
279    let payload = if ip_len == 0 {
280        warn!(
281            "IPv4: packet reported length is 0. Assuming TSO (idx={})",
282            ctx.pcap_index
283        );
284        // the payload() function from pnet will fail
285        let start = ipv4.get_header_length() as usize * 4;
286        if start > data.len() {
287            warn!("IPv4: ip_len == 0 and ipv4.get_header_length is invalid!");
288            return Ok(());
289        }
290        &data[start..]
291    } else {
292        ipv4.payload()
293    };
294
295    // check IP fragmentation before calling handle_l4
296    let frag_offset = (ipv4.get_fragment_offset() * 8) as usize;
297    let more_fragments = ipv4.get_flags() & Ipv4Flags::MoreFragments != 0;
298    let defrag = analyzer.ipv4_defrag.update(
299        ipv4.get_identification().into(),
300        frag_offset,
301        more_fragments,
302        payload,
303    );
304    let payload = match defrag {
305        Fragment::NoFrag(d) => {
306            debug_assert!(d.len() < orig_len);
307            d
308        }
309        Fragment::Complete(ref v) => {
310            warn!("IPv4 defrag done, using defrag buffer len={}", v.len());
311            v
312        }
313        Fragment::Incomplete => {
314            debug!("IPv4 defragmentation incomplete");
315            return Ok(());
316        }
317        Fragment::Error => {
318            warn!("IPv4 defragmentation error");
319            return Ok(());
320        }
321    };
322
323    // TODO check if   ip_len - ipv4.get_options_raw().len() - 20 > payload.len()
324    // if yes, capture may be truncated
325
326    run_plugins_v2_network(packet, ctx, payload, &t3, analyzer)?;
327
328    let l3_info = L3Info {
329        three_tuple: t3,
330        l4_proto,
331    };
332    handle_l3_common(packet, ctx, payload, &l3_info, analyzer)
333}
334
335fn is_ipv6_opt(opt: IpNextHeaderProtocol) -> bool {
336    matches!(
337        opt,
338        IpNextHeaderProtocols::Hopopt
339            | IpNextHeaderProtocols::Ipv6Opts
340            | IpNextHeaderProtocols::Ipv6Route
341            | IpNextHeaderProtocols::Ipv6Frag
342            | IpNextHeaderProtocols::Esp
343            | IpNextHeaderProtocols::Ah
344            | IpNextHeaderProtocols::MobilityHeader
345    )
346}
347
348fn handle_l3_ipv6(
349    packet: &Packet,
350    ctx: &ParseContext,
351    data: &[u8],
352    analyzer: &mut Analyzer,
353) -> Result<(), Error> {
354    trace!("handle_l3_ipv6 (idx={})", ctx.pcap_index);
355    let ipv6 = Ipv6Packet::new(data).ok_or("Could not build IPv6 packet from data")?;
356
357    let mut payload = ipv6.payload();
358    let mut l4_proto = ipv6.get_next_header();
359
360    if payload.is_empty() {
361        // jumbogram ? (rfc2675)
362        trace!("IPv6 length is 0. Jumbogram?");
363        if data.len() >= 40 {
364            payload = &data[40..];
365        } else {
366            warn!(
367                "IPv6 length is 0, but frame is too short for an IPv6 header (idx={})",
368                ctx.pcap_index
369            );
370            return Ok(());
371        }
372    }
373
374    // XXX remove padding ?
375
376    let mut extensions = Vec::new();
377    let mut frag_ext = None;
378
379    // skip all extensions (keep them ?)
380    while is_ipv6_opt(l4_proto) {
381        let ext = ExtensionPacket::new(payload)
382            .ok_or("Could not build IPv6 Extension packet from payload")?;
383        let next_header = ext.get_next_header();
384        trace!("option header: {}", l4_proto);
385        if l4_proto == IpNextHeaderProtocols::Ipv6Frag {
386            if frag_ext.is_some() {
387                warn!("multiple IPv6Frag extensions idx={}", ctx.pcap_index);
388                return Ok(());
389            }
390            frag_ext = FragmentPacket::new(payload);
391        }
392        // XXX fixup wrong extension size calculation in pnet
393        let offset = if l4_proto != IpNextHeaderProtocols::Ah {
394            ext.packet_size()
395        } else {
396            // https://en.wikipedia.org/wiki/IPsec#Authentication_Header
397            // The length of this Authentication Header in 4-octet units, minus 2. For example, an
398            // AH value of 4 equals 3×(32-bit fixed-length AH fields) + 3×(32-bit ICV fields) − 2
399            // and thus an AH value of 4 means 24 octets. Although the size is measured in 4-octet
400            // units, the length of this header needs to be a multiple of 8 octets if carried in an
401            // IPv6 packet. This restriction does not apply to an Authentication Header carried in
402            // an IPv4 packet.
403            let l1 = (payload[1] - 1) as usize;
404            let val = l1 * 4 + l1 * 4 - 2;
405            (val + 7) & (!7)
406        };
407        extensions.push((l4_proto, ext));
408        l4_proto = next_header;
409        payload = &payload[offset..];
410    }
411
412    let t3 = ThreeTuple {
413        src: IpAddr::V6(ipv6.get_source()),
414        dst: IpAddr::V6(ipv6.get_destination()),
415        l4_proto: l4_proto.0,
416    };
417
418    run_plugins_v2_network(packet, ctx, payload, &t3, analyzer)?;
419
420    if l4_proto == IpNextHeaderProtocols::Ipv6NoNxt {
421        // usually the case for IPv6 mobility
422        // XXX header data could be inspected?
423        trace!("No next header");
424        if !payload.is_empty() {
425            warn!(
426                "No next header, but data is present (len={})",
427                payload.len()
428            );
429        }
430        return Ok(());
431    }
432
433    let l3_info = L3Info {
434        three_tuple: t3,
435        l4_proto: l4_proto.0,
436    };
437
438    if let Some(frag_info) = frag_ext {
439        handle_l4_ipv6frag(
440            packet, ctx, &frag_info, payload, &l3_info, l4_proto, analyzer,
441        )
442    } else {
443        handle_l3_common(packet, ctx, payload, &l3_info, analyzer)
444    }
445}
446
447fn handle_l3_vlan_801q(
448    packet: &Packet,
449    ctx: &ParseContext,
450    data: &[u8],
451    analyzer: &mut Analyzer,
452) -> Result<(), Error> {
453    trace!("handle_l3_vlan_801q (idx={})", ctx.pcap_index);
454    let vlan = VlanPacket::new(data).ok_or("Could not build 802.1Q Vlan packet from data")?;
455    let next_ethertype = vlan.get_ethertype();
456    trace!("    802.1q: VLAN id={}", vlan.get_vlan_identifier());
457
458    handle_l3(packet, ctx, vlan.payload(), next_ethertype, analyzer)
459}
460
461fn handle_l3_erspan(
462    packet: &Packet,
463    ctx: &ParseContext,
464    data: &[u8],
465    analyzer: &mut Analyzer,
466) -> Result<(), Error> {
467    trace!("handle_l3_erspan (idx={})", ctx.pcap_index);
468    let erspan = ERSPANPacket::new(data).ok_or("Could not build Erspan packet from data")?;
469    trace!(
470        "    erspan: VLAN id={} span ID={}",
471        erspan.get_vlan(),
472        erspan.get_span_id()
473    );
474    handle_l2(packet, ctx, erspan.payload(), analyzer)
475}
476
477fn handle_l3_mpls(
478    packet: &Packet,
479    ctx: &ParseContext,
480    data: &[u8],
481    analyzer: &mut Analyzer,
482) -> Result<(), Error> {
483    trace!("handle_l2_mpls (idx={})", ctx.pcap_index);
484    let mpls = MPLSPacket::new(data).ok_or("Could not build MPLS packet from data")?;
485
486    let payload = mpls.payload();
487    trace!("    MPLS # labels: {}", mpls.get_num_labels());
488    trace!("    MPLS top label: {}", mpls.get_top_label().get_label());
489
490    // MPLS does not have a next header field. Try to guess possible values from
491    // (IPv4, IPv6, Ethernet)
492    if payload.is_empty() {
493        warn!("MPLS packet but no data");
494        return Ok(());
495    }
496    let first_nibble = payload[0] >> 4;
497    match first_nibble {
498        4 => handle_l3_ipv4(packet, ctx, payload, analyzer),
499        6 => handle_l3_ipv6(packet, ctx, payload, analyzer),
500        _ => handle_l2(packet, ctx, payload, analyzer),
501    }
502    // store top label / decoder association?
503}
504
505fn handle_l3_pppoesession(
506    packet: &Packet,
507    ctx: &ParseContext,
508    data: &[u8],
509    analyzer: &mut Analyzer,
510) -> Result<(), Error> {
511    trace!("handle_l3_pppoesession (idx={})", ctx.pcap_index);
512    let session =
513        PppoeSessionPacket::new(data).ok_or("Could not build PppoeSession packet from data")?;
514    trace!(
515        "    pppoesession: version={} type={} code={}",
516        session.get_version(),
517        session.get_type(),
518        session.get_code(),
519    );
520    let ppp_data = session.payload();
521    handle_l3_ppp(packet, ctx, ppp_data, analyzer)
522}
523
524fn handle_l3_ppp(
525    packet: &Packet,
526    ctx: &ParseContext,
527    data: &[u8],
528    analyzer: &mut Analyzer,
529) -> Result<(), Error> {
530    trace!("handle_l3_ppp (idx={})", ctx.pcap_index);
531    let ppp = PppPacket::new(data).ok_or("Could not build Ppp packet from data")?;
532    let proto = ppp.get_protocol();
533    let payload = ppp.payload();
534    trace!("    ppp: protocol=0x{:02x}", proto.0,);
535    match proto {
536        PppProtocolTypes::Ipv4 => handle_l3_ipv4(packet, ctx, payload, analyzer),
537        PppProtocolTypes::Ipv6 => handle_l3_ipv6(packet, ctx, payload, analyzer),
538        _ => {
539            warn!("Unsupported PPP protocol 0x{:02x}", proto.0);
540            Ok(())
541        }
542    }
543}
544
545fn handle_l3_common(
546    packet: &Packet,
547    ctx: &ParseContext,
548    data: &[u8],
549    l3_info: &L3Info,
550    analyzer: &mut Analyzer,
551) -> Result<(), Error> {
552    match IpNextHeaderProtocol(l3_info.l4_proto) {
553        IpNextHeaderProtocols::Tcp => handle_l4_tcp(packet, ctx, data, l3_info, analyzer),
554        IpNextHeaderProtocols::Udp => handle_l4_udp(packet, ctx, data, l3_info, analyzer),
555        IpNextHeaderProtocols::Icmp => handle_l4_icmp(packet, ctx, data, l3_info, analyzer),
556        IpNextHeaderProtocols::Icmpv6 => handle_l4_icmpv6(packet, ctx, data, l3_info, analyzer),
557        IpNextHeaderProtocols::Esp => handle_l4_generic(packet, ctx, data, l3_info, analyzer),
558        IpNextHeaderProtocols::Gre => handle_l4_gre(packet, ctx, data, l3_info, analyzer),
559        IpNextHeaderProtocols::Ipv4 => handle_l3(packet, ctx, data, EtherTypes::Ipv4, analyzer),
560        IpNextHeaderProtocols::Ipv6 => handle_l3(packet, ctx, data, EtherTypes::Ipv6, analyzer),
561        p => {
562            warn!("Unsupported L4 proto {} (idx={})", p, ctx.pcap_index);
563            handle_l4_generic(packet, ctx, data, l3_info, analyzer)
564        }
565    }
566}
567
568fn handle_l4_tcp(
569    packet: &Packet,
570    ctx: &ParseContext,
571    l4_data: &[u8],
572    l3_info: &L3Info,
573    analyzer: &mut Analyzer,
574) -> Result<(), Error> {
575    trace!("handle_l4_tcp (idx={})", ctx.pcap_index);
576    trace!("    l4_data len: {}", l4_data.len());
577    let tcp = TcpPacket::new(l4_data).ok_or("Could not build TCP packet from data")?;
578
579    let src_port = tcp.get_source();
580    let dst_port = tcp.get_destination();
581
582    // XXX begin copy/paste of handle_l4_common
583    let five_tuple = FiveTuple::from_three_tuple(&l3_info.three_tuple, src_port, dst_port);
584    trace!("5-t: {}", five_tuple);
585    let now = packet.ts;
586
587    let flow_id = {
588        // flows modification section
589        let flows = &mut analyzer.flows;
590        // lookup flow
591        let flow_id = match flows.lookup_flow(&five_tuple) {
592            Some(id) => id,
593            None => {
594                let flow = Flow::new(&five_tuple, packet.ts.secs, packet.ts.micros);
595                gen_event_new_flow(&flow, &analyzer.registry);
596                flows.insert_flow(five_tuple.clone(), flow)
597            }
598        };
599
600        // update flow
601        flows.entry(flow_id).and_modify(|flow| {
602            flow.flow_id = flow_id;
603            flow.last_seen = now;
604        });
605        flow_id
606    };
607
608    // get a read-only reference to flow
609    let flow = analyzer
610        .flows
611        .get_flow(flow_id)
612        .expect("could not get flow from ID")
613        .clone();
614
615    let to_server = flow.five_tuple == five_tuple;
616
617    // XXX end copy/paste
618
619    let res = analyzer
620        .tcp_defrag
621        .update(&flow, &tcp, to_server, ctx.pcap_index);
622    match res {
623        Ok(Some(segments)) => {
624            // merge into one buffer
625            let mut new_vec = Vec::new();
626            let buffer = match segments.len() {
627                0 => {
628                    return Ok(());
629                }
630                1 => &segments[0].data,
631                _ => {
632                    segments
633                        .iter()
634                        .for_each(|s| new_vec.extend_from_slice(&s.data));
635                    &new_vec
636                }
637            };
638            let pcap_index = segments[0].pcap_index;
639            // send to upper layer and call plugins
640            // since this is ACK'ed data, data origin is the current destination
641            let t5 = five_tuple.get_reverse();
642            let origin_addr = t5.src;
643            let origin_port = t5.src_port;
644            trace!(
645                "Sending reassembled data from {}:{} (len={}, first pcap_index={})",
646                origin_addr,
647                origin_port,
648                buffer.len(),
649                pcap_index,
650            );
651            // XXX build a dummy packet
652            let l4_payload = buffer;
653            let dummy_packet = Packet {
654                interface: packet.interface,
655                caplen: 0,
656                origlen: 0,
657                ts: packet.ts, // this is the timestamp of ACK, not data
658                link_type: packet.link_type,
659                data: PacketData::L4(t5.proto, &[]),
660                pcap_index,
661            };
662            let packet_info = PacketInfo {
663                five_tuple: &t5,
664                to_server: !to_server,
665                l3_type: l3_info.three_tuple.l3_proto(),
666                l4_data: &[], // reassembled, so no L4 data
667                l4_type: t5.proto,
668                l4_payload: Some(l4_payload),
669                flow: Some(&flow),
670                pcap_index,
671            };
672            // let start = ::std::time::Instant::now();
673            run_plugins_v2_transport(&dummy_packet, ctx, &packet_info, analyzer)?;
674            // let elapsed = start.elapsed();
675            // debug!("Time to run l4 plugins: {}.{}", elapsed.as_secs(), elapsed.as_millis());
676        }
677        Ok(_) => (),
678        Err(TcpStreamError::Inverted) => {
679            analyzer.flows.entry(flow_id).and_modify(|f| {
680                f.five_tuple = f.five_tuple.get_reverse();
681            });
682        }
683        Err(e) => {
684            warn!("Tcp steam reassembly error: {:?}", e);
685        }
686    }
687
688    // check if TCP streams did timeout or expire
689    // TODO do the check only every nth packet/second?
690    //    warn!("now: {:?}", now);
691    analyzer.defrag_count += 1;
692    if analyzer.defrag_count > 1000 {
693        analyzer.tcp_defrag.check_expired_connections(now);
694        analyzer.defrag_count = 0;
695    }
696
697    // handle_l4_common(
698    //     packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
699    // )
700    Ok(())
701}
702
703fn handle_l4_udp(
704    packet: &Packet,
705    ctx: &ParseContext,
706    data: &[u8],
707    l3_info: &L3Info,
708    analyzer: &mut Analyzer,
709) -> Result<(), Error> {
710    trace!("handle_l4_udp (idx={})", ctx.pcap_index);
711    trace!("    l4_data len: {}", data.len());
712    let udp = UdpPacket::new(data).ok_or("Could not build UDP packet from data")?;
713
714    let l4_payload = Some(udp.payload());
715    let src_port = udp.get_source();
716    let dst_port = udp.get_destination();
717
718    // if sport/dport == 4789, this could be VXLAN
719    // XXX l4 plugins will not be called
720    if src_port == 4789 || dst_port == 4789 {
721        return handle_l4_vxlan(packet, ctx, data, l3_info, udp.payload(), analyzer);
722    }
723
724    // if sport/dport == 6081, this could be GENEVE
725    // XXX l4 plugins will not be called
726    if src_port == 6081 || dst_port == 6081 {
727        return handle_l4_geneve(packet, ctx, data, l3_info, udp.payload(), analyzer);
728    }
729
730    handle_l4_common(
731        packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
732    )
733}
734
735fn handle_l4_icmp(
736    packet: &Packet,
737    ctx: &ParseContext,
738    data: &[u8],
739    l3_info: &L3Info,
740    analyzer: &mut Analyzer,
741) -> Result<(), Error> {
742    trace!("handle_l4_icmp (idx={})", ctx.pcap_index);
743    let icmp = IcmpPacket::new(data).ok_or("Could not build ICMP packet from data")?;
744    trace!(
745        "ICMP type={:?} code={:?}",
746        icmp.get_icmp_type(),
747        icmp.get_icmp_code()
748    );
749
750    let l4_payload = Some(icmp.payload());
751    let src_port = u16::from(icmp.get_icmp_type().0);
752    let dst_port = u16::from(icmp.get_icmp_code().0);
753
754    if analyzer.do_checksums {
755        let cksum = ::pnet_packet::icmp::checksum(&icmp);
756        if cksum != icmp.get_checksum() {
757            warn!("ICMP: invalid checksum");
758        }
759    }
760
761    handle_l4_common(
762        packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
763    )
764}
765
766fn handle_l4_icmpv6(
767    packet: &Packet,
768    ctx: &ParseContext,
769    data: &[u8],
770    l3_info: &L3Info,
771    analyzer: &mut Analyzer,
772) -> Result<(), Error> {
773    trace!("handle_l4_icmpv6 (idx={})", ctx.pcap_index);
774    let icmpv6 = Icmpv6Packet::new(data).ok_or("Could not build ICMPv6 packet from data")?;
775    trace!(
776        "ICMPv6 type={:?} code={:?}",
777        icmpv6.get_icmpv6_type(),
778        icmpv6.get_icmpv6_code()
779    );
780
781    let l4_payload = Some(icmpv6.payload());
782    let src_port = 0;
783    let dst_port = 0;
784
785    if let (IpAddr::V6(src), IpAddr::V6(dst)) = (l3_info.three_tuple.src, l3_info.three_tuple.dst) {
786        let cksum = ::pnet_packet::icmpv6::checksum(&icmpv6, &src, &dst);
787        if cksum != icmpv6.get_checksum() {
788            warn!("ICMPv6: invalid checksum");
789        }
790    }
791
792    handle_l4_common(
793        packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
794    )
795}
796
797// Geneve: Generic Network Virtualization Encapsulation
798// https://tools.ietf.org/html/draft-ietf-nvo3-geneve-16
799fn handle_l4_geneve(
800    packet: &Packet,
801    ctx: &ParseContext,
802    _data: &[u8],
803    _l3_info: &L3Info,
804    l4_data: &[u8],
805    analyzer: &mut Analyzer,
806) -> Result<(), Error> {
807    trace!("handle_l4_geneve (idx={})", ctx.pcap_index);
808    let geneve = GENEVEPacket::new(l4_data).ok_or("Could not build GENEVE packet from data")?;
809    let payload = geneve.payload();
810    let next_proto = geneve.get_protocol_type();
811
812    trace!(
813        "    Geneve: proto=0x{:x} VNI=0x{:x}",
814        next_proto,
815        geneve.get_virtual_network_identifier()
816    );
817    // ignore geneve options
818
819    if next_proto == 0x6558 {
820        handle_l2(packet, ctx, payload, analyzer)
821    } else {
822        handle_l3(packet, ctx, payload, EtherType(next_proto), analyzer)
823    }
824}
825
826fn handle_l4_gre(
827    packet: &Packet,
828    ctx: &ParseContext,
829    data: &[u8],
830    _l3_info: &L3Info,
831    analyzer: &mut Analyzer,
832) -> Result<(), Error> {
833    trace!("handle_l4_gre (idx={})", ctx.pcap_index);
834    let l3_data = data;
835
836    let gre = GrePacket::new(l3_data).ok_or("Could not build GRE packet from data")?;
837
838    let next_proto = gre.get_protocol_type();
839    // XXX can panic: 'Source routed GRE packets not supported' in gre_routing_length()
840    // if gre.get_routing_present() != 1 {
841    //     warn!("Source routed GRE packets not supported");
842    //     return Ok(());
843    // }
844    let data = if next_proto == 0x880b {
845        // PPTP GRE is slightly different, and pnet_packet offset is wrong
846        // See https://en.wikipedia.org/wiki/Generic_Routing_Encapsulation
847        let mut offset = 8;
848        if gre.get_sequence_present() != 0 {
849            offset += 4;
850        }
851        if l3_data[1] >> 7 != 0 {
852            // there is an acknowledge number
853            offset += 4;
854        }
855        debug_assert!(offset <= l3_data.len());
856        &l3_data[offset..]
857    } else {
858        gre.payload()
859    };
860    trace!("GRE: type=0x{:x}", next_proto);
861
862    handle_l3(packet, ctx, data, EtherType(next_proto), analyzer)
863}
864
865fn handle_l4_vxlan(
866    packet: &Packet,
867    ctx: &ParseContext,
868    _data: &[u8],
869    _l3_info: &L3Info,
870    l4_data: &[u8],
871    analyzer: &mut Analyzer,
872) -> Result<(), Error> {
873    trace!("handle_l4_vxlan (idx={})", ctx.pcap_index);
874    let vxlan = VxlanPacket::new(l4_data).ok_or("Could not build Vxlan packet from data")?;
875    let payload = vxlan.payload();
876
877    trace!("    Vxlan: VLAN id={}", vxlan.get_vlan_identifier());
878
879    handle_l2(packet, ctx, payload, analyzer)
880}
881
882fn handle_l4_ipv6frag(
883    packet: &Packet,
884    ctx: &ParseContext,
885    frag_info: &FragmentPacket,
886    data: &[u8],
887    l3_info: &L3Info,
888    l4_proto: IpNextHeaderProtocol,
889    analyzer: &mut Analyzer,
890) -> Result<(), Error> {
891    trace!("handle_l3_ipv6frag (idx={})", ctx.pcap_index);
892    let frag_offset = frag_info.get_fragment_offset() as usize;
893    let frag_id = frag_info.get_id();
894    let last_fragment = frag_info.is_last_fragment();
895    trace!(
896        "IPv6 Fragment frag_offset={} id={} last_fragment={}",
897        frag_offset,
898        frag_id,
899        last_fragment
900    );
901
902    let defrag = {
903        // check IP fragmentation before calling handle_l4
904        let more_fragments = !last_fragment;
905        analyzer
906            .ipv6_defrag
907            .update(frag_id, frag_offset, more_fragments, data)
908    };
909    let data = match defrag {
910        Fragment::NoFrag(d) => d,
911        Fragment::Complete(ref v) => {
912            warn!(
913                "IPv6Fragment defrag done, using defrag buffer len={}",
914                v.len()
915            );
916            v
917        }
918        Fragment::Incomplete => {
919            trace!("IPv6Fragment defragmentation incomplete");
920            return Ok(());
921        }
922        Fragment::Error => {
923            warn!("IPv6Fragment defragmentation error");
924            return Ok(());
925        }
926    };
927
928    match l4_proto {
929        IpNextHeaderProtocols::Tcp => handle_l4_tcp(packet, ctx, data, l3_info, analyzer),
930        IpNextHeaderProtocols::Udp => handle_l4_udp(packet, ctx, data, l3_info, analyzer),
931        IpNextHeaderProtocols::Icmp => handle_l4_icmp(packet, ctx, data, l3_info, analyzer),
932        _ => {
933            warn!("IPv6Fragment: Unsupported L4 proto {}", l4_proto);
934            handle_l4_generic(packet, ctx, data, l3_info, analyzer)
935        }
936    }
937}
938
939fn handle_l4_generic(
940    packet: &Packet,
941    ctx: &ParseContext,
942    data: &[u8],
943    l3_info: &L3Info,
944    analyzer: &mut Analyzer,
945) -> Result<(), Error> {
946    trace!(
947        "handle_l4_generic (idx={}, l4_proto={})",
948        ctx.pcap_index,
949        l3_info.three_tuple.l4_proto
950    );
951    // in generic function, we don't know how to get l4_payload
952    let l4_payload = None;
953    let src_port = 0;
954    let dst_port = 0;
955
956    handle_l4_common(
957        packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
958    )
959}
960
961#[allow(clippy::too_many_arguments)]
962fn handle_l4_common(
963    packet: &Packet,
964    ctx: &ParseContext,
965    l4_data: &[u8],
966    l3_info: &L3Info,
967    src_port: u16,
968    dst_port: u16,
969    l4_payload: Option<&[u8]>,
970    analyzer: &mut Analyzer,
971) -> Result<(), Error> {
972    let five_tuple = FiveTuple::from_three_tuple(&l3_info.three_tuple, src_port, dst_port);
973    trace!("5-t: {}", five_tuple);
974    let now = packet.ts;
975
976    let flow_id = {
977        // flows modification section
978        let flows = &mut analyzer.flows;
979        // lookup flow
980        let flow_id = match flows.lookup_flow(&five_tuple) {
981            Some(id) => id,
982            None => {
983                let flow = Flow::new(&five_tuple, packet.ts.secs, packet.ts.micros);
984                gen_event_new_flow(&flow, &analyzer.registry);
985                flows.insert_flow(five_tuple.clone(), flow)
986            }
987        };
988
989        // update flow
990        flows.entry(flow_id).and_modify(|flow| {
991            flow.flow_id = flow_id;
992            flow.last_seen = now;
993        });
994        flow_id
995    };
996
997    // get a read-only reference to flow
998    let flow = analyzer
999        .flows
1000        .get_flow(flow_id)
1001        .expect("could not get flow from ID")
1002        .clone(); // clone because run_plugins_v2_transport borrows analyzer
1003
1004    let to_server = flow.five_tuple == five_tuple;
1005
1006    let pinfo = PacketInfo {
1007        five_tuple: &five_tuple,
1008        to_server,
1009        l3_type: l3_info.three_tuple.l3_proto(),
1010        l4_data,
1011        l4_type: five_tuple.proto,
1012        l4_payload,
1013        flow: Some(&flow),
1014        pcap_index: ctx.pcap_index,
1015    };
1016    // let start = ::std::time::Instant::now();
1017    run_plugins_v2_transport(packet, ctx, &pinfo, analyzer)?;
1018    // let elapsed = start.elapsed();
1019    // debug!("Time to run l4 plugins: {}.{}", elapsed.as_secs(), elapsed.as_millis());
1020
1021    // XXX do other stuff
1022
1023    // XXX check session expiration
1024    // const FLOW_EXPIRATION: u32 = 100;
1025    // for (flow_id, flow) in self.flows.iter() {
1026    //     if (now - flow.last_seen).secs > FLOW_EXPIRATION {
1027    //         warn!(
1028    //             "Flow {} candidate for expiration (delay: {} secs)",
1029    //             flow_id,
1030    //             (now - flow.last_seen).secs
1031    //         );
1032    //     }
1033    // }
1034
1035    Ok(())
1036}
1037
1038fn run_plugins_v2<'i, F>(
1039    packet: &Packet,
1040    ctx: &ParseContext,
1041    layer: u8,
1042    layer_filter: u16,
1043    cb: F,
1044    analyzer: &mut Analyzer,
1045) -> Result<(), Error>
1046where
1047    F: for<'p> Fn(&'p mut dyn Plugin) -> PluginResult<'i>,
1048{
1049    trace!(
1050        "running plugins for layer={} filter=0x{:04x}",
1051        layer,
1052        layer_filter
1053    );
1054    // clone the registry (which is an Arc)
1055    // so analyzer is not borrowed for the plugins loop
1056    let registry = analyzer.registry.clone();
1057    let empty_vec = vec![];
1058    // get plugins for this specific filter
1059    let l1 = registry
1060        .get_plugins_for_layer(layer, layer_filter)
1061        .unwrap_or(&empty_vec)
1062        .as_slice();
1063    // get catch-all plugins (filter == 0)
1064    let l2 = registry
1065        .get_plugins_for_layer(layer, 0)
1066        .unwrap_or(&empty_vec)
1067        .as_slice();
1068    for plugin in l1.iter().chain(l2) {
1069        let r = {
1070            // limit duration of lock to vallback
1071            let mut p = plugin.lock().expect("locking plugin failed (recursion ?)");
1072            cb(p.deref_mut())
1073        };
1074        match r {
1075            PluginResult::None => continue,
1076            PluginResult::Error(e) => {
1077                // XXX ignore error in plugins ? just log ?
1078                warn!("Plugin returned error {:?}", e);
1079                continue;
1080            }
1081            PluginResult::L2(e, payload) => {
1082                handle_l3(packet, ctx, payload, EtherType(e), analyzer)?
1083            }
1084            PluginResult::L3(l3, payload) => handle_l3_common(packet, ctx, payload, l3, analyzer)?,
1085            PluginResult::L4(t5, payload) => {
1086                let l3_info = L3Info::default(); // XXX
1087                handle_l4_common(
1088                    packet,
1089                    ctx,
1090                    &[],
1091                    &l3_info,
1092                    t5.src_port,
1093                    t5.dst_port,
1094                    Some(payload),
1095                    analyzer,
1096                )?;
1097            }
1098        }
1099    }
1100    Ok(())
1101}
1102
1103/// Run plugins attached to the physical layer
1104pub(crate) fn run_plugins_v2_physical(
1105    packet: &Packet,
1106    ctx: &ParseContext,
1107    data: &[u8],
1108    analyzer: &mut Analyzer,
1109) -> Result<(), Error> {
1110    let cb = move |p: &mut dyn Plugin| p.handle_layer_physical(packet, data);
1111    let layer = 1;
1112    let layer_filter = 0;
1113    run_plugins_v2(packet, ctx, layer, layer_filter, cb, analyzer)
1114}
1115
1116/// Run plugins attached to the link layer (ethernet, etc.)
1117pub(crate) fn run_plugins_v2_link(
1118    packet: &Packet,
1119    ctx: &ParseContext,
1120    linktype: LinkLayerType,
1121    l2_payload: &[u8],
1122    analyzer: &mut Analyzer,
1123) -> Result<(), Error> {
1124    let cb = move |p: &mut dyn Plugin| p.handle_layer_link(packet, linktype as u16, l2_payload);
1125    let layer = 2;
1126    let layer_filter = linktype as u16;
1127    run_plugins_v2(packet, ctx, layer, layer_filter, cb, analyzer)
1128}
1129
1130/// Run plugins attached to the network layer (IPv4, IPv6, Arp, IPsec, etc.)
1131fn run_plugins_v2_network(
1132    packet: &Packet,
1133    ctx: &ParseContext,
1134    l3_payload: &[u8],
1135    three_tuple: &ThreeTuple,
1136    analyzer: &mut Analyzer,
1137) -> Result<(), Error> {
1138    let cb = move |p: &mut dyn Plugin| p.handle_layer_network(packet, l3_payload, three_tuple);
1139    let layer = 3;
1140    let layer_filter = three_tuple.l3_proto();
1141    run_plugins_v2(packet, ctx, layer, layer_filter, cb, analyzer)
1142}
1143
1144/// Run plugins attached to the transport layer (TCP, UDP, etc.)
1145fn run_plugins_v2_transport(
1146    packet: &Packet,
1147    ctx: &ParseContext,
1148    pinfo: &PacketInfo,
1149    analyzer: &mut Analyzer,
1150) -> Result<(), Error> {
1151    let cb = move |p: &mut dyn Plugin| p.handle_layer_transport(packet, pinfo);
1152    let layer = 4;
1153    let layer_filter = pinfo.l4_type as u16;
1154    run_plugins_v2(packet, ctx, layer, layer_filter, cb, analyzer)
1155}
1156
1157pub(crate) fn gen_event_new_flow(flow: &Flow, registry: &PluginRegistry) {
1158    // let start = ::std::time::Instant::now();
1159    registry.run_plugins(
1160        |p| p.plugin_type() & PLUGIN_FLOW_NEW != 0,
1161        |p| p.flow_created(flow),
1162    );
1163    // let elapsed = start.elapsed();
1164    // debug!("Time to run flow_created: {}.{}", elapsed.as_secs(), elapsed.as_millis());
1165}
1166
1167impl PcapAnalyzer for Analyzer {
1168    /// Initialize all plugins
1169    fn init(&mut self) -> Result<(), Error> {
1170        self.registry.run_plugins(|_| true, |p| p.pre_process());
1171        Ok(())
1172    }
1173
1174    /// Dispatch function: given a packet, use link type to get the real data, and
1175    /// call the matching handling function (some pcap blocks encode ethernet, or IPv4 etc.)
1176    fn handle_packet(&mut self, packet: &Packet, ctx: &ParseContext) -> Result<(), Error> {
1177        if ctx.pcap_index < self.skip_index {
1178            return Ok(());
1179        }
1180        match packet.data {
1181            PacketData::L2(data) => self.handle_l2(packet, ctx, data),
1182            PacketData::L3(ethertype, data) => {
1183                handle_l3(packet, ctx, data, EtherType(ethertype), self)
1184            }
1185            PacketData::L4(_, _) => unimplemented!(), // XXX
1186            PacketData::Unsupported(raw) => {
1187                // fixups
1188                if packet.link_type == Linktype(12) {
1189                    // defined as DLT_RAW in libpcap/dlt.h
1190                    if let Some(PacketData::L3(ethertype, packet_data)) =
1191                        get_packetdata_raw(raw, packet.caplen as usize)
1192                    {
1193                        return handle_l3(packet, ctx, packet_data, EtherType(ethertype), self);
1194                    }
1195                }
1196                warn!(
1197                    "Unsupported data format (unknown linktype {}) idx={}",
1198                    packet.link_type, ctx.pcap_index
1199                );
1200                Ok(())
1201            }
1202        }
1203    }
1204
1205    /// Finalize analysis and notify plugins
1206    fn teardown(&mut self) {
1207        {
1208            // expire all TCP connections in reassembly engine
1209            finalize_tcp_streams(self);
1210            // expire remaining flows
1211            let flows = &self.flows;
1212            trace!("{} flows remaining in table", flows.len());
1213            // let start = ::std::time::Instant::now();
1214            self.registry.run_plugins(
1215                |p| p.plugin_type() & PLUGIN_FLOW_DEL != 0,
1216                |p| {
1217                    flows.values().for_each(|flow| {
1218                        p.flow_destroyed(flow);
1219                    });
1220                },
1221            );
1222            // let elapsed = start.elapsed();
1223            // debug!("Time to run flow_destroyed {}.{}", elapsed.as_secs(), elapsed.as_millis());
1224            self.flows.clear();
1225
1226            self.registry.run_plugins(|_| true, |p| p.post_process());
1227
1228            if let Some(output_dir) = &self.output_dir {
1229                self.registry.run_plugins(
1230                    |_| true,
1231                    |p| {
1232                        let res = p.save_results(output_dir);
1233                        if let Err(e) = res {
1234                            warn!("error while saving results for {}: {}", p.name(), e);
1235                        }
1236                    },
1237                );
1238            }
1239        };
1240    }
1241}
1242
1243impl SafePcapAnalyzer for Analyzer {}