mod cli;
mod engines;
mod metrics;
mod server;
mod ws;
use clap::{Args, Parser, Subcommand};
use cli::service::ServiceCommand;
use engines::{EngineOverride, EngineType};
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
#[derive(Parser, Debug)]
#[command(name = "spark-dashboard", version, about)]
struct Cli {
#[command(flatten)]
run: RunArgs,
#[command(subcommand)]
command: Option<Command>,
}
#[derive(Debug, Subcommand)]
enum Command {
#[command(subcommand)]
Service(ServiceCommand),
}
#[derive(Args, Debug)]
struct RunArgs {
#[arg(
short = 'p',
long,
env = "SPARK_DASHBOARD_PORT",
default_value_t = 3000
)]
port: u16,
#[arg(
short = 'b',
long,
env = "SPARK_DASHBOARD_BIND",
default_value = "0.0.0.0"
)]
bind: String,
#[arg(long, env = "SPARK_DASHBOARD_POLL_INTERVAL", default_value_t = 1000)]
poll_interval: u64,
#[arg(long, env = "SPARK_DASHBOARD_GPU_INDEX", default_value_t = 0)]
gpu_index: u32,
#[arg(long, value_name = "TYPE")]
engine: Vec<String>,
#[arg(long, value_name = "URL")]
engine_url: Vec<String>,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
match cli.command {
Some(Command::Service(cmd)) => cli::service::dispatch(cmd),
None => run_server(cli.run),
}
}
fn run_server(args: RunArgs) -> Result<(), Box<dyn std::error::Error>> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async move { run_server_inner(args).await })
}
async fn run_server_inner(args: RunArgs) -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let overrides: Vec<EngineOverride> = args
.engine
.iter()
.zip(args.engine_url.iter())
.filter_map(|(engine_str, url)| {
let engine_type = match engine_str.to_lowercase().as_str() {
"vllm" => EngineType::Vllm,
unknown => {
tracing::warn!("Unknown engine type '{}', ignoring override", unknown);
return None;
}
};
Some(EngineOverride {
engine_type,
endpoint: url.clone(),
})
})
.collect();
if !overrides.is_empty() {
tracing::info!("Manual engine overrides: {:?}", overrides);
}
let (tx, _rx) = broadcast::channel::<String>(16);
let engine_state: Arc<RwLock<Vec<engines::EngineSnapshot>>> = Arc::new(RwLock::new(Vec::new()));
tokio::spawn(engines::engine_collector_loop(
engine_state.clone(),
overrides,
));
tokio::spawn(metrics::metrics_collector(
tx.clone(),
args.poll_interval,
args.gpu_index,
engine_state.clone(),
));
let app = server::create_router(tx);
let addr = format!("{}:{}", args.bind, args.port);
let listener = tokio::net::TcpListener::bind(&addr).await?;
tracing::info!("Spark Dashboard running at http://{}", addr);
axum::serve(listener, app).await?;
Ok(())
}