feldera_adapterlib/postprocess.rs
1//! Data postprocessing layer for connectors.
2//!
3//! This module provides a postprocessing framework that allows data
4//! transformation after encoding and before transmission.
5//!
6//! The postprocessing layer fits between encoding and transport in the data pipeline:
7//!
8//! ```text
9//! Circuit → Encoder → Postprocessor → Transport
10//! ```
11
12use crate::transport::{OutputBatchType, Step};
13use anyhow::Result as AnyResult;
14use feldera_types::postprocess::PostprocessorConfig;
15use std::collections::BTreeMap;
16use std::fmt::{Display, Formatter, Result as FmtResult};
17use std::sync::Arc;
18
19/// Errors that can occur during creation of a postprocessor.
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum PostprocessorCreateError {
22 /// Postprocessing configuration is invalid.
23 ConfigurationError(String),
24 /// Implementation for factory generating Postprocessor not found.
25 FactoryNotFound(String),
26}
27
28impl Display for PostprocessorCreateError {
29 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
30 match self {
31 PostprocessorCreateError::ConfigurationError(msg) => {
32 write!(f, "Configuration error: {}", msg)
33 }
34 PostprocessorCreateError::FactoryNotFound(msg) => {
35 write!(
36 f,
37 "Could not locate factory generating postprocessor: {}",
38 msg
39 )
40 }
41 }
42 }
43}
44
45impl std::error::Error for PostprocessorCreateError {}
46
47/// Trait for postprocessing encoded output data before transmission.
48///
49/// A postprocessor sits between the encoder and the transport. The user has to
50/// implement only one of the two possible APIs: `push_buffer` and `push_key`,
51/// depending on the encoder used:
52/// **Keyed mode**: is used encoders designed to work with message-based transports such as
53/// Kafka, where every message can have (key, value, header) components, and where the
54/// output is a stream of (key, value, headers) tuples. These encoder require the
55/// `push_key` method.
56/// **Buffer mode**: encoders where the output is simply a stream of buffers,
57/// where each buffer can contain an encoded representation of one or more output records.
58/// These encoders require the `push_buffer` method.
59pub trait Postprocessor: Send + Sync {
60 /// Called once for every output batch produced by the pipeline before any
61 /// records are pushed to the postprocessor.
62 ///
63 /// # Arguments
64 ///
65 /// * `step` — a monotonically-increasing sequence number assigned to this
66 /// batch. Fault-tolerant endpoints use this to detect and discard
67 /// duplicate output. Non-fault-tolerant postprocessors may ignore it.
68 ///
69 /// * `batch_type` — indicates whether this batch is:
70 /// - `OutputBatchType::Delta`: an incremental update (inserts and
71 /// deletes since the last step).
72 /// - `OutputBatchType::Snapshot`: a full snapshot of the materialized
73 /// view at this point in time.
74 ///
75 /// The default implementation is a no-op.
76 fn batch_start(&mut self, _step: Step, _batch_type: OutputBatchType) {}
77
78 /// Transform a serialized buffer (buffer mode).
79 ///
80 /// Called for each output chunk produced by the encoder. There can be many
81 /// calls to this method between `batch_start` and `batch_end` notifications.
82 ///
83 /// # Arguments
84 ///
85 /// * `buffer` — the raw bytes produced by the encoder.
86 ///
87 /// # Returns
88 ///
89 /// The transformed byte buffer on success. On error the affected records
90 /// are dropped and the error is reported to the controller; processing of
91 /// subsequent records continues normally.
92 ///
93 /// The default implementation returns the data unchanged.
94 fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<Vec<u8>> {
95 Ok(buffer.to_vec())
96 }
97
98 /// Transform a key/value/headers record (keyed mode).
99 ///
100 /// Called for each key/value update generated by the encoder.
101 ///
102 /// # Arguments
103 ///
104 /// * `key` — serialized key component of the message, or `None` if the key is absent.
105 ///
106 /// * `val` — serialized value bytes, or `None` if the value is absent.
107 ///
108 /// * `headers` — a slice of `(name, value)` pairs. Each header value is
109 /// an optional byte slice; `None` means the header is present but has no
110 /// value.
111 ///
112 /// # Returns
113 ///
114 /// A tuple `(key, val, headers)` with the same shape as the arguments but
115 /// owning the transformed bytes. On error the affected records are dropped
116 /// and the error is reported to the controller; processing continues.
117 ///
118 /// The default implementation returns the data unchanged.
119 #[allow(clippy::type_complexity)]
120 fn push_key(
121 &mut self,
122 key: Option<&[u8]>,
123 val: Option<&[u8]>,
124 headers: &[(&str, Option<&[u8]>)],
125 ) -> AnyResult<(
126 Option<Vec<u8>>,
127 Option<Vec<u8>>,
128 Vec<(String, Option<Vec<u8>>)>,
129 )> {
130 Ok((
131 key.map(<[u8]>::to_vec),
132 val.map(<[u8]>::to_vec),
133 headers
134 .iter()
135 .map(|(k, v)| (k.to_string(), v.map(<[u8]>::to_vec)))
136 .collect(),
137 ))
138 }
139
140 /// Called once at the end of each output batch. The default is a no-op.
141 fn batch_end(&mut self) {}
142
143 /// Returns the approximate amount of memory owned by this postprocessor.
144 ///
145 /// The default returns 0; override when the implementation holds a
146 /// significant internal buffer.
147 fn memory(&self) -> usize {
148 0
149 }
150
151 /// Create a new postprocessor with the same configuration as `self`.
152 ///
153 /// Used when multiple parallel output pipelines are needed.
154 fn fork(&self) -> Box<dyn Postprocessor>;
155}
156
157/// A factory that can create a new Postprocessor object.
158pub trait PostprocessorFactory: Send + Sync {
159 /// Create a new postprocessor based on the supplied configuration.
160 ///
161 /// # Arguments
162 ///
163 /// * `config` - Postprocessor-specific configuration.
164 fn create(
165 &self,
166 config: &PostprocessorConfig,
167 ) -> Result<Box<dyn Postprocessor>, PostprocessorCreateError>;
168}
169
170/// A registry where all factories that can create Postprocessors are registered.
171#[derive(Default)]
172pub struct PostprocessorRegistry {
173 registered: BTreeMap<&'static str, Arc<dyn PostprocessorFactory>>,
174}
175
176impl PostprocessorRegistry {
177 pub fn new() -> Self {
178 Self {
179 registered: BTreeMap::new(),
180 }
181 }
182
183 /// Register a new factory under the specified name.
184 pub fn register(&mut self, name: &'static str, factory: Box<dyn PostprocessorFactory>) {
185 self.registered.insert(name, Arc::from(factory));
186 }
187
188 pub fn get(&self, name: &str) -> Option<Arc<dyn PostprocessorFactory>> {
189 self.registered.get(name).cloned()
190 }
191}