Skip to main content

netgauze_flow_pkt/wire/deserializer/
ipfix.rs

1// Copyright (C) 2023-present The NetGauze Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12// implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use chrono::{LocalResult, TimeZone, Utc};
17use nom::IResult;
18use nom::error::ErrorKind;
19use nom::number::complete::{be_u8, be_u16, be_u32};
20use serde::{Deserialize, Serialize};
21
22use crate::ipfix::*;
23use crate::wire::deserializer::{FieldSpecifierParsingError, ie};
24use crate::{DATA_SET_MIN_ID, DataSetId};
25use netgauze_parse_utils::{
26    ErrorKindSerdeDeref, ReadablePduWithOneInput, Span, parse_into_located,
27    parse_into_located_one_input, parse_into_located_two_inputs,
28};
29use netgauze_serde_macros::LocatedError;
30
31/// 2-octets version, 2-octets length, 4-octets * 3 (export time, seq no,
32/// observation domain id)
33pub const IPFIX_HEADER_LENGTH: u16 = 16;
34
35#[derive(LocatedError, Eq, PartialEq, Clone, Debug, Serialize, Deserialize)]
36pub enum IpfixPacketParsingError {
37    #[serde(with = "ErrorKindSerdeDeref")]
38    NomError(#[from_nom] ErrorKind),
39    UnsupportedVersion(u16),
40    InvalidLength(u16),
41    InvalidExportTime(u32),
42    SetParsingError(#[from_located(module = "self")] SetParsingError),
43}
44
45impl std::fmt::Display for IpfixPacketParsingError {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        match self {
48            Self::NomError(e) => write!(f, "{}", nom::Err::Error(e)),
49            Self::UnsupportedVersion(version) => write!(f, "unsupported IPFIX version: {version}"),
50            Self::InvalidLength(len) => write!(f, "invalid IPFIX packet length: {len}"),
51            Self::InvalidExportTime(time) => write!(f, "invalid IPFIX export time: {time}"),
52            Self::SetParsingError(e) => write!(f, "Set parsing error: {e}"),
53        }
54    }
55}
56
57impl std::error::Error for IpfixPacketParsingError {
58    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
59        match self {
60            Self::NomError(_err) => None,
61            Self::UnsupportedVersion(_) => None,
62            Self::InvalidLength(_) => None,
63            Self::InvalidExportTime(_) => None,
64            Self::SetParsingError(err) => Some(err),
65        }
66    }
67}
68
69impl<'a> ReadablePduWithOneInput<'a, &mut TemplatesMap, LocatedIpfixPacketParsingError<'a>>
70    for IpfixPacket
71{
72    fn from_wire(
73        buf: Span<'a>,
74        templates_map: &mut TemplatesMap,
75    ) -> IResult<Span<'a>, Self, LocatedIpfixPacketParsingError<'a>> {
76        let input = buf;
77        let (buf, version) = be_u16(buf)?;
78        if version != IPFIX_VERSION {
79            return Err(nom::Err::Error(LocatedIpfixPacketParsingError::new(
80                input,
81                IpfixPacketParsingError::UnsupportedVersion(version),
82            )));
83        }
84        let input = buf;
85        let (buf, length) = be_u16(buf)?;
86        if length < IPFIX_HEADER_LENGTH {
87            return Err(nom::Err::Error(LocatedIpfixPacketParsingError::new(
88                input,
89                IpfixPacketParsingError::InvalidLength(length),
90            )));
91        }
92        let (remainder, buf) = nom::bytes::complete::take(length - 4)(buf)?;
93        let (buf, export_time) = be_u32(buf)?;
94        let export_time = match Utc.timestamp_opt(export_time as i64, 0) {
95            LocalResult::Single(time) => time,
96            _ => {
97                return Err(nom::Err::Error(LocatedIpfixPacketParsingError::new(
98                    input,
99                    IpfixPacketParsingError::InvalidExportTime(export_time),
100                )));
101            }
102        };
103        let (buf, sequence_number) = be_u32(buf)?;
104        let (mut buf, observation_domain_id) = be_u32(buf)?;
105        let mut payload = Vec::new();
106        while !buf.is_empty() {
107            let (tmp, element) = match Set::from_wire(buf, templates_map) {
108                Ok((buf, value)) => Ok((buf, value)),
109                Err(err) => match err {
110                    nom::Err::Incomplete(needed) => Err(nom::Err::Incomplete(needed)),
111                    nom::Err::Error(error) => Err(nom::Err::Error(error.into())),
112                    nom::Err::Failure(failure) => Err(nom::Err::Failure(failure.into())),
113                },
114            }?;
115            payload.push(element);
116            buf = tmp;
117        }
118        Ok((
119            remainder,
120            IpfixPacket::new(
121                export_time,
122                sequence_number,
123                observation_domain_id,
124                payload.into_boxed_slice(),
125            ),
126        ))
127    }
128}
129
130#[derive(LocatedError, Eq, PartialEq, Clone, Debug, Serialize, Deserialize)]
131pub enum SetParsingError {
132    #[serde(with = "ErrorKindSerdeDeref")]
133    NomError(#[from_nom] ErrorKind),
134    InvalidLength(u16),
135    InvalidSetId(u16),
136    NoTemplateDefinedFor(u16),
137    InvalidPaddingValue(u8),
138    TemplateRecordError(#[from_located(module = "self")] TemplateRecordParsingError),
139    OptionsTemplateRecordError(#[from_located(module = "self")] OptionsTemplateRecordParsingError),
140    DataRecordError(#[from_located(module = "self")] DataRecordParsingError),
141}
142
143impl std::fmt::Display for SetParsingError {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        match self {
146            Self::NomError(e) => write!(f, "{}", nom::Err::Error(e)),
147            Self::InvalidLength(len) => write!(f, "invalid Set length: {len}"),
148            Self::InvalidSetId(id) => write!(f, "invalid Set id: {id}"),
149            Self::NoTemplateDefinedFor(id) => write!(f, "no template defined for: {id}"),
150            Self::InvalidPaddingValue(padding) => write!(f, "invalid Padding value: {padding}"),
151            Self::TemplateRecordError(e) => write!(f, "{e}"),
152            Self::OptionsTemplateRecordError(e) => write!(f, "{e}"),
153            Self::DataRecordError(e) => write!(f, "{e}"),
154        }
155    }
156}
157
158impl std::error::Error for SetParsingError {
159    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
160        match self {
161            Self::NomError(_err) => None,
162            Self::InvalidLength(_) => None,
163            Self::InvalidSetId(_) => None,
164            Self::NoTemplateDefinedFor(_) => None,
165            Self::InvalidPaddingValue(_) => None,
166            Self::TemplateRecordError(e) => Some(e),
167            Self::OptionsTemplateRecordError(e) => Some(e),
168            Self::DataRecordError(e) => Some(e),
169        }
170    }
171}
172
173impl<'a> ReadablePduWithOneInput<'a, &mut TemplatesMap, LocatedSetParsingError<'a>> for Set {
174    fn from_wire(
175        buf: Span<'a>,
176        templates_map: &mut TemplatesMap,
177    ) -> IResult<Span<'a>, Self, LocatedSetParsingError<'a>> {
178        let input = buf;
179        let (buf, id) = nom::combinator::map_res(be_u16, |id| {
180            if id != IPFIX_TEMPLATE_SET_ID
181                && id != IPFIX_OPTIONS_TEMPLATE_SET_ID
182                && id < DATA_SET_MIN_ID
183            {
184                Err(SetParsingError::InvalidSetId(id))
185            } else {
186                Ok(id)
187            }
188        })(buf)?;
189        let (buf, length) = nom::combinator::map_res(be_u16, |length| {
190            if length < 4 {
191                Err(SetParsingError::InvalidLength(length))
192            } else {
193                Ok(length)
194            }
195        })(buf)?;
196        let (remainder, mut buf) = nom::bytes::complete::take(length - 4)(buf)?;
197        let set = match id {
198            IPFIX_TEMPLATE_SET_ID => {
199                let mut templates = Vec::new();
200                while !buf.is_empty() {
201                    let (tmp, element) = match TemplateRecord::from_wire(buf, templates_map) {
202                        Ok((buf, value)) => Ok((buf, value)),
203                        Err(err) => match err {
204                            nom::Err::Incomplete(needed) => Err(nom::Err::Incomplete(needed)),
205                            nom::Err::Error(error) => Err(nom::Err::Error(error.into())),
206                            nom::Err::Failure(failure) => Err(nom::Err::Failure(failure.into())),
207                        },
208                    }?;
209                    templates.push(element);
210                    buf = tmp;
211                }
212                Set::Template(templates.into_boxed_slice())
213            }
214            IPFIX_OPTIONS_TEMPLATE_SET_ID => {
215                let mut option_templates = vec![];
216                // THE RFC is not super clear about
217                // length allowed in the Options
218                // Template set. Like Wireshark implementation, we assume anything
219                // less than 4-octets (min field size) is padding
220                while buf.len() > 3 {
221                    // let (t, option_template) =
222                    //     parse_into_located_one_input(buf, templates_map)?;
223                    let (t, option_template) =
224                        match OptionsTemplateRecord::from_wire(buf, templates_map) {
225                            Ok((buf, value)) => Ok((buf, value)),
226                            Err(err) => match err {
227                                nom::Err::Incomplete(needed) => Err(nom::Err::Incomplete(needed)),
228                                nom::Err::Error(error) => Err(nom::Err::Error(error.into())),
229                                nom::Err::Failure(failure) => {
230                                    Err(nom::Err::Failure(failure.into()))
231                                }
232                            },
233                        }?;
234                    buf = t;
235                    option_templates.push(option_template);
236                }
237                // buf could be a non zero value for padding
238                check_padding_value(buf)?;
239                Set::OptionsTemplate(option_templates.into_boxed_slice())
240            }
241            // We don't need to check for valid Set ID again, since we already checked
242            id => {
243                let template = if let Some(fields) = templates_map.get_mut(&id) {
244                    fields
245                } else {
246                    return Err(nom::Err::Error(LocatedSetParsingError::new(
247                        input,
248                        SetParsingError::NoTemplateDefinedFor(id),
249                    )));
250                };
251                // since we could have vlen fields, we can only state a min_record_len here
252                let min_record_length = template
253                    .scope_fields_specs
254                    .iter()
255                    .map(|x| {
256                        if x.length() == 65535 {
257                            1
258                        } else {
259                            x.length() as usize
260                        }
261                    })
262                    .sum::<usize>()
263                    + template
264                        .fields_specs
265                        .iter()
266                        .map(|x| {
267                            if x.length() == 65535 {
268                                1
269                            } else {
270                                x.length() as usize
271                            }
272                        })
273                        .sum::<usize>();
274
275                let mut records = Vec::new();
276
277                while buf.len() >= min_record_length && min_record_length > 0 {
278                    let read_template: &DecodingTemplate = template;
279                    let (t, record): (Span<'_>, DataRecord) =
280                        parse_into_located_one_input(buf, read_template)?;
281                    buf = t;
282                    records.push(record);
283                }
284                template.increment_processed_count();
285                // buf could be a non zero value for padding
286                while buf.len() > 0 && nom::combinator::peek(be_u8)(buf)?.1 == 0 {
287                    let (t, _) = be_u8(buf)?;
288                    buf = t;
289                }
290
291                // We can safely unwrap DataSetId here since we already checked the range
292                Set::Data {
293                    id: DataSetId::new(id).unwrap(),
294                    records: records.into_boxed_slice(),
295                }
296            }
297        };
298        Ok((remainder, set))
299    }
300}
301
302#[inline]
303fn check_padding_value(mut buf: Span<'_>) -> IResult<Span<'_>, (), LocatedSetParsingError<'_>> {
304    while buf.len() > 0 {
305        let (t, padding_value) = be_u8(buf)?;
306        if padding_value != 0 {
307            return Err(nom::Err::Error(LocatedSetParsingError::new(
308                buf,
309                SetParsingError::InvalidPaddingValue(padding_value),
310            )));
311        }
312        buf = t;
313    }
314    Ok((buf, ()))
315}
316
317#[derive(LocatedError, Eq, PartialEq, Clone, Debug, Serialize, Deserialize)]
318pub enum OptionsTemplateRecordParsingError {
319    #[serde(with = "ErrorKindSerdeDeref")]
320    NomError(#[from_nom] ErrorKind),
321    InvalidTemplateId(u16),
322    /// Scope fields count must be less than the total fields count
323    InvalidScopeFieldsCount(u16),
324    FieldError(#[from_located(module = "crate::wire::deserializer")] FieldSpecifierParsingError),
325}
326
327impl std::fmt::Display for OptionsTemplateRecordParsingError {
328    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329        match self {
330            Self::NomError(err) => write!(f, "Nom error {}", nom::Err::Error(err)),
331            Self::InvalidTemplateId(id) => write!(f, "invalid template ID {id}"),
332            Self::InvalidScopeFieldsCount(count) => write!(f, "invalid scope {count}"),
333            Self::FieldError(err) => write!(f, "{err}"),
334        }
335    }
336}
337
338impl std::error::Error for OptionsTemplateRecordParsingError {
339    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
340        match self {
341            Self::NomError(_err) => None,
342            Self::InvalidTemplateId(_) => None,
343            Self::InvalidScopeFieldsCount(_) => None,
344            Self::FieldError(err) => Some(err),
345        }
346    }
347}
348
349impl<'a>
350    ReadablePduWithOneInput<'a, &mut TemplatesMap, LocatedOptionsTemplateRecordParsingError<'a>>
351    for OptionsTemplateRecord
352{
353    fn from_wire(
354        buf: Span<'a>,
355        templates_map: &mut TemplatesMap,
356    ) -> IResult<Span<'a>, Self, LocatedOptionsTemplateRecordParsingError<'a>> {
357        let input = buf;
358        let (buf, template_id) = be_u16(buf)?;
359        // from RFC7011: Each Template Record is given a unique Template ID in the range
360        // 256 to 65535.
361        if template_id < 256 {
362            return Err(nom::Err::Error(
363                LocatedOptionsTemplateRecordParsingError::new(
364                    input,
365                    OptionsTemplateRecordParsingError::InvalidTemplateId(template_id),
366                ),
367            ));
368        }
369        let (buf, total_fields_count) = be_u16(buf)?;
370        let input = buf;
371        let (mut buf, scope_fields_count) = be_u16(buf)?;
372        if scope_fields_count > total_fields_count {
373            return Err(nom::Err::Error(
374                LocatedOptionsTemplateRecordParsingError::new(
375                    input,
376                    OptionsTemplateRecordParsingError::InvalidScopeFieldsCount(scope_fields_count),
377                ),
378            ));
379        }
380        let mut scope_fields = Vec::with_capacity(scope_fields_count as usize);
381        for _ in 0..scope_fields_count {
382            let (t, field) = parse_into_located(buf)?;
383            scope_fields.push(field);
384            buf = t;
385        }
386        let fields_count = total_fields_count - scope_fields_count;
387        let mut fields = Vec::with_capacity(fields_count as usize);
388        for _ in 0..fields_count {
389            let (t, field) = parse_into_located(buf)?;
390            fields.push(field);
391            buf = t;
392        }
393        templates_map.insert(
394            template_id,
395            DecodingTemplate::new(
396                scope_fields.clone().into_boxed_slice(),
397                fields.clone().into_boxed_slice(),
398            ),
399        );
400        Ok((
401            buf,
402            OptionsTemplateRecord::new(
403                template_id,
404                scope_fields.into_boxed_slice(),
405                fields.into_boxed_slice(),
406            ),
407        ))
408    }
409}
410
411#[derive(LocatedError, Eq, PartialEq, Clone, Debug, Serialize, Deserialize)]
412pub enum DataRecordParsingError {
413    FieldError(#[from_located(module = "")] ie::FieldParsingError),
414}
415
416impl std::fmt::Display for DataRecordParsingError {
417    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418        match self {
419            Self::FieldError(err) => write!(f, "{err}"),
420        }
421    }
422}
423
424impl std::error::Error for DataRecordParsingError {
425    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
426        match self {
427            Self::FieldError(err) => Some(err),
428        }
429    }
430}
431
432impl<'a> ReadablePduWithOneInput<'a, &DecodingTemplate, LocatedDataRecordParsingError<'a>>
433    for DataRecord
434{
435    fn from_wire(
436        buf: Span<'a>,
437        field_specifiers: &DecodingTemplate,
438    ) -> IResult<Span<'a>, Self, LocatedDataRecordParsingError<'a>> {
439        let mut buf = buf;
440        let mut scope_fields =
441            Vec::<crate::ie::Field>::with_capacity(field_specifiers.scope_fields_specs.len());
442        for spec in &field_specifiers.scope_fields_specs {
443            let (t, scope_field) =
444                parse_into_located_two_inputs(buf, &spec.element_id(), spec.length)?;
445            buf = t;
446            scope_fields.push(scope_field);
447        }
448
449        let mut fields =
450            Vec::<crate::ie::Field>::with_capacity(field_specifiers.fields_specs.len());
451        for spec in &field_specifiers.fields_specs {
452            let (t, field) = parse_into_located_two_inputs(buf, &spec.element_id(), spec.length)?;
453            buf = t;
454            fields.push(field);
455        }
456
457        Ok((
458            buf,
459            DataRecord::new(scope_fields.into_boxed_slice(), fields.into_boxed_slice()),
460        ))
461    }
462}
463
464#[derive(LocatedError, Eq, PartialEq, Clone, Debug, Serialize, Deserialize)]
465pub enum TemplateRecordParsingError {
466    #[serde(with = "ErrorKindSerdeDeref")]
467    NomError(#[from_nom] ErrorKind),
468    InvalidTemplateId(u16),
469    FieldSpecifierError(
470        #[from_located(module = "crate::wire::deserializer")] FieldSpecifierParsingError,
471    ),
472}
473
474impl std::fmt::Display for TemplateRecordParsingError {
475    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476        match self {
477            Self::NomError(err) => write!(f, "Nom error {}", nom::Err::Error(err)),
478            Self::InvalidTemplateId(err) => write!(f, "Invalid template id {err}"),
479            Self::FieldSpecifierError(err) => write!(f, "{err}"),
480        }
481    }
482}
483
484impl std::error::Error for TemplateRecordParsingError {
485    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
486        match self {
487            Self::NomError(_err) => None,
488            Self::InvalidTemplateId(_) => None,
489            Self::FieldSpecifierError(err) => Some(err),
490        }
491    }
492}
493
494impl<'a> ReadablePduWithOneInput<'a, &mut TemplatesMap, LocatedTemplateRecordParsingError<'a>>
495    for TemplateRecord
496{
497    fn from_wire(
498        buf: Span<'a>,
499        templates_map: &mut TemplatesMap,
500    ) -> IResult<Span<'a>, Self, LocatedTemplateRecordParsingError<'a>> {
501        let input = buf;
502        let (buf, template_id) = be_u16(buf)?;
503        // from RFC7011: Each Template Record is given a unique Template ID in the range
504        // 256 to 65535.
505        if template_id < 256 {
506            return Err(nom::Err::Error(LocatedTemplateRecordParsingError::new(
507                input,
508                TemplateRecordParsingError::InvalidTemplateId(template_id),
509            )));
510        }
511        let (mut buf, field_count) = be_u16(buf)?;
512        let mut fields = Vec::with_capacity(field_count as usize);
513        for _ in 0..field_count {
514            let (t, field) = parse_into_located(buf)?;
515            fields.push(field);
516            buf = t;
517        }
518        templates_map.insert(
519            template_id,
520            DecodingTemplate::new(Box::new([]), fields.clone().into_boxed_slice()),
521        );
522        Ok((
523            buf,
524            TemplateRecord::new(template_id, fields.into_boxed_slice()),
525        ))
526    }
527}