use std::path::PathBuf;
use std::process::ExitCode;
use clap::{Parser, Subcommand};
use pipeflow::prelude::*;
#[derive(Parser)]
#[command(name = "pipeflow")]
#[command(version, about, long_about = None)]
struct Cli {
#[arg(short, long, global = true)]
verbose: bool,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Run {
#[arg(value_name = "CONFIG", required = true)]
config: PathBuf,
},
Config {
#[command(subcommand)]
command: ConfigCommands,
},
}
#[derive(Subcommand)]
enum ConfigCommands {
Validate {
#[arg(value_name = "CONFIG", required = true)]
config: PathBuf,
},
Graph {
#[arg(value_name = "CONFIG", required = true)]
config: PathBuf,
},
Show {
#[arg(value_name = "CONFIG", required = true)]
config: PathBuf,
#[arg(short, long, default_value = "yaml")]
format: OutputFormat,
},
}
#[derive(Clone, Copy, Default, clap::ValueEnum)]
enum OutputFormat {
#[default]
Yaml,
Json,
}
#[tokio::main]
async fn main() -> ExitCode {
let cli = Cli::parse();
use tracing_subscriber::EnvFilter;
let default_level = if cli.verbose { "debug" } else { "info" };
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));
tracing_subscriber::fmt().with_env_filter(filter).init();
let result = match cli.command {
Commands::Run { config } => cmd_run(config).await,
Commands::Config { command } => match command {
ConfigCommands::Validate { config } => cmd_validate(config),
ConfigCommands::Graph { config } => cmd_graph(config),
ConfigCommands::Show { config, format } => cmd_show(config, format),
},
};
match result {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
eprintln!("Error: {e}");
ExitCode::FAILURE
}
}
}
async fn cmd_run(path: PathBuf) -> Result<()> {
tracing::info!(config = ?path, "Loading configuration");
let mut engine = Engine::from_file(&path)?;
engine.build().await?;
engine.run().await?;
Ok(())
}
fn cmd_validate(path: PathBuf) -> Result<()> {
tracing::info!(config = ?path, "Validating configuration");
let config = Config::from_file(&path)?;
println!("✓ Configuration is valid");
println!(" Sources: {} defined", config.pipeline.sources.len());
println!(" Transforms: {} defined", config.pipeline.transforms.len());
println!(" Sinks: {} defined", config.pipeline.sinks.len());
Ok(())
}
fn cmd_show(path: PathBuf, format: OutputFormat) -> Result<()> {
tracing::info!(config = ?path, "Loading configuration");
let config = Config::from_file(&path)?;
let output = match format {
OutputFormat::Yaml => serde_yaml::to_string(&config)?,
OutputFormat::Json => serde_json::to_string_pretty(&config)?,
};
println!("{}", output);
Ok(())
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum NodeKind {
Source,
Transform,
Sink,
}
impl NodeKind {
fn label(self) -> &'static str {
match self {
Self::Source => "source",
Self::Transform => "transform",
Self::Sink => "sink",
}
}
}
#[derive(Clone, Debug)]
struct NodeMeta {
kind: NodeKind,
ty: String,
steps: Vec<String>,
}
fn cmd_graph(path: PathBuf) -> Result<()> {
use std::collections::{BTreeMap, BTreeSet};
tracing::info!(config = ?path, "Loading configuration");
let config = Config::from_file(&path)?;
let mut meta: BTreeMap<String, NodeMeta> = BTreeMap::new();
for source in &config.pipeline.sources {
meta.insert(
source.id.clone(),
NodeMeta {
kind: NodeKind::Source,
ty: source.source_type.clone(),
steps: Vec::new(),
},
);
}
for transform in &config.pipeline.transforms {
let steps: Vec<String> = transform
.steps
.iter()
.map(|s| s.step_type.clone())
.collect();
meta.insert(
transform.id.clone(),
NodeMeta {
kind: NodeKind::Transform,
ty: "pipeline".to_string(),
steps,
},
);
}
for sink in &config.pipeline.sinks {
meta.insert(
sink.id.clone(),
NodeMeta {
kind: NodeKind::Sink,
ty: sink.sink_type.clone(),
steps: Vec::new(),
},
);
}
let mut incoming: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
let mut outgoing: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
let mut add_edge = |from: &str, to: &str| {
outgoing
.entry(from.to_string())
.or_default()
.insert(to.to_string());
incoming
.entry(to.to_string())
.or_default()
.insert(from.to_string());
};
for (from, to) in config.input_edges() {
add_edge(from, to);
}
let fmt_node = |id: &str, meta: &BTreeMap<String, NodeMeta>| -> String {
let Some(m) = meta.get(id) else {
return id.to_string();
};
let type_info = if m.kind == NodeKind::Transform && !m.steps.is_empty() {
m.steps.join(" -> ")
} else {
m.ty.clone()
};
format!("{id} [{}:{}]", m.kind.label(), type_info)
};
println!("Pipeline Graph (normalized)");
println!("Legend: '->' pipeline edge");
println!();
println!("Sources ({})", config.pipeline.sources.len());
for source in &config.pipeline.sources {
println!(" - {}", fmt_node(&source.id, &meta));
if let Some(targets) = outgoing.get(&source.id) {
for target in targets {
println!(" -> {}", target);
}
}
}
println!();
println!("Transforms ({})", config.pipeline.transforms.len());
for transform in &config.pipeline.transforms {
println!(" - {}", fmt_node(&transform.id, &meta));
if let Some(inputs) = incoming.get(&transform.id) {
for input in inputs {
println!(" <- {}", input);
}
}
if let Some(targets) = outgoing.get(&transform.id) {
for target in targets {
println!(" -> {}", target);
}
}
}
println!();
println!("Sinks ({})", config.pipeline.sinks.len());
for sink in &config.pipeline.sinks {
println!(" - {}", fmt_node(&sink.id, &meta));
if let Some(inputs) = incoming.get(&sink.id) {
for input in inputs {
println!(" <- {}", input);
}
}
}
Ok(())
}