Skip to main content

feldera_adapterlib/
postprocess.rs

1//! Data postprocessing layer for connectors.
2//!
3//! This module provides a postprocessing framework that allows data
4//! transformation after encoding and before transmission.
5//!
6//! The postprocessing layer fits between encoding and transport in the data pipeline:
7//!
8//! ```text
9//! Circuit → Encoder → Postprocessor → Transport
10//! ```
11
12use crate::transport::{OutputBatchType, Step};
13use anyhow::Result as AnyResult;
14use feldera_types::postprocess::PostprocessorConfig;
15use std::collections::BTreeMap;
16use std::fmt::{Display, Formatter, Result as FmtResult};
17use std::sync::Arc;
18
19/// Errors that can occur during creation of a postprocessor.
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum PostprocessorCreateError {
22    /// Postprocessing configuration is invalid.
23    ConfigurationError(String),
24    /// Implementation for factory generating Postprocessor not found.
25    FactoryNotFound(String),
26}
27
28impl Display for PostprocessorCreateError {
29    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
30        match self {
31            PostprocessorCreateError::ConfigurationError(msg) => {
32                write!(f, "Configuration error: {}", msg)
33            }
34            PostprocessorCreateError::FactoryNotFound(msg) => {
35                write!(
36                    f,
37                    "Could not locate factory generating postprocessor: {}",
38                    msg
39                )
40            }
41        }
42    }
43}
44
45impl std::error::Error for PostprocessorCreateError {}
46
47/// Trait for postprocessing encoded output data before transmission.
48///
49/// A postprocessor sits between the encoder and the transport.  The user has to
50/// implement only one of the two possible APIs: `push_buffer` and `push_key`,
51/// depending on the encoder used:
52/// **Keyed mode**: is used encoders designed to work with message-based transports such as
53/// Kafka, where every message can have (key, value, header) components, and where the
54/// output is a stream of (key, value, headers) tuples.  These encoder require the
55/// `push_key` method.
56/// **Buffer mode**: encoders where the output is simply a stream of buffers,
57/// where each buffer can contain an encoded representation of one or more output records.
58/// These encoders require the `push_buffer` method.
59pub trait Postprocessor: Send + Sync {
60    /// Called once for every output batch produced by the pipeline before any
61    /// records are pushed to the postprocessor.
62    ///
63    /// # Arguments
64    ///
65    /// * `step` — a monotonically-increasing sequence number assigned to this
66    ///   batch.  Fault-tolerant endpoints use this to detect and discard
67    ///   duplicate output.  Non-fault-tolerant postprocessors may ignore it.
68    ///
69    /// * `batch_type` — indicates whether this batch is:
70    ///   - `OutputBatchType::Delta`: an incremental update (inserts and
71    ///     deletes since the last step).
72    ///   - `OutputBatchType::Snapshot`: a full snapshot of the materialized
73    ///     view at this point in time.
74    ///
75    /// The default implementation is a no-op.
76    fn batch_start(&mut self, _step: Step, _batch_type: OutputBatchType) {}
77
78    /// Transform a serialized buffer (buffer mode).
79    ///
80    /// Called for each output chunk produced by the encoder. There can be many
81    /// calls to this method between `batch_start` and `batch_end` notifications.
82    ///
83    /// # Arguments
84    ///
85    /// * `buffer` — the raw bytes produced by the encoder.
86    ///
87    /// # Returns
88    ///
89    /// The transformed byte buffer on success.  On error the affected records
90    /// are dropped and the error is reported to the controller; processing of
91    /// subsequent records continues normally.
92    ///
93    /// The default implementation returns the data unchanged.
94    fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<Vec<u8>> {
95        Ok(buffer.to_vec())
96    }
97
98    /// Transform a key/value/headers record (keyed mode).
99    ///
100    /// Called for each key/value update generated by the encoder.
101    ///
102    /// # Arguments
103    ///
104    /// * `key` — serialized key component of the message, or `None` if the key is absent.
105    ///
106    /// * `val` — serialized value bytes, or `None` if the value is absent.
107    ///
108    /// * `headers` — a slice of `(name, value)` pairs.  Each header value is
109    ///   an optional byte slice; `None` means the header is present but has no
110    ///   value.
111    ///
112    /// # Returns
113    ///
114    /// A tuple `(key, val, headers)` with the same shape as the arguments but
115    /// owning the transformed bytes.  On error the affected records are dropped
116    /// and the error is reported to the controller; processing continues.
117    ///
118    /// The default implementation returns the data unchanged.
119    #[allow(clippy::type_complexity)]
120    fn push_key(
121        &mut self,
122        key: Option<&[u8]>,
123        val: Option<&[u8]>,
124        headers: &[(&str, Option<&[u8]>)],
125    ) -> AnyResult<(
126        Option<Vec<u8>>,
127        Option<Vec<u8>>,
128        Vec<(String, Option<Vec<u8>>)>,
129    )> {
130        Ok((
131            key.map(<[u8]>::to_vec),
132            val.map(<[u8]>::to_vec),
133            headers
134                .iter()
135                .map(|(k, v)| (k.to_string(), v.map(<[u8]>::to_vec)))
136                .collect(),
137        ))
138    }
139
140    /// Called once at the end of each output batch.  The default is a no-op.
141    fn batch_end(&mut self) {}
142
143    /// Returns the approximate amount of memory owned by this postprocessor.
144    ///
145    /// The default returns 0; override when the implementation holds a
146    /// significant internal buffer.
147    fn memory(&self) -> usize {
148        0
149    }
150
151    /// Create a new postprocessor with the same configuration as `self`.
152    ///
153    /// Used when multiple parallel output pipelines are needed.
154    fn fork(&self) -> Box<dyn Postprocessor>;
155}
156
157/// A factory that can create a new Postprocessor object.
158pub trait PostprocessorFactory: Send + Sync {
159    /// Create a new postprocessor based on the supplied configuration.
160    ///
161    /// # Arguments
162    ///
163    /// * `config` - Postprocessor-specific configuration.
164    fn create(
165        &self,
166        config: &PostprocessorConfig,
167    ) -> Result<Box<dyn Postprocessor>, PostprocessorCreateError>;
168}
169
170/// A registry where all factories that can create Postprocessors are registered.
171#[derive(Default)]
172pub struct PostprocessorRegistry {
173    registered: BTreeMap<&'static str, Arc<dyn PostprocessorFactory>>,
174}
175
176impl PostprocessorRegistry {
177    pub fn new() -> Self {
178        Self {
179            registered: BTreeMap::new(),
180        }
181    }
182
183    /// Register a new factory under the specified name.
184    pub fn register(&mut self, name: &'static str, factory: Box<dyn PostprocessorFactory>) {
185        self.registered.insert(name, Arc::from(factory));
186    }
187
188    pub fn get(&self, name: &str) -> Option<Arc<dyn PostprocessorFactory>> {
189        self.registered.get(name).cloned()
190    }
191}