feldera-adapterlib 0.312.0

Connector support for the Feldera streaming engine
Documentation
//! Data postprocessing layer for connectors.
//!
//! This module provides a postprocessing framework that allows data
//! transformation after encoding and before transmission.
//!
//! The postprocessing layer fits between encoding and transport in the data pipeline:
//!
//! ```text
//! Circuit → Encoder → Postprocessor → Transport
//! ```

use crate::transport::{OutputBatchType, Step};
use anyhow::Result as AnyResult;
use feldera_types::postprocess::PostprocessorConfig;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::Arc;

/// Errors that can occur during creation of a postprocessor.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PostprocessorCreateError {
    /// Postprocessing configuration is invalid.
    ConfigurationError(String),
    /// Implementation for factory generating Postprocessor not found.
    FactoryNotFound(String),
}

impl Display for PostprocessorCreateError {
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
        match self {
            PostprocessorCreateError::ConfigurationError(msg) => {
                write!(f, "Configuration error: {}", msg)
            }
            PostprocessorCreateError::FactoryNotFound(msg) => {
                write!(
                    f,
                    "Could not locate factory generating postprocessor: {}",
                    msg
                )
            }
        }
    }
}

impl std::error::Error for PostprocessorCreateError {}

/// Trait for postprocessing encoded output data before transmission.
///
/// A postprocessor sits between the encoder and the transport.  The user has to
/// implement only one of the two possible APIs: `push_buffer` and `push_key`,
/// depending on the encoder used:
/// **Keyed mode**: is used encoders designed to work with message-based transports such as
/// Kafka, where every message can have (key, value, header) components, and where the
/// output is a stream of (key, value, headers) tuples.  These encoder require the
/// `push_key` method.
/// **Buffer mode**: encoders where the output is simply a stream of buffers,
/// where each buffer can contain an encoded representation of one or more output records.
/// These encoders require the `push_buffer` method.
pub trait Postprocessor: Send + Sync {
    /// Called once for every output batch produced by the pipeline before any
    /// records are pushed to the postprocessor.
    ///
    /// # Arguments
    ///
    /// * `step` — a monotonically-increasing sequence number assigned to this
    ///   batch.  Fault-tolerant endpoints use this to detect and discard
    ///   duplicate output.  Non-fault-tolerant postprocessors may ignore it.
    ///
    /// * `batch_type` — indicates whether this batch is:
    ///   - `OutputBatchType::Delta`: an incremental update (inserts and
    ///     deletes since the last step).
    ///   - `OutputBatchType::Snapshot`: a full snapshot of the materialized
    ///     view at this point in time.
    ///
    /// The default implementation is a no-op.
    fn batch_start(&mut self, _step: Step, _batch_type: OutputBatchType) {}

    /// Transform a serialized buffer (buffer mode).
    ///
    /// Called for each output chunk produced by the encoder. There can be many
    /// calls to this method between `batch_start` and `batch_end` notifications.
    ///
    /// # Arguments
    ///
    /// * `buffer` — the raw bytes produced by the encoder.
    ///
    /// # Returns
    ///
    /// The transformed byte buffer on success.  On error the affected records
    /// are dropped and the error is reported to the controller; processing of
    /// subsequent records continues normally.
    ///
    /// The default implementation returns the data unchanged.
    fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<Vec<u8>> {
        Ok(buffer.to_vec())
    }

    /// Transform a key/value/headers record (keyed mode).
    ///
    /// Called for each key/value update generated by the encoder.
    ///
    /// # Arguments
    ///
    /// * `key` — serialized key component of the message, or `None` if the key is absent.
    ///
    /// * `val` — serialized value bytes, or `None` if the value is absent.
    ///
    /// * `headers` — a slice of `(name, value)` pairs.  Each header value is
    ///   an optional byte slice; `None` means the header is present but has no
    ///   value.
    ///
    /// # Returns
    ///
    /// A tuple `(key, val, headers)` with the same shape as the arguments but
    /// owning the transformed bytes.  On error the affected records are dropped
    /// and the error is reported to the controller; processing continues.
    ///
    /// The default implementation returns the data unchanged.
    #[allow(clippy::type_complexity)]
    fn push_key(
        &mut self,
        key: Option<&[u8]>,
        val: Option<&[u8]>,
        headers: &[(&str, Option<&[u8]>)],
    ) -> AnyResult<(
        Option<Vec<u8>>,
        Option<Vec<u8>>,
        Vec<(String, Option<Vec<u8>>)>,
    )> {
        Ok((
            key.map(<[u8]>::to_vec),
            val.map(<[u8]>::to_vec),
            headers
                .iter()
                .map(|(k, v)| (k.to_string(), v.map(<[u8]>::to_vec)))
                .collect(),
        ))
    }

    /// Called once at the end of each output batch.  The default is a no-op.
    fn batch_end(&mut self) {}

    /// Returns the approximate amount of memory owned by this postprocessor.
    ///
    /// The default returns 0; override when the implementation holds a
    /// significant internal buffer.
    fn memory(&self) -> usize {
        0
    }

    /// Create a new postprocessor with the same configuration as `self`.
    ///
    /// Used when multiple parallel output pipelines are needed.
    fn fork(&self) -> Box<dyn Postprocessor>;
}

/// A factory that can create a new Postprocessor object.
pub trait PostprocessorFactory: Send + Sync {
    /// Create a new postprocessor based on the supplied configuration.
    ///
    /// # Arguments
    ///
    /// * `config` - Postprocessor-specific configuration.
    fn create(
        &self,
        config: &PostprocessorConfig,
    ) -> Result<Box<dyn Postprocessor>, PostprocessorCreateError>;
}

/// A registry where all factories that can create Postprocessors are registered.
#[derive(Default)]
pub struct PostprocessorRegistry {
    registered: BTreeMap<&'static str, Arc<dyn PostprocessorFactory>>,
}

impl PostprocessorRegistry {
    pub fn new() -> Self {
        Self {
            registered: BTreeMap::new(),
        }
    }

    /// Register a new factory under the specified name.
    pub fn register(&mut self, name: &'static str, factory: Box<dyn PostprocessorFactory>) {
        self.registered.insert(name, Arc::from(factory));
    }

    pub fn get(&self, name: &str) -> Option<Arc<dyn PostprocessorFactory>> {
        self.registered.get(name).cloned()
    }
}