feldera_adapterlib/preprocess.rs
1//! Data preprocessing layer for connectors.
2//!
3//! This module provides a preprocessing framework that allows data transformation
4//! before it reaches the parser.
5//!
6//! The preprocessing layer fits between transport and parsing in the data pipeline:
7//!
8//! ```text
9//! Transport → Preprocessor → Parser → Circuit
10//! ```
11
12use crate::format::{ParseError, Splitter};
13use feldera_types::preprocess::PreprocessorConfig;
14use std::collections::BTreeMap;
15use std::fmt::{Display, Formatter, Result as FmtResult};
16use std::sync::Arc;
17
18// Errors that can occur during creation of a preprocessor
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum PreprocessorCreateError {
21 /// Preprocessing configuration is invalid.
22 ConfigurationError(String),
23 /// Implementation for factory generating Preprocessor not found
24 FactoryNotFound(String),
25}
26
27impl Display for PreprocessorCreateError {
28 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
29 match self {
30 PreprocessorCreateError::ConfigurationError(msg) => {
31 write!(f, "Configuration error: {}", msg)
32 }
33 PreprocessorCreateError::FactoryNotFound(msg) => {
34 write!(
35 f,
36 "Could not locate factory generating preprocessor: {}",
37 msg
38 )
39 }
40 }
41 }
42}
43
44impl std::error::Error for PreprocessorCreateError {}
45
46/// Trait for preprocessing raw data before parsing.
47pub trait Preprocessor: Send + Sync {
48 /// Process raw input data and return transformed data.
49 ///
50 /// # Arguments
51 /// * `data` - Raw input data bytes
52 ///
53 /// # Returns
54 /// A `PreprocessResult` containing the transformed data or errors.
55 fn process(&mut self, data: &[u8]) -> (Vec<u8>, Vec<ParseError>);
56
57 /// Create a new preprocessor with the same configuration as `self`.
58 ///
59 /// Used by multithreaded transport endpoints to create multiple parallel
60 /// input pipelines.
61 fn fork(&self) -> Box<dyn Preprocessor>;
62
63 /// Returns an object that can be used to break a stream of incoming data
64 /// into complete records to pass to [Preprocessor::process]. If the object
65 /// is None, the parser's splitter object will actually be used.
66 fn splitter(&self) -> Option<Box<dyn Splitter>>;
67}
68
69/// A factory that can create a new Preprocessor object.
70pub trait PreprocessorFactory: Send + Sync {
71 /// Create a new preprocessor based on the supplied configuration.
72 ///
73 /// # Arguments
74 ///
75 /// * `config` - Preprocessor-specific configuration.
76 fn create(
77 &self,
78 config: &PreprocessorConfig,
79 ) -> Result<Box<dyn Preprocessor>, PreprocessorCreateError>;
80}
81
82/// A registry where all factories that can create Preprocessors are registered
83#[derive(Default)]
84pub struct PreprocessorRegistry {
85 registered: BTreeMap<&'static str, Arc<dyn PreprocessorFactory>>,
86}
87
88impl PreprocessorRegistry {
89 pub fn new() -> Self {
90 Self {
91 registered: BTreeMap::new(),
92 }
93 }
94
95 /// Register a new factory under the specified name
96 pub fn register(&mut self, name: &'static str, factory: Box<dyn PreprocessorFactory>) {
97 self.registered.insert(name, Arc::from(factory));
98 }
99
100 pub fn get(&self, name: &str) -> Option<Arc<dyn PreprocessorFactory>> {
101 self.registered.get(name).cloned()
102 }
103}