1use std::{cmp::min, net::IpAddr, ops::DerefMut, sync::Arc};
2
3use log::{debug, trace, warn};
4use pako_tools::*;
5use pcap_parser::{
6 data::{get_packetdata_raw, PacketData},
7 Linktype,
8};
9use pnet_packet::{
10 ethernet::{EtherType, EtherTypes, EthernetPacket},
11 gre::GrePacket,
12 icmp::IcmpPacket,
13 icmpv6::Icmpv6Packet,
14 ip::{IpNextHeaderProtocol, IpNextHeaderProtocols},
15 ipv4::{Ipv4Flags, Ipv4Packet},
16 ipv6::{ExtensionPacket, FragmentPacket, Ipv6Packet},
17 tcp::TcpPacket,
18 udp::UdpPacket,
19 vlan::VlanPacket,
20 Packet as PnetPacket, PacketSize,
21};
22
23use crate::{
24 erspan::ERSPANPacket,
25 flow_map::FlowMap,
26 geneve::*,
27 ip_defrag::{DefragEngine, Fragment, IPDefragEngine},
28 layers::LinkLayerType,
29 mpls::*,
30 packet_info::PacketInfo,
31 plugin::*,
32 plugin_registry::*,
33 ppp::{PppPacket, PppProtocolTypes},
34 pppoe::PppoeSessionPacket,
35 tcp_reassembly::{finalize_tcp_streams, TcpStreamError, TcpStreamReassembly},
36 vxlan::*,
37};
38
39#[derive(Clone, Debug, Default)]
40pub struct L3Info {
41 pub l4_proto: u8,
43 pub three_tuple: ThreeTuple,
44}
45
46pub struct Analyzer {
60 pub(crate) registry: Arc<PluginRegistry>,
61
62 pub(crate) flows: FlowMap,
63
64 ipv4_defrag: Box<dyn DefragEngine>,
65 ipv6_defrag: Box<dyn DefragEngine>,
66 pub(crate) tcp_defrag: TcpStreamReassembly,
67
68 defrag_count: usize,
69 do_checksums: bool,
70 skip_index: usize,
71 output_dir: Option<String>,
72}
73
74impl Analyzer {
75 pub fn new(registry: Arc<PluginRegistry>, config: &Config) -> Analyzer {
76 let do_checksums = config.get_bool("do_checksums").unwrap_or(true);
77 let skip_index = config.get_usize("skip_index").unwrap_or(0);
78 if skip_index > 0 {
79 debug!("Will skip to index {}", skip_index);
80 }
81 let output_dir = config.get("output_dir").map(|s| s.to_owned());
82 Analyzer {
83 registry,
84 flows: FlowMap::default(),
85 ipv4_defrag: Box::new(IPDefragEngine::new()),
86 ipv6_defrag: Box::new(IPDefragEngine::new()),
87 tcp_defrag: TcpStreamReassembly::default(),
88 defrag_count: 0,
89 do_checksums,
90 skip_index,
91 output_dir,
92 }
93 }
94
95 pub fn registry(&self) -> &PluginRegistry {
97 &self.registry
98 }
99
100 #[inline]
101 fn handle_l2(&mut self, packet: &Packet, ctx: &ParseContext, data: &[u8]) -> Result<(), Error> {
102 handle_l2(packet, ctx, data, self)
103 }
104
105 pub fn with_deterministic_rng(mut self) -> Self {
109 self.flows = self.flows.with_rng_seed(0);
110 self
111 }
112}
113
114pub(crate) fn handle_l2(
115 packet: &Packet,
116 ctx: &ParseContext,
117 data: &[u8],
118 analyzer: &mut Analyzer,
119) -> Result<(), Error> {
120 trace!("handle_l2 (idx={})", ctx.pcap_index);
121
122 let datalen = min(packet.caplen as usize, data.len());
124 let data = &data[..datalen];
125
126 run_plugins_v2_physical(packet, ctx, data, analyzer)?;
128 match EthernetPacket::new(data) {
132 Some(eth) => {
133 let dest = eth.get_destination();
136 if dest.is_multicast() {
137 match &data[..6] {
138 [0x01, 0x00, 0x0c, 0xcc, 0xcc, 0xcc] => {
139 debug!("Cisco CDP/VTP/UDLD - ignoring");
140 return Ok(());
142 }
143 [0x01, 0x00, 0x0c, 0xcd, 0xcd, 0xd0] => {
144 debug!("Cisco Multicast address - ignoring");
145 return Ok(());
146 }
147 _ => {
148 trace!("Ethernet broadcast (unknown type) (idx={})", ctx.pcap_index);
149 }
150 }
151 }
152 let ethertype = eth.get_ethertype();
153 match ethertype.0 {
155 0..=1500 => {
156 let payload = eth.payload();
164 if payload.len() < 3 {
165 warn!("Incomplete 802.3 frame (idx={})", ctx.pcap_index);
166 return Ok(());
167 }
168 trace!("IEEE 802.3 frame, ignoring");
178 return Ok(());
179 }
180 1501..=1536 => {
181 warn!(
182 "Undefined value in ethernet type/length field (idx={})",
183 ctx.pcap_index
184 );
185 }
186 _ => (),
187 }
188 let payload = eth.payload();
189 trace!(" ethertype: 0x{:x}", ethertype.0);
190 run_plugins_v2_link(packet, ctx, LinkLayerType::Ethernet, payload, analyzer)?;
191 handle_l3(packet, ctx, payload, ethertype, analyzer)
192 }
193 None => {
194 Ok(())
196 }
197 }
198}
199
200pub(crate) fn handle_l3(
201 packet: &Packet,
202 ctx: &ParseContext,
203 data: &[u8],
204 ethertype: EtherType,
205 analyzer: &mut Analyzer,
206) -> Result<(), Error> {
207 if data.is_empty() {
208 return Ok(());
209 }
210
211 match ethertype {
213 EtherType(0x6558) => handle_l2(packet, ctx, data, analyzer),
215 EtherTypes::Ipv4 => handle_l3_ipv4(packet, ctx, data, analyzer),
216 EtherTypes::Ipv6 => handle_l3_ipv6(packet, ctx, data, analyzer),
217 EtherTypes::Vlan => handle_l3_vlan_801q(packet, ctx, data, analyzer),
218 EtherTypes::Arp => Ok(()),
220 EtherType(0x880b) => handle_l3_ppp(packet, ctx, data, analyzer),
222 EtherTypes::Mpls | EtherTypes::MplsMcast => handle_l3_mpls(packet, ctx, data, analyzer),
225 EtherType(0x88be) => handle_l3_erspan(packet, ctx, data, analyzer),
226 EtherTypes::PppoeSession => handle_l3_pppoesession(packet, ctx, data, analyzer),
227
228 e => {
229 trace!(
230 "unsupported ethertype {} (0x{:x}) (idx={})",
231 e,
232 e.0,
233 ctx.pcap_index
234 );
235 Ok(())
236 }
237 }
238}
239
240fn handle_l3_ipv4(
241 packet: &Packet,
242 ctx: &ParseContext,
243 data: &[u8],
244 analyzer: &mut Analyzer,
245) -> Result<(), Error> {
246 trace!("handle_l3_ipv4 (idx={})", ctx.pcap_index);
247 let ipv4 = Ipv4Packet::new(data).ok_or("Could not build IPv4 packet from data")?;
248 let orig_len = data.len();
250
251 let ip_len = ipv4.get_total_length() as usize;
252
253 let (data, ipv4) = {
255 if ip_len < data.len() && ip_len > 0 {
256 let d = &data[..ip_len];
257 let ipv4 = Ipv4Packet::new(d).ok_or("Could not build IPv4 packet from data")?;
258 (d, ipv4)
259 } else {
260 (data, ipv4)
261 }
262 };
263
264 let l4_proto = ipv4.get_next_level_protocol().0;
265 let t3 = ThreeTuple {
266 src: IpAddr::V4(ipv4.get_source()),
267 dst: IpAddr::V4(ipv4.get_destination()),
268 l4_proto,
269 };
270
271 if analyzer.do_checksums {
272 let cksum = ::pnet_packet::ipv4::checksum(&ipv4);
273 if cksum != ipv4.get_checksum() {
274 warn!("IPv4: invalid checksum");
275 }
276 }
277
278 let payload = if ip_len == 0 {
280 warn!(
281 "IPv4: packet reported length is 0. Assuming TSO (idx={})",
282 ctx.pcap_index
283 );
284 let start = ipv4.get_header_length() as usize * 4;
286 if start > data.len() {
287 warn!("IPv4: ip_len == 0 and ipv4.get_header_length is invalid!");
288 return Ok(());
289 }
290 &data[start..]
291 } else {
292 ipv4.payload()
293 };
294
295 let frag_offset = (ipv4.get_fragment_offset() * 8) as usize;
297 let more_fragments = ipv4.get_flags() & Ipv4Flags::MoreFragments != 0;
298 let defrag = analyzer.ipv4_defrag.update(
299 ipv4.get_identification().into(),
300 frag_offset,
301 more_fragments,
302 payload,
303 );
304 let payload = match defrag {
305 Fragment::NoFrag(d) => {
306 debug_assert!(d.len() < orig_len);
307 d
308 }
309 Fragment::Complete(ref v) => {
310 warn!("IPv4 defrag done, using defrag buffer len={}", v.len());
311 v
312 }
313 Fragment::Incomplete => {
314 debug!("IPv4 defragmentation incomplete");
315 return Ok(());
316 }
317 Fragment::Error => {
318 warn!("IPv4 defragmentation error");
319 return Ok(());
320 }
321 };
322
323 run_plugins_v2_network(packet, ctx, payload, &t3, analyzer)?;
327
328 let l3_info = L3Info {
329 three_tuple: t3,
330 l4_proto,
331 };
332 handle_l3_common(packet, ctx, payload, &l3_info, analyzer)
333}
334
335fn is_ipv6_opt(opt: IpNextHeaderProtocol) -> bool {
336 matches!(
337 opt,
338 IpNextHeaderProtocols::Hopopt
339 | IpNextHeaderProtocols::Ipv6Opts
340 | IpNextHeaderProtocols::Ipv6Route
341 | IpNextHeaderProtocols::Ipv6Frag
342 | IpNextHeaderProtocols::Esp
343 | IpNextHeaderProtocols::Ah
344 | IpNextHeaderProtocols::MobilityHeader
345 )
346}
347
348fn handle_l3_ipv6(
349 packet: &Packet,
350 ctx: &ParseContext,
351 data: &[u8],
352 analyzer: &mut Analyzer,
353) -> Result<(), Error> {
354 trace!("handle_l3_ipv6 (idx={})", ctx.pcap_index);
355 let ipv6 = Ipv6Packet::new(data).ok_or("Could not build IPv6 packet from data")?;
356
357 let mut payload = ipv6.payload();
358 let mut l4_proto = ipv6.get_next_header();
359
360 if payload.is_empty() {
361 trace!("IPv6 length is 0. Jumbogram?");
363 if data.len() >= 40 {
364 payload = &data[40..];
365 } else {
366 warn!(
367 "IPv6 length is 0, but frame is too short for an IPv6 header (idx={})",
368 ctx.pcap_index
369 );
370 return Ok(());
371 }
372 }
373
374 let mut extensions = Vec::new();
377 let mut frag_ext = None;
378
379 while is_ipv6_opt(l4_proto) {
381 let ext = ExtensionPacket::new(payload)
382 .ok_or("Could not build IPv6 Extension packet from payload")?;
383 let next_header = ext.get_next_header();
384 trace!("option header: {}", l4_proto);
385 if l4_proto == IpNextHeaderProtocols::Ipv6Frag {
386 if frag_ext.is_some() {
387 warn!("multiple IPv6Frag extensions idx={}", ctx.pcap_index);
388 return Ok(());
389 }
390 frag_ext = FragmentPacket::new(payload);
391 }
392 let offset = if l4_proto != IpNextHeaderProtocols::Ah {
394 ext.packet_size()
395 } else {
396 let l1 = (payload[1] - 1) as usize;
404 let val = l1 * 4 + l1 * 4 - 2;
405 (val + 7) & (!7)
406 };
407 extensions.push((l4_proto, ext));
408 l4_proto = next_header;
409 payload = &payload[offset..];
410 }
411
412 let t3 = ThreeTuple {
413 src: IpAddr::V6(ipv6.get_source()),
414 dst: IpAddr::V6(ipv6.get_destination()),
415 l4_proto: l4_proto.0,
416 };
417
418 run_plugins_v2_network(packet, ctx, payload, &t3, analyzer)?;
419
420 if l4_proto == IpNextHeaderProtocols::Ipv6NoNxt {
421 trace!("No next header");
424 if !payload.is_empty() {
425 warn!(
426 "No next header, but data is present (len={})",
427 payload.len()
428 );
429 }
430 return Ok(());
431 }
432
433 let l3_info = L3Info {
434 three_tuple: t3,
435 l4_proto: l4_proto.0,
436 };
437
438 if let Some(frag_info) = frag_ext {
439 handle_l4_ipv6frag(
440 packet, ctx, &frag_info, payload, &l3_info, l4_proto, analyzer,
441 )
442 } else {
443 handle_l3_common(packet, ctx, payload, &l3_info, analyzer)
444 }
445}
446
447fn handle_l3_vlan_801q(
448 packet: &Packet,
449 ctx: &ParseContext,
450 data: &[u8],
451 analyzer: &mut Analyzer,
452) -> Result<(), Error> {
453 trace!("handle_l3_vlan_801q (idx={})", ctx.pcap_index);
454 let vlan = VlanPacket::new(data).ok_or("Could not build 802.1Q Vlan packet from data")?;
455 let next_ethertype = vlan.get_ethertype();
456 trace!(" 802.1q: VLAN id={}", vlan.get_vlan_identifier());
457
458 handle_l3(packet, ctx, vlan.payload(), next_ethertype, analyzer)
459}
460
461fn handle_l3_erspan(
462 packet: &Packet,
463 ctx: &ParseContext,
464 data: &[u8],
465 analyzer: &mut Analyzer,
466) -> Result<(), Error> {
467 trace!("handle_l3_erspan (idx={})", ctx.pcap_index);
468 let erspan = ERSPANPacket::new(data).ok_or("Could not build Erspan packet from data")?;
469 trace!(
470 " erspan: VLAN id={} span ID={}",
471 erspan.get_vlan(),
472 erspan.get_span_id()
473 );
474 handle_l2(packet, ctx, erspan.payload(), analyzer)
475}
476
477fn handle_l3_mpls(
478 packet: &Packet,
479 ctx: &ParseContext,
480 data: &[u8],
481 analyzer: &mut Analyzer,
482) -> Result<(), Error> {
483 trace!("handle_l2_mpls (idx={})", ctx.pcap_index);
484 let mpls = MPLSPacket::new(data).ok_or("Could not build MPLS packet from data")?;
485
486 let payload = mpls.payload();
487 trace!(" MPLS # labels: {}", mpls.get_num_labels());
488 trace!(" MPLS top label: {}", mpls.get_top_label().get_label());
489
490 if payload.is_empty() {
493 warn!("MPLS packet but no data");
494 return Ok(());
495 }
496 let first_nibble = payload[0] >> 4;
497 match first_nibble {
498 4 => handle_l3_ipv4(packet, ctx, payload, analyzer),
499 6 => handle_l3_ipv6(packet, ctx, payload, analyzer),
500 _ => handle_l2(packet, ctx, payload, analyzer),
501 }
502 }
504
505fn handle_l3_pppoesession(
506 packet: &Packet,
507 ctx: &ParseContext,
508 data: &[u8],
509 analyzer: &mut Analyzer,
510) -> Result<(), Error> {
511 trace!("handle_l3_pppoesession (idx={})", ctx.pcap_index);
512 let session =
513 PppoeSessionPacket::new(data).ok_or("Could not build PppoeSession packet from data")?;
514 trace!(
515 " pppoesession: version={} type={} code={}",
516 session.get_version(),
517 session.get_type(),
518 session.get_code(),
519 );
520 let ppp_data = session.payload();
521 handle_l3_ppp(packet, ctx, ppp_data, analyzer)
522}
523
524fn handle_l3_ppp(
525 packet: &Packet,
526 ctx: &ParseContext,
527 data: &[u8],
528 analyzer: &mut Analyzer,
529) -> Result<(), Error> {
530 trace!("handle_l3_ppp (idx={})", ctx.pcap_index);
531 let ppp = PppPacket::new(data).ok_or("Could not build Ppp packet from data")?;
532 let proto = ppp.get_protocol();
533 let payload = ppp.payload();
534 trace!(" ppp: protocol=0x{:02x}", proto.0,);
535 match proto {
536 PppProtocolTypes::Ipv4 => handle_l3_ipv4(packet, ctx, payload, analyzer),
537 PppProtocolTypes::Ipv6 => handle_l3_ipv6(packet, ctx, payload, analyzer),
538 _ => {
539 warn!("Unsupported PPP protocol 0x{:02x}", proto.0);
540 Ok(())
541 }
542 }
543}
544
545fn handle_l3_common(
546 packet: &Packet,
547 ctx: &ParseContext,
548 data: &[u8],
549 l3_info: &L3Info,
550 analyzer: &mut Analyzer,
551) -> Result<(), Error> {
552 match IpNextHeaderProtocol(l3_info.l4_proto) {
553 IpNextHeaderProtocols::Tcp => handle_l4_tcp(packet, ctx, data, l3_info, analyzer),
554 IpNextHeaderProtocols::Udp => handle_l4_udp(packet, ctx, data, l3_info, analyzer),
555 IpNextHeaderProtocols::Icmp => handle_l4_icmp(packet, ctx, data, l3_info, analyzer),
556 IpNextHeaderProtocols::Icmpv6 => handle_l4_icmpv6(packet, ctx, data, l3_info, analyzer),
557 IpNextHeaderProtocols::Esp => handle_l4_generic(packet, ctx, data, l3_info, analyzer),
558 IpNextHeaderProtocols::Gre => handle_l4_gre(packet, ctx, data, l3_info, analyzer),
559 IpNextHeaderProtocols::Ipv4 => handle_l3(packet, ctx, data, EtherTypes::Ipv4, analyzer),
560 IpNextHeaderProtocols::Ipv6 => handle_l3(packet, ctx, data, EtherTypes::Ipv6, analyzer),
561 p => {
562 warn!("Unsupported L4 proto {} (idx={})", p, ctx.pcap_index);
563 handle_l4_generic(packet, ctx, data, l3_info, analyzer)
564 }
565 }
566}
567
568fn handle_l4_tcp(
569 packet: &Packet,
570 ctx: &ParseContext,
571 l4_data: &[u8],
572 l3_info: &L3Info,
573 analyzer: &mut Analyzer,
574) -> Result<(), Error> {
575 trace!("handle_l4_tcp (idx={})", ctx.pcap_index);
576 trace!(" l4_data len: {}", l4_data.len());
577 let tcp = TcpPacket::new(l4_data).ok_or("Could not build TCP packet from data")?;
578
579 let src_port = tcp.get_source();
580 let dst_port = tcp.get_destination();
581
582 let five_tuple = FiveTuple::from_three_tuple(&l3_info.three_tuple, src_port, dst_port);
584 trace!("5-t: {}", five_tuple);
585 let now = packet.ts;
586
587 let flow_id = {
588 let flows = &mut analyzer.flows;
590 let flow_id = match flows.lookup_flow(&five_tuple) {
592 Some(id) => id,
593 None => {
594 let flow = Flow::new(&five_tuple, packet.ts.secs, packet.ts.micros);
595 gen_event_new_flow(&flow, &analyzer.registry);
596 flows.insert_flow(five_tuple.clone(), flow)
597 }
598 };
599
600 flows.entry(flow_id).and_modify(|flow| {
602 flow.flow_id = flow_id;
603 flow.last_seen = now;
604 });
605 flow_id
606 };
607
608 let flow = analyzer
610 .flows
611 .get_flow(flow_id)
612 .expect("could not get flow from ID")
613 .clone();
614
615 let to_server = flow.five_tuple == five_tuple;
616
617 let res = analyzer
620 .tcp_defrag
621 .update(&flow, &tcp, to_server, ctx.pcap_index);
622 match res {
623 Ok(Some(segments)) => {
624 let mut new_vec = Vec::new();
626 let buffer = match segments.len() {
627 0 => {
628 return Ok(());
629 }
630 1 => &segments[0].data,
631 _ => {
632 segments
633 .iter()
634 .for_each(|s| new_vec.extend_from_slice(&s.data));
635 &new_vec
636 }
637 };
638 let pcap_index = segments[0].pcap_index;
639 let t5 = five_tuple.get_reverse();
642 let origin_addr = t5.src;
643 let origin_port = t5.src_port;
644 trace!(
645 "Sending reassembled data from {}:{} (len={}, first pcap_index={})",
646 origin_addr,
647 origin_port,
648 buffer.len(),
649 pcap_index,
650 );
651 let l4_payload = buffer;
653 let dummy_packet = Packet {
654 interface: packet.interface,
655 caplen: 0,
656 origlen: 0,
657 ts: packet.ts, link_type: packet.link_type,
659 data: PacketData::L4(t5.proto, &[]),
660 pcap_index,
661 };
662 let packet_info = PacketInfo {
663 five_tuple: &t5,
664 to_server: !to_server,
665 l3_type: l3_info.three_tuple.l3_proto(),
666 l4_data: &[], l4_type: t5.proto,
668 l4_payload: Some(l4_payload),
669 flow: Some(&flow),
670 pcap_index,
671 };
672 run_plugins_v2_transport(&dummy_packet, ctx, &packet_info, analyzer)?;
674 }
677 Ok(_) => (),
678 Err(TcpStreamError::Inverted) => {
679 analyzer.flows.entry(flow_id).and_modify(|f| {
680 f.five_tuple = f.five_tuple.get_reverse();
681 });
682 }
683 Err(e) => {
684 warn!("Tcp steam reassembly error: {:?}", e);
685 }
686 }
687
688 analyzer.defrag_count += 1;
692 if analyzer.defrag_count > 1000 {
693 analyzer.tcp_defrag.check_expired_connections(now);
694 analyzer.defrag_count = 0;
695 }
696
697 Ok(())
701}
702
703fn handle_l4_udp(
704 packet: &Packet,
705 ctx: &ParseContext,
706 data: &[u8],
707 l3_info: &L3Info,
708 analyzer: &mut Analyzer,
709) -> Result<(), Error> {
710 trace!("handle_l4_udp (idx={})", ctx.pcap_index);
711 trace!(" l4_data len: {}", data.len());
712 let udp = UdpPacket::new(data).ok_or("Could not build UDP packet from data")?;
713
714 let l4_payload = Some(udp.payload());
715 let src_port = udp.get_source();
716 let dst_port = udp.get_destination();
717
718 if src_port == 4789 || dst_port == 4789 {
721 return handle_l4_vxlan(packet, ctx, data, l3_info, udp.payload(), analyzer);
722 }
723
724 if src_port == 6081 || dst_port == 6081 {
727 return handle_l4_geneve(packet, ctx, data, l3_info, udp.payload(), analyzer);
728 }
729
730 handle_l4_common(
731 packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
732 )
733}
734
735fn handle_l4_icmp(
736 packet: &Packet,
737 ctx: &ParseContext,
738 data: &[u8],
739 l3_info: &L3Info,
740 analyzer: &mut Analyzer,
741) -> Result<(), Error> {
742 trace!("handle_l4_icmp (idx={})", ctx.pcap_index);
743 let icmp = IcmpPacket::new(data).ok_or("Could not build ICMP packet from data")?;
744 trace!(
745 "ICMP type={:?} code={:?}",
746 icmp.get_icmp_type(),
747 icmp.get_icmp_code()
748 );
749
750 let l4_payload = Some(icmp.payload());
751 let src_port = u16::from(icmp.get_icmp_type().0);
752 let dst_port = u16::from(icmp.get_icmp_code().0);
753
754 if analyzer.do_checksums {
755 let cksum = ::pnet_packet::icmp::checksum(&icmp);
756 if cksum != icmp.get_checksum() {
757 warn!("ICMP: invalid checksum");
758 }
759 }
760
761 handle_l4_common(
762 packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
763 )
764}
765
766fn handle_l4_icmpv6(
767 packet: &Packet,
768 ctx: &ParseContext,
769 data: &[u8],
770 l3_info: &L3Info,
771 analyzer: &mut Analyzer,
772) -> Result<(), Error> {
773 trace!("handle_l4_icmpv6 (idx={})", ctx.pcap_index);
774 let icmpv6 = Icmpv6Packet::new(data).ok_or("Could not build ICMPv6 packet from data")?;
775 trace!(
776 "ICMPv6 type={:?} code={:?}",
777 icmpv6.get_icmpv6_type(),
778 icmpv6.get_icmpv6_code()
779 );
780
781 let l4_payload = Some(icmpv6.payload());
782 let src_port = 0;
783 let dst_port = 0;
784
785 if let (IpAddr::V6(src), IpAddr::V6(dst)) = (l3_info.three_tuple.src, l3_info.three_tuple.dst) {
786 let cksum = ::pnet_packet::icmpv6::checksum(&icmpv6, &src, &dst);
787 if cksum != icmpv6.get_checksum() {
788 warn!("ICMPv6: invalid checksum");
789 }
790 }
791
792 handle_l4_common(
793 packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
794 )
795}
796
797fn handle_l4_geneve(
800 packet: &Packet,
801 ctx: &ParseContext,
802 _data: &[u8],
803 _l3_info: &L3Info,
804 l4_data: &[u8],
805 analyzer: &mut Analyzer,
806) -> Result<(), Error> {
807 trace!("handle_l4_geneve (idx={})", ctx.pcap_index);
808 let geneve = GENEVEPacket::new(l4_data).ok_or("Could not build GENEVE packet from data")?;
809 let payload = geneve.payload();
810 let next_proto = geneve.get_protocol_type();
811
812 trace!(
813 " Geneve: proto=0x{:x} VNI=0x{:x}",
814 next_proto,
815 geneve.get_virtual_network_identifier()
816 );
817 if next_proto == 0x6558 {
820 handle_l2(packet, ctx, payload, analyzer)
821 } else {
822 handle_l3(packet, ctx, payload, EtherType(next_proto), analyzer)
823 }
824}
825
826fn handle_l4_gre(
827 packet: &Packet,
828 ctx: &ParseContext,
829 data: &[u8],
830 _l3_info: &L3Info,
831 analyzer: &mut Analyzer,
832) -> Result<(), Error> {
833 trace!("handle_l4_gre (idx={})", ctx.pcap_index);
834 let l3_data = data;
835
836 let gre = GrePacket::new(l3_data).ok_or("Could not build GRE packet from data")?;
837
838 let next_proto = gre.get_protocol_type();
839 let data = if next_proto == 0x880b {
845 let mut offset = 8;
848 if gre.get_sequence_present() != 0 {
849 offset += 4;
850 }
851 if l3_data[1] >> 7 != 0 {
852 offset += 4;
854 }
855 debug_assert!(offset <= l3_data.len());
856 &l3_data[offset..]
857 } else {
858 gre.payload()
859 };
860 trace!("GRE: type=0x{:x}", next_proto);
861
862 handle_l3(packet, ctx, data, EtherType(next_proto), analyzer)
863}
864
865fn handle_l4_vxlan(
866 packet: &Packet,
867 ctx: &ParseContext,
868 _data: &[u8],
869 _l3_info: &L3Info,
870 l4_data: &[u8],
871 analyzer: &mut Analyzer,
872) -> Result<(), Error> {
873 trace!("handle_l4_vxlan (idx={})", ctx.pcap_index);
874 let vxlan = VxlanPacket::new(l4_data).ok_or("Could not build Vxlan packet from data")?;
875 let payload = vxlan.payload();
876
877 trace!(" Vxlan: VLAN id={}", vxlan.get_vlan_identifier());
878
879 handle_l2(packet, ctx, payload, analyzer)
880}
881
882fn handle_l4_ipv6frag(
883 packet: &Packet,
884 ctx: &ParseContext,
885 frag_info: &FragmentPacket,
886 data: &[u8],
887 l3_info: &L3Info,
888 l4_proto: IpNextHeaderProtocol,
889 analyzer: &mut Analyzer,
890) -> Result<(), Error> {
891 trace!("handle_l3_ipv6frag (idx={})", ctx.pcap_index);
892 let frag_offset = frag_info.get_fragment_offset() as usize;
893 let frag_id = frag_info.get_id();
894 let last_fragment = frag_info.is_last_fragment();
895 trace!(
896 "IPv6 Fragment frag_offset={} id={} last_fragment={}",
897 frag_offset,
898 frag_id,
899 last_fragment
900 );
901
902 let defrag = {
903 let more_fragments = !last_fragment;
905 analyzer
906 .ipv6_defrag
907 .update(frag_id, frag_offset, more_fragments, data)
908 };
909 let data = match defrag {
910 Fragment::NoFrag(d) => d,
911 Fragment::Complete(ref v) => {
912 warn!(
913 "IPv6Fragment defrag done, using defrag buffer len={}",
914 v.len()
915 );
916 v
917 }
918 Fragment::Incomplete => {
919 trace!("IPv6Fragment defragmentation incomplete");
920 return Ok(());
921 }
922 Fragment::Error => {
923 warn!("IPv6Fragment defragmentation error");
924 return Ok(());
925 }
926 };
927
928 match l4_proto {
929 IpNextHeaderProtocols::Tcp => handle_l4_tcp(packet, ctx, data, l3_info, analyzer),
930 IpNextHeaderProtocols::Udp => handle_l4_udp(packet, ctx, data, l3_info, analyzer),
931 IpNextHeaderProtocols::Icmp => handle_l4_icmp(packet, ctx, data, l3_info, analyzer),
932 _ => {
933 warn!("IPv6Fragment: Unsupported L4 proto {}", l4_proto);
934 handle_l4_generic(packet, ctx, data, l3_info, analyzer)
935 }
936 }
937}
938
939fn handle_l4_generic(
940 packet: &Packet,
941 ctx: &ParseContext,
942 data: &[u8],
943 l3_info: &L3Info,
944 analyzer: &mut Analyzer,
945) -> Result<(), Error> {
946 trace!(
947 "handle_l4_generic (idx={}, l4_proto={})",
948 ctx.pcap_index,
949 l3_info.three_tuple.l4_proto
950 );
951 let l4_payload = None;
953 let src_port = 0;
954 let dst_port = 0;
955
956 handle_l4_common(
957 packet, ctx, data, l3_info, src_port, dst_port, l4_payload, analyzer,
958 )
959}
960
961#[allow(clippy::too_many_arguments)]
962fn handle_l4_common(
963 packet: &Packet,
964 ctx: &ParseContext,
965 l4_data: &[u8],
966 l3_info: &L3Info,
967 src_port: u16,
968 dst_port: u16,
969 l4_payload: Option<&[u8]>,
970 analyzer: &mut Analyzer,
971) -> Result<(), Error> {
972 let five_tuple = FiveTuple::from_three_tuple(&l3_info.three_tuple, src_port, dst_port);
973 trace!("5-t: {}", five_tuple);
974 let now = packet.ts;
975
976 let flow_id = {
977 let flows = &mut analyzer.flows;
979 let flow_id = match flows.lookup_flow(&five_tuple) {
981 Some(id) => id,
982 None => {
983 let flow = Flow::new(&five_tuple, packet.ts.secs, packet.ts.micros);
984 gen_event_new_flow(&flow, &analyzer.registry);
985 flows.insert_flow(five_tuple.clone(), flow)
986 }
987 };
988
989 flows.entry(flow_id).and_modify(|flow| {
991 flow.flow_id = flow_id;
992 flow.last_seen = now;
993 });
994 flow_id
995 };
996
997 let flow = analyzer
999 .flows
1000 .get_flow(flow_id)
1001 .expect("could not get flow from ID")
1002 .clone(); let to_server = flow.five_tuple == five_tuple;
1005
1006 let pinfo = PacketInfo {
1007 five_tuple: &five_tuple,
1008 to_server,
1009 l3_type: l3_info.three_tuple.l3_proto(),
1010 l4_data,
1011 l4_type: five_tuple.proto,
1012 l4_payload,
1013 flow: Some(&flow),
1014 pcap_index: ctx.pcap_index,
1015 };
1016 run_plugins_v2_transport(packet, ctx, &pinfo, analyzer)?;
1018 Ok(())
1036}
1037
1038fn run_plugins_v2<'i, F>(
1039 packet: &Packet,
1040 ctx: &ParseContext,
1041 layer: u8,
1042 layer_filter: u16,
1043 cb: F,
1044 analyzer: &mut Analyzer,
1045) -> Result<(), Error>
1046where
1047 F: for<'p> Fn(&'p mut dyn Plugin) -> PluginResult<'i>,
1048{
1049 trace!(
1050 "running plugins for layer={} filter=0x{:04x}",
1051 layer,
1052 layer_filter
1053 );
1054 let registry = analyzer.registry.clone();
1057 let empty_vec = vec![];
1058 let l1 = registry
1060 .get_plugins_for_layer(layer, layer_filter)
1061 .unwrap_or(&empty_vec)
1062 .as_slice();
1063 let l2 = registry
1065 .get_plugins_for_layer(layer, 0)
1066 .unwrap_or(&empty_vec)
1067 .as_slice();
1068 for plugin in l1.iter().chain(l2) {
1069 let r = {
1070 let mut p = plugin.lock().expect("locking plugin failed (recursion ?)");
1072 cb(p.deref_mut())
1073 };
1074 match r {
1075 PluginResult::None => continue,
1076 PluginResult::Error(e) => {
1077 warn!("Plugin returned error {:?}", e);
1079 continue;
1080 }
1081 PluginResult::L2(e, payload) => {
1082 handle_l3(packet, ctx, payload, EtherType(e), analyzer)?
1083 }
1084 PluginResult::L3(l3, payload) => handle_l3_common(packet, ctx, payload, l3, analyzer)?,
1085 PluginResult::L4(t5, payload) => {
1086 let l3_info = L3Info::default(); handle_l4_common(
1088 packet,
1089 ctx,
1090 &[],
1091 &l3_info,
1092 t5.src_port,
1093 t5.dst_port,
1094 Some(payload),
1095 analyzer,
1096 )?;
1097 }
1098 }
1099 }
1100 Ok(())
1101}
1102
1103pub(crate) fn run_plugins_v2_physical(
1105 packet: &Packet,
1106 ctx: &ParseContext,
1107 data: &[u8],
1108 analyzer: &mut Analyzer,
1109) -> Result<(), Error> {
1110 let cb = move |p: &mut dyn Plugin| p.handle_layer_physical(packet, data);
1111 let layer = 1;
1112 let layer_filter = 0;
1113 run_plugins_v2(packet, ctx, layer, layer_filter, cb, analyzer)
1114}
1115
1116pub(crate) fn run_plugins_v2_link(
1118 packet: &Packet,
1119 ctx: &ParseContext,
1120 linktype: LinkLayerType,
1121 l2_payload: &[u8],
1122 analyzer: &mut Analyzer,
1123) -> Result<(), Error> {
1124 let cb = move |p: &mut dyn Plugin| p.handle_layer_link(packet, linktype as u16, l2_payload);
1125 let layer = 2;
1126 let layer_filter = linktype as u16;
1127 run_plugins_v2(packet, ctx, layer, layer_filter, cb, analyzer)
1128}
1129
1130fn run_plugins_v2_network(
1132 packet: &Packet,
1133 ctx: &ParseContext,
1134 l3_payload: &[u8],
1135 three_tuple: &ThreeTuple,
1136 analyzer: &mut Analyzer,
1137) -> Result<(), Error> {
1138 let cb = move |p: &mut dyn Plugin| p.handle_layer_network(packet, l3_payload, three_tuple);
1139 let layer = 3;
1140 let layer_filter = three_tuple.l3_proto();
1141 run_plugins_v2(packet, ctx, layer, layer_filter, cb, analyzer)
1142}
1143
1144fn run_plugins_v2_transport(
1146 packet: &Packet,
1147 ctx: &ParseContext,
1148 pinfo: &PacketInfo,
1149 analyzer: &mut Analyzer,
1150) -> Result<(), Error> {
1151 let cb = move |p: &mut dyn Plugin| p.handle_layer_transport(packet, pinfo);
1152 let layer = 4;
1153 let layer_filter = pinfo.l4_type as u16;
1154 run_plugins_v2(packet, ctx, layer, layer_filter, cb, analyzer)
1155}
1156
1157pub(crate) fn gen_event_new_flow(flow: &Flow, registry: &PluginRegistry) {
1158 registry.run_plugins(
1160 |p| p.plugin_type() & PLUGIN_FLOW_NEW != 0,
1161 |p| p.flow_created(flow),
1162 );
1163 }
1166
1167impl PcapAnalyzer for Analyzer {
1168 fn init(&mut self) -> Result<(), Error> {
1170 self.registry.run_plugins(|_| true, |p| p.pre_process());
1171 Ok(())
1172 }
1173
1174 fn handle_packet(&mut self, packet: &Packet, ctx: &ParseContext) -> Result<(), Error> {
1177 if ctx.pcap_index < self.skip_index {
1178 return Ok(());
1179 }
1180 match packet.data {
1181 PacketData::L2(data) => self.handle_l2(packet, ctx, data),
1182 PacketData::L3(ethertype, data) => {
1183 handle_l3(packet, ctx, data, EtherType(ethertype), self)
1184 }
1185 PacketData::L4(_, _) => unimplemented!(), PacketData::Unsupported(raw) => {
1187 if packet.link_type == Linktype(12) {
1189 if let Some(PacketData::L3(ethertype, packet_data)) =
1191 get_packetdata_raw(raw, packet.caplen as usize)
1192 {
1193 return handle_l3(packet, ctx, packet_data, EtherType(ethertype), self);
1194 }
1195 }
1196 warn!(
1197 "Unsupported data format (unknown linktype {}) idx={}",
1198 packet.link_type, ctx.pcap_index
1199 );
1200 Ok(())
1201 }
1202 }
1203 }
1204
1205 fn teardown(&mut self) {
1207 {
1208 finalize_tcp_streams(self);
1210 let flows = &self.flows;
1212 trace!("{} flows remaining in table", flows.len());
1213 self.registry.run_plugins(
1215 |p| p.plugin_type() & PLUGIN_FLOW_DEL != 0,
1216 |p| {
1217 flows.values().for_each(|flow| {
1218 p.flow_destroyed(flow);
1219 });
1220 },
1221 );
1222 self.flows.clear();
1225
1226 self.registry.run_plugins(|_| true, |p| p.post_process());
1227
1228 if let Some(output_dir) = &self.output_dir {
1229 self.registry.run_plugins(
1230 |_| true,
1231 |p| {
1232 let res = p.save_results(output_dir);
1233 if let Err(e) = res {
1234 warn!("error while saving results for {}: {}", p.name(), e);
1235 }
1236 },
1237 );
1238 }
1239 };
1240 }
1241}
1242
1243impl SafePcapAnalyzer for Analyzer {}