Skip to main content

feldera_adapterlib/
preprocess.rs

1//! Data preprocessing layer for connectors.
2//!
3//! This module provides a preprocessing framework that allows data transformation
4//! before it reaches the parser.
5//!
6//! The preprocessing layer fits between transport and parsing in the data pipeline:
7//!
8//! ```text
9//! Transport → Preprocessor → Parser → Circuit
10//! ```
11
12use crate::format::{ParseError, Splitter};
13use feldera_types::preprocess::PreprocessorConfig;
14use std::collections::BTreeMap;
15use std::fmt::{Display, Formatter, Result as FmtResult};
16use std::sync::Arc;
17
18// Errors that can occur during creation of a preprocessor
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum PreprocessorCreateError {
21    /// Preprocessing configuration is invalid.
22    ConfigurationError(String),
23    /// Implementation for factory generating Preprocessor not found
24    FactoryNotFound(String),
25}
26
27impl Display for PreprocessorCreateError {
28    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
29        match self {
30            PreprocessorCreateError::ConfigurationError(msg) => {
31                write!(f, "Configuration error: {}", msg)
32            }
33            PreprocessorCreateError::FactoryNotFound(msg) => {
34                write!(
35                    f,
36                    "Could not locate factory generating preprocessor: {}",
37                    msg
38                )
39            }
40        }
41    }
42}
43
44impl std::error::Error for PreprocessorCreateError {}
45
46/// Trait for preprocessing raw data before parsing.
47pub trait Preprocessor: Send + Sync {
48    /// Process raw input data and return transformed data.
49    ///
50    /// # Arguments
51    /// * `data` - Raw input data bytes
52    ///
53    /// # Returns
54    /// A `PreprocessResult` containing the transformed data or errors.
55    fn process(&mut self, data: &[u8]) -> (Vec<u8>, Vec<ParseError>);
56
57    /// Create a new preprocessor with the same configuration as `self`.
58    ///
59    /// Used by multithreaded transport endpoints to create multiple parallel
60    /// input pipelines.
61    fn fork(&self) -> Box<dyn Preprocessor>;
62
63    /// Returns an object that can be used to break a stream of incoming data
64    /// into complete records to pass to [Preprocessor::process].  If the object
65    /// is None, the parser's splitter object will actually be used.
66    fn splitter(&self) -> Option<Box<dyn Splitter>>;
67}
68
69/// A factory that can create a new Preprocessor object.
70pub trait PreprocessorFactory: Send + Sync {
71    /// Create a new preprocessor based on the supplied configuration.
72    ///
73    /// # Arguments
74    ///
75    /// * `config` - Preprocessor-specific configuration.
76    fn create(
77        &self,
78        config: &PreprocessorConfig,
79    ) -> Result<Box<dyn Preprocessor>, PreprocessorCreateError>;
80}
81
82/// A registry where all factories that can create Preprocessors are registered
83#[derive(Default)]
84pub struct PreprocessorRegistry {
85    registered: BTreeMap<&'static str, Arc<dyn PreprocessorFactory>>,
86}
87
88impl PreprocessorRegistry {
89    pub fn new() -> Self {
90        Self {
91            registered: BTreeMap::new(),
92        }
93    }
94
95    /// Register a new factory under the specified name
96    pub fn register(&mut self, name: &'static str, factory: Box<dyn PreprocessorFactory>) {
97        self.registered.insert(name, Arc::from(factory));
98    }
99
100    pub fn get(&self, name: &str) -> Option<Arc<dyn PreprocessorFactory>> {
101        self.registered.get(name).cloned()
102    }
103}