1use super::data_number::FieldValue;
10use crate::variable_versions::ipfix_lookup::IPFixField;
11use crate::{NetflowPacket, NetflowParseError, ParsedNetflow, PartialParse};
12
13use nom::IResult;
14use nom::bytes::complete::take;
15use nom::combinator::complete;
16use nom::combinator::map_res;
17use nom::multi::{count, many0};
18use nom::number::complete::{be_u8, be_u16};
19use nom_derive::{Nom, Parse};
20use serde::Serialize;
21
22use crate::variable_versions::v9::{
23 DATA_TEMPLATE_V9_ID, Data as V9Data, OPTIONS_TEMPLATE_V9_ID, OptionsData as V9OptionsData,
24 OptionsTemplate as V9OptionsTemplate, ScopeDataField as V9ScopeDataField,
25 Template as V9Template,
26};
27
28use std::collections::HashMap;
29
30const DATA_TEMPLATE_IPFIX_ID: u16 = 2;
31const OPTIONS_TEMPLATE_IPFIX_ID: u16 = 3;
32
33type TemplateId = u16;
34pub type IPFixFieldPair = (IPFixField, FieldValue);
35pub type IpFixFlowRecord = Vec<IPFixFieldPair>;
36
37fn calculate_padding(content_size: usize) -> Vec<u8> {
40 const PADDING_SIZES: [usize; 4] = [0, 3, 2, 1];
41 let padding_len = PADDING_SIZES[content_size % 4];
42 vec![0u8; padding_len]
43}
44
45#[derive(Debug, PartialEq, Clone, Serialize)]
46pub struct IPFixParser {
47 pub templates: HashMap<TemplateId, Template>,
48 pub v9_templates: HashMap<TemplateId, V9Template>,
49 pub ipfix_options_templates: HashMap<TemplateId, OptionsTemplate>,
50 pub v9_options_templates: HashMap<TemplateId, V9OptionsTemplate>,
51 pub max_error_sample_size: usize,
54}
55
56impl Default for IPFixParser {
57 fn default() -> Self {
58 Self {
59 templates: HashMap::default(),
60 v9_templates: HashMap::default(),
61 ipfix_options_templates: HashMap::default(),
62 v9_options_templates: HashMap::default(),
63 max_error_sample_size: 256,
64 }
65 }
66}
67
68impl IPFixParser {
69 pub fn parse<'a>(&mut self, packet: &'a [u8]) -> ParsedNetflow<'a> {
70 match IPFix::parse(packet, self) {
71 Ok((remaining, ipfix)) => ParsedNetflow::Success {
72 packet: NetflowPacket::IPFix(ipfix),
73 remaining,
74 },
75 Err(e) => {
76 let remaining_sample = if packet.len() > self.max_error_sample_size {
78 packet[..self.max_error_sample_size].to_vec()
79 } else {
80 packet.to_vec()
81 };
82 ParsedNetflow::Error {
83 error: NetflowParseError::Partial(PartialParse {
84 version: 10,
85 error: e.to_string(),
86 remaining: remaining_sample,
87 }),
88 }
89 }
90 }
91 }
92
93 fn add_ipfix_templates(&mut self, templates: &[Template]) {
95 for t in templates {
96 self.templates.insert(t.template_id, t.clone());
97 }
98 }
99
100 fn add_ipfix_options_templates(&mut self, templates: &[OptionsTemplate]) {
101 for t in templates {
102 self.ipfix_options_templates
103 .insert(t.template_id, t.clone());
104 }
105 }
106
107 fn add_v9_templates(&mut self, templates: &[V9Template]) {
108 for t in templates {
109 self.v9_templates.insert(t.template_id, t.clone());
110 }
111 }
112
113 fn add_v9_options_templates(&mut self, templates: &[V9OptionsTemplate]) {
114 for t in templates {
115 self.v9_options_templates.insert(t.template_id, t.clone());
116 }
117 }
118}
119
120#[derive(Nom, Debug, PartialEq, Clone, Serialize)]
121#[nom(ExtraArgs(parser: &mut IPFixParser))]
122pub struct IPFix {
123 pub header: Header,
125 #[nom(
127 PreExec = "let length = header.length.saturating_sub(16);",
128 Parse = "map_res(take(length), |i| {
129 many0(complete(|i| FlowSet::parse(i, parser)
130 .map(|(i, flow_set)| (i, flow_set))
131 ))(i)
132 .map(|(_, flow_sets)| flow_sets) // Extract the Vec<FlowSet>
133 })"
134 )]
135 pub flowsets: Vec<FlowSet>,
136}
137
138#[derive(Debug, PartialEq, Clone, Serialize)]
139pub enum FlowSetBody {
140 Template(Template),
141 Templates(Vec<Template>),
142 V9Template(V9Template),
143 V9Templates(Vec<V9Template>),
144 OptionsTemplate(OptionsTemplate),
145 OptionsTemplates(Vec<OptionsTemplate>),
146 V9OptionsTemplate(V9OptionsTemplate),
147 V9OptionsTemplates(Vec<V9OptionsTemplate>),
148 Data(Data),
149 OptionsData(OptionsData),
150 V9Data(V9Data),
151 V9OptionsData(V9OptionsData),
152 NoTemplate(Vec<u8>),
153 Empty,
154}
155
156impl FlowSetBody {
190 fn parse_templates<'a, T, F>(
191 i: &'a [u8],
192 parser: &mut IPFixParser,
193 parse_fn: F,
194 single_variant: fn(T) -> FlowSetBody,
195 multi_variant: fn(Vec<T>) -> FlowSetBody,
196 validate: fn(&T) -> bool,
197 add_templates: fn(&mut IPFixParser, &[T]),
198 ) -> IResult<&'a [u8], FlowSetBody>
199 where
200 T: Clone,
201 F: Fn(&'a [u8]) -> IResult<&'a [u8], T>,
202 {
203 let (i, templates) = many0(complete(parse_fn))(i)?;
204 if templates.is_empty() || templates.iter().any(|t| !validate(t)) {
205 return Err(nom::Err::Error(nom::error::Error::new(
206 i,
207 nom::error::ErrorKind::Verify,
208 )));
209 }
210 add_templates(parser, &templates);
212 match templates.len() {
213 1 => {
214 if let Some(template) = templates.into_iter().next() {
215 Ok((i, single_variant(template)))
216 } else {
217 Err(nom::Err::Error(nom::error::Error::new(
218 i,
219 nom::error::ErrorKind::Verify,
220 )))
221 }
222 }
223 _ => Ok((i, multi_variant(templates))),
224 }
225 }
226
227 fn parse<'a>(
228 i: &'a [u8],
229 parser: &mut IPFixParser,
230 id: u16,
231 ) -> IResult<&'a [u8], FlowSetBody> {
232 match id {
233 DATA_TEMPLATE_IPFIX_ID => Self::parse_templates(
234 i,
235 parser,
236 Template::parse,
237 FlowSetBody::Template,
238 FlowSetBody::Templates,
239 |t: &Template| t.is_valid(),
240 |parser, templates| parser.add_ipfix_templates(templates),
241 ),
242 DATA_TEMPLATE_V9_ID => Self::parse_templates(
243 i,
244 parser,
245 V9Template::parse,
246 FlowSetBody::V9Template,
247 FlowSetBody::V9Templates,
248 |_t: &V9Template| true,
249 |parser, templates| parser.add_v9_templates(templates),
250 ),
251 OPTIONS_TEMPLATE_V9_ID => Self::parse_templates(
252 i,
253 parser,
254 V9OptionsTemplate::parse,
255 FlowSetBody::V9OptionsTemplate,
256 FlowSetBody::V9OptionsTemplates,
257 |_t: &V9OptionsTemplate| true,
258 |parser, templates| parser.add_v9_options_templates(templates),
259 ),
260 OPTIONS_TEMPLATE_IPFIX_ID => Self::parse_templates(
261 i,
262 parser,
263 OptionsTemplate::parse,
264 FlowSetBody::OptionsTemplate,
265 FlowSetBody::OptionsTemplates,
266 |t: &OptionsTemplate| t.is_valid(),
267 |parser, templates| parser.add_ipfix_options_templates(templates),
268 ),
269 _ => {
271 if let Some(template) = parser.templates.get(&id) {
272 if template.get_fields().is_empty() {
273 Ok((i, FlowSetBody::Empty))
274 } else {
275 let (i, data) = Data::parse(i, template)?;
276 Ok((i, FlowSetBody::Data(data)))
277 }
278 } else if let Some(options_template) = parser.ipfix_options_templates.get(&id) {
279 if options_template.get_fields().is_empty() {
280 Ok((i, FlowSetBody::Empty))
281 } else {
282 let (i, data) = OptionsData::parse(i, options_template)?;
283 Ok((i, FlowSetBody::OptionsData(data)))
284 }
285 } else if let Some(v9_template) = parser.v9_templates.get(&id) {
286 let (i, data) = V9Data::parse(i, v9_template)?;
287 Ok((i, FlowSetBody::V9Data(data)))
288 } else if let Some(v9_options_template) = parser.v9_options_templates.get(&id) {
289 let (i, data) = V9OptionsData::parse(i, v9_options_template)?;
290 Ok((i, FlowSetBody::V9OptionsData(data)))
291 } else if id > 255 {
292 Ok((i, FlowSetBody::NoTemplate(i.to_vec())))
293 } else {
294 Err(nom::Err::Error(nom::error::Error::new(
295 i,
296 nom::error::ErrorKind::Verify,
297 )))
298 }
299 }
300 }
301 }
302}
303
304#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Nom)]
305pub struct Header {
306 #[nom(Value = "10")]
310 pub version: u16,
311 pub length: u16,
314 pub export_time: u32,
317 pub sequence_number: u32,
324 pub observation_domain_id: u32,
333}
334
335#[derive(Debug, PartialEq, Clone, Serialize, Nom)]
336#[nom(ExtraArgs(parser: &mut IPFixParser))]
337pub struct FlowSet {
338 pub header: FlowSetHeader,
339 #[nom(
340 PreExec = "let length = header.length.saturating_sub(4);",
341 Parse = "map_res(take(length),
342 |i| FlowSetBody::parse(i, parser, header.header_id)
343 .map(|(_, flow_set)| flow_set))"
344 )]
345 pub body: FlowSetBody,
346}
347
348#[derive(Debug, PartialEq, Clone, Serialize, Nom)]
349pub struct FlowSetHeader {
350 pub header_id: u16,
355 pub length: u16,
359}
360
361#[derive(Debug, PartialEq, Clone, Serialize, Nom)]
362#[nom(ExtraArgs(template: &Template))]
363pub struct Data {
364 #[nom(
365 ErrorIf = "template.get_fields().is_empty() ",
366 Parse = "{ |i| FieldParser::parse::<Template>(i, template) }"
367 )]
368 pub fields: Vec<IpFixFlowRecord>,
369 #[serde(skip_serializing)]
370 pub padding: Vec<u8>,
371}
372
373impl Data {
374 pub fn new(fields: Vec<IpFixFlowRecord>) -> Self {
378 Self {
379 fields,
380 padding: vec![],
381 }
382 }
383}
384
385#[derive(Debug, PartialEq, Clone, Serialize, Nom)]
386#[nom(ExtraArgs(template: &OptionsTemplate))]
387pub struct OptionsData {
388 #[nom(
389 ErrorIf = "template.get_fields().is_empty() ",
390 Parse = "{ |i| FieldParser::parse::<OptionsTemplate>(i, template) }"
391 )]
392 pub fields: Vec<Vec<IPFixFieldPair>>,
393 #[serde(skip_serializing)]
394 pub padding: Vec<u8>,
395}
396
397impl OptionsData {
398 pub fn new(fields: Vec<Vec<IPFixFieldPair>>) -> Self {
402 Self {
403 fields,
404 padding: vec![],
405 }
406 }
407}
408
409#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Nom)]
410pub struct OptionsTemplate {
411 pub template_id: u16,
412 pub field_count: u16,
413 pub scope_field_count: u16,
414 #[nom(
415 PreExec = "let combined_count = usize::from(scope_field_count.saturating_add(
416 field_count.checked_sub(scope_field_count).unwrap_or(field_count)));",
417 Parse = "count(TemplateField::parse, combined_count)"
418 )]
419 pub fields: Vec<TemplateField>,
420}
421
422#[derive(Debug, PartialEq, Eq, Clone, Serialize, Nom, Default)]
423pub struct Template {
424 pub template_id: u16,
425 pub field_count: u16,
426 #[nom(Count = "field_count")]
427 pub fields: Vec<TemplateField>,
428}
429
430#[derive(Debug, PartialEq, Eq, Clone, Serialize, Nom)]
431pub struct TemplateField {
432 #[nom(
433 PostExec = "let (field_type_number, is_enterprise) = if field_type_number > 32767 {
434 (field_type_number.overflowing_sub(32768).0, true)
435 } else { (field_type_number, false) };"
436 )]
437 pub field_type_number: u16,
438 pub field_length: u16,
439 #[nom(Cond = "is_enterprise")]
440 #[serde(skip_serializing_if = "Option::is_none")]
441 pub enterprise_number: Option<u32>,
442 #[nom(Value(IPFixField::new(field_type_number, enterprise_number)))]
443 pub field_type: IPFixField,
444}
445
446trait CommonTemplate {
448 fn get_fields(&self) -> &Vec<TemplateField>;
449
450 fn is_valid(&self) -> bool {
451 !self.get_fields().is_empty() && self.get_fields().iter().any(|f| f.field_length > 0)
452 }
453}
454
455impl CommonTemplate for Template {
456 fn get_fields(&self) -> &Vec<TemplateField> {
457 &self.fields
458 }
459}
460
461impl CommonTemplate for OptionsTemplate {
462 fn get_fields(&self) -> &Vec<TemplateField> {
463 &self.fields
464 }
465}
466
467pub struct FieldParser;
468
469impl<'a> FieldParser {
470 fn parse<T: CommonTemplate>(
474 mut i: &'a [u8],
475 template: &T,
476 ) -> IResult<&'a [u8], Vec<Vec<IPFixFieldPair>>> {
477 let template_fields = template.get_fields();
478 if template_fields.is_empty() {
479 return Ok((i, Vec::new()));
480 }
481
482 let template_size: usize = template_fields
484 .iter()
485 .map(|f| usize::from(f.field_length))
486 .sum();
487 let estimated_records = if template_size > 0 {
488 i.len() / template_size
489 } else {
490 0
491 };
492 let mut res = Vec::with_capacity(estimated_records);
493
494 while !i.is_empty() {
496 let mut vec = Vec::with_capacity(template_fields.len());
497 for field in template_fields.iter() {
498 let field_res = field.parse_as_field_value(i);
499 if field_res.is_err() {
500 return Ok((i, res));
501 }
502 let (remaining, field_value) = field_res.unwrap();
503 vec.push((field.field_type, field_value));
504 i = remaining;
505 }
506 res.push(vec);
507 }
508 Ok((i, res))
509 }
510}
511
512impl TemplateField {
513 fn parse_field_length<'a>(&self, i: &'a [u8]) -> IResult<&'a [u8], u16> {
518 match self.field_length {
519 65535 => {
520 let (i, length) = be_u8(i)?;
521 if length == 255 {
522 let (i, full_length) = be_u16(i)?;
523 if (full_length as usize) > i.len() {
525 return Err(nom::Err::Error(nom::error::Error::new(
526 i,
527 nom::error::ErrorKind::Eof,
528 )));
529 }
530 Ok((i, full_length))
531 } else {
532 if (length as usize) > i.len() {
534 return Err(nom::Err::Error(nom::error::Error::new(
535 i,
536 nom::error::ErrorKind::Eof,
537 )));
538 }
539 Ok((i, u16::from(length)))
540 }
541 }
542 length => Ok((i, length)),
543 }
544 }
545
546 fn parse_as_field_value<'a>(&self, i: &'a [u8]) -> IResult<&'a [u8], FieldValue> {
547 let (i, length) = self.parse_field_length(i)?;
548 FieldValue::from_field_type(i, self.field_type.into(), length)
549 }
550}
551
552impl IPFix {
553 pub fn to_be_bytes(&self) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
555 let mut result = Vec::new();
556
557 result.extend_from_slice(&self.header.version.to_be_bytes());
558 result.extend_from_slice(&self.header.length.to_be_bytes());
559 result.extend_from_slice(&self.header.export_time.to_be_bytes());
560 result.extend_from_slice(&self.header.sequence_number.to_be_bytes());
561 result.extend_from_slice(&self.header.observation_domain_id.to_be_bytes());
562
563 for flow in &self.flowsets {
564 result.extend_from_slice(&flow.header.header_id.to_be_bytes());
565 result.extend_from_slice(&flow.header.length.to_be_bytes());
566
567 let mut result_flowset = vec![];
568
569 if let FlowSetBody::Template(template) = &flow.body {
570 result_flowset.extend_from_slice(&template.template_id.to_be_bytes());
571 result_flowset.extend_from_slice(&template.field_count.to_be_bytes());
572
573 for field in template.fields.iter() {
574 result_flowset.extend_from_slice(&field.field_type_number.to_be_bytes());
575 result_flowset.extend_from_slice(&field.field_length.to_be_bytes());
576 if let Some(enterprise) = field.enterprise_number {
577 result_flowset.extend_from_slice(&enterprise.to_be_bytes());
578 }
579 }
580 }
581
582 if let FlowSetBody::Templates(templates) = &flow.body {
583 for template in templates.iter() {
584 result_flowset.extend_from_slice(&template.template_id.to_be_bytes());
585 result_flowset.extend_from_slice(&template.field_count.to_be_bytes());
586
587 for field in template.fields.iter() {
588 result_flowset
589 .extend_from_slice(&field.field_type_number.to_be_bytes());
590 result_flowset.extend_from_slice(&field.field_length.to_be_bytes());
591 if let Some(enterprise) = field.enterprise_number {
592 result_flowset.extend_from_slice(&enterprise.to_be_bytes());
593 }
594 }
595 }
596 }
597
598 if let FlowSetBody::V9Template(template) = &flow.body {
599 result.extend_from_slice(&template.template_id.to_be_bytes());
600 result.extend_from_slice(&template.field_count.to_be_bytes());
601 for field in template.fields.iter() {
602 result.extend_from_slice(&field.field_type_number.to_be_bytes());
603 result.extend_from_slice(&field.field_length.to_be_bytes());
604 }
605 }
606
607 if let FlowSetBody::OptionsTemplate(options_template) = &flow.body {
608 result_flowset.extend_from_slice(&options_template.template_id.to_be_bytes());
609 result_flowset.extend_from_slice(&options_template.field_count.to_be_bytes());
610 result_flowset
611 .extend_from_slice(&options_template.scope_field_count.to_be_bytes());
612
613 for field in options_template.fields.iter() {
614 result_flowset.extend_from_slice(&field.field_type_number.to_be_bytes());
615 result_flowset.extend_from_slice(&field.field_length.to_be_bytes());
616 if let Some(enterprise) = field.enterprise_number {
617 result_flowset.extend_from_slice(&enterprise.to_be_bytes());
618 }
619 }
620 }
621
622 if let FlowSetBody::V9OptionsTemplate(template) = &flow.body {
623 result.extend_from_slice(&template.template_id.to_be_bytes());
624 result.extend_from_slice(&template.options_scope_length.to_be_bytes());
625 result.extend_from_slice(&template.options_length.to_be_bytes());
626 for field in template.scope_fields.iter() {
627 result.extend_from_slice(&field.field_type_number.to_be_bytes());
628 result.extend_from_slice(&field.field_length.to_be_bytes());
629 }
630 for field in template.option_fields.iter() {
631 result.extend_from_slice(&field.field_type_number.to_be_bytes());
632 result.extend_from_slice(&field.field_length.to_be_bytes());
633 }
634 }
635
636 if let FlowSetBody::Data(data) = &flow.body {
637 let mut data_content = Vec::new();
638 for item in data.fields.iter() {
639 for (_, v) in item.iter() {
640 data_content.extend_from_slice(&v.to_be_bytes()?);
641 }
642 }
643 result_flowset.extend_from_slice(&data_content);
644
645 let padding = if data.padding.is_empty() {
647 calculate_padding(data_content.len())
648 } else {
649 data.padding.clone()
650 };
651 result_flowset.extend_from_slice(&padding);
652 }
653
654 if let FlowSetBody::OptionsData(data) = &flow.body {
655 let mut options_data_content = Vec::new();
656 for item in data.fields.iter() {
657 for (_, v) in item.iter() {
658 options_data_content.extend_from_slice(&v.to_be_bytes()?);
659 }
660 }
661 result_flowset.extend_from_slice(&options_data_content);
662
663 let padding = if data.padding.is_empty() {
665 calculate_padding(options_data_content.len())
666 } else {
667 data.padding.clone()
668 };
669 result_flowset.extend_from_slice(&padding);
670 }
671
672 if let FlowSetBody::V9Data(data) = &flow.body {
673 for item in data.fields.iter() {
674 for (_, v) in item.iter() {
675 result_flowset.extend_from_slice(&v.to_be_bytes()?);
676 }
677 }
678 }
679
680 if let FlowSetBody::V9OptionsData(options_data) = &flow.body {
681 for options_data_field in options_data.fields.iter() {
682 for field in options_data_field.scope_fields.iter() {
683 match field {
684 V9ScopeDataField::System(value) => {
685 result.extend_from_slice(value);
686 }
687 V9ScopeDataField::Interface(value) => {
688 result.extend_from_slice(value);
689 }
690 V9ScopeDataField::LineCard(value) => {
691 result.extend_from_slice(value);
692 }
693 V9ScopeDataField::NetFlowCache(value) => {
694 result.extend_from_slice(value);
695 }
696 V9ScopeDataField::Template(value) => {
697 result.extend_from_slice(value);
698 }
699 }
700 }
701 for options_field in options_data_field.options_fields.iter() {
702 for (_field_type, field_value) in options_field.iter() {
703 result.extend_from_slice(&field_value.to_be_bytes()?);
704 }
705 }
706 }
707 }
708
709 result.append(&mut result_flowset);
710 }
711
712 Ok(result)
713 }
714}