use std::process::ExitCode;
use anyhow::Context;
use tokio::net::TcpListener;
use tracing::{info, warn};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
use cellos_server::jetstream::{ensure_stream, replay_projection};
use cellos_server::{router, AppState};
#[tokio::main]
async fn main() -> ExitCode {
let fmt_layer = tracing_subscriber::fmt::layer()
.json()
.with_filter(cellos_core::observability::redacted_filter());
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(fmt_layer)
.init();
match run().await {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
tracing::error!(error = format!("{e:#}"), "cellos-server fatal");
ExitCode::FAILURE
}
}
}
async fn run() -> anyhow::Result<()> {
let bind = std::env::var("CELLOS_SERVER_BIND").unwrap_or_else(|_| "127.0.0.1:8080".to_owned());
let nats_url =
std::env::var("CELLOS_NATS_URL").unwrap_or_else(|_| "nats://127.0.0.1:4222".to_owned());
let token = std::env::var("CELLOS_SERVER_API_TOKEN")
.context("CELLOS_SERVER_API_TOKEN must be set (fail-closed)")?;
if token.trim().is_empty() {
anyhow::bail!("CELLOS_SERVER_API_TOKEN must not be empty");
}
let nats = match async_nats::connect(&nats_url).await {
Ok(c) => {
info!(%nats_url, "connected to NATS");
Some(c)
}
Err(e) => {
warn!(%nats_url, error = %e, "NATS unreachable at startup; serving cached state only");
None
}
};
let mut state = AppState::new(nats.clone(), token);
if let Some(client) = &nats {
let skip_replay = std::env::var("CELLOS_SERVER_SKIP_REPLAY")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
match ensure_stream(client).await {
Ok(ctx) => {
if skip_replay {
info!("CELLOS_SERVER_SKIP_REPLAY set; skipping projection replay");
} else if let Err(e) = replay_projection(&state, &ctx).await {
warn!(error = %format!("{e:#}"), "projection replay failed; starting with empty cache");
}
state.jetstream = Some(ctx);
}
Err(e) => {
warn!(error = %format!("{e:#}"), "ensure_stream failed; WebSocket will reject until JetStream becomes available");
}
}
}
let app = router(state);
let listener = TcpListener::bind(&bind)
.await
.with_context(|| format!("binding {bind}"))?;
info!(%bind, "cellos-server listening");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.context("axum::serve")?;
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("install Ctrl-C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
info!("shutdown signal received; draining");
}