pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! pipeflow CLI - Configuration-driven data pipeline runner

use std::path::PathBuf;
use std::process::ExitCode;

use clap::{Parser, Subcommand};

use pipeflow::prelude::*;

/// A lightweight, configuration-driven data pipeline framework
#[derive(Parser)]
#[command(name = "pipeflow")]
#[command(version, about, long_about = None)]
struct Cli {
    /// Enable verbose output
    #[arg(short, long, global = true)]
    verbose: bool,

    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand)]
enum Commands {
    /// Run from configuration file
    Run {
        /// Path to configuration file (YAML)
        #[arg(value_name = "CONFIG", required = true)]
        config: PathBuf,
    },
    /// Configuration management commands
    Config {
        #[command(subcommand)]
        command: ConfigCommands,
    },
}

#[derive(Subcommand)]
enum ConfigCommands {
    /// Validate configuration file
    Validate {
        /// Path to configuration file (YAML)
        #[arg(value_name = "CONFIG", required = true)]
        config: PathBuf,
    },
    /// Show pipeline graph (ASCII)
    Graph {
        /// Path to configuration file (YAML)
        #[arg(value_name = "CONFIG", required = true)]
        config: PathBuf,
    },
    /// Show merged and normalized configuration
    Show {
        /// Path to configuration file (YAML)
        #[arg(value_name = "CONFIG", required = true)]
        config: PathBuf,
        /// Output format
        #[arg(short, long, default_value = "yaml")]
        format: OutputFormat,
    },
}

/// Output format for config command
#[derive(Clone, Copy, Default, clap::ValueEnum)]
enum OutputFormat {
    #[default]
    Yaml,
    Json,
}

#[tokio::main]
async fn main() -> ExitCode {
    let cli = Cli::parse();

    // Initialize tracing with RUST_LOG env var support
    // Priority: RUST_LOG > --verbose > default (INFO)
    use tracing_subscriber::EnvFilter;

    let default_level = if cli.verbose { "debug" } else { "info" };
    let filter =
        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));

    tracing_subscriber::fmt().with_env_filter(filter).init();

    let result = match cli.command {
        Commands::Run { config } => cmd_run(config).await,
        Commands::Config { command } => match command {
            ConfigCommands::Validate { config } => cmd_validate(config),
            ConfigCommands::Graph { config } => cmd_graph(config),
            ConfigCommands::Show { config, format } => cmd_show(config, format),
        },
    };

    match result {
        Ok(()) => ExitCode::SUCCESS,
        Err(e) => {
            eprintln!("Error: {e}");
            ExitCode::FAILURE
        }
    }
}

/// Run from configuration
async fn cmd_run(path: PathBuf) -> Result<()> {
    tracing::info!(config = ?path, "Loading configuration");

    let mut engine = Engine::from_file(&path)?;
    engine.build().await?;
    engine.run().await?;

    Ok(())
}

/// Validate configuration
fn cmd_validate(path: PathBuf) -> Result<()> {
    tracing::info!(config = ?path, "Validating configuration");

    let config = Config::from_file(&path)?;

    println!("✓ Configuration is valid");
    println!("  Sources:    {} defined", config.pipeline.sources.len());
    println!("  Transforms: {} defined", config.pipeline.transforms.len());
    println!("  Sinks:      {} defined", config.pipeline.sinks.len());

    Ok(())
}

/// Show merged and normalized configuration
fn cmd_show(path: PathBuf, format: OutputFormat) -> Result<()> {
    tracing::info!(config = ?path, "Loading configuration");

    let config = Config::from_file(&path)?;

    let output = match format {
        OutputFormat::Yaml => serde_yaml::to_string(&config)?,
        OutputFormat::Json => serde_json::to_string_pretty(&config)?,
    };

    println!("{}", output);
    Ok(())
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum NodeKind {
    Source,
    Transform,
    Sink,
}

impl NodeKind {
    fn label(self) -> &'static str {
        match self {
            Self::Source => "source",
            Self::Transform => "transform",
            Self::Sink => "sink",
        }
    }
}

#[derive(Clone, Debug)]
struct NodeMeta {
    kind: NodeKind,
    ty: String,
    steps: Vec<String>,
}

/// Show pipeline graph in ASCII.
fn cmd_graph(path: PathBuf) -> Result<()> {
    use std::collections::{BTreeMap, BTreeSet};

    tracing::info!(config = ?path, "Loading configuration");

    let config = Config::from_file(&path)?;

    let mut meta: BTreeMap<String, NodeMeta> = BTreeMap::new();

    for source in &config.pipeline.sources {
        meta.insert(
            source.id.clone(),
            NodeMeta {
                kind: NodeKind::Source,
                ty: source.source_type.clone(),
                steps: Vec::new(),
            },
        );
    }

    for transform in &config.pipeline.transforms {
        let steps: Vec<String> = transform
            .steps
            .iter()
            .map(|s| s.step_type.clone())
            .collect();

        meta.insert(
            transform.id.clone(),
            NodeMeta {
                kind: NodeKind::Transform,
                ty: "pipeline".to_string(),
                steps,
            },
        );
    }

    for sink in &config.pipeline.sinks {
        meta.insert(
            sink.id.clone(),
            NodeMeta {
                kind: NodeKind::Sink,
                ty: sink.sink_type.clone(),
                steps: Vec::new(),
            },
        );
    }

    // input edges: input (emitter) -> consumer (transform or sink)
    let mut incoming: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
    let mut outgoing: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();

    let mut add_edge = |from: &str, to: &str| {
        outgoing
            .entry(from.to_string())
            .or_default()
            .insert(to.to_string());
        incoming
            .entry(to.to_string())
            .or_default()
            .insert(from.to_string());
    };

    for (from, to) in config.input_edges() {
        add_edge(from, to);
    }

    let fmt_node = |id: &str, meta: &BTreeMap<String, NodeMeta>| -> String {
        let Some(m) = meta.get(id) else {
            return id.to_string();
        };

        let type_info = if m.kind == NodeKind::Transform && !m.steps.is_empty() {
            m.steps.join(" -> ")
        } else {
            m.ty.clone()
        };

        format!("{id} [{}:{}]", m.kind.label(), type_info)
    };

    println!("Pipeline Graph (normalized)");
    println!("Legend: '->' pipeline edge");

    println!();
    println!("Sources ({})", config.pipeline.sources.len());
    for source in &config.pipeline.sources {
        println!("  - {}", fmt_node(&source.id, &meta));

        if let Some(targets) = outgoing.get(&source.id) {
            for target in targets {
                println!("      -> {}", target);
            }
        }
    }

    println!();
    println!("Transforms ({})", config.pipeline.transforms.len());
    for transform in &config.pipeline.transforms {
        println!("  - {}", fmt_node(&transform.id, &meta));

        if let Some(inputs) = incoming.get(&transform.id) {
            for input in inputs {
                println!("      <- {}", input);
            }
        }

        if let Some(targets) = outgoing.get(&transform.id) {
            for target in targets {
                println!("      -> {}", target);
            }
        }
    }

    println!();
    println!("Sinks ({})", config.pipeline.sinks.len());
    for sink in &config.pipeline.sinks {
        println!("  - {}", fmt_node(&sink.id, &meta));

        if let Some(inputs) = incoming.get(&sink.id) {
            for input in inputs {
                println!("      <- {}", input);
            }
        }
    }

    Ok(())
}