use std::sync::Arc;
use serde::de::DeserializeOwned;
use crate::error::{Error, Result};
use crate::sink::Sink;
use crate::source::Source;
use crate::transform::Transform;
fn parse_node_config<T: DeserializeOwned>(
config: &serde_yaml::Value,
node_type: &str,
) -> Result<T> {
serde_yaml::from_value(config.clone())
.map_err(|e| Error::config(format!("Invalid {} config: {}", node_type, e)))
}
pub(crate) fn build_source(
id: &str,
source_type: &str,
config: &serde_yaml::Value,
) -> Result<Arc<dyn Source>> {
match source_type {
"http_client" => {
#[cfg(feature = "http-client")]
{
use crate::source::http_client::{HttpClientConfig, HttpClientSource};
let cfg: HttpClientConfig = parse_node_config(config, "http_client")?;
Ok(Arc::new(HttpClientSource::new(id, cfg)?))
}
#[cfg(not(feature = "http-client"))]
{
let _ = config;
Err(Error::config(
"Source type 'http_client' requires feature 'http-client'",
))
}
}
"http_server" => {
#[cfg(feature = "http-server")]
{
use crate::source::http_server::{HttpServerConfig, HttpServerSource};
let cfg: HttpServerConfig = parse_node_config(config, "http_server")?;
Ok(Arc::new(HttpServerSource::new(id, cfg)?))
}
#[cfg(not(feature = "http-server"))]
{
let _ = config;
Err(Error::config(
"Source type 'http_server' requires feature 'http-server'",
))
}
}
"sql" | "database" => {
#[cfg(feature = "database")]
{
use crate::source::sql::{SqlSource, SqlSourceConfig};
let cfg: SqlSourceConfig = parse_node_config(config, "sql")?;
Ok(Arc::new(SqlSource::new(id, cfg)?))
}
#[cfg(not(feature = "database"))]
{
let _ = config;
Err(Error::config(
"Source type 'sql' requires feature 'database'",
))
}
}
"redis" => {
#[cfg(feature = "redis")]
{
use crate::source::redis::{RedisSource, RedisSourceConfig};
let cfg: RedisSourceConfig = parse_node_config(config, "redis")?;
Ok(Arc::new(RedisSource::new(id, cfg)?))
}
#[cfg(not(feature = "redis"))]
{
let _ = config;
Err(Error::config(
"Source type 'redis' requires feature 'redis'",
))
}
}
"file" => {
#[cfg(feature = "file")]
{
use crate::source::file::{FileSource, FileSourceConfig};
let cfg: FileSourceConfig = parse_node_config(config, "file")?;
Ok(Arc::new(FileSource::new(id, cfg)?))
}
#[cfg(not(feature = "file"))]
{
let _ = config;
Err(Error::config("Source type 'file' requires feature 'file'"))
}
}
other => Err(Error::config(format!("Unknown source type: {}", other))),
}
}
pub(crate) fn build_transform(
config: &crate::config::TransformConfig,
) -> Result<Arc<dyn Transform>> {
use crate::transform::pipeline::TransformPipeline;
let mut steps: Vec<Box<dyn crate::transform::step::Step>> =
Vec::with_capacity(config.steps.len());
for step_config in &config.steps {
let step = build_step(step_config)?;
steps.push(step);
}
Ok(Arc::new(TransformPipeline::new(&config.id, steps)))
}
fn build_step(config: &crate::config::StepConfig) -> Result<Box<dyn crate::transform::step::Step>> {
match config.step_type.as_str() {
"filter" => {
use crate::transform::filter::{FilterStep, FilterStepConfig};
let cfg: FilterStepConfig = parse_node_config(&config.config, "filter")?;
Ok(Box::new(FilterStep::new(cfg)?))
}
"hash" => {
use crate::transform::hash::{HashStep, HashStepConfig};
let cfg: HashStepConfig = parse_node_config(&config.config, "hash")?;
Ok(Box::new(HashStep::new(cfg)?))
}
"remap" => {
use crate::transform::remap::{RemapStep, RemapStepConfig};
let cfg: RemapStepConfig = parse_node_config(&config.config, "remap")?;
Ok(Box::new(RemapStep::new(cfg)?))
}
"window" => {
use crate::transform::window::{WindowStep, WindowStepConfig};
let cfg: WindowStepConfig = parse_node_config(&config.config, "window")?;
Ok(Box::new(WindowStep::new(cfg)?))
}
"compute" => {
use crate::transform::compute::{ComputeStep, ComputeStepConfig};
let cfg: ComputeStepConfig = parse_node_config(&config.config, "compute")?;
Ok(Box::new(ComputeStep::new(cfg)?))
}
other => Err(Error::config(format!("Unknown step type: {}", other))),
}
}
pub(crate) async fn build_sink(
config: &crate::config::SinkConfig,
system: &crate::config::SystemConfig,
) -> Result<Arc<dyn Sink>> {
use crate::sink::blackhole::BlackholeSink;
match config.sink_type.as_str() {
"console" => {
use crate::sink::console::{ConsoleSink, ConsoleSinkConfig};
let cfg: ConsoleSinkConfig = parse_node_config(&config.config, "console")?;
Ok(Arc::new(ConsoleSink::new(&config.id, cfg)) as Arc<dyn Sink>)
}
"file" => {
#[cfg(feature = "file")]
{
use crate::sink::file::{FileSink, FileSinkConfig};
let cfg: FileSinkConfig = parse_node_config(&config.config, "file")?;
Ok(Arc::new(FileSink::new(&config.id, cfg).await?) as Arc<dyn Sink>)
}
#[cfg(not(feature = "file"))]
{
Err(Error::config("Sink type 'file' requires feature 'file'"))
}
}
"blackhole" => Ok(Arc::new(BlackholeSink::new(&config.id)) as Arc<dyn Sink>),
"sql" => {
#[cfg(feature = "database")]
{
use crate::sink::sql::{SqlSink, SqlSinkConfig};
let cfg: SqlSinkConfig = parse_node_config(&config.config, "sql")?;
Ok(Arc::new(SqlSink::new(&config.id, cfg).await?) as Arc<dyn Sink>)
}
#[cfg(not(feature = "database"))]
{
Err(Error::config("Sink type 'sql' requires feature 'database'"))
}
}
"http_client" | "http" => {
#[cfg(feature = "http-client")]
{
use crate::sink::http_client::{HttpClientSink, HttpClientSinkConfig};
let cfg: HttpClientSinkConfig = parse_node_config(&config.config, "http_client")?;
Ok(Arc::new(HttpClientSink::new(&config.id, cfg)?) as Arc<dyn Sink>)
}
#[cfg(not(feature = "http-client"))]
{
Err(Error::config(
"Sink type 'http_client' requires feature 'http-client'",
))
}
}
"redis" => {
#[cfg(feature = "redis")]
{
use crate::sink::redis::{RedisSink, RedisSinkConfig};
let cfg: RedisSinkConfig = parse_node_config(&config.config, "redis")?;
Ok(Arc::new(RedisSink::new(&config.id, cfg).await?) as Arc<dyn Sink>)
}
#[cfg(not(feature = "redis"))]
{
Err(Error::config("Sink type 'redis' requires feature 'redis'"))
}
}
"notify" => {
#[cfg(feature = "notify")]
{
use crate::sink::notify::{NotifySink, NotifySinkConfig};
let cfg: NotifySinkConfig = parse_node_config(&config.config, "notify")?;
Ok(Arc::new(NotifySink::new(&config.id, cfg, &system.notify)?) as Arc<dyn Sink>)
}
#[cfg(not(feature = "notify"))]
{
Err(Error::config(
"Sink type 'notify' requires feature 'notify'",
))
}
}
other => Err(Error::config(format!("Unknown sink type: {}", other))),
}
}