netgauze_collector/inputs/mod.rs
1// Copyright (C) 2025-present The NetGauze Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12// implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! External input processing infrastructure for the NetGauze collector.
17//!
18//! This module contains components for ingesting enrichment data from multiple
19//! sources and applying enrichment operations. The primary abstractions
20//! include:
21//!
22//! # Input Sources
23//!
24//! - [`files`] - File-based input processing
25//! - [`kafka`] - Kafka-based input processing
26//! - [`flow_options`] - Flow Options Data Records input processing
27//!
28//! # Enrichment
29//!
30//! The [`EnrichmentHandle`] trait enables asynchronous enrichment of incoming
31//! data. Implementors can update enrichment state without blocking data
32//! ingestion.
33
34pub mod files;
35pub mod flow_options;
36pub mod kafka;
37
38/// Asynchronous handle for applying enrichment to collected data.
39///
40/// Implementations should allow concurrent enrichment updates without
41/// blocking the main data ingestion path. The generic parameter `T`
42/// represents the enrichment operation type.
43///
44/// # Example usage
45///
46/// ```ignore
47/// async fn enrich_data<H: EnrichmentHandle<MyOp>>(
48/// handle: &H,
49/// operation: MyOp
50/// ) -> Result<(), anyhow::Error> {
51/// handle.update_enrichment(operation).await
52/// }
53/// ```
54pub trait EnrichmentHandle<T>: Send + Sync
55where
56 T: std::fmt::Display,
57{
58 /// Send an enrichment operation to the actor implementing this trait.
59 ///
60 /// Returns a boxed future to enable trait methods without `async_trait`.
61 fn update_enrichment(&self, op: T)
62 -> futures::future::BoxFuture<'_, Result<(), anyhow::Error>>;
63}
64
65/// Categorized errors from input processing operations.
66///
67/// Provides structured error information suitable for logging, metrics,
68/// and operational decisions. Each variant contains:
69/// - `context`: Where the error occurred
70/// - `reason`: Why it occurred
71#[derive(Debug, Clone, strum_macros::Display)]
72pub enum InputProcessingError {
73 #[strum(to_string = "Invalid format in {context}: {reason}")]
74 InvalidFormat { context: String, reason: String },
75
76 #[strum(to_string = "Conversion error in {context}: {reason}")]
77 ConversionError { context: String, reason: String },
78
79 #[strum(to_string = "UTF-8 decode error in {context}: {reason}")]
80 Utf8Error { context: String, reason: String },
81
82 #[strum(to_string = "JSON error in {context}: {reason}")]
83 JsonError { context: String, reason: String },
84
85 #[strum(to_string = "IO error in {context}: {reason}")]
86 IoError { context: String, reason: String },
87
88 #[strum(to_string = "Unsupported operation in {handler}: {reason}")]
89 UnsupportedOperation { handler: String, reason: String },
90}
91
92impl std::error::Error for InputProcessingError {}
93
94impl InputProcessingError {
95 /// Returns a static category label for metrics classification.
96 pub fn category(&self) -> &'static str {
97 match self {
98 Self::InvalidFormat { .. } => "invalid_format",
99 Self::ConversionError { .. } => "converstion_error",
100 Self::Utf8Error { .. } => "utf8_error",
101 Self::JsonError { .. } => "json_error",
102 Self::IoError { .. } => "io_error",
103 Self::UnsupportedOperation { .. } => "unsupported_operation",
104 }
105 }
106}