netflow_parser/lib.rs
1#![doc = include_str!("../README.md")]
2
3#[cfg(feature = "netflow_common")]
4pub mod netflow_common;
5pub mod protocol;
6pub mod scoped_parser;
7pub mod static_versions;
8pub mod template_events;
9mod tests;
10pub mod variable_versions;
11
12#[cfg(feature = "netflow_common")]
13use crate::netflow_common::{NetflowCommon, NetflowCommonError, NetflowCommonFlowSet};
14
15use static_versions::{
16 v5::{V5, V5Parser},
17 v7::{V7, V7Parser},
18};
19use variable_versions::Config;
20use variable_versions::enterprise_registry::EnterpriseFieldDef;
21use variable_versions::ipfix::{IPFix, IPFixParser};
22use variable_versions::v9::{V9, V9Parser};
23
24use nom_derive::{Nom, Parse};
25use serde::Serialize;
26
27use std::collections::HashSet;
28
29// Re-export scoped parser types for convenience
30pub use scoped_parser::{
31 AutoScopedParser, IpfixSourceKey, RouterScopedParser, ScopingInfo, V9SourceKey,
32 extract_scoping_info,
33};
34
35// Re-export template event types for convenience
36pub use template_events::{TemplateEvent, TemplateHook, TemplateHooks, TemplateProtocol};
37
38/// Enum of supported Netflow Versions
39#[derive(Debug, Clone, Serialize)]
40pub enum NetflowPacket {
41 /// Version 5
42 V5(V5),
43 /// Version 7
44 V7(V7),
45 /// Version 9
46 V9(V9),
47 /// IPFix
48 IPFix(IPFix),
49}
50
51impl NetflowPacket {
52 pub fn is_v5(&self) -> bool {
53 matches!(self, Self::V5(_v))
54 }
55 pub fn is_v7(&self) -> bool {
56 matches!(self, Self::V7(_v))
57 }
58 pub fn is_v9(&self) -> bool {
59 matches!(self, Self::V9(_v))
60 }
61 pub fn is_ipfix(&self) -> bool {
62 matches!(self, Self::IPFix(_v))
63 }
64 #[cfg(feature = "netflow_common")]
65 pub fn as_netflow_common(&self) -> Result<NetflowCommon, NetflowCommonError> {
66 self.try_into()
67 }
68}
69
70/// Result of parsing NetFlow packets from a byte buffer.
71///
72/// This struct contains both successfully parsed packets and an optional error
73/// that stopped parsing. This ensures no data loss when parsing fails partway
74/// through a buffer.
75///
76/// # Examples
77///
78/// ```rust
79/// use netflow_parser::NetflowParser;
80///
81/// let mut parser = NetflowParser::default();
82/// let buffer = vec![/* netflow data */];
83///
84/// let result = parser.parse_bytes(&buffer);
85///
86/// // Process all successfully parsed packets
87/// for packet in result.packets {
88/// println!("Parsed packet");
89/// }
90///
91/// // Check if parsing stopped due to error
92/// if let Some(error) = result.error {
93/// eprintln!("Parsing stopped: {}", error);
94/// }
95/// ```
96#[derive(Debug, Clone, Serialize)]
97pub struct ParseResult {
98 /// Successfully parsed NetFlow packets.
99 /// This vec contains all packets that were successfully parsed before
100 /// any error occurred.
101 pub packets: Vec<NetflowPacket>,
102
103 /// Optional error that stopped parsing.
104 /// - `None` means all data was successfully parsed
105 /// - `Some(error)` means parsing stopped due to an error, but `packets`
106 /// contains all successfully parsed packets up to that point
107 pub error: Option<NetflowError>,
108}
109
110impl ParseResult {
111 /// Returns true if parsing completed without errors.
112 ///
113 /// # Examples
114 ///
115 /// ```rust
116 /// use netflow_parser::NetflowParser;
117 ///
118 /// let mut parser = NetflowParser::default();
119 /// let result = parser.parse_bytes(&[]);
120 /// assert!(result.is_ok());
121 /// ```
122 pub fn is_ok(&self) -> bool {
123 self.error.is_none()
124 }
125
126 /// Returns true if parsing stopped due to an error.
127 ///
128 /// Note: Even when this returns `true`, `packets` may contain
129 /// successfully parsed packets.
130 pub fn is_err(&self) -> bool {
131 self.error.is_some()
132 }
133}
134
135#[derive(Nom)]
136/// Generic Netflow Header for shared versions
137struct GenericNetflowHeader {
138 version: u16,
139}
140
141/// Main parser for Netflow packets supporting V5, V7, V9, and IPFIX.
142///
143/// Use [`NetflowParser::builder()`] for ergonomic configuration with the builder pattern,
144/// or [`NetflowParser::default()`] for quick setup with defaults.
145///
146/// # ⚠️ Multi-Source Deployments
147///
148/// **IMPORTANT**: If you're parsing NetFlow from multiple routers or sources,
149/// use [`AutoScopedParser`] instead of `NetflowParser`
150/// to prevent template cache collisions.
151///
152/// Template IDs are **NOT unique across sources**. Different routers can (and often do)
153/// use the same template ID with completely different schemas. When multiple sources
154/// share a single `NetflowParser`, their templates collide in the cache, causing:
155///
156/// - Template thrashing (constant eviction and re-learning)
157/// - Parsing failures (data parsed with wrong template)
158/// - Performance degradation (high cache miss rate)
159///
160/// ## Single Source (✅ Use NetflowParser)
161///
162/// ```rust
163/// use netflow_parser::NetflowParser;
164///
165/// let mut parser = NetflowParser::default();
166/// let data = [0u8; 72]; // Example NetFlow data
167/// // Single router/source - no collisions possible
168/// let packets = parser.parse_bytes(&data);
169/// ```
170///
171/// ## Multiple Sources (✅ Use AutoScopedParser)
172///
173/// ```rust
174/// use netflow_parser::AutoScopedParser;
175/// use std::net::SocketAddr;
176///
177/// let mut parser = AutoScopedParser::new();
178/// let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
179/// let data = [0u8; 72]; // Example NetFlow data
180/// // Each source gets isolated template cache (RFC-compliant)
181/// let packets = parser.parse_from_source(source, &data);
182/// ```
183///
184/// # Examples
185///
186/// ```rust
187/// use netflow_parser::NetflowParser;
188/// use netflow_parser::variable_versions::ttl::TtlConfig;
189/// use std::time::Duration;
190///
191/// // Using builder pattern (recommended)
192/// let parser = NetflowParser::builder()
193/// .with_cache_size(2000)
194/// .with_ttl(TtlConfig::new(Duration::from_secs(7200)))
195/// .build()
196/// .expect("Failed to build parser");
197///
198/// // Using default
199/// let parser = NetflowParser::default();
200/// ```
201#[derive(Debug)]
202pub struct NetflowParser {
203 pub v9_parser: V9Parser,
204 pub ipfix_parser: IPFixParser,
205 pub allowed_versions: HashSet<u16>,
206 /// Maximum number of bytes to include in error samples to prevent memory exhaustion.
207 /// Defaults to 256 bytes.
208 pub max_error_sample_size: usize,
209 /// Template event hooks for monitoring template lifecycle events.
210 template_hooks: TemplateHooks,
211}
212
213/// Statistics about template cache utilization.
214#[derive(Debug, Clone)]
215pub struct CacheStats {
216 /// Current number of cached templates
217 pub current_size: usize,
218 /// Maximum cache size before LRU eviction
219 pub max_size: usize,
220 /// TTL configuration (if enabled)
221 pub ttl_config: Option<variable_versions::ttl::TtlConfig>,
222 /// Performance metrics snapshot
223 pub metrics: variable_versions::metrics::CacheMetricsSnapshot,
224}
225
226/// Combined cache statistics for both V9 and IPFIX template caches.
227///
228/// This struct provides named fields instead of positional tuples,
229/// making it clear which stats belong to V9 vs IPFIX.
230#[derive(Debug, Clone)]
231pub struct ParserCacheStats {
232 /// V9 template cache statistics
233 pub v9: CacheStats,
234 /// IPFIX template cache statistics
235 pub ipfix: CacheStats,
236}
237
238/// Builder for configuring and constructing a [`NetflowParser`].
239///
240/// # Examples
241///
242/// ```rust
243/// use netflow_parser::NetflowParser;
244/// use netflow_parser::variable_versions::ttl::TtlConfig;
245/// use std::collections::HashSet;
246/// use std::time::Duration;
247///
248/// let parser = NetflowParser::builder()
249/// .with_cache_size(2000)
250/// .with_ttl(TtlConfig::new(Duration::from_secs(7200)))
251/// .with_allowed_versions([5, 9, 10].into())
252/// .with_max_error_sample_size(512)
253/// .build()
254/// .expect("Failed to build parser");
255/// ```
256#[derive(Clone)]
257pub struct NetflowParserBuilder {
258 v9_config: Config,
259 ipfix_config: Config,
260 allowed_versions: HashSet<u16>,
261 max_error_sample_size: usize,
262 template_hooks: TemplateHooks,
263}
264
265// Custom Debug implementation to avoid printing closures
266impl std::fmt::Debug for NetflowParserBuilder {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 f.debug_struct("NetflowParserBuilder")
269 .field("v9_config", &self.v9_config)
270 .field("ipfix_config", &self.ipfix_config)
271 .field("allowed_versions", &self.allowed_versions)
272 .field("max_error_sample_size", &self.max_error_sample_size)
273 .field(
274 "template_hooks",
275 &format!("{} hooks", self.template_hooks.len()),
276 )
277 .finish()
278 }
279}
280
281impl Default for NetflowParserBuilder {
282 fn default() -> Self {
283 Self {
284 v9_config: Config::new(1000, None),
285 ipfix_config: Config::new(1000, None),
286 allowed_versions: [5, 7, 9, 10].iter().cloned().collect(),
287 max_error_sample_size: 256,
288 template_hooks: TemplateHooks::new(),
289 }
290 }
291}
292
293impl NetflowParserBuilder {
294 /// Sets the template cache size for both V9 and IPFIX parsers.
295 ///
296 /// # Arguments
297 ///
298 /// * `size` - Maximum number of templates to cache (must be > 0)
299 ///
300 /// # Examples
301 ///
302 /// ```rust
303 /// use netflow_parser::NetflowParser;
304 ///
305 /// let parser = NetflowParser::builder()
306 /// .with_cache_size(2000)
307 /// .build()
308 /// .expect("Failed to build parser");
309 /// ```
310 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
311 pub fn with_cache_size(mut self, size: usize) -> Self {
312 self.v9_config.max_template_cache_size = size;
313 self.ipfix_config.max_template_cache_size = size;
314 self
315 }
316
317 /// Sets the V9 parser template cache size independently.
318 ///
319 /// # Arguments
320 ///
321 /// * `size` - Maximum number of templates to cache (must be > 0)
322 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
323 pub fn with_v9_cache_size(mut self, size: usize) -> Self {
324 self.v9_config.max_template_cache_size = size;
325 self
326 }
327
328 /// Sets the IPFIX parser template cache size independently.
329 ///
330 /// * `size` - Maximum number of templates to cache (must be > 0)
331 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
332 pub fn with_ipfix_cache_size(mut self, size: usize) -> Self {
333 self.ipfix_config.max_template_cache_size = size;
334 self
335 }
336
337 /// Sets the maximum field count for both V9 and IPFIX parsers.
338 ///
339 /// This limits the number of fields allowed in a single template to prevent DoS attacks.
340 ///
341 /// # Arguments
342 ///
343 /// * `count` - Maximum number of fields per template (default: 10,000)
344 ///
345 /// # Examples
346 ///
347 /// ```rust
348 /// use netflow_parser::NetflowParser;
349 ///
350 /// let parser = NetflowParser::builder()
351 /// .with_max_field_count(5000) // More restrictive limit
352 /// .build()
353 /// .expect("Failed to build parser");
354 /// ```
355 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
356 pub fn with_max_field_count(mut self, count: usize) -> Self {
357 self.v9_config.max_field_count = count;
358 self.ipfix_config.max_field_count = count;
359 self
360 }
361
362 /// Sets the V9 parser maximum field count independently.
363 ///
364 /// # Arguments
365 ///
366 /// * `count` - Maximum number of fields per template
367 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
368 pub fn with_v9_max_field_count(mut self, count: usize) -> Self {
369 self.v9_config.max_field_count = count;
370 self
371 }
372
373 /// Sets the IPFIX parser maximum field count independently.
374 ///
375 /// # Arguments
376 ///
377 /// * `count` - Maximum number of fields per template
378 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
379 pub fn with_ipfix_max_field_count(mut self, count: usize) -> Self {
380 self.ipfix_config.max_field_count = count;
381 self
382 }
383
384 /// Sets the TTL configuration for both V9 and IPFIX parsers.
385 ///
386 /// # Arguments
387 ///
388 /// * `ttl` - TTL configuration (time-based)
389 ///
390 /// # Examples
391 ///
392 /// ```rust
393 /// use netflow_parser::NetflowParser;
394 /// use netflow_parser::variable_versions::ttl::TtlConfig;
395 /// use std::time::Duration;
396 ///
397 /// let parser = NetflowParser::builder()
398 /// .with_ttl(TtlConfig::new(Duration::from_secs(7200)))
399 /// .build()
400 /// .expect("Failed to build parser");
401 /// ```
402 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
403 pub fn with_ttl(mut self, ttl: variable_versions::ttl::TtlConfig) -> Self {
404 self.v9_config.ttl_config = Some(ttl.clone());
405 self.ipfix_config.ttl_config = Some(ttl);
406 self
407 }
408
409 /// Sets the TTL configuration for V9 parser independently.
410 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
411 pub fn with_v9_ttl(mut self, ttl: variable_versions::ttl::TtlConfig) -> Self {
412 self.v9_config.ttl_config = Some(ttl);
413 self
414 }
415
416 /// Sets the TTL configuration for IPFIX parser independently.
417 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
418 pub fn with_ipfix_ttl(mut self, ttl: variable_versions::ttl::TtlConfig) -> Self {
419 self.ipfix_config.ttl_config = Some(ttl);
420 self
421 }
422
423 /// Sets which Netflow versions are allowed to be parsed.
424 ///
425 /// # Arguments
426 ///
427 /// * `versions` - Set of allowed version numbers (5, 7, 9, 10)
428 ///
429 /// # Examples
430 ///
431 /// ```rust
432 /// use netflow_parser::NetflowParser;
433 ///
434 /// // Only parse V9 and IPFIX
435 /// let parser = NetflowParser::builder()
436 /// .with_allowed_versions([9, 10].into())
437 /// .build()
438 /// .expect("Failed to build parser");
439 /// ```
440 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
441 pub fn with_allowed_versions(mut self, versions: HashSet<u16>) -> Self {
442 self.allowed_versions = versions;
443 self
444 }
445
446 /// Sets the maximum error sample size for error reporting.
447 ///
448 /// # Arguments
449 ///
450 /// * `size` - Maximum bytes to include in error messages
451 ///
452 /// # Examples
453 ///
454 /// ```rust
455 /// use netflow_parser::NetflowParser;
456 ///
457 /// let parser = NetflowParser::builder()
458 /// .with_max_error_sample_size(512)
459 /// .build()
460 /// .expect("Failed to build parser");
461 /// ```
462 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
463 pub fn with_max_error_sample_size(mut self, size: usize) -> Self {
464 self.max_error_sample_size = size;
465 self.v9_config.max_error_sample_size = size;
466 self.ipfix_config.max_error_sample_size = size;
467 self
468 }
469
470 /// Registers a custom enterprise field definition for both V9 and IPFIX parsers.
471 ///
472 /// This allows library users to define their own enterprise-specific fields without
473 /// modifying the library source code. Registered fields will be parsed according to
474 /// their specified data type instead of falling back to raw bytes.
475 ///
476 /// # Arguments
477 ///
478 /// * `def` - Enterprise field definition containing enterprise number, field number, name, and data type
479 ///
480 /// # Examples
481 ///
482 /// ```rust
483 /// use netflow_parser::NetflowParser;
484 /// use netflow_parser::variable_versions::enterprise_registry::EnterpriseFieldDef;
485 /// use netflow_parser::variable_versions::data_number::FieldDataType;
486 ///
487 /// let parser = NetflowParser::builder()
488 /// .register_enterprise_field(EnterpriseFieldDef::new(
489 /// 12345, // Enterprise number
490 /// 1, // Field number
491 /// "customMetric",
492 /// FieldDataType::UnsignedDataNumber,
493 /// ))
494 /// .register_enterprise_field(EnterpriseFieldDef::new(
495 /// 12345,
496 /// 2,
497 /// "customApplicationName",
498 /// FieldDataType::String,
499 /// ))
500 /// .build()
501 /// .expect("Failed to build parser");
502 /// ```
503 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
504 pub fn register_enterprise_field(mut self, def: EnterpriseFieldDef) -> Self {
505 self.v9_config.enterprise_registry.register(def.clone());
506 self.ipfix_config.enterprise_registry.register(def);
507 self
508 }
509
510 /// Registers multiple custom enterprise field definitions at once.
511 ///
512 /// # Arguments
513 ///
514 /// * `defs` - Iterator of enterprise field definitions
515 ///
516 /// # Examples
517 ///
518 /// ```rust
519 /// use netflow_parser::NetflowParser;
520 /// use netflow_parser::variable_versions::enterprise_registry::EnterpriseFieldDef;
521 /// use netflow_parser::variable_versions::data_number::FieldDataType;
522 ///
523 /// let fields = vec![
524 /// EnterpriseFieldDef::new(12345, 1, "field1", FieldDataType::UnsignedDataNumber),
525 /// EnterpriseFieldDef::new(12345, 2, "field2", FieldDataType::String),
526 /// EnterpriseFieldDef::new(12345, 3, "field3", FieldDataType::Ip4Addr),
527 /// ];
528 ///
529 /// let parser = NetflowParser::builder()
530 /// .register_enterprise_fields(fields)
531 /// .build()
532 /// .expect("Failed to build parser");
533 /// ```
534 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
535 pub fn register_enterprise_fields(
536 mut self,
537 defs: impl IntoIterator<Item = EnterpriseFieldDef>,
538 ) -> Self {
539 // Collect once to avoid unnecessary cloning
540 let defs: Vec<_> = defs.into_iter().collect();
541
542 // Register clones in V9 registry
543 for def in &defs {
544 self.v9_config.enterprise_registry.register(def.clone());
545 }
546
547 // Move originals into IPFIX registry (no clone needed)
548 for def in defs {
549 self.ipfix_config.enterprise_registry.register(def);
550 }
551
552 self
553 }
554
555 /// Hint for single-source deployments (documentation only).
556 ///
557 /// This method exists for clarity and returns `self` unchanged.
558 /// Use when parsing from a single router or source.
559 ///
560 /// # Examples
561 ///
562 /// ```rust
563 /// use netflow_parser::NetflowParser;
564 ///
565 /// let parser = NetflowParser::builder()
566 /// .single_source() // Documents intent
567 /// .with_cache_size(1000)
568 /// .build()
569 /// .expect("Failed to build parser");
570 /// ```
571 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
572 pub fn single_source(self) -> Self {
573 self
574 }
575
576 /// Creates an AutoScopedParser for multi-source deployments.
577 ///
578 /// **Recommended** for parsing NetFlow from multiple routers. Each source
579 /// gets an isolated template cache, preventing template ID collisions.
580 ///
581 /// # Examples
582 ///
583 /// ```rust
584 /// use netflow_parser::NetflowParser;
585 /// use std::net::SocketAddr;
586 ///
587 /// // Multi-source deployment
588 /// let mut parser = NetflowParser::builder()
589 /// .with_cache_size(2000)
590 /// .multi_source();
591 ///
592 /// let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
593 /// let data = [0u8; 72]; // Example data
594 /// let packets = parser.parse_from_source(source, &data);
595 /// ```
596 ///
597 /// Equivalent to:
598 /// ```rust
599 /// use netflow_parser::{NetflowParser, AutoScopedParser};
600 ///
601 /// let builder = NetflowParser::builder().with_cache_size(2000);
602 /// let _parser = AutoScopedParser::with_builder(builder);
603 /// ```
604 pub fn multi_source(self) -> AutoScopedParser {
605 AutoScopedParser::with_builder(self)
606 }
607
608 /// Builds the configured [`NetflowParser`].
609 ///
610 /// # Errors
611 ///
612 /// Returns an error if:
613 /// - Template cache size is 0
614 /// - Parser initialization fails
615 ///
616 /// # Examples
617 ///
618 /// ```rust
619 /// use netflow_parser::NetflowParser;
620 ///
621 /// let parser = NetflowParser::builder()
622 /// .with_cache_size(2000)
623 /// .build()
624 /// .expect("Failed to build parser");
625 /// ```
626 /// Registers a callback for template lifecycle events.
627 ///
628 /// This allows you to monitor template operations in real-time, including:
629 /// - Template learning (new templates added to cache)
630 /// - Template collisions (template ID reused)
631 /// - Template evictions (LRU policy removed template)
632 /// - Template expirations (TTL-based removal)
633 /// - Missing templates (data packet for unknown template)
634 ///
635 /// # Arguments
636 ///
637 /// * `hook` - A closure that will be called for each template event
638 ///
639 /// # Examples
640 ///
641 /// ```rust
642 /// use netflow_parser::{NetflowParser, TemplateEvent, TemplateProtocol};
643 ///
644 /// let parser = NetflowParser::builder()
645 /// .on_template_event(|event| {
646 /// match event {
647 /// TemplateEvent::Learned { template_id, protocol } => {
648 /// println!("Learned template {}", template_id);
649 /// }
650 /// TemplateEvent::Collision { template_id, protocol } => {
651 /// eprintln!("⚠️ Template collision: {}", template_id);
652 /// }
653 /// _ => {}
654 /// }
655 /// })
656 /// .build()
657 /// .unwrap();
658 /// ```
659 #[must_use = "builder methods consume self and return a new builder; the return value must be used"]
660 pub fn on_template_event<F>(mut self, hook: F) -> Self
661 where
662 F: Fn(&TemplateEvent) + Send + Sync + 'static,
663 {
664 self.template_hooks.register(hook);
665 self
666 }
667
668 /// Builds the `NetflowParser` with the configured settings.
669 ///
670 /// Returns an error if the parser configuration is invalid.
671 pub fn build(self) -> Result<NetflowParser, String> {
672 let v9_parser =
673 V9Parser::try_new(self.v9_config).map_err(|e| format!("V9 parser error: {}", e))?;
674 let ipfix_parser = IPFixParser::try_new(self.ipfix_config)
675 .map_err(|e| format!("IPFIX parser error: {}", e))?;
676
677 Ok(NetflowParser {
678 v9_parser,
679 ipfix_parser,
680 allowed_versions: self.allowed_versions,
681 max_error_sample_size: self.max_error_sample_size,
682 template_hooks: self.template_hooks,
683 })
684 }
685}
686
687#[derive(Debug, Clone)]
688pub enum ParsedNetflow<'a> {
689 Success {
690 packet: NetflowPacket,
691 remaining: &'a [u8],
692 },
693 Error {
694 error: NetflowError,
695 },
696 UnallowedVersion,
697}
698
699/// Comprehensive error type for NetFlow parsing operations.
700///
701/// Provides rich context about parsing failures including offset, error kind,
702/// and relevant data for debugging.
703#[derive(Debug, Clone, Serialize)]
704pub enum NetflowError {
705 /// Incomplete data - more bytes needed to parse a complete packet.
706 ///
707 /// Contains the number of bytes available and a description of what was expected.
708 Incomplete {
709 /// Number of bytes that were available
710 available: usize,
711 /// Description of what was being parsed
712 context: String,
713 },
714
715 /// Unknown or unsupported NetFlow version encountered.
716 ///
717 /// The version number found in the packet header doesn't match any known
718 /// NetFlow version (V5, V7, V9, IPFIX).
719 UnsupportedVersion {
720 /// The version number found in the packet
721 version: u16,
722 /// Offset in bytes where the version was found
723 offset: usize,
724 /// Sample of the packet data for debugging
725 sample: Vec<u8>,
726 },
727
728 /// Version is valid but filtered out by `allowed_versions` configuration.
729 ///
730 /// The parser was configured to only accept certain versions and this
731 /// version was explicitly excluded.
732 FilteredVersion {
733 /// The version number that was filtered
734 version: u16,
735 },
736
737 /// Template definition is required but not found in cache.
738 ///
739 /// For V9 and IPFIX, data packets reference template IDs that must be
740 /// learned from template packets. This error occurs when data arrives
741 /// before (or without) its corresponding template.
742 MissingTemplate {
743 /// The template ID that was not found
744 template_id: u16,
745 /// The protocol (V9 or IPFIX)
746 protocol: TemplateProtocol,
747 /// List of currently cached template IDs for this protocol
748 available_templates: Vec<u16>,
749 /// Raw packet data that couldn't be parsed
750 raw_data: Vec<u8>,
751 },
752
753 /// Parsing error with detailed context.
754 ///
755 /// Generic parsing failure with information about what failed and where.
756 ParseError {
757 /// Offset in bytes where the error occurred
758 offset: usize,
759 /// Description of what was being parsed
760 context: String,
761 /// The specific error kind
762 kind: String,
763 /// Sample of remaining data for debugging
764 remaining: Vec<u8>,
765 },
766
767 /// Partial parse - some data was parsed but errors occurred.
768 ///
769 /// Used when processing continues despite errors (e.g., some flowsets
770 /// parsed successfully but others failed).
771 Partial {
772 /// Description of the partial parse result
773 message: String,
774 },
775}
776
777impl std::fmt::Display for NetflowError {
778 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
779 match self {
780 NetflowError::Incomplete { available, context } => {
781 write!(
782 f,
783 "Incomplete data: {} (only {} bytes available)",
784 context, available
785 )
786 }
787 NetflowError::UnsupportedVersion {
788 version, offset, ..
789 } => {
790 write!(
791 f,
792 "Unsupported NetFlow version {} at offset {}",
793 version, offset
794 )
795 }
796 NetflowError::FilteredVersion { version } => {
797 write!(
798 f,
799 "NetFlow version {} filtered out by allowed_versions configuration",
800 version
801 )
802 }
803 NetflowError::MissingTemplate {
804 template_id,
805 protocol,
806 available_templates,
807 ..
808 } => {
809 write!(
810 f,
811 "Missing template {} for {:?} (available: {:?})",
812 template_id, protocol, available_templates
813 )
814 }
815 NetflowError::ParseError {
816 offset,
817 context,
818 kind,
819 ..
820 } => {
821 write!(
822 f,
823 "Parse error at offset {}: {} ({})",
824 offset, context, kind
825 )
826 }
827 NetflowError::Partial { message } => {
828 write!(f, "Partial parse error: {}", message)
829 }
830 }
831 }
832}
833
834impl std::error::Error for NetflowError {}
835
836// Legacy type alias for backwards compatibility during migration
837#[deprecated(since = "0.8.0", note = "Use NetflowError instead")]
838pub type NetflowPacketError = NetflowError;
839
840#[deprecated(since = "0.8.0", note = "Use NetflowError instead")]
841pub type NetflowParseError = NetflowError;
842
843#[derive(Debug, Clone, Serialize)]
844pub struct PartialParse {
845 pub version: u16,
846 pub remaining: Vec<u8>,
847 pub error: String,
848}
849
850/// Iterator that yields NetflowPacket items from a byte buffer without allocating a Vec.
851/// Maintains parser state for template caching (V9/IPFIX).
852pub struct NetflowPacketIterator<'a> {
853 parser: &'a mut NetflowParser,
854 remaining: &'a [u8],
855 errored: bool,
856}
857
858impl<'a> NetflowPacketIterator<'a> {
859 /// Returns the unconsumed bytes remaining in the buffer.
860 ///
861 /// This is useful for:
862 /// - Debugging: See how much data was consumed
863 /// - Mixed protocols: Process non-netflow data after netflow packets
864 /// - Resumption: Know where parsing stopped
865 ///
866 /// # Examples
867 ///
868 /// ```rust
869 /// use netflow_parser::NetflowParser;
870 ///
871 /// let v5_packet = [0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,];
872 /// let mut parser = NetflowParser::default();
873 /// let mut iter = parser.iter_packets(&v5_packet);
874 ///
875 /// while let Some(_packet) = iter.next() {
876 /// // Process packet
877 /// }
878 ///
879 /// // Check how many bytes remain unconsumed
880 /// assert_eq!(iter.remaining().len(), 0);
881 /// ```
882 pub fn remaining(&self) -> &'a [u8] {
883 self.remaining
884 }
885
886 /// Returns true if all bytes have been consumed or an error occurred.
887 ///
888 /// This is useful for validation and ensuring complete buffer processing.
889 ///
890 /// # Examples
891 ///
892 /// ```rust
893 /// use netflow_parser::NetflowParser;
894 ///
895 /// let v5_packet = [0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,];
896 /// let mut parser = NetflowParser::default();
897 /// let mut iter = parser.iter_packets(&v5_packet);
898 ///
899 /// // Consume all packets
900 /// for _packet in &mut iter {
901 /// // Process packet
902 /// }
903 ///
904 /// assert!(iter.is_complete());
905 /// ```
906 pub fn is_complete(&self) -> bool {
907 self.remaining.is_empty() || self.errored
908 }
909}
910
911impl<'a> Iterator for NetflowPacketIterator<'a> {
912 type Item = Result<NetflowPacket, NetflowError>;
913
914 fn next(&mut self) -> Option<Self::Item> {
915 // Stop if we've errored or no bytes remain
916 if self.errored || self.remaining.is_empty() {
917 return None;
918 }
919
920 match self.parser.parse_packet_by_version(self.remaining) {
921 ParsedNetflow::Success {
922 packet,
923 remaining: new_remaining,
924 } => {
925 self.remaining = new_remaining;
926 Some(Ok(packet))
927 }
928 ParsedNetflow::UnallowedVersion => {
929 self.errored = true;
930 None
931 }
932 ParsedNetflow::Error { error } => {
933 self.errored = true;
934 Some(Err(error))
935 }
936 }
937 }
938}
939
940impl Default for NetflowParser {
941 fn default() -> Self {
942 Self {
943 v9_parser: V9Parser::default(),
944 ipfix_parser: IPFixParser::default(),
945 allowed_versions: [5, 7, 9, 10].iter().cloned().collect(),
946 max_error_sample_size: 256,
947 template_hooks: TemplateHooks::new(),
948 }
949 }
950}
951
952impl NetflowParser {
953 /// Creates a new builder for configuring a [`NetflowParser`].
954 ///
955 /// # Examples
956 ///
957 /// ```rust
958 /// use netflow_parser::NetflowParser;
959 /// use netflow_parser::variable_versions::ttl::TtlConfig;
960 /// use std::time::Duration;
961 ///
962 /// let parser = NetflowParser::builder()
963 /// .with_cache_size(2000)
964 /// .with_ttl(TtlConfig::new(Duration::from_secs(7200)))
965 /// .build()
966 /// .expect("Failed to build parser");
967 /// ```
968 pub fn builder() -> NetflowParserBuilder {
969 NetflowParserBuilder::default()
970 }
971
972 /// Gets statistics about the V9 template cache.
973 ///
974 /// # Examples
975 ///
976 /// ```rust
977 /// use netflow_parser::NetflowParser;
978 ///
979 /// let parser = NetflowParser::default();
980 /// let stats = parser.v9_cache_stats();
981 /// println!("V9 cache: {}/{} templates", stats.current_size, stats.max_size);
982 /// ```
983 pub fn v9_cache_stats(&self) -> CacheStats {
984 CacheStats {
985 current_size: self.v9_parser.templates.len()
986 + self.v9_parser.options_templates.len(),
987 max_size: self.v9_parser.max_template_cache_size,
988 ttl_config: self.v9_parser.ttl_config.clone(),
989 metrics: self.v9_parser.metrics.snapshot(),
990 }
991 }
992
993 /// Gets statistics about the IPFIX template cache.
994 ///
995 /// # Examples
996 ///
997 /// ```rust
998 /// use netflow_parser::NetflowParser;
999 ///
1000 /// let parser = NetflowParser::default();
1001 /// let stats = parser.ipfix_cache_stats();
1002 /// println!("IPFIX cache: {}/{} templates", stats.current_size, stats.max_size);
1003 /// ```
1004 pub fn ipfix_cache_stats(&self) -> CacheStats {
1005 CacheStats {
1006 current_size: self.ipfix_parser.templates.len()
1007 + self.ipfix_parser.v9_templates.len()
1008 + self.ipfix_parser.ipfix_options_templates.len()
1009 + self.ipfix_parser.v9_options_templates.len(),
1010 max_size: self.ipfix_parser.max_template_cache_size,
1011 ttl_config: self.ipfix_parser.ttl_config.clone(),
1012 metrics: self.ipfix_parser.metrics.snapshot(),
1013 }
1014 }
1015
1016 /// Lists all cached V9 template IDs.
1017 ///
1018 /// Note: This returns template IDs from both regular and options templates.
1019 ///
1020 /// # Examples
1021 ///
1022 /// ```rust
1023 /// use netflow_parser::NetflowParser;
1024 ///
1025 /// let parser = NetflowParser::default();
1026 /// let template_ids = parser.v9_template_ids();
1027 /// println!("Cached V9 templates: {:?}", template_ids);
1028 /// ```
1029 pub fn v9_template_ids(&self) -> Vec<u16> {
1030 let mut ids: Vec<u16> = self.v9_parser.templates.iter().map(|(id, _)| *id).collect();
1031 ids.extend(self.v9_parser.options_templates.iter().map(|(id, _)| *id));
1032 ids.sort_unstable();
1033 ids
1034 }
1035
1036 /// Lists all cached IPFIX template IDs.
1037 ///
1038 /// Note: This returns template IDs from both IPFIX and V9-format templates (IPFIX can contain both).
1039 ///
1040 /// # Examples
1041 ///
1042 /// ```rust
1043 /// use netflow_parser::NetflowParser;
1044 ///
1045 /// let parser = NetflowParser::default();
1046 /// let template_ids = parser.ipfix_template_ids();
1047 /// println!("Cached IPFIX templates: {:?}", template_ids);
1048 /// ```
1049 pub fn ipfix_template_ids(&self) -> Vec<u16> {
1050 let mut ids: Vec<u16> = self
1051 .ipfix_parser
1052 .templates
1053 .iter()
1054 .map(|(id, _)| *id)
1055 .collect();
1056 ids.extend(self.ipfix_parser.v9_templates.iter().map(|(id, _)| *id));
1057 ids.extend(
1058 self.ipfix_parser
1059 .ipfix_options_templates
1060 .iter()
1061 .map(|(id, _)| *id),
1062 );
1063 ids.extend(
1064 self.ipfix_parser
1065 .v9_options_templates
1066 .iter()
1067 .map(|(id, _)| *id),
1068 );
1069 ids.sort_unstable();
1070 ids
1071 }
1072
1073 /// Checks if a V9 template with the given ID is cached.
1074 ///
1075 /// Note: This uses `peek()` which does not affect LRU ordering.
1076 ///
1077 /// # Arguments
1078 ///
1079 /// * `template_id` - The template ID to check
1080 ///
1081 /// # Examples
1082 ///
1083 /// ```rust
1084 /// use netflow_parser::NetflowParser;
1085 ///
1086 /// let parser = NetflowParser::default();
1087 /// if parser.has_v9_template(256) {
1088 /// println!("Template 256 is cached");
1089 /// }
1090 /// ```
1091 pub fn has_v9_template(&self, template_id: u16) -> bool {
1092 self.v9_parser.templates.peek(&template_id).is_some()
1093 || self
1094 .v9_parser
1095 .options_templates
1096 .peek(&template_id)
1097 .is_some()
1098 }
1099
1100 /// Checks if an IPFIX template with the given ID is cached.
1101 ///
1102 /// Note: This uses `peek()` which does not affect LRU ordering.
1103 ///
1104 /// # Arguments
1105 ///
1106 /// * `template_id` - The template ID to check
1107 ///
1108 /// # Examples
1109 ///
1110 /// ```rust
1111 /// use netflow_parser::NetflowParser;
1112 ///
1113 /// let parser = NetflowParser::default();
1114 /// if parser.has_ipfix_template(256) {
1115 /// println!("Template 256 is cached");
1116 /// }
1117 /// ```
1118 pub fn has_ipfix_template(&self, template_id: u16) -> bool {
1119 self.ipfix_parser.templates.peek(&template_id).is_some()
1120 || self.ipfix_parser.v9_templates.peek(&template_id).is_some()
1121 || self
1122 .ipfix_parser
1123 .ipfix_options_templates
1124 .peek(&template_id)
1125 .is_some()
1126 || self
1127 .ipfix_parser
1128 .v9_options_templates
1129 .peek(&template_id)
1130 .is_some()
1131 }
1132
1133 /// Clears all cached V9 templates.
1134 ///
1135 /// This is useful for testing or when you need to force template re-learning.
1136 ///
1137 /// # Examples
1138 ///
1139 /// ```rust
1140 /// use netflow_parser::NetflowParser;
1141 ///
1142 /// let mut parser = NetflowParser::default();
1143 /// parser.clear_v9_templates();
1144 /// ```
1145 pub fn clear_v9_templates(&mut self) {
1146 self.v9_parser.templates.clear();
1147 self.v9_parser.options_templates.clear();
1148 }
1149
1150 /// Clears all cached IPFIX templates.
1151 ///
1152 /// This is useful for testing or when you need to force template re-learning.
1153 ///
1154 /// # Examples
1155 ///
1156 /// ```rust
1157 /// use netflow_parser::NetflowParser;
1158 ///
1159 /// let mut parser = NetflowParser::default();
1160 /// parser.clear_ipfix_templates();
1161 /// ```
1162 pub fn clear_ipfix_templates(&mut self) {
1163 self.ipfix_parser.templates.clear();
1164 self.ipfix_parser.v9_templates.clear();
1165 self.ipfix_parser.ipfix_options_templates.clear();
1166 self.ipfix_parser.v9_options_templates.clear();
1167 }
1168
1169 /// Triggers template event hooks.
1170 ///
1171 /// This method is called internally by template operations to notify
1172 /// registered hooks about template lifecycle events. It can also be called
1173 /// manually for testing or custom integration scenarios.
1174 ///
1175 /// # Arguments
1176 ///
1177 /// * `event` - The template event to trigger
1178 ///
1179 /// # Examples
1180 ///
1181 /// ```rust
1182 /// use netflow_parser::{NetflowParser, TemplateEvent, TemplateProtocol};
1183 ///
1184 /// let parser = NetflowParser::default();
1185 /// parser.trigger_template_event(TemplateEvent::Learned {
1186 /// template_id: 256,
1187 /// protocol: TemplateProtocol::V9,
1188 /// });
1189 /// ```
1190 #[inline]
1191 pub fn trigger_template_event(&self, event: TemplateEvent) {
1192 self.template_hooks.trigger(&event);
1193 }
1194
1195 /// Parses NetFlow packets from a byte slice, preserving all successfully parsed packets.
1196 ///
1197 /// This function parses packets in sequence and returns a [`ParseResult`] containing both
1198 /// successfully parsed packets and an optional error. **No data is lost** - if parsing fails
1199 /// partway through, you still get all packets parsed before the error.
1200 ///
1201 /// # Arguments
1202 ///
1203 /// * `packet` - Byte slice containing NetFlow packet(s)
1204 ///
1205 /// # Returns
1206 ///
1207 /// [`ParseResult`] with:
1208 /// * `packets` - All successfully parsed packets (even if error occurred)
1209 /// * `error` - `None` if fully successful, `Some(error)` if parsing stopped
1210 ///
1211 /// # Examples
1212 ///
1213 /// ## Basic usage
1214 ///
1215 /// ```rust
1216 /// use netflow_parser::NetflowParser;
1217 ///
1218 /// let v5_packet = [0, 5, 2, 0, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,];
1219 /// let result = NetflowParser::default().parse_bytes(&v5_packet);
1220 ///
1221 /// // Process all packets
1222 /// for packet in result.packets {
1223 /// println!("Parsed packet");
1224 /// }
1225 ///
1226 /// // Check for errors
1227 /// if let Some(e) = result.error {
1228 /// eprintln!("Error: {}", e);
1229 /// }
1230 /// ```
1231 ///
1232 /// ## With JSON serialization
1233 ///
1234 /// ```rust
1235 /// use serde_json::json;
1236 /// use netflow_parser::NetflowParser;
1237 ///
1238 /// let v5_packet = [0, 5, 2, 0, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,];
1239 /// let result = NetflowParser::default().parse_bytes(&v5_packet);
1240 /// println!("{}", json!(result.packets).to_string());
1241 /// ```
1242 ///
1243 #[inline]
1244 pub fn parse_bytes(&mut self, packet: &[u8]) -> ParseResult {
1245 if packet.is_empty() {
1246 return ParseResult {
1247 packets: vec![],
1248 error: None,
1249 };
1250 }
1251
1252 let mut packets = Vec::new();
1253 let mut remaining = packet;
1254 let mut error = None;
1255
1256 while !remaining.is_empty() {
1257 match self.parse_packet_by_version(remaining) {
1258 ParsedNetflow::Success {
1259 packet,
1260 remaining: new_remaining,
1261 } => {
1262 packets.push(packet);
1263 remaining = new_remaining;
1264 }
1265 ParsedNetflow::UnallowedVersion => {
1266 break;
1267 }
1268 ParsedNetflow::Error { error: e } => {
1269 // Store error but keep successfully parsed packets
1270 error = Some(e);
1271 break;
1272 }
1273 }
1274 }
1275
1276 ParseResult { packets, error }
1277 }
1278
1279 /// Returns an iterator that yields NetflowPacket items without allocating a Vec.
1280 /// This is useful for processing large batches of packets without collecting all results in memory.
1281 ///
1282 /// # Examples
1283 ///
1284 /// ```rust
1285 /// use netflow_parser::{NetflowParser, NetflowPacket};
1286 ///
1287 /// let v5_packet = [0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,];
1288 /// let mut parser = NetflowParser::default();
1289 ///
1290 /// for result in parser.iter_packets(&v5_packet) {
1291 /// match result {
1292 /// Ok(NetflowPacket::V5(v5)) => println!("V5 packet: {:?}", v5.header.version),
1293 /// Err(e) => println!("Error: {:?}", e),
1294 /// _ => (),
1295 /// }
1296 /// }
1297 /// ```
1298 #[inline]
1299 pub fn iter_packets<'a>(&'a mut self, packet: &'a [u8]) -> NetflowPacketIterator<'a> {
1300 NetflowPacketIterator {
1301 parser: self,
1302 remaining: packet,
1303 errored: false,
1304 }
1305 }
1306
1307 #[inline]
1308 fn parse_packet_by_version<'a>(&mut self, packet: &'a [u8]) -> ParsedNetflow<'a> {
1309 match GenericNetflowHeader::parse(packet) {
1310 Ok((remaining, header)) if self.allowed_versions.contains(&header.version) => {
1311 match header.version {
1312 5 => V5Parser::parse(remaining),
1313 7 => V7Parser::parse(remaining),
1314 9 => self.v9_parser.parse(remaining),
1315 10 => self.ipfix_parser.parse(remaining),
1316 _ => ParsedNetflow::Error {
1317 error: NetflowError::UnsupportedVersion {
1318 version: header.version,
1319 offset: 0,
1320 sample: packet[..packet.len().min(self.max_error_sample_size)]
1321 .to_vec(),
1322 },
1323 },
1324 }
1325 }
1326 Ok(_) => {
1327 // Version is valid but filtered by allowed_versions
1328 ParsedNetflow::UnallowedVersion
1329 }
1330 Err(e) => ParsedNetflow::Error {
1331 error: NetflowError::Incomplete {
1332 available: packet.len(),
1333 context: format!("NetFlow header: {}", e),
1334 },
1335 },
1336 }
1337 }
1338
1339 /// Takes a Netflow packet slice and returns a vector of Parsed NetflowCommonFlowSet
1340 #[cfg(feature = "netflow_common")]
1341 #[inline]
1342 pub fn parse_bytes_as_netflow_common_flowsets(
1343 &mut self,
1344 packet: &[u8],
1345 ) -> Vec<NetflowCommonFlowSet> {
1346 let netflow_packets = self.parse_bytes(packet);
1347 netflow_packets
1348 .packets
1349 .iter()
1350 .flat_map(|n| n.as_netflow_common().unwrap_or_default().flowsets)
1351 .collect()
1352 }
1353}