use clap::{Parser, Subcommand};
use std::sync::Arc;
use tracing_subscriber::EnvFilter;
#[derive(Parser)]
#[command(name = "z8run", version, about, long_about = None)]
struct Cli {
#[arg(long, env = "Z8_LOG_LEVEL", default_value = "info")]
log_level: String,
#[arg(long, env = "Z8_DATA_DIR", default_value = "./data")]
data_dir: String,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Serve {
#[arg(short, long, env = "Z8_PORT", default_value = "7700")]
port: u16,
#[arg(long, env = "Z8_BIND", default_value = "0.0.0.0")]
bind: String,
#[arg(long, env = "Z8_DB_URL")]
db_url: Option<String>,
},
Migrate {
#[arg(long, env = "Z8_DB_URL")]
db_url: Option<String>,
},
Plugin {
#[command(subcommand)]
action: PluginAction,
},
Info,
Validate {
file: String,
},
}
#[derive(Subcommand)]
enum PluginAction {
List,
Install {
source: String,
},
Remove {
name: String,
},
Scan,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let cli = Cli::parse();
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&cli.log_level)),
)
.with_target(true)
.with_thread_ids(false)
.init();
match cli.command {
Commands::Serve { port, bind, db_url } => {
cmd_serve(port, bind, db_url, &cli.data_dir).await?;
}
Commands::Migrate { db_url } => {
cmd_migrate(db_url, &cli.data_dir).await?;
}
Commands::Plugin { action } => {
cmd_plugin(action, &cli.data_dir).await?;
}
Commands::Info => {
cmd_info();
}
Commands::Validate { file } => {
cmd_validate(&file).await?;
}
}
Ok(())
}
async fn cmd_serve(
port: u16,
bind: String,
db_url: Option<String>,
data_dir: &str,
) -> anyhow::Result<()> {
println!(
r#"
╔═══════════════════════════════════════╗
║ z8run v{} ║
║ Next Generation Flow Engine ║
╚═══════════════════════════════════════╝
"#,
env!("CARGO_PKG_VERSION")
);
tracing::info!(port, bind = %bind, data_dir, "Starting z8run server");
std::fs::create_dir_all(data_dir)?;
std::fs::create_dir_all(format!("{}/plugins", data_dir))?;
let registry = z8run_runtime::registry::PluginRegistry::new(format!("{}/plugins", data_dir));
let plugin_count = registry.scan().await.unwrap_or(0);
tracing::info!(plugins = plugin_count, "Plugins scanned");
let url = db_url.unwrap_or_else(|| format!("sqlite://{}/z8run.db?mode=rwc", data_dir));
let jwt_secret = match std::env::var("Z8_JWT_SECRET") {
Ok(secret) if !secret.is_empty() => {
tracing::info!("JWT secret loaded from Z8_JWT_SECRET");
secret
}
_ => {
if url.starts_with("postgres") || url.starts_with("mysql") {
anyhow::bail!(
"Z8_JWT_SECRET is required when using PostgreSQL or MySQL. \
Generate one with: openssl rand -base64 32"
);
}
let dev_secret: String = (0..32)
.map(|_| format!("{:02x}", rand::random::<u8>()))
.collect();
tracing::warn!(
"No Z8_JWT_SECRET set — generated ephemeral secret (tokens won't survive restarts)"
);
dev_secret
}
};
let vault_secret = std::env::var("Z8_VAULT_SECRET").unwrap_or_else(|_| jwt_secret.clone());
let (storage, user_storage, vault): (
Arc<dyn z8run_storage::repository::FlowRepository>,
Arc<dyn z8run_storage::repository::UserRepository>,
Arc<dyn z8run_storage::credential_vault::CredentialVault>,
) = if url.starts_with("postgres") {
tracing::info!(url = %url, "Connecting to PostgreSQL");
let pg = z8run_storage::postgres::PgStorage::new(&url).await?;
pg.migrate().await?;
tracing::info!("PostgreSQL ready");
let pg_arc = Arc::new(pg);
let vault_pg = Arc::new(z8run_storage::credential_vault::PgCredentialVault::new(
pg_arc.pool().clone(),
&vault_secret,
));
(
pg_arc.clone() as Arc<dyn z8run_storage::repository::FlowRepository>,
pg_arc as Arc<dyn z8run_storage::repository::UserRepository>,
vault_pg as Arc<dyn z8run_storage::credential_vault::CredentialVault>,
)
} else {
tracing::info!(url = %url, "Connecting to SQLite");
let sqlite = z8run_storage::sqlite::SqliteStorage::new(&url).await?;
sqlite.migrate().await?;
tracing::info!("SQLite ready");
let sqlite_arc = Arc::new(sqlite);
let vault_sqlite = Arc::new(z8run_storage::credential_vault::SqliteCredentialVault::new(
sqlite_arc.pool().clone(),
&vault_secret,
));
(
sqlite_arc.clone() as Arc<dyn z8run_storage::repository::FlowRepository>,
sqlite_arc as Arc<dyn z8run_storage::repository::UserRepository>,
vault_sqlite as Arc<dyn z8run_storage::credential_vault::CredentialVault>,
)
};
tracing::info!("Credential vault initialized");
let state = Arc::new(z8run_api::state::AppState::new(
storage,
user_storage,
vault,
jwt_secret,
port,
));
z8run_core::nodes::register_builtin_nodes(&state.engine).await;
tracing::info!("Built-in nodes registered");
let app = z8run_api::build_router(state);
let addr = format!("{}:{}", bind, port);
let listener = tokio::net::TcpListener::bind(&addr).await?;
tracing::info!(address = %addr, "Server ready");
tracing::info!("Editor: http://{}:{}", bind, port);
axum::serve(listener, app).await?;
Ok(())
}
async fn cmd_migrate(db_url: Option<String>, data_dir: &str) -> anyhow::Result<()> {
std::fs::create_dir_all(data_dir)?;
let url = db_url.unwrap_or_else(|| format!("sqlite://{}/z8run.db?mode=rwc", data_dir));
tracing::info!(url = %url, "Running migrations...");
if url.starts_with("postgres") {
let pg = z8run_storage::postgres::PgStorage::new(&url).await?;
pg.migrate().await.map_err(|e| anyhow::anyhow!(e))?;
} else {
let sqlite = z8run_storage::sqlite::SqliteStorage::new(&url).await?;
sqlite.migrate().await.map_err(|e| anyhow::anyhow!(e))?;
}
tracing::info!("Migrations completed");
Ok(())
}
async fn cmd_plugin(action: PluginAction, data_dir: &str) -> anyhow::Result<()> {
let registry = z8run_runtime::registry::PluginRegistry::new(format!("{}/plugins", data_dir));
match action {
PluginAction::List => {
let plugins = registry.list().await;
if plugins.is_empty() {
println!("No plugins installed.");
} else {
println!("{:<20} {:<10} DESCRIPTION", "NAME", "VERSION");
println!("{}", "-".repeat(60));
for p in plugins {
println!(
"{:<20} {:<10} {}",
p.manifest.name, p.manifest.version, p.manifest.description
);
}
}
}
PluginAction::Install { source } => {
let source_path = std::path::Path::new(&source);
println!("Installing plugin from: {}", source);
match registry.install_local(source_path).await {
Ok(name) => println!("✓ Plugin '{}' installed successfully", name),
Err(e) => {
eprintln!("✗ Failed to install plugin: {}", e);
std::process::exit(1);
}
}
}
PluginAction::Remove { name } => {
println!("Removing plugin: {}", name);
match registry.remove(&name).await {
Ok(()) => println!("✓ Plugin '{}' removed successfully", name),
Err(e) => {
eprintln!("✗ Failed to remove plugin: {}", e);
std::process::exit(1);
}
}
}
PluginAction::Scan => {
let count = registry.scan().await?;
println!("{} plugins found and registered", count);
}
}
Ok(())
}
fn cmd_info() {
println!("z8run v{}", env!("CARGO_PKG_VERSION"));
println!("Next Generation Visual Flow Engine");
println!();
println!("License: Apache-2.0 / MIT");
println!("Repository: https://github.com/z8run/z8run");
println!("Web: https://z8run.org");
println!();
println!("Runtime: Rust + Tokio (async multi-thread)");
println!("Plugins: WebAssembly (wasmtime)");
println!("Protocol: Binary over WebSockets");
}
async fn cmd_validate(file: &str) -> anyhow::Result<()> {
let content = std::fs::read_to_string(file)?;
let flow: z8run_core::Flow = serde_json::from_str(&content)?;
println!("Flow: {} ({})", flow.name, flow.id);
println!("Nodes: {}", flow.nodes.len());
println!("Edges: {}", flow.edges.len());
match flow.validate_acyclic() {
Ok(()) => println!("✓ Valid graph (DAG without cycles)"),
Err(e) => println!("✗ Error: {}", e),
}
match flow.topological_order() {
Ok(order) => {
println!("✓ Execution order:");
for (i, node_id) in order.iter().enumerate() {
if let Some(node) = flow.find_node(*node_id) {
println!(" {}. {} ({})", i + 1, node.name, node.node_type);
}
}
}
Err(e) => println!("✗ Error: {}", e),
}
Ok(())
}