data-courier 0.1.0-beta.4

Async Rust framework for composable data pipelines
Documentation
use std::fmt;

use serde_json::Value;

use crate::pipeline::ErrorPolicy;
use crate::retry::RetryPolicy;

use super::redact::{RedactedJsonValue, RedactedOptionRetryPolicy, RedactedStr};

#[derive(Clone, Default, PartialEq)]
pub struct Config {
    pub pipelines: Vec<PipelineSpec>,
    pub observability: Option<super::observability::ObservabilityConfig>,
}

impl fmt::Debug for Config {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Config")
            .field("pipelines", &self.pipelines)
            .field("observability", &self.observability)
            .finish()
    }
}

#[derive(Clone, PartialEq)]
pub struct PipelineSpec {
    pub name: String,
    pub source: SourceSpec,
    pub transforms: Vec<TransformSpec>,
    pub sinks: Vec<SinkSpec>,
    pub channel_capacity: Option<usize>,
}

#[derive(Clone, PartialEq)]
pub struct SourceSpec {
    pub kind: String,
    pub config: Value,
    pub retry: Option<RetryPolicy>,
}

#[derive(Clone, PartialEq)]
pub struct TransformSpec {
    pub kind: String,
    pub config: Value,
    pub on_error: Option<ErrorPolicyConfig>,
}

#[derive(Clone, PartialEq)]
pub struct SinkSpec {
    pub kind: String,
    pub config: Value,
    pub on_error: Option<ErrorPolicyConfig>,
    pub retry: Option<RetryPolicy>,
}

impl fmt::Debug for PipelineSpec {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("PipelineSpec")
            .field("name", &RedactedStr(&self.name))
            .field("source", &self.source)
            .field("transforms", &self.transforms)
            .field("sinks", &self.sinks)
            .field("channel_capacity", &self.channel_capacity)
            .finish()
    }
}

impl fmt::Debug for SourceSpec {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("SourceSpec")
            .field("kind", &RedactedStr(&self.kind))
            .field("config", &RedactedJsonValue(&self.config))
            .field("retry", &RedactedOptionRetryPolicy(self.retry.as_ref()))
            .finish()
    }
}

impl fmt::Debug for TransformSpec {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("TransformSpec")
            .field("kind", &RedactedStr(&self.kind))
            .field("config", &RedactedJsonValue(&self.config))
            .field("on_error", &self.on_error)
            .finish()
    }
}

impl fmt::Debug for SinkSpec {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("SinkSpec")
            .field("kind", &RedactedStr(&self.kind))
            .field("config", &RedactedJsonValue(&self.config))
            .field("on_error", &self.on_error)
            .field("retry", &RedactedOptionRetryPolicy(self.retry.as_ref()))
            .finish()
    }
}

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub enum ErrorPolicyConfig {
    #[default]
    Drop,
    FailPipeline,
}

impl From<ErrorPolicyConfig> for ErrorPolicy {
    fn from(value: ErrorPolicyConfig) -> Self {
        match value {
            ErrorPolicyConfig::Drop => ErrorPolicy::Drop,
            ErrorPolicyConfig::FailPipeline => ErrorPolicy::FailPipeline,
        }
    }
}