Skip to main content

netflow_parser/variable_versions/ipfix/
parser.rs

1//! IPFixParser — template-cached IPFIX parser with pending flow support.
2//!
3//! Type definitions live in the parent `ipfix` module (`mod.rs`).
4//! Parsing impl blocks for IPFIX types (FlowSetBody, FieldParser, TemplateField, etc.)
5//! are also defined here.
6
7use super::{
8    CommonTemplate, DATA_TEMPLATE_IPFIX_ID, DEFAULT_MAX_TEMPLATE_CACHE_SIZE, Data, FieldParser,
9    FlowSet, FlowSetBody, FlowSetHeader, IPFix, IPFixFieldPair, IPFixParser, MAX_FIELD_COUNT,
10    NoTemplateInfo, OPTIONS_TEMPLATE_IPFIX_ID, OptionsData, OptionsTemplate, Template,
11    TemplateField,
12};
13use crate::variable_versions::config::DEFAULT_MAX_RECORDS_PER_FLOWSET;
14use crate::variable_versions::enterprise_registry::EnterpriseFieldRegistry;
15use crate::variable_versions::field_value::FieldValue;
16use crate::variable_versions::metrics::CacheMetricsInner;
17use crate::variable_versions::ttl::{TemplateWithTtl, TtlConfig};
18use crate::variable_versions::v9::{
19    DATA_TEMPLATE_V9_ID, Data as V9Data, OPTIONS_TEMPLATE_V9_ID, OptionsData as V9OptionsData,
20    OptionsTemplate as V9OptionsTemplate, Template as V9Template,
21};
22use crate::variable_versions::{
23    Config, ConfigError, ParserConfig, ParserFields, PendingFlowCache, PendingFlowEntry,
24    PendingFlowsConfig,
25};
26use crate::{NetflowError, NetflowPacket, ParsedNetflow};
27
28use crate::variable_versions::fast_parse::{parse_u8, parse_u16_be};
29use lru::LruCache;
30use nom::IResult;
31use nom::combinator::complete;
32use nom::multi::many0;
33use nom_derive::Parse;
34use std::num::NonZeroUsize;
35use std::sync::Arc;
36
37impl Default for IPFixParser {
38    fn default() -> Self {
39        // Safe to unwrap because DEFAULT_MAX_TEMPLATE_CACHE_SIZE is non-zero
40        let config = Config {
41            max_template_cache_size: DEFAULT_MAX_TEMPLATE_CACHE_SIZE,
42            max_field_count: MAX_FIELD_COUNT,
43            max_template_total_size: usize::from(u16::MAX),
44            max_error_sample_size: 256,
45            max_records_per_flowset: DEFAULT_MAX_RECORDS_PER_FLOWSET,
46            ttl_config: None,
47            enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()),
48            pending_flows_config: None,
49        };
50
51        match Self::try_new(config) {
52            Ok(parser) => parser,
53            Err(e) => unreachable!("hardcoded default config must be valid: {e}"),
54        }
55    }
56}
57
58impl IPFixParser {
59    /// Validates a configuration without allocating parser internals.
60    pub fn validate_config(config: &Config) -> Result<(), ConfigError> {
61        config.validate()
62    }
63
64    /// Create a new IPFixParser with a custom template cache size and optional TTL configuration.
65    ///
66    /// # Arguments
67    /// * `config` - Configuration struct containing max_template_cache_size and optional ttl_config
68    ///
69    /// # Errors
70    /// Returns `ConfigError` if `max_template_cache_size` is 0
71    pub fn try_new(config: Config) -> Result<Self, ConfigError> {
72        let cache_size = NonZeroUsize::new(config.max_template_cache_size).ok_or(
73            ConfigError::InvalidCacheSize(config.max_template_cache_size),
74        )?;
75
76        let pending_flows = config
77            .pending_flows_config
78            .map(PendingFlowCache::new)
79            .transpose()?;
80
81        Ok(Self {
82            templates: LruCache::new(cache_size),
83            v9_templates: LruCache::new(cache_size),
84            ipfix_options_templates: LruCache::new(cache_size),
85            v9_options_templates: LruCache::new(cache_size),
86            ttl_config: config.ttl_config,
87            max_template_cache_size: config.max_template_cache_size,
88            max_field_count: config.max_field_count,
89            max_template_total_size: config.max_template_total_size,
90            max_error_sample_size: config.max_error_sample_size,
91            max_records_per_flowset: config.max_records_per_flowset,
92            enterprise_registry: config.enterprise_registry,
93            metrics: CacheMetricsInner::new(),
94            pending_flows,
95        })
96    }
97}
98
99impl ParserFields for IPFixParser {
100    fn set_max_template_cache_size_field(&mut self, size: usize) {
101        self.max_template_cache_size = size;
102    }
103    fn set_max_field_count_field(&mut self, count: usize) {
104        self.max_field_count = count;
105    }
106    fn set_max_template_total_size_field(&mut self, size: usize) {
107        self.max_template_total_size = size;
108    }
109    fn set_max_error_sample_size_field(&mut self, size: usize) {
110        self.max_error_sample_size = size;
111    }
112    fn set_max_records_per_flowset_field(&mut self, count: usize) {
113        self.max_records_per_flowset = count;
114    }
115    fn set_ttl_config_field(&mut self, config: Option<TtlConfig>) {
116        self.ttl_config = config;
117    }
118    fn set_enterprise_registry(&mut self, registry: Arc<EnterpriseFieldRegistry>) {
119        self.enterprise_registry = registry;
120    }
121    fn pending_flows(&self) -> &Option<PendingFlowCache> {
122        &self.pending_flows
123    }
124    fn pending_flows_mut(&mut self) -> &mut Option<PendingFlowCache> {
125        &mut self.pending_flows
126    }
127    fn metrics_mut(&mut self) -> &mut CacheMetricsInner {
128        &mut self.metrics
129    }
130}
131
132impl ParserConfig for IPFixParser {
133    fn set_pending_flows_config(
134        &mut self,
135        config: Option<PendingFlowsConfig>,
136    ) -> Result<(), ConfigError> {
137        match config {
138            Some(pf_config) => {
139                if let Some(ref mut cache) = self.pending_flows {
140                    cache.resize(pf_config, &mut self.metrics)?;
141                } else {
142                    self.pending_flows = Some(PendingFlowCache::new(pf_config)?);
143                }
144            }
145            None => {
146                // Record all cached entries as dropped before discarding.
147                if let Some(ref cache) = self.pending_flows {
148                    let count = cache.count();
149                    if count > 0 {
150                        self.metrics.record_pending_dropped_n(count as u64);
151                    }
152                }
153                self.pending_flows = None;
154            }
155        }
156        Ok(())
157    }
158
159    fn resize_template_caches(&mut self, cache_size: NonZeroUsize) {
160        self.templates.resize(cache_size);
161        self.v9_templates.resize(cache_size);
162        self.ipfix_options_templates.resize(cache_size);
163        self.v9_options_templates.resize(cache_size);
164    }
165}
166
167impl IPFixParser {
168    /// Parse an IPFIX message from raw bytes, using cached templates to decode data records.
169    pub(crate) fn parse<'a>(&mut self, packet: &'a [u8]) -> ParsedNetflow<'a> {
170        match IPFix::parse(packet, self) {
171            Ok((remaining, mut ipfix)) => {
172                self.process_pending_flows(&mut ipfix);
173                ParsedNetflow::Success {
174                    packet: NetflowPacket::IPFix(ipfix),
175                    remaining,
176                }
177            }
178            Err(e) => ParsedNetflow::Error {
179                error: NetflowError::Partial {
180                    message: format!("IPFIX parse error: {}", e),
181                },
182            },
183        }
184    }
185
186    fn process_pending_flows(&mut self, ipfix: &mut IPFix) {
187        let Some(mut pending_cache) = self.pending_flows.take() else {
188            return;
189        };
190        let learned = Self::cache_notemplate_ipfix_flowsets(
191            ipfix,
192            &mut pending_cache,
193            &mut self.metrics,
194            self.max_error_sample_size,
195        );
196        self.replay_ipfix_pending_flows(ipfix, &mut pending_cache, &learned);
197        self.pending_flows = Some(pending_cache);
198    }
199
200    /// Single pass: cache NoTemplate raw data, collect learned template IDs,
201    /// remove successfully-cached flowsets, and adjust header.length.
202    fn cache_notemplate_ipfix_flowsets(
203        ipfix: &mut IPFix,
204        cache: &mut PendingFlowCache,
205        metrics: &mut CacheMetricsInner,
206        max_error_sample_size: usize,
207    ) -> Vec<u16> {
208        let mut learned_template_ids: Vec<u16> = Vec::new();
209        let mut remove_mask: Vec<bool> = vec![false; ipfix.flowsets.len()];
210        for (i, flowset) in ipfix.flowsets.iter_mut().enumerate() {
211            match &mut flowset.body {
212                FlowSetBody::NoTemplate(info) => {
213                    // Reject flowsets with impossibly small headers (RFC minimum is 4).
214                    // Also reject truncated raw_data (oversized entry at parse time).
215                    // The flowset is kept in output as diagnostic data.
216                    if flowset.header.length < 4 {
217                        metrics.record_pending_dropped();
218                        continue;
219                    }
220                    let body_len = (flowset.header.length as usize) - 4;
221                    if info.raw_data.len() < body_len {
222                        metrics.record_pending_dropped();
223                        continue;
224                    }
225                    let raw_data = std::mem::take(&mut info.raw_data);
226                    if let Some(mut returned) = cache.cache(info.template_id, raw_data, metrics)
227                    {
228                        // Truncate rejected data to diagnostic size so
229                        // callers don't hold the full (potentially large)
230                        // buffer that was not cached.
231                        let full_len = returned.len();
232                        returned.truncate(max_error_sample_size);
233                        if returned.len() < full_len {
234                            info.truncated = true;
235                        }
236                        info.raw_data = returned;
237                    } else {
238                        remove_mask[i] = true;
239                    }
240                }
241                FlowSetBody::Template(t) => {
242                    learned_template_ids.push(t.template_id);
243                }
244                FlowSetBody::Templates(ts) => {
245                    for t in ts.iter() {
246                        learned_template_ids.push(t.template_id);
247                    }
248                }
249                FlowSetBody::V9Template(t) => {
250                    learned_template_ids.push(t.template_id);
251                }
252                FlowSetBody::V9Templates(ts) => {
253                    for t in ts.iter() {
254                        learned_template_ids.push(t.template_id);
255                    }
256                }
257                FlowSetBody::OptionsTemplate(t) => {
258                    learned_template_ids.push(t.template_id);
259                }
260                FlowSetBody::OptionsTemplates(ts) => {
261                    for t in ts.iter() {
262                        learned_template_ids.push(t.template_id);
263                    }
264                }
265                FlowSetBody::V9OptionsTemplate(t) => {
266                    learned_template_ids.push(t.template_id);
267                }
268                FlowSetBody::V9OptionsTemplates(ts) => {
269                    for t in ts.iter() {
270                        learned_template_ids.push(t.template_id);
271                    }
272                }
273                _ => {}
274            }
275        }
276        // Remove successfully-cached flowsets and reconcile header length.
277        let mut mask_iter = remove_mask.into_iter();
278        ipfix
279            .flowsets
280            .retain(|_| !mask_iter.next().unwrap_or(false));
281        // Reconcile header length from remaining flowsets (avoids drift from
282        // saturating arithmetic on corrupt input).
283        let body_len: u16 = ipfix
284            .flowsets
285            .iter()
286            .fold(0u16, |acc, fs| acc.saturating_add(fs.header.length));
287        // IPFIX header is 16 bytes; total message length = header + body.
288        ipfix.header.length = 16u16.saturating_add(body_len);
289        learned_template_ids
290    }
291
292    /// Replay pending flows for each newly learned template.
293    fn replay_ipfix_pending_flows(
294        &mut self,
295        ipfix: &mut IPFix,
296        cache: &mut PendingFlowCache,
297        learned: &[u16],
298    ) {
299        for &template_id in learned {
300            let entries = cache.drain(template_id, &mut self.metrics);
301            let total_entries = entries.len();
302            for (processed, entry) in entries.iter().enumerate() {
303                // Bound flowset count, consistent with V9 replay.
304                if ipfix.flowsets.len() >= u16::MAX as usize {
305                    let remaining = (total_entries - processed) as u64;
306                    self.metrics.record_pending_replay_failed_n(remaining);
307                    break;
308                }
309                let flowset_length =
310                    u16::try_from(entry.raw_data.len().saturating_add(4)).unwrap_or(u16::MAX);
311                let Some(new_header_length) = ipfix.header.length.checked_add(flowset_length)
312                else {
313                    // Count this entry plus all remaining as failed.
314                    let remaining = (total_entries - processed) as u64;
315                    self.metrics.record_pending_replay_failed_n(remaining);
316                    break;
317                };
318                if self.try_replay_ipfix_flow(&mut ipfix.flowsets, template_id, entry) {
319                    self.metrics.record_pending_replayed();
320                    ipfix.header.length = new_header_length;
321                } else {
322                    self.metrics.record_pending_replay_failed();
323                }
324            }
325        }
326    }
327
328    /// Try to replay a pending flow entry using available templates.
329    fn try_replay_ipfix_flow(
330        &mut self,
331        flowsets: &mut Vec<FlowSet>,
332        template_id: u16,
333        entry: &PendingFlowEntry,
334    ) -> bool {
335        // Use peek_valid_template to avoid false LRU promotion on failed parse.
336        // Promote only after successful replay.
337
338        // Try IPFIX templates
339        if let Some(template) = crate::variable_versions::peek_valid_template(
340            &mut self.templates,
341            &template_id,
342            &self.ttl_config,
343            &mut self.metrics,
344        ) && let Ok((_, data)) = Data::parse_with_registry(
345            &entry.raw_data,
346            &template,
347            &self.enterprise_registry,
348            self.max_records_per_flowset,
349        ) {
350            // Don't record_hit() — the original flowset already recorded
351            // a miss. Replay success is tracked via record_pending_replayed().
352            self.templates.promote(&template_id);
353            flowsets.push(FlowSet {
354                header: FlowSetHeader {
355                    header_id: template_id,
356                    length: u16::try_from(entry.raw_data.len().saturating_add(4))
357                        .unwrap_or(u16::MAX),
358                },
359                body: FlowSetBody::Data(data),
360            });
361            return true;
362        }
363
364        // Try IPFIX options templates
365        if let Some(template) = crate::variable_versions::peek_valid_template(
366            &mut self.ipfix_options_templates,
367            &template_id,
368            &self.ttl_config,
369            &mut self.metrics,
370        ) && let Ok((_, data)) = OptionsData::parse_with_registry(
371            &entry.raw_data,
372            &template,
373            &self.enterprise_registry,
374            self.max_records_per_flowset,
375        ) {
376            self.ipfix_options_templates.promote(&template_id);
377            flowsets.push(FlowSet {
378                header: FlowSetHeader {
379                    header_id: template_id,
380                    length: u16::try_from(entry.raw_data.len().saturating_add(4))
381                        .unwrap_or(u16::MAX),
382                },
383                body: FlowSetBody::OptionsData(data),
384            });
385            return true;
386        }
387
388        // Try V9 templates
389        if let Some(template) = crate::variable_versions::peek_valid_template(
390            &mut self.v9_templates,
391            &template_id,
392            &self.ttl_config,
393            &mut self.metrics,
394        ) && let Ok((_, data)) =
395            V9Data::parse_with_limit(&entry.raw_data, &template, self.max_records_per_flowset)
396        {
397            self.v9_templates.promote(&template_id);
398            flowsets.push(FlowSet {
399                header: FlowSetHeader {
400                    header_id: template_id,
401                    length: u16::try_from(entry.raw_data.len().saturating_add(4))
402                        .unwrap_or(u16::MAX),
403                },
404                body: FlowSetBody::V9Data(data),
405            });
406            return true;
407        }
408
409        // Try V9 options templates
410        if let Some(template) = crate::variable_versions::peek_valid_template(
411            &mut self.v9_options_templates,
412            &template_id,
413            &self.ttl_config,
414            &mut self.metrics,
415        ) && let Ok((_, data)) = V9OptionsData::parse_with_limit(
416            &entry.raw_data,
417            &template,
418            self.max_records_per_flowset,
419        ) {
420            self.v9_options_templates.promote(&template_id);
421            flowsets.push(FlowSet {
422                header: FlowSetHeader {
423                    header_id: template_id,
424                    length: u16::try_from(entry.raw_data.len().saturating_add(4))
425                        .unwrap_or(u16::MAX),
426                },
427                body: FlowSetBody::V9OptionsData(data),
428            });
429            return true;
430        }
431
432        false
433    }
434
435    /// Returns a sorted, deduplicated list of all available template IDs.
436    pub fn available_template_ids(&self) -> Vec<u16> {
437        let mut ids: Vec<u16> = self
438            .templates
439            .iter()
440            .map(|(&id, _)| id)
441            .chain(self.v9_templates.iter().map(|(&id, _)| id))
442            .chain(self.ipfix_options_templates.iter().map(|(&id, _)| id))
443            .chain(self.v9_options_templates.iter().map(|(&id, _)| id))
444            .collect();
445        ids.sort_unstable();
446        ids.dedup();
447        ids
448    }
449}
450
451// ---------------------------------------------------------------------------
452// Parsing impl blocks (moved from mod.rs)
453// ---------------------------------------------------------------------------
454
455/// Trait abstracting the template ID and field count accessors needed by the
456/// generic insertion helper and template withdrawal detection.
457trait HasTemplateId: Clone + PartialEq {
458    fn template_id(&self) -> u16;
459    /// Returns the declared field count. A field count of 0 signals template
460    /// withdrawal per RFC 7011 Section 3.4.3.
461    fn field_count(&self) -> u16;
462}
463
464impl HasTemplateId for Template {
465    fn template_id(&self) -> u16 {
466        self.template_id
467    }
468    fn field_count(&self) -> u16 {
469        self.field_count
470    }
471}
472
473impl HasTemplateId for OptionsTemplate {
474    fn template_id(&self) -> u16 {
475        self.template_id
476    }
477    fn field_count(&self) -> u16 {
478        self.field_count
479    }
480}
481
482impl HasTemplateId for V9Template {
483    fn template_id(&self) -> u16 {
484        self.template_id
485    }
486    fn field_count(&self) -> u16 {
487        self.field_count
488    }
489}
490
491impl HasTemplateId for V9OptionsTemplate {
492    fn template_id(&self) -> u16 {
493        self.template_id
494    }
495    fn field_count(&self) -> u16 {
496        // V9 options templates don't support withdrawal
497        // Return combined field count (non-zero for valid templates)
498        let scope = self.options_scope_length / 4;
499        let option = self.options_length / 4;
500        scope.saturating_add(option)
501    }
502}
503
504/// Insert templates into an LRU cache, recording collision/eviction/insertion metrics.
505fn insert_templates<T: HasTemplateId>(
506    cache: &mut LruCache<u16, TemplateWithTtl<Arc<T>>>,
507    templates: &[T],
508    ttl_enabled: bool,
509    metrics: &mut CacheMetricsInner,
510) {
511    for t in templates {
512        let arc_template = Arc::new(t.clone());
513        let wrapped = TemplateWithTtl::new(arc_template, ttl_enabled);
514        if let Some(existing) = cache.peek(&t.template_id())
515            && existing.template.as_ref() != t
516        {
517            metrics.record_collision();
518        }
519        // push() returns Some in two cases: (1) a different key was LRU-evicted
520        // to make room, or (2) the same key existed and its value was replaced.
521        // Only count case (1) as an eviction.
522        if let Some((evicted_key, _evicted)) = cache.push(t.template_id(), wrapped)
523            && evicted_key != t.template_id()
524        {
525            metrics.record_eviction();
526        }
527        metrics.record_insertion();
528    }
529}
530
531impl IPFixParser {
532    /// Add templates to the parser by cloning from slice.
533    fn add_ipfix_templates(&mut self, templates: &[Template]) {
534        let ttl_enabled = self.ttl_config.is_some();
535        insert_templates(
536            &mut self.templates,
537            templates,
538            ttl_enabled,
539            &mut self.metrics,
540        );
541    }
542
543    fn add_ipfix_options_templates(&mut self, templates: &[OptionsTemplate]) {
544        let ttl_enabled = self.ttl_config.is_some();
545        insert_templates(
546            &mut self.ipfix_options_templates,
547            templates,
548            ttl_enabled,
549            &mut self.metrics,
550        );
551    }
552
553    fn add_v9_templates(&mut self, templates: &[V9Template]) {
554        let ttl_enabled = self.ttl_config.is_some();
555        insert_templates(
556            &mut self.v9_templates,
557            templates,
558            ttl_enabled,
559            &mut self.metrics,
560        );
561    }
562
563    fn add_v9_options_templates(&mut self, templates: &[V9OptionsTemplate]) {
564        let ttl_enabled = self.ttl_config.is_some();
565        insert_templates(
566            &mut self.v9_options_templates,
567            templates,
568            ttl_enabled,
569            &mut self.metrics,
570        );
571    }
572
573    /// Remove an IPFIX template by ID (RFC 7011 Section 8.1 template withdrawal).
574    /// Also purges any pending flows cached under this template ID to prevent
575    /// stale data from being replayed against a replacement template.
576    ///
577    /// Per RFC 7011 §8.1, template_id == DATA_TEMPLATE_IPFIX_ID (2) with
578    /// field_count == 0 signals "withdraw ALL data templates". This method
579    /// handles both individual and bulk withdrawal.
580    fn withdraw_ipfix_template(&mut self, template_id: u16) {
581        if template_id == DATA_TEMPLATE_IPFIX_ID {
582            // "Withdraw all data templates" — clear entire data template
583            // cache and drain pending flows only for those template IDs.
584            // Pending flows for options template IDs are left untouched
585            // since those templates remain valid.
586            let ids: Vec<u16> = self.templates.iter().map(|(&id, _)| id).collect();
587            for id in &ids {
588                self.templates.pop(id);
589            }
590            if let Some(ref mut cache) = self.pending_flows {
591                for &id in &ids {
592                    let drained = cache.drain(id, &mut self.metrics);
593                    let n = drained.len() as u64;
594                    if n > 0 {
595                        self.metrics.record_pending_dropped_n(n);
596                    }
597                }
598            }
599        } else {
600            self.templates.pop(&template_id);
601            if let Some(ref mut cache) = self.pending_flows {
602                let drained = cache.drain(template_id, &mut self.metrics);
603                let n = drained.len() as u64;
604                if n > 0 {
605                    self.metrics.record_pending_dropped_n(n);
606                }
607            }
608        }
609    }
610
611    /// Remove an IPFIX options template by ID (template withdrawal).
612    /// Also purges any pending flows cached under this template ID.
613    ///
614    /// Per RFC 7011 §8.1, template_id == OPTIONS_TEMPLATE_IPFIX_ID (3) with
615    /// field_count == 0 signals "withdraw ALL options templates".
616    fn withdraw_ipfix_options_template(&mut self, template_id: u16) {
617        if template_id == OPTIONS_TEMPLATE_IPFIX_ID {
618            // "Withdraw all options templates" — clear entire options template
619            // cache and drain pending flows only for those template IDs.
620            // Pending flows for data template IDs are left untouched.
621            let ids: Vec<u16> = self
622                .ipfix_options_templates
623                .iter()
624                .map(|(&id, _)| id)
625                .collect();
626            for id in &ids {
627                self.ipfix_options_templates.pop(id);
628            }
629            if let Some(ref mut cache) = self.pending_flows {
630                for &id in &ids {
631                    let drained = cache.drain(id, &mut self.metrics);
632                    let n = drained.len() as u64;
633                    if n > 0 {
634                        self.metrics.record_pending_dropped_n(n);
635                    }
636                }
637            }
638        } else {
639            self.ipfix_options_templates.pop(&template_id);
640            if let Some(ref mut cache) = self.pending_flows {
641                let drained = cache.drain(template_id, &mut self.metrics);
642                let n = drained.len() as u64;
643                if n > 0 {
644                    self.metrics.record_pending_dropped_n(n);
645                }
646            }
647        }
648    }
649}
650
651impl FlowSetBody {
652    #[allow(clippy::too_many_arguments)]
653    fn parse_templates<'a, T, F>(
654        i: &'a [u8],
655        parser: &mut IPFixParser,
656        parse_fn: F,
657        single_variant: fn(T) -> FlowSetBody,
658        multi_variant: fn(Vec<T>) -> FlowSetBody,
659        validate: fn(&T, &IPFixParser) -> bool,
660        add_templates: fn(&mut IPFixParser, &[T]),
661        withdraw_template: Option<fn(&mut IPFixParser, u16)>,
662    ) -> IResult<&'a [u8], FlowSetBody>
663    where
664        T: Clone + HasTemplateId,
665        F: Fn(&'a [u8]) -> IResult<&'a [u8], T>,
666    {
667        let (i, templates) = many0(complete(parse_fn))(i)?;
668
669        // Handle template withdrawals (RFC 7011 Section 8.1):
670        // Templates with field_count=0 signal withdrawal from the cache.
671        // Skip withdrawal for IDs that also have a new definition in the
672        // same flowset — the new definition will simply replace the old one
673        // without needlessly draining pending flows.
674        let mut had_withdrawals = false;
675        if let Some(withdraw_fn) = withdraw_template {
676            for t in &templates {
677                if t.field_count() == 0 {
678                    let id = t.template_id();
679                    // "Withdraw all" IDs (2 for data, 3 for options) always
680                    // take effect regardless of other templates in the batch.
681                    let has_redefinition = id != DATA_TEMPLATE_IPFIX_ID
682                        && id != OPTIONS_TEMPLATE_IPFIX_ID
683                        && templates
684                            .iter()
685                            .any(|other| other.template_id() == id && other.field_count() > 0);
686                    if !has_redefinition {
687                        withdraw_fn(parser, id);
688                    }
689                    had_withdrawals = true;
690                }
691            }
692        }
693
694        // Filter to only valid templates (withdrawals will be filtered out
695        // since they have empty fields, failing the is_valid check)
696        let valid_templates: Vec<_> = templates
697            .into_iter()
698            .filter(|t| validate(t, parser))
699            .collect();
700        if valid_templates.is_empty() {
701            // If we processed withdrawals, return Empty instead of error
702            if had_withdrawals {
703                return Ok((i, FlowSetBody::Empty));
704            }
705            return Err(nom::Err::Error(nom::error::Error::new(
706                i,
707                nom::error::ErrorKind::Verify,
708            )));
709        }
710        // Pass slice to add_templates to clone only what's needed
711        add_templates(parser, &valid_templates);
712        match valid_templates.len() {
713            1 => {
714                if let Some(template) = valid_templates.into_iter().next() {
715                    Ok((i, single_variant(template)))
716                } else {
717                    Err(nom::Err::Error(nom::error::Error::new(
718                        i,
719                        nom::error::ErrorKind::Verify,
720                    )))
721                }
722            }
723            _ => Ok((i, multi_variant(valid_templates))),
724        }
725    }
726
727    pub(super) fn parse<'a>(
728        i: &'a [u8],
729        parser: &mut IPFixParser,
730        id: u16,
731    ) -> IResult<&'a [u8], FlowSetBody> {
732        match id {
733            DATA_TEMPLATE_IPFIX_ID => Self::parse_templates(
734                i,
735                parser,
736                Template::parse,
737                FlowSetBody::Template,
738                FlowSetBody::Templates,
739                |t: &Template, p: &IPFixParser| t.is_valid(p),
740                |parser, templates| parser.add_ipfix_templates(templates),
741                Some(|parser: &mut IPFixParser, id| parser.withdraw_ipfix_template(id)),
742            ),
743            DATA_TEMPLATE_V9_ID => Self::parse_templates(
744                i,
745                parser,
746                V9Template::parse,
747                FlowSetBody::V9Template,
748                FlowSetBody::V9Templates,
749                |t: &V9Template, p: &IPFixParser| {
750                    // Validate V9 templates using IPFIX parser limits.
751                    // V9 does not support variable-length fields, so reject
752                    // zero-length and the variable-length sentinel 65535.
753                    usize::from(t.field_count) <= p.max_field_count
754                        && !t.fields.is_empty()
755                        && t.fields
756                            .iter()
757                            .all(|f| f.field_length > 0 && f.field_length != 65535)
758                        && usize::from(t.get_total_size()) <= p.max_template_total_size
759                        && !t.has_duplicate_fields()
760                },
761                |parser, templates| parser.add_v9_templates(templates),
762                None, // V9 doesn't support template withdrawal
763            ),
764            OPTIONS_TEMPLATE_V9_ID => Self::parse_templates(
765                i,
766                parser,
767                V9OptionsTemplate::parse,
768                FlowSetBody::V9OptionsTemplate,
769                FlowSetBody::V9OptionsTemplates,
770                |t: &V9OptionsTemplate, p: &IPFixParser| {
771                    let scope_count = usize::from(t.options_scope_length / 4);
772                    let option_count = usize::from(t.options_length / 4);
773                    t.options_scope_length.is_multiple_of(4)
774                        && t.options_length.is_multiple_of(4)
775                        && scope_count > 0
776                        && scope_count <= p.max_field_count
777                        && option_count <= p.max_field_count
778                        && scope_count.saturating_add(option_count) <= p.max_field_count
779                        && usize::from(t.get_total_size()) <= p.max_template_total_size
780                        && !t.has_duplicate_scope_fields()
781                        && !t.has_duplicate_option_fields()
782                        // V9 does not support variable-length fields; reject any
783                        // zero-length or variable-length sentinel (65535) fields,
784                        // consistent with the V9 parser's own validation.
785                        && t.scope_fields.iter().all(|f| f.field_length > 0 && f.field_length != 65535)
786                        && t.option_fields.iter().all(|f| f.field_length > 0 && f.field_length != 65535)
787                },
788                |parser, templates| parser.add_v9_options_templates(templates),
789                None, // V9 doesn't support template withdrawal
790            ),
791            OPTIONS_TEMPLATE_IPFIX_ID => Self::parse_templates(
792                i,
793                parser,
794                OptionsTemplate::parse,
795                FlowSetBody::OptionsTemplate,
796                FlowSetBody::OptionsTemplates,
797                |t: &OptionsTemplate, p: &IPFixParser| t.is_valid(p),
798                |parser, templates| parser.add_ipfix_options_templates(templates),
799                Some(|parser: &mut IPFixParser, id| parser.withdraw_ipfix_options_template(id)),
800            ),
801            // Parse Data
802            _ => {
803                // NOTE: Template ID collision across cache types is possible and
804                // expected when both IPFIX and V9-style templates coexist in
805                // the same parser (the IPFIX parser accepts both flavors).
806                // The lookup order below defines priority: IPFIX templates >
807                // IPFIX options > V9 templates > V9 options. If the same
808                // template ID appears in multiple caches, only the first
809                // match is used and others are silently shadowed.
810
811                // Try IPFix templates
812                if let Some(template) = crate::variable_versions::get_valid_template(
813                    &mut parser.templates,
814                    &id,
815                    &parser.ttl_config,
816                    &mut parser.metrics,
817                ) {
818                    parser.metrics.record_hit();
819                    if template.get_fields().is_empty() {
820                        return Ok((i, FlowSetBody::Empty));
821                    }
822                    let (i, data) = Data::parse_with_registry(
823                        i,
824                        &template,
825                        &parser.enterprise_registry,
826                        parser.max_records_per_flowset,
827                    )?;
828                    return Ok((i, FlowSetBody::Data(data)));
829                }
830
831                // Try IPFix options templates
832                if let Some(template) = crate::variable_versions::get_valid_template(
833                    &mut parser.ipfix_options_templates,
834                    &id,
835                    &parser.ttl_config,
836                    &mut parser.metrics,
837                ) {
838                    parser.metrics.record_hit();
839                    if template.get_fields().is_empty() {
840                        return Ok((i, FlowSetBody::Empty));
841                    }
842                    let (i, data) = OptionsData::parse_with_registry(
843                        i,
844                        &template,
845                        &parser.enterprise_registry,
846                        parser.max_records_per_flowset,
847                    )?;
848                    return Ok((i, FlowSetBody::OptionsData(data)));
849                }
850
851                // Try V9 templates
852                if let Some(template) = crate::variable_versions::get_valid_template(
853                    &mut parser.v9_templates,
854                    &id,
855                    &parser.ttl_config,
856                    &mut parser.metrics,
857                ) {
858                    parser.metrics.record_hit();
859                    let (i, data) =
860                        V9Data::parse_with_limit(i, &template, parser.max_records_per_flowset)?;
861                    return Ok((i, FlowSetBody::V9Data(data)));
862                }
863
864                // Try V9 options templates
865                if let Some(template) = crate::variable_versions::get_valid_template(
866                    &mut parser.v9_options_templates,
867                    &id,
868                    &parser.ttl_config,
869                    &mut parser.metrics,
870                ) {
871                    parser.metrics.record_hit();
872                    let (i, data) = V9OptionsData::parse_with_limit(
873                        i,
874                        &template,
875                        parser.max_records_per_flowset,
876                    )?;
877                    return Ok((i, FlowSetBody::V9OptionsData(data)));
878                }
879
880                // Template not found or expired — one miss per flowset,
881                // symmetric with one hit per flowset above.
882                parser.metrics.record_miss();
883                if id > 255 {
884                    // Store full raw data only when the pending cache is
885                    // enabled, the entry fits the size limit, AND the
886                    // per-template cap has room.  Otherwise truncate to
887                    // max_error_sample_size to avoid large allocations
888                    // that would be immediately rejected.
889                    let (raw_data, truncated) = if parser
890                        .pending_flows
891                        .as_ref()
892                        .is_some_and(|c| c.would_accept(id, i.len()))
893                    {
894                        (i.to_vec(), false)
895                    } else {
896                        let limit = i.len().min(parser.max_error_sample_size);
897                        (i[..limit].to_vec(), limit < i.len())
898                    };
899                    let info = NoTemplateInfo {
900                        template_id: id,
901                        raw_data,
902                        truncated,
903                    };
904                    Ok((&[] as &[u8], FlowSetBody::NoTemplate(info)))
905                } else {
906                    // Set IDs 4-255 are reserved per RFC 7011; skip gracefully
907                    Ok((&[] as &[u8], FlowSetBody::Empty))
908                }
909            }
910        }
911    }
912}
913
914impl Template {
915    /// Validate the template against parser configuration
916    pub fn is_valid(&self, parser: &IPFixParser) -> bool {
917        <Self as CommonTemplate>::is_valid(self, parser)
918    }
919}
920
921impl OptionsTemplate {
922    /// Validate the options template against parser configuration
923    pub fn is_valid(&self, parser: &IPFixParser) -> bool {
924        <Self as CommonTemplate>::is_valid(self, parser)
925    }
926}
927
928/// Collect template field lengths only when at least one field is variable-length.
929fn collect_varlen_field_lengths(fields: &[TemplateField]) -> Vec<u16> {
930    if fields.iter().any(|f| f.field_length == 65535) {
931        fields.iter().map(|f| f.field_length).collect()
932    } else {
933        Vec::new()
934    }
935}
936
937impl Data {
938    /// Parse Data using the enterprise registry to resolve custom enterprise fields
939    pub(super) fn parse_with_registry<'a>(
940        i: &'a [u8],
941        template: &Template,
942        registry: &EnterpriseFieldRegistry,
943        max_records: usize,
944    ) -> IResult<&'a [u8], Self> {
945        let template_field_lengths = collect_varlen_field_lengths(template.get_fields());
946        let (i, fields) = FieldParser::parse_with_registry(i, template, registry, max_records)?;
947        Ok((
948            i,
949            Self {
950                fields,
951                padding: vec![],
952                template_field_lengths,
953            },
954        ))
955    }
956}
957
958impl OptionsData {
959    /// Parse OptionsData using the enterprise registry to resolve custom enterprise fields
960    pub(super) fn parse_with_registry<'a>(
961        i: &'a [u8],
962        template: &OptionsTemplate,
963        registry: &EnterpriseFieldRegistry,
964        max_records: usize,
965    ) -> IResult<&'a [u8], Self> {
966        let template_field_lengths = collect_varlen_field_lengths(template.get_fields());
967        let (i, fields) = FieldParser::parse_with_registry(i, template, registry, max_records)?;
968        Ok((
969            i,
970            Self {
971                fields,
972                padding: vec![],
973                template_field_lengths,
974            },
975        ))
976    }
977}
978
979impl<'a> FieldParser {
980    /// Core parsing loop shared by `parse` and `parse_with_registry`.
981    ///
982    /// The `parse_field` closure controls how each template field is decoded —
983    /// either using built-in field types or the enterprise registry.
984    fn parse_inner<T: CommonTemplate, F>(
985        mut i: &'a [u8],
986        template: &T,
987        max_records: usize,
988        parse_field: F,
989    ) -> IResult<&'a [u8], Vec<Vec<IPFixFieldPair>>>
990    where
991        F: Fn(&TemplateField, &'a [u8]) -> IResult<&'a [u8], FieldValue>,
992    {
993        let template_fields = template.get_fields();
994        if template_fields.is_empty() {
995            return Ok((i, Vec::new()));
996        }
997
998        // Estimate capacity based on input size and template field count.
999        // Variable-length fields (field_length == 65535) are RFC 7011 markers,
1000        // not actual sizes — count them as 1 byte minimum for estimation.
1001        let template_size: usize = template_fields
1002            .iter()
1003            .map(|f| {
1004                if f.field_length == 65535 {
1005                    1
1006                } else {
1007                    usize::from(f.field_length)
1008                }
1009            })
1010            .sum();
1011        // template_fields is non-empty (checked above) and each contributes >= 1 byte,
1012        // so template_size is always > 0 here.
1013        let estimated_records = (i.len() / template_size).min(max_records);
1014        let mut res = Vec::with_capacity(estimated_records);
1015
1016        // Try to parse as much as we can, but if it fails, just return what we have so far.
1017        while !i.is_empty() && res.len() < max_records {
1018            let before = i;
1019            let mut vec = Vec::with_capacity(template_fields.len());
1020            for field in template_fields.iter() {
1021                match parse_field(field, i) {
1022                    Ok((remaining, field_value)) => {
1023                        vec.push((field.field_type, field_value));
1024                        i = remaining;
1025                    }
1026                    Err(_) => {
1027                        i = before;
1028                        return Ok((i, res));
1029                    }
1030                }
1031            }
1032            // Guard against infinite loops: if no bytes were consumed after
1033            // parsing a full record, stop to prevent CPU-bound DoS.
1034            if std::ptr::eq(i, before) {
1035                break;
1036            }
1037            res.push(vec);
1038        }
1039        Ok((i, res))
1040    }
1041
1042    /// Takes a byte stream and a cached template.
1043    /// Fields get matched to static types.
1044    /// Returns BTree of IPFix Types & Fields or IResult Error.
1045    pub(super) fn parse<T: CommonTemplate>(
1046        i: &'a [u8],
1047        template: &T,
1048        max_records: usize,
1049    ) -> IResult<&'a [u8], Vec<Vec<IPFixFieldPair>>> {
1050        Self::parse_inner(i, template, max_records, |field, input| {
1051            field.parse_as_field_value(input)
1052        })
1053    }
1054
1055    /// Same as parse but uses the enterprise registry to resolve custom enterprise fields
1056    fn parse_with_registry<T: CommonTemplate>(
1057        i: &'a [u8],
1058        template: &T,
1059        registry: &EnterpriseFieldRegistry,
1060        max_records: usize,
1061    ) -> IResult<&'a [u8], Vec<Vec<IPFixFieldPair>>> {
1062        Self::parse_inner(i, template, max_records, |field, input| {
1063            field.parse_as_field_value_with_registry(input, registry)
1064        })
1065    }
1066}
1067
1068impl TemplateField {
1069    // If 65535, read 1 byte.
1070    // If that byte is < 255 that is the length.
1071    // If that byte is == 255 then read 2 bytes.  That is the length.
1072    // Otherwise, return the field length.
1073    fn parse_field_length<'a>(&self, i: &'a [u8]) -> IResult<&'a [u8], u16> {
1074        match self.field_length {
1075            65535 => {
1076                let (i, length) = parse_u8(i)?;
1077                if length == 255 {
1078                    let (i, full_length) = parse_u16_be(i)?;
1079                    // RFC 7011 Section 7: length values of 0 are not permitted
1080                    if full_length == 0 {
1081                        return Err(nom::Err::Error(nom::error::Error::new(
1082                            i,
1083                            nom::error::ErrorKind::Verify,
1084                        )));
1085                    }
1086                    // Validate length doesn't exceed remaining buffer
1087                    if (full_length as usize) > i.len() {
1088                        return Err(nom::Err::Error(nom::error::Error::new(
1089                            i,
1090                            nom::error::ErrorKind::Eof,
1091                        )));
1092                    }
1093                    Ok((i, full_length))
1094                } else {
1095                    // RFC 7011 Section 7: length values of 0 are not permitted
1096                    if length == 0 {
1097                        return Err(nom::Err::Error(nom::error::Error::new(
1098                            i,
1099                            nom::error::ErrorKind::Verify,
1100                        )));
1101                    }
1102                    // Validate length doesn't exceed remaining buffer
1103                    if (length as usize) > i.len() {
1104                        return Err(nom::Err::Error(nom::error::Error::new(
1105                            i,
1106                            nom::error::ErrorKind::Eof,
1107                        )));
1108                    }
1109                    Ok((i, u16::from(length)))
1110                }
1111            }
1112            length => Ok((i, length)),
1113        }
1114    }
1115
1116    fn parse_as_field_value<'a>(&self, i: &'a [u8]) -> IResult<&'a [u8], FieldValue> {
1117        let (i, length) = self.parse_field_length(i)?;
1118        FieldValue::from_field_type(i, self.field_type.into(), length)
1119    }
1120
1121    fn parse_as_field_value_with_registry<'a>(
1122        &self,
1123        i: &'a [u8],
1124        registry: &EnterpriseFieldRegistry,
1125    ) -> IResult<&'a [u8], FieldValue> {
1126        let (i, length) = self.parse_field_length(i)?;
1127        let field_type = self.field_type.to_field_data_type(registry);
1128        FieldValue::from_field_type(i, field_type, length)
1129    }
1130}