1use super::{
8 CommonTemplate, DATA_TEMPLATE_IPFIX_ID, DEFAULT_MAX_TEMPLATE_CACHE_SIZE, Data, FieldParser,
9 FlowSet, FlowSetBody, FlowSetHeader, IPFix, IPFixFieldPair, IPFixParser, MAX_FIELD_COUNT,
10 NoTemplateInfo, OPTIONS_TEMPLATE_IPFIX_ID, OptionsData, OptionsTemplate, Template,
11 TemplateField,
12};
13use crate::variable_versions::config::DEFAULT_MAX_RECORDS_PER_FLOWSET;
14use crate::variable_versions::enterprise_registry::EnterpriseFieldRegistry;
15use crate::variable_versions::field_value::FieldValue;
16use crate::variable_versions::metrics::CacheMetricsInner;
17use crate::variable_versions::ttl::{TemplateWithTtl, TtlConfig};
18use crate::variable_versions::v9::{
19 DATA_TEMPLATE_V9_ID, Data as V9Data, OPTIONS_TEMPLATE_V9_ID, OptionsData as V9OptionsData,
20 OptionsTemplate as V9OptionsTemplate, Template as V9Template,
21};
22use crate::variable_versions::{
23 Config, ConfigError, ParserConfig, ParserFields, PendingFlowCache, PendingFlowEntry,
24 PendingFlowsConfig,
25};
26use crate::{NetflowError, NetflowPacket, ParsedNetflow};
27
28use crate::variable_versions::fast_parse::{parse_u8, parse_u16_be};
29use lru::LruCache;
30use nom::IResult;
31use nom::combinator::complete;
32use nom::multi::many0;
33use nom_derive::Parse;
34use std::num::NonZeroUsize;
35use std::sync::Arc;
36
37impl Default for IPFixParser {
38 fn default() -> Self {
39 let config = Config {
41 max_template_cache_size: DEFAULT_MAX_TEMPLATE_CACHE_SIZE,
42 max_field_count: MAX_FIELD_COUNT,
43 max_template_total_size: usize::from(u16::MAX),
44 max_error_sample_size: 256,
45 max_records_per_flowset: DEFAULT_MAX_RECORDS_PER_FLOWSET,
46 ttl_config: None,
47 enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()),
48 pending_flows_config: None,
49 };
50
51 match Self::try_new(config) {
52 Ok(parser) => parser,
53 Err(e) => unreachable!("hardcoded default config must be valid: {e}"),
54 }
55 }
56}
57
58impl IPFixParser {
59 pub fn validate_config(config: &Config) -> Result<(), ConfigError> {
61 config.validate()
62 }
63
64 pub fn try_new(config: Config) -> Result<Self, ConfigError> {
72 let cache_size = NonZeroUsize::new(config.max_template_cache_size).ok_or(
73 ConfigError::InvalidCacheSize(config.max_template_cache_size),
74 )?;
75
76 let pending_flows = config
77 .pending_flows_config
78 .map(PendingFlowCache::new)
79 .transpose()?;
80
81 Ok(Self {
82 templates: LruCache::new(cache_size),
83 v9_templates: LruCache::new(cache_size),
84 ipfix_options_templates: LruCache::new(cache_size),
85 v9_options_templates: LruCache::new(cache_size),
86 ttl_config: config.ttl_config,
87 max_template_cache_size: config.max_template_cache_size,
88 max_field_count: config.max_field_count,
89 max_template_total_size: config.max_template_total_size,
90 max_error_sample_size: config.max_error_sample_size,
91 max_records_per_flowset: config.max_records_per_flowset,
92 enterprise_registry: config.enterprise_registry,
93 metrics: CacheMetricsInner::new(),
94 pending_flows,
95 })
96 }
97}
98
99impl ParserFields for IPFixParser {
100 fn set_max_template_cache_size_field(&mut self, size: usize) {
101 self.max_template_cache_size = size;
102 }
103 fn set_max_field_count_field(&mut self, count: usize) {
104 self.max_field_count = count;
105 }
106 fn set_max_template_total_size_field(&mut self, size: usize) {
107 self.max_template_total_size = size;
108 }
109 fn set_max_error_sample_size_field(&mut self, size: usize) {
110 self.max_error_sample_size = size;
111 }
112 fn set_max_records_per_flowset_field(&mut self, count: usize) {
113 self.max_records_per_flowset = count;
114 }
115 fn set_ttl_config_field(&mut self, config: Option<TtlConfig>) {
116 self.ttl_config = config;
117 }
118 fn set_enterprise_registry(&mut self, registry: Arc<EnterpriseFieldRegistry>) {
119 self.enterprise_registry = registry;
120 }
121 fn pending_flows(&self) -> &Option<PendingFlowCache> {
122 &self.pending_flows
123 }
124 fn pending_flows_mut(&mut self) -> &mut Option<PendingFlowCache> {
125 &mut self.pending_flows
126 }
127 fn metrics_mut(&mut self) -> &mut CacheMetricsInner {
128 &mut self.metrics
129 }
130}
131
132impl ParserConfig for IPFixParser {
133 fn set_pending_flows_config(
134 &mut self,
135 config: Option<PendingFlowsConfig>,
136 ) -> Result<(), ConfigError> {
137 match config {
138 Some(pf_config) => {
139 if let Some(ref mut cache) = self.pending_flows {
140 cache.resize(pf_config, &mut self.metrics)?;
141 } else {
142 self.pending_flows = Some(PendingFlowCache::new(pf_config)?);
143 }
144 }
145 None => {
146 if let Some(ref cache) = self.pending_flows {
148 let count = cache.count();
149 if count > 0 {
150 self.metrics.record_pending_dropped_n(count as u64);
151 }
152 }
153 self.pending_flows = None;
154 }
155 }
156 Ok(())
157 }
158
159 fn resize_template_caches(&mut self, cache_size: NonZeroUsize) {
160 self.templates.resize(cache_size);
161 self.v9_templates.resize(cache_size);
162 self.ipfix_options_templates.resize(cache_size);
163 self.v9_options_templates.resize(cache_size);
164 }
165}
166
167impl IPFixParser {
168 pub(crate) fn parse<'a>(&mut self, packet: &'a [u8]) -> ParsedNetflow<'a> {
170 match IPFix::parse(packet, self) {
171 Ok((remaining, mut ipfix)) => {
172 self.process_pending_flows(&mut ipfix);
173 ParsedNetflow::Success {
174 packet: NetflowPacket::IPFix(ipfix),
175 remaining,
176 }
177 }
178 Err(e) => ParsedNetflow::Error {
179 error: NetflowError::Partial {
180 message: format!("IPFIX parse error: {}", e),
181 },
182 },
183 }
184 }
185
186 fn process_pending_flows(&mut self, ipfix: &mut IPFix) {
187 let Some(mut pending_cache) = self.pending_flows.take() else {
188 return;
189 };
190 let learned = Self::cache_notemplate_ipfix_flowsets(
191 ipfix,
192 &mut pending_cache,
193 &mut self.metrics,
194 self.max_error_sample_size,
195 );
196 self.replay_ipfix_pending_flows(ipfix, &mut pending_cache, &learned);
197 self.pending_flows = Some(pending_cache);
198 }
199
200 fn cache_notemplate_ipfix_flowsets(
203 ipfix: &mut IPFix,
204 cache: &mut PendingFlowCache,
205 metrics: &mut CacheMetricsInner,
206 max_error_sample_size: usize,
207 ) -> Vec<u16> {
208 let mut learned_template_ids: Vec<u16> = Vec::new();
209 let mut remove_mask: Vec<bool> = vec![false; ipfix.flowsets.len()];
210 for (i, flowset) in ipfix.flowsets.iter_mut().enumerate() {
211 match &mut flowset.body {
212 FlowSetBody::NoTemplate(info) => {
213 if flowset.header.length < 4 {
217 metrics.record_pending_dropped();
218 continue;
219 }
220 let body_len = (flowset.header.length as usize) - 4;
221 if info.raw_data.len() < body_len {
222 metrics.record_pending_dropped();
223 continue;
224 }
225 let raw_data = std::mem::take(&mut info.raw_data);
226 if let Some(mut returned) = cache.cache(info.template_id, raw_data, metrics)
227 {
228 let full_len = returned.len();
232 returned.truncate(max_error_sample_size);
233 if returned.len() < full_len {
234 info.truncated = true;
235 }
236 info.raw_data = returned;
237 } else {
238 remove_mask[i] = true;
239 }
240 }
241 FlowSetBody::Template(t) => {
242 learned_template_ids.push(t.template_id);
243 }
244 FlowSetBody::Templates(ts) => {
245 for t in ts.iter() {
246 learned_template_ids.push(t.template_id);
247 }
248 }
249 FlowSetBody::V9Template(t) => {
250 learned_template_ids.push(t.template_id);
251 }
252 FlowSetBody::V9Templates(ts) => {
253 for t in ts.iter() {
254 learned_template_ids.push(t.template_id);
255 }
256 }
257 FlowSetBody::OptionsTemplate(t) => {
258 learned_template_ids.push(t.template_id);
259 }
260 FlowSetBody::OptionsTemplates(ts) => {
261 for t in ts.iter() {
262 learned_template_ids.push(t.template_id);
263 }
264 }
265 FlowSetBody::V9OptionsTemplate(t) => {
266 learned_template_ids.push(t.template_id);
267 }
268 FlowSetBody::V9OptionsTemplates(ts) => {
269 for t in ts.iter() {
270 learned_template_ids.push(t.template_id);
271 }
272 }
273 _ => {}
274 }
275 }
276 let mut mask_iter = remove_mask.into_iter();
278 ipfix
279 .flowsets
280 .retain(|_| !mask_iter.next().unwrap_or(false));
281 let body_len: u16 = ipfix
284 .flowsets
285 .iter()
286 .fold(0u16, |acc, fs| acc.saturating_add(fs.header.length));
287 ipfix.header.length = 16u16.saturating_add(body_len);
289 learned_template_ids
290 }
291
292 fn replay_ipfix_pending_flows(
294 &mut self,
295 ipfix: &mut IPFix,
296 cache: &mut PendingFlowCache,
297 learned: &[u16],
298 ) {
299 for &template_id in learned {
300 let entries = cache.drain(template_id, &mut self.metrics);
301 let total_entries = entries.len();
302 for (processed, entry) in entries.iter().enumerate() {
303 if ipfix.flowsets.len() >= u16::MAX as usize {
305 let remaining = (total_entries - processed) as u64;
306 self.metrics.record_pending_replay_failed_n(remaining);
307 break;
308 }
309 let flowset_length =
310 u16::try_from(entry.raw_data.len().saturating_add(4)).unwrap_or(u16::MAX);
311 let Some(new_header_length) = ipfix.header.length.checked_add(flowset_length)
312 else {
313 let remaining = (total_entries - processed) as u64;
315 self.metrics.record_pending_replay_failed_n(remaining);
316 break;
317 };
318 if self.try_replay_ipfix_flow(&mut ipfix.flowsets, template_id, entry) {
319 self.metrics.record_pending_replayed();
320 ipfix.header.length = new_header_length;
321 } else {
322 self.metrics.record_pending_replay_failed();
323 }
324 }
325 }
326 }
327
328 fn try_replay_ipfix_flow(
330 &mut self,
331 flowsets: &mut Vec<FlowSet>,
332 template_id: u16,
333 entry: &PendingFlowEntry,
334 ) -> bool {
335 if let Some(template) = crate::variable_versions::peek_valid_template(
340 &mut self.templates,
341 &template_id,
342 &self.ttl_config,
343 &mut self.metrics,
344 ) && let Ok((_, data)) = Data::parse_with_registry(
345 &entry.raw_data,
346 &template,
347 &self.enterprise_registry,
348 self.max_records_per_flowset,
349 ) {
350 self.templates.promote(&template_id);
353 flowsets.push(FlowSet {
354 header: FlowSetHeader {
355 header_id: template_id,
356 length: u16::try_from(entry.raw_data.len().saturating_add(4))
357 .unwrap_or(u16::MAX),
358 },
359 body: FlowSetBody::Data(data),
360 });
361 return true;
362 }
363
364 if let Some(template) = crate::variable_versions::peek_valid_template(
366 &mut self.ipfix_options_templates,
367 &template_id,
368 &self.ttl_config,
369 &mut self.metrics,
370 ) && let Ok((_, data)) = OptionsData::parse_with_registry(
371 &entry.raw_data,
372 &template,
373 &self.enterprise_registry,
374 self.max_records_per_flowset,
375 ) {
376 self.ipfix_options_templates.promote(&template_id);
377 flowsets.push(FlowSet {
378 header: FlowSetHeader {
379 header_id: template_id,
380 length: u16::try_from(entry.raw_data.len().saturating_add(4))
381 .unwrap_or(u16::MAX),
382 },
383 body: FlowSetBody::OptionsData(data),
384 });
385 return true;
386 }
387
388 if let Some(template) = crate::variable_versions::peek_valid_template(
390 &mut self.v9_templates,
391 &template_id,
392 &self.ttl_config,
393 &mut self.metrics,
394 ) && let Ok((_, data)) =
395 V9Data::parse_with_limit(&entry.raw_data, &template, self.max_records_per_flowset)
396 {
397 self.v9_templates.promote(&template_id);
398 flowsets.push(FlowSet {
399 header: FlowSetHeader {
400 header_id: template_id,
401 length: u16::try_from(entry.raw_data.len().saturating_add(4))
402 .unwrap_or(u16::MAX),
403 },
404 body: FlowSetBody::V9Data(data),
405 });
406 return true;
407 }
408
409 if let Some(template) = crate::variable_versions::peek_valid_template(
411 &mut self.v9_options_templates,
412 &template_id,
413 &self.ttl_config,
414 &mut self.metrics,
415 ) && let Ok((_, data)) = V9OptionsData::parse_with_limit(
416 &entry.raw_data,
417 &template,
418 self.max_records_per_flowset,
419 ) {
420 self.v9_options_templates.promote(&template_id);
421 flowsets.push(FlowSet {
422 header: FlowSetHeader {
423 header_id: template_id,
424 length: u16::try_from(entry.raw_data.len().saturating_add(4))
425 .unwrap_or(u16::MAX),
426 },
427 body: FlowSetBody::V9OptionsData(data),
428 });
429 return true;
430 }
431
432 false
433 }
434
435 pub fn available_template_ids(&self) -> Vec<u16> {
437 let mut ids: Vec<u16> = self
438 .templates
439 .iter()
440 .map(|(&id, _)| id)
441 .chain(self.v9_templates.iter().map(|(&id, _)| id))
442 .chain(self.ipfix_options_templates.iter().map(|(&id, _)| id))
443 .chain(self.v9_options_templates.iter().map(|(&id, _)| id))
444 .collect();
445 ids.sort_unstable();
446 ids.dedup();
447 ids
448 }
449}
450
451trait HasTemplateId: Clone + PartialEq {
458 fn template_id(&self) -> u16;
459 fn field_count(&self) -> u16;
462}
463
464impl HasTemplateId for Template {
465 fn template_id(&self) -> u16 {
466 self.template_id
467 }
468 fn field_count(&self) -> u16 {
469 self.field_count
470 }
471}
472
473impl HasTemplateId for OptionsTemplate {
474 fn template_id(&self) -> u16 {
475 self.template_id
476 }
477 fn field_count(&self) -> u16 {
478 self.field_count
479 }
480}
481
482impl HasTemplateId for V9Template {
483 fn template_id(&self) -> u16 {
484 self.template_id
485 }
486 fn field_count(&self) -> u16 {
487 self.field_count
488 }
489}
490
491impl HasTemplateId for V9OptionsTemplate {
492 fn template_id(&self) -> u16 {
493 self.template_id
494 }
495 fn field_count(&self) -> u16 {
496 let scope = self.options_scope_length / 4;
499 let option = self.options_length / 4;
500 scope.saturating_add(option)
501 }
502}
503
504fn insert_templates<T: HasTemplateId>(
506 cache: &mut LruCache<u16, TemplateWithTtl<Arc<T>>>,
507 templates: &[T],
508 ttl_enabled: bool,
509 metrics: &mut CacheMetricsInner,
510) {
511 for t in templates {
512 let arc_template = Arc::new(t.clone());
513 let wrapped = TemplateWithTtl::new(arc_template, ttl_enabled);
514 if let Some(existing) = cache.peek(&t.template_id())
515 && existing.template.as_ref() != t
516 {
517 metrics.record_collision();
518 }
519 if let Some((evicted_key, _evicted)) = cache.push(t.template_id(), wrapped)
523 && evicted_key != t.template_id()
524 {
525 metrics.record_eviction();
526 }
527 metrics.record_insertion();
528 }
529}
530
531impl IPFixParser {
532 fn add_ipfix_templates(&mut self, templates: &[Template]) {
534 let ttl_enabled = self.ttl_config.is_some();
535 insert_templates(
536 &mut self.templates,
537 templates,
538 ttl_enabled,
539 &mut self.metrics,
540 );
541 }
542
543 fn add_ipfix_options_templates(&mut self, templates: &[OptionsTemplate]) {
544 let ttl_enabled = self.ttl_config.is_some();
545 insert_templates(
546 &mut self.ipfix_options_templates,
547 templates,
548 ttl_enabled,
549 &mut self.metrics,
550 );
551 }
552
553 fn add_v9_templates(&mut self, templates: &[V9Template]) {
554 let ttl_enabled = self.ttl_config.is_some();
555 insert_templates(
556 &mut self.v9_templates,
557 templates,
558 ttl_enabled,
559 &mut self.metrics,
560 );
561 }
562
563 fn add_v9_options_templates(&mut self, templates: &[V9OptionsTemplate]) {
564 let ttl_enabled = self.ttl_config.is_some();
565 insert_templates(
566 &mut self.v9_options_templates,
567 templates,
568 ttl_enabled,
569 &mut self.metrics,
570 );
571 }
572
573 fn withdraw_ipfix_template(&mut self, template_id: u16) {
581 if template_id == DATA_TEMPLATE_IPFIX_ID {
582 let ids: Vec<u16> = self.templates.iter().map(|(&id, _)| id).collect();
587 for id in &ids {
588 self.templates.pop(id);
589 }
590 if let Some(ref mut cache) = self.pending_flows {
591 for &id in &ids {
592 let drained = cache.drain(id, &mut self.metrics);
593 let n = drained.len() as u64;
594 if n > 0 {
595 self.metrics.record_pending_dropped_n(n);
596 }
597 }
598 }
599 } else {
600 self.templates.pop(&template_id);
601 if let Some(ref mut cache) = self.pending_flows {
602 let drained = cache.drain(template_id, &mut self.metrics);
603 let n = drained.len() as u64;
604 if n > 0 {
605 self.metrics.record_pending_dropped_n(n);
606 }
607 }
608 }
609 }
610
611 fn withdraw_ipfix_options_template(&mut self, template_id: u16) {
617 if template_id == OPTIONS_TEMPLATE_IPFIX_ID {
618 let ids: Vec<u16> = self
622 .ipfix_options_templates
623 .iter()
624 .map(|(&id, _)| id)
625 .collect();
626 for id in &ids {
627 self.ipfix_options_templates.pop(id);
628 }
629 if let Some(ref mut cache) = self.pending_flows {
630 for &id in &ids {
631 let drained = cache.drain(id, &mut self.metrics);
632 let n = drained.len() as u64;
633 if n > 0 {
634 self.metrics.record_pending_dropped_n(n);
635 }
636 }
637 }
638 } else {
639 self.ipfix_options_templates.pop(&template_id);
640 if let Some(ref mut cache) = self.pending_flows {
641 let drained = cache.drain(template_id, &mut self.metrics);
642 let n = drained.len() as u64;
643 if n > 0 {
644 self.metrics.record_pending_dropped_n(n);
645 }
646 }
647 }
648 }
649}
650
651impl FlowSetBody {
652 #[allow(clippy::too_many_arguments)]
653 fn parse_templates<'a, T, F>(
654 i: &'a [u8],
655 parser: &mut IPFixParser,
656 parse_fn: F,
657 single_variant: fn(T) -> FlowSetBody,
658 multi_variant: fn(Vec<T>) -> FlowSetBody,
659 validate: fn(&T, &IPFixParser) -> bool,
660 add_templates: fn(&mut IPFixParser, &[T]),
661 withdraw_template: Option<fn(&mut IPFixParser, u16)>,
662 ) -> IResult<&'a [u8], FlowSetBody>
663 where
664 T: Clone + HasTemplateId,
665 F: Fn(&'a [u8]) -> IResult<&'a [u8], T>,
666 {
667 let (i, templates) = many0(complete(parse_fn))(i)?;
668
669 let mut had_withdrawals = false;
675 if let Some(withdraw_fn) = withdraw_template {
676 for t in &templates {
677 if t.field_count() == 0 {
678 let id = t.template_id();
679 let has_redefinition = id != DATA_TEMPLATE_IPFIX_ID
682 && id != OPTIONS_TEMPLATE_IPFIX_ID
683 && templates
684 .iter()
685 .any(|other| other.template_id() == id && other.field_count() > 0);
686 if !has_redefinition {
687 withdraw_fn(parser, id);
688 }
689 had_withdrawals = true;
690 }
691 }
692 }
693
694 let valid_templates: Vec<_> = templates
697 .into_iter()
698 .filter(|t| validate(t, parser))
699 .collect();
700 if valid_templates.is_empty() {
701 if had_withdrawals {
703 return Ok((i, FlowSetBody::Empty));
704 }
705 return Err(nom::Err::Error(nom::error::Error::new(
706 i,
707 nom::error::ErrorKind::Verify,
708 )));
709 }
710 add_templates(parser, &valid_templates);
712 match valid_templates.len() {
713 1 => {
714 if let Some(template) = valid_templates.into_iter().next() {
715 Ok((i, single_variant(template)))
716 } else {
717 Err(nom::Err::Error(nom::error::Error::new(
718 i,
719 nom::error::ErrorKind::Verify,
720 )))
721 }
722 }
723 _ => Ok((i, multi_variant(valid_templates))),
724 }
725 }
726
727 pub(super) fn parse<'a>(
728 i: &'a [u8],
729 parser: &mut IPFixParser,
730 id: u16,
731 ) -> IResult<&'a [u8], FlowSetBody> {
732 match id {
733 DATA_TEMPLATE_IPFIX_ID => Self::parse_templates(
734 i,
735 parser,
736 Template::parse,
737 FlowSetBody::Template,
738 FlowSetBody::Templates,
739 |t: &Template, p: &IPFixParser| t.is_valid(p),
740 |parser, templates| parser.add_ipfix_templates(templates),
741 Some(|parser: &mut IPFixParser, id| parser.withdraw_ipfix_template(id)),
742 ),
743 DATA_TEMPLATE_V9_ID => Self::parse_templates(
744 i,
745 parser,
746 V9Template::parse,
747 FlowSetBody::V9Template,
748 FlowSetBody::V9Templates,
749 |t: &V9Template, p: &IPFixParser| {
750 usize::from(t.field_count) <= p.max_field_count
754 && !t.fields.is_empty()
755 && t.fields
756 .iter()
757 .all(|f| f.field_length > 0 && f.field_length != 65535)
758 && usize::from(t.get_total_size()) <= p.max_template_total_size
759 && !t.has_duplicate_fields()
760 },
761 |parser, templates| parser.add_v9_templates(templates),
762 None, ),
764 OPTIONS_TEMPLATE_V9_ID => Self::parse_templates(
765 i,
766 parser,
767 V9OptionsTemplate::parse,
768 FlowSetBody::V9OptionsTemplate,
769 FlowSetBody::V9OptionsTemplates,
770 |t: &V9OptionsTemplate, p: &IPFixParser| {
771 let scope_count = usize::from(t.options_scope_length / 4);
772 let option_count = usize::from(t.options_length / 4);
773 t.options_scope_length.is_multiple_of(4)
774 && t.options_length.is_multiple_of(4)
775 && scope_count > 0
776 && scope_count <= p.max_field_count
777 && option_count <= p.max_field_count
778 && scope_count.saturating_add(option_count) <= p.max_field_count
779 && usize::from(t.get_total_size()) <= p.max_template_total_size
780 && !t.has_duplicate_scope_fields()
781 && !t.has_duplicate_option_fields()
782 && t.scope_fields.iter().all(|f| f.field_length > 0 && f.field_length != 65535)
786 && t.option_fields.iter().all(|f| f.field_length > 0 && f.field_length != 65535)
787 },
788 |parser, templates| parser.add_v9_options_templates(templates),
789 None, ),
791 OPTIONS_TEMPLATE_IPFIX_ID => Self::parse_templates(
792 i,
793 parser,
794 OptionsTemplate::parse,
795 FlowSetBody::OptionsTemplate,
796 FlowSetBody::OptionsTemplates,
797 |t: &OptionsTemplate, p: &IPFixParser| t.is_valid(p),
798 |parser, templates| parser.add_ipfix_options_templates(templates),
799 Some(|parser: &mut IPFixParser, id| parser.withdraw_ipfix_options_template(id)),
800 ),
801 _ => {
803 if let Some(template) = crate::variable_versions::get_valid_template(
813 &mut parser.templates,
814 &id,
815 &parser.ttl_config,
816 &mut parser.metrics,
817 ) {
818 parser.metrics.record_hit();
819 if template.get_fields().is_empty() {
820 return Ok((i, FlowSetBody::Empty));
821 }
822 let (i, data) = Data::parse_with_registry(
823 i,
824 &template,
825 &parser.enterprise_registry,
826 parser.max_records_per_flowset,
827 )?;
828 return Ok((i, FlowSetBody::Data(data)));
829 }
830
831 if let Some(template) = crate::variable_versions::get_valid_template(
833 &mut parser.ipfix_options_templates,
834 &id,
835 &parser.ttl_config,
836 &mut parser.metrics,
837 ) {
838 parser.metrics.record_hit();
839 if template.get_fields().is_empty() {
840 return Ok((i, FlowSetBody::Empty));
841 }
842 let (i, data) = OptionsData::parse_with_registry(
843 i,
844 &template,
845 &parser.enterprise_registry,
846 parser.max_records_per_flowset,
847 )?;
848 return Ok((i, FlowSetBody::OptionsData(data)));
849 }
850
851 if let Some(template) = crate::variable_versions::get_valid_template(
853 &mut parser.v9_templates,
854 &id,
855 &parser.ttl_config,
856 &mut parser.metrics,
857 ) {
858 parser.metrics.record_hit();
859 let (i, data) =
860 V9Data::parse_with_limit(i, &template, parser.max_records_per_flowset)?;
861 return Ok((i, FlowSetBody::V9Data(data)));
862 }
863
864 if let Some(template) = crate::variable_versions::get_valid_template(
866 &mut parser.v9_options_templates,
867 &id,
868 &parser.ttl_config,
869 &mut parser.metrics,
870 ) {
871 parser.metrics.record_hit();
872 let (i, data) = V9OptionsData::parse_with_limit(
873 i,
874 &template,
875 parser.max_records_per_flowset,
876 )?;
877 return Ok((i, FlowSetBody::V9OptionsData(data)));
878 }
879
880 parser.metrics.record_miss();
883 if id > 255 {
884 let (raw_data, truncated) = if parser
890 .pending_flows
891 .as_ref()
892 .is_some_and(|c| c.would_accept(id, i.len()))
893 {
894 (i.to_vec(), false)
895 } else {
896 let limit = i.len().min(parser.max_error_sample_size);
897 (i[..limit].to_vec(), limit < i.len())
898 };
899 let info = NoTemplateInfo {
900 template_id: id,
901 raw_data,
902 truncated,
903 };
904 Ok((&[] as &[u8], FlowSetBody::NoTemplate(info)))
905 } else {
906 Ok((&[] as &[u8], FlowSetBody::Empty))
908 }
909 }
910 }
911 }
912}
913
914impl Template {
915 pub fn is_valid(&self, parser: &IPFixParser) -> bool {
917 <Self as CommonTemplate>::is_valid(self, parser)
918 }
919}
920
921impl OptionsTemplate {
922 pub fn is_valid(&self, parser: &IPFixParser) -> bool {
924 <Self as CommonTemplate>::is_valid(self, parser)
925 }
926}
927
928fn collect_varlen_field_lengths(fields: &[TemplateField]) -> Vec<u16> {
930 if fields.iter().any(|f| f.field_length == 65535) {
931 fields.iter().map(|f| f.field_length).collect()
932 } else {
933 Vec::new()
934 }
935}
936
937impl Data {
938 pub(super) fn parse_with_registry<'a>(
940 i: &'a [u8],
941 template: &Template,
942 registry: &EnterpriseFieldRegistry,
943 max_records: usize,
944 ) -> IResult<&'a [u8], Self> {
945 let template_field_lengths = collect_varlen_field_lengths(template.get_fields());
946 let (i, fields) = FieldParser::parse_with_registry(i, template, registry, max_records)?;
947 Ok((
948 i,
949 Self {
950 fields,
951 padding: vec![],
952 template_field_lengths,
953 },
954 ))
955 }
956}
957
958impl OptionsData {
959 pub(super) fn parse_with_registry<'a>(
961 i: &'a [u8],
962 template: &OptionsTemplate,
963 registry: &EnterpriseFieldRegistry,
964 max_records: usize,
965 ) -> IResult<&'a [u8], Self> {
966 let template_field_lengths = collect_varlen_field_lengths(template.get_fields());
967 let (i, fields) = FieldParser::parse_with_registry(i, template, registry, max_records)?;
968 Ok((
969 i,
970 Self {
971 fields,
972 padding: vec![],
973 template_field_lengths,
974 },
975 ))
976 }
977}
978
979impl<'a> FieldParser {
980 fn parse_inner<T: CommonTemplate, F>(
985 mut i: &'a [u8],
986 template: &T,
987 max_records: usize,
988 parse_field: F,
989 ) -> IResult<&'a [u8], Vec<Vec<IPFixFieldPair>>>
990 where
991 F: Fn(&TemplateField, &'a [u8]) -> IResult<&'a [u8], FieldValue>,
992 {
993 let template_fields = template.get_fields();
994 if template_fields.is_empty() {
995 return Ok((i, Vec::new()));
996 }
997
998 let template_size: usize = template_fields
1002 .iter()
1003 .map(|f| {
1004 if f.field_length == 65535 {
1005 1
1006 } else {
1007 usize::from(f.field_length)
1008 }
1009 })
1010 .sum();
1011 let estimated_records = (i.len() / template_size).min(max_records);
1014 let mut res = Vec::with_capacity(estimated_records);
1015
1016 while !i.is_empty() && res.len() < max_records {
1018 let before = i;
1019 let mut vec = Vec::with_capacity(template_fields.len());
1020 for field in template_fields.iter() {
1021 match parse_field(field, i) {
1022 Ok((remaining, field_value)) => {
1023 vec.push((field.field_type, field_value));
1024 i = remaining;
1025 }
1026 Err(_) => {
1027 i = before;
1028 return Ok((i, res));
1029 }
1030 }
1031 }
1032 if std::ptr::eq(i, before) {
1035 break;
1036 }
1037 res.push(vec);
1038 }
1039 Ok((i, res))
1040 }
1041
1042 pub(super) fn parse<T: CommonTemplate>(
1046 i: &'a [u8],
1047 template: &T,
1048 max_records: usize,
1049 ) -> IResult<&'a [u8], Vec<Vec<IPFixFieldPair>>> {
1050 Self::parse_inner(i, template, max_records, |field, input| {
1051 field.parse_as_field_value(input)
1052 })
1053 }
1054
1055 fn parse_with_registry<T: CommonTemplate>(
1057 i: &'a [u8],
1058 template: &T,
1059 registry: &EnterpriseFieldRegistry,
1060 max_records: usize,
1061 ) -> IResult<&'a [u8], Vec<Vec<IPFixFieldPair>>> {
1062 Self::parse_inner(i, template, max_records, |field, input| {
1063 field.parse_as_field_value_with_registry(input, registry)
1064 })
1065 }
1066}
1067
1068impl TemplateField {
1069 fn parse_field_length<'a>(&self, i: &'a [u8]) -> IResult<&'a [u8], u16> {
1074 match self.field_length {
1075 65535 => {
1076 let (i, length) = parse_u8(i)?;
1077 if length == 255 {
1078 let (i, full_length) = parse_u16_be(i)?;
1079 if full_length == 0 {
1081 return Err(nom::Err::Error(nom::error::Error::new(
1082 i,
1083 nom::error::ErrorKind::Verify,
1084 )));
1085 }
1086 if (full_length as usize) > i.len() {
1088 return Err(nom::Err::Error(nom::error::Error::new(
1089 i,
1090 nom::error::ErrorKind::Eof,
1091 )));
1092 }
1093 Ok((i, full_length))
1094 } else {
1095 if length == 0 {
1097 return Err(nom::Err::Error(nom::error::Error::new(
1098 i,
1099 nom::error::ErrorKind::Verify,
1100 )));
1101 }
1102 if (length as usize) > i.len() {
1104 return Err(nom::Err::Error(nom::error::Error::new(
1105 i,
1106 nom::error::ErrorKind::Eof,
1107 )));
1108 }
1109 Ok((i, u16::from(length)))
1110 }
1111 }
1112 length => Ok((i, length)),
1113 }
1114 }
1115
1116 fn parse_as_field_value<'a>(&self, i: &'a [u8]) -> IResult<&'a [u8], FieldValue> {
1117 let (i, length) = self.parse_field_length(i)?;
1118 FieldValue::from_field_type(i, self.field_type.into(), length)
1119 }
1120
1121 fn parse_as_field_value_with_registry<'a>(
1122 &self,
1123 i: &'a [u8],
1124 registry: &EnterpriseFieldRegistry,
1125 ) -> IResult<&'a [u8], FieldValue> {
1126 let (i, length) = self.parse_field_length(i)?;
1127 let field_type = self.field_type.to_field_data_type(registry);
1128 FieldValue::from_field_type(i, field_type, length)
1129 }
1130}