pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
use std::sync::Arc;

use serde::de::DeserializeOwned;

use crate::error::{Error, Result};
use crate::sink::Sink;
use crate::source::Source;
use crate::transform::Transform;

/// Parse a YAML config value into a typed config struct.
fn parse_node_config<T: DeserializeOwned>(
    config: &serde_yaml::Value,
    node_type: &str,
) -> Result<T> {
    serde_yaml::from_value(config.clone())
        .map_err(|e| Error::config(format!("Invalid {} config: {}", node_type, e)))
}

pub(crate) fn build_source(
    id: &str,
    source_type: &str,
    config: &serde_yaml::Value,
) -> Result<Arc<dyn Source>> {
    match source_type {
        "http_client" => {
            #[cfg(feature = "http-client")]
            {
                use crate::source::http_client::{HttpClientConfig, HttpClientSource};
                let cfg: HttpClientConfig = parse_node_config(config, "http_client")?;
                Ok(Arc::new(HttpClientSource::new(id, cfg)?))
            }
            #[cfg(not(feature = "http-client"))]
            {
                let _ = config;
                Err(Error::config(
                    "Source type 'http_client' requires feature 'http-client'",
                ))
            }
        }
        "http_server" => {
            #[cfg(feature = "http-server")]
            {
                use crate::source::http_server::{HttpServerConfig, HttpServerSource};
                let cfg: HttpServerConfig = parse_node_config(config, "http_server")?;
                Ok(Arc::new(HttpServerSource::new(id, cfg)?))
            }
            #[cfg(not(feature = "http-server"))]
            {
                let _ = config;
                Err(Error::config(
                    "Source type 'http_server' requires feature 'http-server'",
                ))
            }
        }
        "sql" | "database" => {
            #[cfg(feature = "database")]
            {
                use crate::source::sql::{SqlSource, SqlSourceConfig};
                let cfg: SqlSourceConfig = parse_node_config(config, "sql")?;
                Ok(Arc::new(SqlSource::new(id, cfg)?))
            }
            #[cfg(not(feature = "database"))]
            {
                let _ = config;
                Err(Error::config(
                    "Source type 'sql' requires feature 'database'",
                ))
            }
        }
        "redis" => {
            #[cfg(feature = "redis")]
            {
                use crate::source::redis::{RedisSource, RedisSourceConfig};
                let cfg: RedisSourceConfig = parse_node_config(config, "redis")?;
                Ok(Arc::new(RedisSource::new(id, cfg)?))
            }
            #[cfg(not(feature = "redis"))]
            {
                let _ = config;
                Err(Error::config(
                    "Source type 'redis' requires feature 'redis'",
                ))
            }
        }
        "file" => {
            #[cfg(feature = "file")]
            {
                use crate::source::file::{FileSource, FileSourceConfig};
                let cfg: FileSourceConfig = parse_node_config(config, "file")?;
                Ok(Arc::new(FileSource::new(id, cfg)?))
            }
            #[cfg(not(feature = "file"))]
            {
                let _ = config;
                Err(Error::config("Source type 'file' requires feature 'file'"))
            }
        }
        other => Err(Error::config(format!("Unknown source type: {}", other))),
    }
}

pub(crate) fn build_transform(
    config: &crate::config::TransformConfig,
) -> Result<Arc<dyn Transform>> {
    use crate::transform::pipeline::TransformPipeline;

    let mut steps: Vec<Box<dyn crate::transform::step::Step>> =
        Vec::with_capacity(config.steps.len());
    for step_config in &config.steps {
        let step = build_step(step_config)?;
        steps.push(step);
    }

    Ok(Arc::new(TransformPipeline::new(&config.id, steps)))
}

fn build_step(config: &crate::config::StepConfig) -> Result<Box<dyn crate::transform::step::Step>> {
    match config.step_type.as_str() {
        "filter" => {
            use crate::transform::filter::{FilterStep, FilterStepConfig};
            let cfg: FilterStepConfig = parse_node_config(&config.config, "filter")?;
            Ok(Box::new(FilterStep::new(cfg)?))
        }
        "hash" => {
            use crate::transform::hash::{HashStep, HashStepConfig};
            let cfg: HashStepConfig = parse_node_config(&config.config, "hash")?;
            Ok(Box::new(HashStep::new(cfg)?))
        }
        "remap" => {
            use crate::transform::remap::{RemapStep, RemapStepConfig};
            let cfg: RemapStepConfig = parse_node_config(&config.config, "remap")?;
            Ok(Box::new(RemapStep::new(cfg)?))
        }
        "window" => {
            use crate::transform::window::{WindowStep, WindowStepConfig};
            let cfg: WindowStepConfig = parse_node_config(&config.config, "window")?;
            Ok(Box::new(WindowStep::new(cfg)?))
        }
        "compute" => {
            use crate::transform::compute::{ComputeStep, ComputeStepConfig};
            let cfg: ComputeStepConfig = parse_node_config(&config.config, "compute")?;
            Ok(Box::new(ComputeStep::new(cfg)?))
        }
        other => Err(Error::config(format!("Unknown step type: {}", other))),
    }
}

pub(crate) async fn build_sink(
    config: &crate::config::SinkConfig,
    system: &crate::config::SystemConfig,
) -> Result<Arc<dyn Sink>> {
    use crate::sink::blackhole::BlackholeSink;

    match config.sink_type.as_str() {
        "console" => {
            use crate::sink::console::{ConsoleSink, ConsoleSinkConfig};
            let cfg: ConsoleSinkConfig = parse_node_config(&config.config, "console")?;
            Ok(Arc::new(ConsoleSink::new(&config.id, cfg)) as Arc<dyn Sink>)
        }
        "file" => {
            #[cfg(feature = "file")]
            {
                use crate::sink::file::{FileSink, FileSinkConfig};
                let cfg: FileSinkConfig = parse_node_config(&config.config, "file")?;
                Ok(Arc::new(FileSink::new(&config.id, cfg).await?) as Arc<dyn Sink>)
            }
            #[cfg(not(feature = "file"))]
            {
                Err(Error::config("Sink type 'file' requires feature 'file'"))
            }
        }
        "blackhole" => Ok(Arc::new(BlackholeSink::new(&config.id)) as Arc<dyn Sink>),
        "sql" => {
            #[cfg(feature = "database")]
            {
                use crate::sink::sql::{SqlSink, SqlSinkConfig};
                let cfg: SqlSinkConfig = parse_node_config(&config.config, "sql")?;
                Ok(Arc::new(SqlSink::new(&config.id, cfg).await?) as Arc<dyn Sink>)
            }
            #[cfg(not(feature = "database"))]
            {
                Err(Error::config("Sink type 'sql' requires feature 'database'"))
            }
        }
        "http_client" | "http" => {
            #[cfg(feature = "http-client")]
            {
                use crate::sink::http_client::{HttpClientSink, HttpClientSinkConfig};
                let cfg: HttpClientSinkConfig = parse_node_config(&config.config, "http_client")?;
                Ok(Arc::new(HttpClientSink::new(&config.id, cfg)?) as Arc<dyn Sink>)
            }
            #[cfg(not(feature = "http-client"))]
            {
                Err(Error::config(
                    "Sink type 'http_client' requires feature 'http-client'",
                ))
            }
        }
        "redis" => {
            #[cfg(feature = "redis")]
            {
                use crate::sink::redis::{RedisSink, RedisSinkConfig};
                let cfg: RedisSinkConfig = parse_node_config(&config.config, "redis")?;
                Ok(Arc::new(RedisSink::new(&config.id, cfg).await?) as Arc<dyn Sink>)
            }
            #[cfg(not(feature = "redis"))]
            {
                Err(Error::config("Sink type 'redis' requires feature 'redis'"))
            }
        }
        "notify" => {
            #[cfg(feature = "notify")]
            {
                use crate::sink::notify::{NotifySink, NotifySinkConfig};
                let cfg: NotifySinkConfig = parse_node_config(&config.config, "notify")?;
                Ok(Arc::new(NotifySink::new(&config.id, cfg, &system.notify)?) as Arc<dyn Sink>)
            }
            #[cfg(not(feature = "notify"))]
            {
                Err(Error::config(
                    "Sink type 'notify' requires feature 'notify'",
                ))
            }
        }
        other => Err(Error::config(format!("Unknown sink type: {}", other))),
    }
}