use anyhow::{Context, Result};
use axum::{extract::State, routing::get, Router};
use bollard::Docker;
use cloudevents::EventBuilder;
use futures::StreamExt;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use stormchaser_model::events::{
EventSource, EventType, RunnerEventType, RunnerHeartbeatEvent, RunnerRegisterEvent,
RunnerStepTypeSchema, SchemaVersion,
};
use stormchaser_model::nats::{publish_cloudevent, NatsSubject};
use stormchaser_model::runner::RunnerStatus;
use tokio::sync::watch;
use tokio::time;
use tracing::{error, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
use stormchaser_model::{dsl, APPLICATION_JSON};
pub mod container_machine;
pub mod handler;
pub mod parsing;
pub mod reaper;
pub mod traits;
use crate::handler::{handle_task, scan_for_orphans};
use crate::reaper::run_reaper;
#[derive(Debug, Clone)]
pub struct Config {
pub nats_url: String,
pub runner_id: String,
pub encryption_key: Option<String>,
pub rust_log: String,
}
impl Config {
pub fn from_env<I, K, V>(env: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: AsRef<str>,
{
let mut nats_url = "nats://localhost:4222".to_string();
let mut runner_id = Uuid::new_v4().to_string();
let mut encryption_key = None;
let mut rust_log = "stormchaser_runner_docker=info".to_string();
for (k, v) in env {
match k.as_ref() {
"NATS_URL" => nats_url = v.as_ref().to_string(),
"RUNNER_ID" => runner_id = v.as_ref().to_string(),
"STORMCHASER_STATE_ENCRYPTION_KEY" => encryption_key = Some(v.as_ref().to_string()),
"RUST_LOG" => rust_log = v.as_ref().to_string(),
_ => {}
}
}
Self {
nats_url,
runner_id,
encryption_key,
rust_log,
}
}
}
struct AppState {
is_ready: watch::Receiver<bool>,
}
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::from_env(std::env::vars());
run_runner(config).await
}
pub async fn run_runner(config: Config) -> Result<()> {
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install default crypto provider");
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(&config.rust_log))
.with(tracing_subscriber::fmt::layer())
.init();
let nats_url = config.nats_url;
let runner_id = config.runner_id;
let encryption_key = config.encryption_key;
if encryption_key.is_some() {
info!("State encryption is enabled");
} else {
warn!("State encryption is DISABLED. Sensitive step data in Docker labels will be stored in plaintext.");
}
info!(
"Starting Stormchaser Docker Runner {} (rev: {}, branch: {}, built: {}): {}",
env!("CARGO_PKG_VERSION"),
env!("VERGEN_GIT_SHA"),
env!("VERGEN_GIT_BRANCH"),
env!("VERGEN_BUILD_TIMESTAMP"),
runner_id
);
let docker =
Docker::connect_with_local_defaults().context("Failed to connect to Docker daemon")?;
let (ready_tx, ready_rx) = watch::channel(false);
let health_state = Arc::new(AppState { is_ready: ready_rx });
let reaper_docker = docker.clone();
tokio::spawn(async move {
if let Err(e) = run_reaper(reaper_docker).await {
error!("Reaper error: {:?}", e);
}
});
let app = Router::new()
.route("/healthz", get(|| async { "OK" }))
.route(
"/readyz",
get(|state: State<Arc<AppState>>| async move {
if *state.is_ready.borrow() {
axum::http::StatusCode::OK
} else {
axum::http::StatusCode::SERVICE_UNAVAILABLE
}
}),
)
.with_state(health_state);
let health_addr = SocketAddr::from(([0, 0, 0, 0], 8081)); let listener = tokio::net::TcpListener::bind(health_addr).await?;
info!("Health server listening on {}", health_addr);
let health_server = tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app).await {
error!("Health server error: {:?}", e);
}
});
let nats_options = async_nats::ConnectOptions::new().retry_on_initial_connect();
let nats_client = async_nats::connect_with_options(nats_url, nats_options)
.await
.context("Failed to connect to NATS")?;
let nats_subject = format!("stormchaser.v1.runner.docker.{}", runner_id);
let common_schema = schemars::schema_for!(dsl::CommonContainerSpec);
let common_schema_json = serde_json::to_value(common_schema)?;
let registration_payload = RunnerRegisterEvent {
runner_id: runner_id.clone(),
runner_type: "docker".to_string(),
protocol_version: "v1".to_string(),
nats_subject: nats_subject.clone(),
capabilities: vec![
"docker".to_string(),
"linux".to_string(),
"container".to_string(),
],
step_types: vec![RunnerStepTypeSchema {
step_type: "RunContainer".to_string(),
schema: Some(common_schema_json),
documentation: Some(
"Runs a container using Docker with a minimal common set of parameters."
.to_string(),
),
}],
};
let ce = cloudevents::EventBuilderV10::new()
.id(uuid::Uuid::new_v4().to_string())
.ty("stormchaser.v1.runner.register")
.source(EventSource::System.as_str())
.time(chrono::Utc::now())
.data(
APPLICATION_JSON,
serde_json::to_value(registration_payload).unwrap(),
)
.build()
.context("Failed to build CloudEvent")?;
let payload_bytes = serde_json::to_vec(&ce).context("Failed to serialize CloudEvent")?;
nats_client
.publish("stormchaser.v1.runner.register", payload_bytes.into())
.await
.context("Failed to publish registration event")?;
info!("Runner registered successfully");
let _ = scan_for_orphans(
docker.clone(),
nats_client.clone(),
runner_id.clone(),
encryption_key.clone(),
)
.await;
let _ = ready_tx.send(true);
let heartbeat_client = nats_client.clone();
let heartbeat_id = runner_id.clone();
let mut heartbeat_interval = time::interval(Duration::from_secs(10));
let js = async_nats::jetstream::new(nats_client.clone());
let mut runner_subscriber = nats_client.subscribe(nats_subject.clone()).await?;
info!("Ensuring JetStream stream 'stormchaser' exists...");
let stream = js
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "stormchaser".to_string(),
subjects: vec!["stormchaser.>".to_string()],
..Default::default()
})
.await
.context("Failed to ensure JetStream stream")?;
info!("Creating durable consumer for docker-runner...");
let consumer = stream
.get_or_create_consumer(
"docker-runner",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("docker-runner".to_string()),
filter_subject: "stormchaser.v1.step.scheduled.runcontainer".to_string(),
..Default::default()
},
)
.await
.context("Failed to create JetStream consumer")?;
let mut task_messages = consumer
.messages()
.await
.context("Failed to get consumer messages")?;
info!(
"Listening for tasks on {} and JetStream subject stormchaser.step.scheduled.runcontainer",
nats_subject
);
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
loop {
tokio::select! {
_ = sigterm.recv() => {
info!("SIGTERM received, shutting down...");
break;
}
_ = sigint.recv() => {
info!("SIGINT received, shutting down...");
break;
}
_ = heartbeat_interval.tick() => {
let heartbeat_payload = RunnerHeartbeatEvent {
runner_id: heartbeat_id.clone(),
version: env!("CARGO_PKG_VERSION").to_string(),
state: RunnerStatus::Online,
};
if let Err(e) = publish_cloudevent(&async_nats::jetstream::new(heartbeat_client.clone()), NatsSubject::RunnerHeartbeat, EventType::Runner(RunnerEventType::Heartbeat), EventSource::System, serde_json::to_value(heartbeat_payload).unwrap(), Some(SchemaVersion::new("1.0".to_string())), None)
.await
{
error!("Failed to publish heartbeat: {:?}", e);
}
}
message = runner_subscriber.next() => {
if let Some(msg) = message {
info!("Received runner-specific message: {:?}", msg.payload);
}
}
message = task_messages.next() => {
match message {
Some(Ok(msg)) => {
tokio::spawn(handle_task(
msg,
docker.clone(),
nats_client.clone(),
runner_id.clone(),
encryption_key.clone(),
));
}
Some(Err(e)) => {
error!("JetStream consumer error: {:?}", e);
time::sleep(Duration::from_secs(1)).await;
}
None => {
error!("JetStream consumer closed");
break;
}
}
}
}
}
health_server.abort();
info!("Runner stopped");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_from_env_defaults() {
let env: Vec<(String, String)> = vec![];
let config = Config::from_env(env);
assert_eq!(config.nats_url, "nats://localhost:4222");
assert!(config.encryption_key.is_none());
assert_eq!(config.rust_log, "stormchaser_runner_docker=info");
assert!(!config.runner_id.is_empty());
}
#[test]
fn test_config_from_env_custom() {
let env = vec![
("NATS_URL".to_string(), "nats://remote:4222".to_string()),
("RUNNER_ID".to_string(), "my-runner".to_string()),
(
"STORMCHASER_STATE_ENCRYPTION_KEY".to_string(),
"my-key".to_string(),
),
("RUST_LOG".to_string(), "debug".to_string()),
];
let config = Config::from_env(env);
assert_eq!(config.nats_url, "nats://remote:4222");
assert_eq!(config.runner_id, "my-runner");
assert_eq!(config.encryption_key.unwrap(), "my-key");
assert_eq!(config.rust_log, "debug");
}
}