use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use aa_core::config::LocalModeConfig;
use axum::{routing::get, Extension, Router};
use sqlx::sqlite::SqlitePool;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use crate::dashboard_server::{dashboard_router, find_dashboard_dist};
use crate::routes::admin_status::{admin_status, AdminStatusState};
use crate::routes::healthz::{healthz, HealthzState};
use crate::storage::{SqliteBackend, SqliteConfig, StorageBackend, StorageError};
pub struct LocalGatewayHandle {
pub local_addr: SocketAddr,
pub(crate) shutdown_tx: oneshot::Sender<()>,
pub(crate) server_task: Option<tokio::task::JoinHandle<()>>,
pub(crate) pool: Option<SqlitePool>,
pub(crate) storage: Option<Arc<dyn StorageBackend>>,
pub(crate) pid_path: Option<PathBuf>,
}
impl LocalGatewayHandle {
pub async fn shutdown(self) -> Result<(), LocalModeError> {
let _ = self.shutdown_tx.send(());
if let Some(task) = self.server_task {
let _ = task.await;
}
if let Some(pid_path) = self.pid_path {
let _ = std::fs::remove_file(pid_path);
}
drop(self.storage);
if let Some(pool) = self.pool {
pool.close().await;
}
Ok(())
}
}
#[cfg(unix)]
pub(crate) async fn wait_for_shutdown_signal() -> Result<(), LocalModeError> {
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).map_err(LocalModeError::Signal)?;
tokio::select! {
_ = tokio::signal::ctrl_c() => Ok(()),
_ = sigterm.recv() => Ok(()),
}
}
#[cfg(not(unix))]
pub(crate) async fn wait_for_shutdown_signal() -> Result<(), LocalModeError> {
tokio::signal::ctrl_c().await.map_err(LocalModeError::Signal)
}
pub async fn run_until_shutdown(handle: LocalGatewayHandle) -> Result<(), LocalModeError> {
wait_for_shutdown_signal().await?;
handle.shutdown().await
}
#[derive(Debug, thiserror::Error)]
pub enum LocalModeError {
#[error("failed to bind local gateway to {addr}: {source}")]
Bind {
addr: String,
#[source]
source: std::io::Error,
},
#[error("failed to open SQLite at {path}: {source}", path = path.display())]
Storage {
path: PathBuf,
#[source]
source: sqlx::Error,
},
#[error("failed to write PID file at {path}: {source}", path = path.display())]
PidFile {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("shutdown signal handler installation failed: {0}")]
Signal(#[source] std::io::Error),
#[error("storage backend error at {path}: {source}", path = path.display())]
StorageBackend {
path: PathBuf,
#[source]
source: StorageError,
},
}
pub(crate) fn router(config: &LocalModeConfig, storage: Option<Arc<dyn StorageBackend>>) -> Router {
let dist = if config.dashboard { find_dashboard_dist() } else { None };
router_with_resolved_dist(config, dist.as_deref(), storage)
}
pub(crate) fn router_with_resolved_dist(
config: &LocalModeConfig,
dist: Option<&Path>,
storage: Option<Arc<dyn StorageBackend>>,
) -> Router {
let state = HealthzState::new("local", "sqlite");
let mut app = Router::new().route("/healthz", get(healthz)).layer(Extension(state));
if let Some(backend) = storage {
let admin_state = AdminStatusState::new(
"local",
backend,
Some(config.storage_path.to_string_lossy().into_owned()),
None,
);
app = app
.route("/api/v1/admin/status", get(admin_status))
.layer(Extension(admin_state));
}
if config.dashboard {
match dist {
Some(dist) => app = app.merge(dashboard_router(dist)),
None => tracing::warn!(
target: "aa_gateway::local_mode",
"dashboard enabled but no dashboard/dist/ resolved \
(checked AAASM_DASHBOARD_DIST, installed layout, and workspace layout); \
serving /healthz only — run `pnpm --dir dashboard build` to enable the SPA",
),
}
}
app
}
pub(crate) fn ensure_storage_parent(path: &Path) -> Result<(), LocalModeError> {
let Some(parent) = path.parent() else { return Ok(()) };
if parent.as_os_str().is_empty() {
return Ok(());
}
std::fs::create_dir_all(parent).map_err(|source| LocalModeError::Storage {
path: path.to_path_buf(),
source: sqlx::Error::Io(source),
})
}
pub(crate) async fn open_storage(path: &Path) -> Result<(SqlitePool, Arc<dyn StorageBackend>), LocalModeError> {
ensure_storage_parent(path)?;
let backend = SqliteBackend::open(&SqliteConfig {
path: path.to_path_buf(),
})
.await
.map_err(|source| LocalModeError::StorageBackend {
path: path.to_path_buf(),
source,
})?;
backend
.migrate()
.await
.map_err(|source| LocalModeError::StorageBackend {
path: path.to_path_buf(),
source,
})?;
let pool = backend.pool().clone();
let storage: Arc<dyn StorageBackend> = Arc::new(backend);
Ok((pool, storage))
}
pub(crate) async fn probe_running(port: u16) -> bool {
let url = format!("http://127.0.0.1:{port}/healthz");
let Ok(client) = reqwest::Client::builder()
.timeout(std::time::Duration::from_millis(100))
.build()
else {
return false;
};
let Ok(resp) = client.get(&url).send().await else {
return false;
};
if !resp.status().is_success() {
return false;
}
let Ok(body) = resp.json::<serde_json::Value>().await else {
return false;
};
body.get("mode").and_then(|v| v.as_str()).is_some()
&& body.get("storage").and_then(|v| v.as_str()).is_some()
&& body.get("version").and_then(|v| v.as_str()).is_some()
}
pub(crate) fn pid_file_path() -> PathBuf {
dirs::home_dir().unwrap_or_default().join(".aasm/gateway.pid")
}
pub(crate) fn write_banner(addr: &SocketAddr, storage_path: &Path) {
eprintln!("Agent Assembly [local mode] v{}", env!("CARGO_PKG_VERSION"));
eprintln!(" Listening: http://{addr}");
eprintln!(" Dashboard: http://{addr}/");
eprintln!(" Storage: {} (SQLite)", storage_path.display());
eprintln!();
eprintln!(" Ctrl+C to stop.");
}
pub async fn start_local(config: &LocalModeConfig) -> Result<LocalGatewayHandle, LocalModeError> {
start_local_with_pid_path(config, &pid_file_path()).await
}
pub(crate) async fn start_local_with_pid_path(
config: &LocalModeConfig,
pid_path: &Path,
) -> Result<LocalGatewayHandle, LocalModeError> {
if probe_running(config.port).await {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), config.port);
let (shutdown_tx, _closed_rx) = oneshot::channel();
return Ok(LocalGatewayHandle {
local_addr: addr,
shutdown_tx,
server_task: None,
pool: None,
storage: None,
pid_path: None,
});
}
let (pool, storage) = open_storage(&config.storage_path).await?;
let requested_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), config.port);
let listener = TcpListener::bind(requested_addr)
.await
.map_err(|source| LocalModeError::Bind {
addr: requested_addr.to_string(),
source,
})?;
let local_addr = listener.local_addr().unwrap_or(requested_addr);
write_banner(&local_addr, &config.storage_path);
std::fs::write(pid_path, std::process::id().to_string()).map_err(|source| LocalModeError::PidFile {
path: pid_path.to_path_buf(),
source,
})?;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let app = router(config, Some(Arc::clone(&storage)));
let server_task = tokio::spawn(async move {
let _ = axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
})
.await;
});
Ok(LocalGatewayHandle {
local_addr,
shutdown_tx,
server_task: Some(server_task),
pool: Some(pool),
storage: Some(storage),
pid_path: Some(pid_path.to_path_buf()),
})
}
#[cfg(test)]
mod tests {
use super::*;
use axum::body::{to_bytes, Body};
use axum::http::{Request, StatusCode};
use tower::ServiceExt;
fn healthz_only_config() -> LocalModeConfig {
LocalModeConfig {
port: 0,
dashboard: false,
storage_path: std::path::PathBuf::from("/dev/null"),
}
}
#[tokio::test]
async fn router_serves_healthz_with_local_mode_json() {
let cfg = healthz_only_config();
let app = router(&cfg, None);
let request = Request::builder()
.uri("/healthz")
.body(Body::empty())
.expect("build request");
let response = app.oneshot(request).await.expect("router.oneshot");
assert_eq!(response.status(), StatusCode::OK);
let ctype = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or_default();
assert!(
ctype.starts_with("application/json"),
"expected application/json, got {ctype}"
);
let bytes = to_bytes(response.into_body(), 8 * 1024).await.expect("read body");
let body: serde_json::Value = serde_json::from_slice(&bytes).expect("parse json");
assert_eq!(body["mode"], "local", "mode label");
assert_eq!(body["storage"], "sqlite", "storage label");
assert_eq!(body["version"], env!("CARGO_PKG_VERSION"), "crate version");
assert!(
body["uptime_secs"].is_u64(),
"uptime_secs must be present and a u64; got {body}",
);
}
#[tokio::test]
async fn router_serves_admin_status_when_storage_is_wired() {
let tmp = tempfile::tempdir().expect("tempdir");
let db_path = tmp.path().join("local.db");
let backend = SqliteBackend::open(&SqliteConfig { path: db_path.clone() })
.await
.expect("open sqlite backend");
backend.migrate().await.expect("migrate");
let storage: Arc<dyn StorageBackend> = Arc::new(backend);
let cfg = LocalModeConfig {
port: 0,
dashboard: false,
storage_path: db_path,
};
let app = router(&cfg, Some(Arc::clone(&storage)));
let request = Request::builder()
.uri("/api/v1/admin/status")
.body(Body::empty())
.expect("build request");
let response = app.oneshot(request).await.expect("router.oneshot");
assert_eq!(response.status(), StatusCode::OK);
let bytes = to_bytes(response.into_body(), 16 * 1024).await.expect("read body");
let body: serde_json::Value = serde_json::from_slice(&bytes).expect("parse json");
assert_eq!(body["mode"], "local");
let storage_block = &body["storage"];
assert_eq!(storage_block["backend"], "sqlite");
assert_eq!(storage_block["health"], "ok");
assert!(storage_block["path"].is_string(), "sqlite path must be reported");
assert!(
storage_block.get("database_url").is_none(),
"sqlite branch must omit database_url",
);
assert!(
storage_block.get("timescaledb").is_none(),
"sqlite must omit timescaledb block",
);
assert!(storage_block["row_counts"]["audit_events_hot"].as_u64().is_some());
assert!(storage_block["row_counts"]["agents"].as_u64().is_some());
assert!(storage_block["row_counts"]["policy_versions"].as_u64().is_some());
}
#[tokio::test]
async fn router_omits_admin_status_when_storage_is_none() {
let cfg = healthz_only_config();
let app = router(&cfg, None);
let request = Request::builder()
.uri("/api/v1/admin/status")
.body(Body::empty())
.expect("build request");
let response = app.oneshot(request).await.expect("router.oneshot");
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[test]
fn ensure_storage_parent_creates_nested_directories() {
let tmp = tempfile::tempdir().expect("tempdir");
let nested = tmp.path().join("a/b/c/local.db");
assert!(!nested.parent().expect("parent").exists());
ensure_storage_parent(&nested).expect("ensure_storage_parent");
assert!(
nested.parent().expect("parent").is_dir(),
"ensure_storage_parent should mkdir -p the parent tree"
);
}
#[tokio::test]
async fn open_storage_creates_sqlite_file_in_fresh_tempdir() {
let tmp = tempfile::tempdir().expect("tempdir");
let db_path = tmp.path().join("nested/local.db");
assert!(!db_path.parent().expect("parent").exists());
assert!(!db_path.exists());
let (pool, storage) = open_storage(&db_path).await.expect("open_storage");
assert!(
db_path.is_file(),
"open_storage should materialise the SQLite file on disk"
);
assert!(!pool.is_closed(), "open_storage should return an open pool",);
storage.healthcheck().await.expect("healthcheck should succeed");
pool.close().await;
}
#[tokio::test]
async fn probe_running_returns_false_on_connection_refused() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind ephemeral port");
let port = listener.local_addr().expect("local_addr").port();
drop(listener);
let start = std::time::Instant::now();
let alive = probe_running(port).await;
let elapsed = start.elapsed();
assert!(!alive, "probe must report dead when port is closed");
assert!(
elapsed < std::time::Duration::from_millis(500),
"probe should fail fast on connection refused; took {elapsed:?}"
);
}
#[tokio::test]
async fn probe_running_returns_true_against_local_mode_router() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind ephemeral port");
let port = listener.local_addr().expect("local_addr").port();
let cfg = healthz_only_config();
let server = tokio::spawn(async move {
let _ = axum::serve(listener, router(&cfg, None)).await;
});
let alive = probe_running(port).await;
server.abort();
assert!(alive, "probe must report alive against a real local-mode /healthz");
}
#[tokio::test]
async fn probe_running_returns_false_on_body_shape_mismatch() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind ephemeral port");
let port = listener.local_addr().expect("local_addr").port();
async fn foreign_handler() -> axum::Json<serde_json::Value> {
axum::Json(serde_json::json!({"foo": "bar"}))
}
let foreign_app = Router::new().route("/healthz", get(foreign_handler));
let server = tokio::spawn(async move {
let _ = axum::serve(listener, foreign_app).await;
});
let alive = probe_running(port).await;
server.abort();
assert!(
!alive,
"probe must reject foreign /healthz responses missing HealthzBody fields"
);
}
async fn test_config_with_ephemeral_port() -> (LocalModeConfig, tempfile::TempDir, u16) {
let tmp = tempfile::tempdir().expect("tempdir");
let probe_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind ephemeral port");
let port = probe_listener.local_addr().expect("local_addr").port();
drop(probe_listener);
let config = LocalModeConfig {
port,
dashboard: false,
storage_path: tmp.path().join("local.db"),
};
(config, tmp, port)
}
#[tokio::test]
async fn start_local_binds_127_0_0_1_and_serves_healthz() {
let (config, _tmp, port) = test_config_with_ephemeral_port().await;
let pid_path = _tmp.path().join("gateway.pid");
let handle = start_local_with_pid_path(&config, &pid_path)
.await
.expect("start_local");
assert_eq!(
handle.local_addr.ip(),
IpAddr::V4(Ipv4Addr::LOCALHOST),
"start_local must bind 127.0.0.1, never 0.0.0.0"
);
assert_eq!(handle.local_addr.port(), port);
let body = reqwest::get(format!("http://127.0.0.1:{port}/healthz"))
.await
.expect("GET /healthz")
.json::<serde_json::Value>()
.await
.expect("parse json");
assert_eq!(body["mode"], "local");
assert_eq!(body["storage"], "sqlite");
let _ = handle.shutdown_tx.send(());
}
#[tokio::test]
async fn start_local_writes_pid_file_with_running_pid() {
let (config, _tmp, _port) = test_config_with_ephemeral_port().await;
let pid_path = _tmp.path().join("gateway.pid");
assert!(!pid_path.exists(), "pid file must not exist before start");
let handle = start_local_with_pid_path(&config, &pid_path)
.await
.expect("start_local");
assert!(pid_path.is_file(), "pid file must be written by start_local");
let written = std::fs::read_to_string(&pid_path).expect("read pid file");
let written_pid: u32 = written.trim().parse().expect("pid file contents must be a u32");
assert_eq!(
written_pid,
std::process::id(),
"pid file must contain the running process id"
);
let _ = handle.shutdown_tx.send(());
}
#[tokio::test]
async fn start_local_skips_when_probe_returns_true() {
let (config, _tmp, _port) = test_config_with_ephemeral_port().await;
let first_pid_path = _tmp.path().join("first.pid");
let second_pid_path = _tmp.path().join("second.pid");
let first = start_local_with_pid_path(&config, &first_pid_path)
.await
.expect("first start_local");
assert!(first_pid_path.is_file(), "first start must write its PID file");
let second = start_local_with_pid_path(&config, &second_pid_path)
.await
.expect("second start_local must succeed via probe short-circuit");
assert!(
!second_pid_path.exists(),
"short-circuited start must NOT write a new PID file"
);
assert_eq!(
second.local_addr.port(),
config.port,
"short-circuited handle must still report the configured port"
);
let _ = first.shutdown_tx.send(());
let _ = second.shutdown_tx.send(());
}
#[tokio::test]
async fn start_local_healthz_round_trip_completes_within_500ms() {
let (config, _tmp, port) = test_config_with_ephemeral_port().await;
let pid_path = _tmp.path().join("gateway.pid");
let started = std::time::Instant::now();
let handle = start_local_with_pid_path(&config, &pid_path)
.await
.expect("start_local");
let _ = reqwest::get(format!("http://127.0.0.1:{port}/healthz"))
.await
.expect("GET /healthz")
.json::<serde_json::Value>()
.await
.expect("parse json");
let elapsed = started.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(500),
"AAASM-1576 AC #5: start_local → /healthz round-trip must be < 500 ms, took {elapsed:?}"
);
let _ = handle.shutdown_tx.send(());
}
#[tokio::test]
async fn handle_shutdown_stops_the_server_within_100ms() {
let (config, _tmp, port) = test_config_with_ephemeral_port().await;
let pid_path = _tmp.path().join("gateway.pid");
let handle = start_local_with_pid_path(&config, &pid_path)
.await
.expect("start_local");
let pre = reqwest::get(format!("http://127.0.0.1:{port}/healthz")).await;
assert!(
pre.is_ok_and(|r| r.status().is_success()),
"server must respond before shutdown"
);
let started = std::time::Instant::now();
handle.shutdown().await.expect("shutdown");
let elapsed = started.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(500),
"shutdown must complete promptly; took {elapsed:?}"
);
let post = tokio::time::timeout(
std::time::Duration::from_millis(200),
reqwest::get(format!("http://127.0.0.1:{port}/healthz")),
)
.await;
let still_alive = matches!(post, Ok(Ok(resp)) if resp.status().is_success());
assert!(!still_alive, "server must not respond after shutdown");
}
#[tokio::test]
async fn handle_shutdown_removes_the_pid_file() {
let (config, _tmp, _port) = test_config_with_ephemeral_port().await;
let pid_path = _tmp.path().join("gateway.pid");
let handle = start_local_with_pid_path(&config, &pid_path)
.await
.expect("start_local");
assert!(pid_path.is_file(), "pid file must exist after start_local");
handle.shutdown().await.expect("shutdown");
assert!(!pid_path.exists(), "pid file must be removed by handle.shutdown()");
}
#[tokio::test]
async fn handle_shutdown_closes_the_sqlite_pool() {
let (config, _tmp, _port) = test_config_with_ephemeral_port().await;
let pid_path = _tmp.path().join("gateway.pid");
let handle = start_local_with_pid_path(&config, &pid_path)
.await
.expect("start_local");
let pool_clone = handle
.pool
.as_ref()
.expect("normal start_local must populate pool")
.clone();
assert!(!pool_clone.is_closed(), "pool must be open after start");
handle.shutdown().await.expect("shutdown");
assert!(
pool_clone.is_closed(),
"pool must report closed after handle.shutdown()"
);
}
fn make_dashboard_stub_dist() -> tempfile::TempDir {
let dir = tempfile::tempdir().expect("tempdir");
std::fs::write(
dir.path().join("index.html"),
r#"<!doctype html><html><body><div id="root"></div></body></html>"#,
)
.expect("write index.html");
dir
}
fn dashboard_on_config() -> LocalModeConfig {
LocalModeConfig {
port: 0,
dashboard: true,
storage_path: std::path::PathBuf::from("/dev/null"),
}
}
#[tokio::test]
async fn router_serves_dashboard_index_when_enabled_with_dist() {
let dist = make_dashboard_stub_dist();
let app = router_with_resolved_dist(&dashboard_on_config(), Some(dist.path()), None);
let response = app
.oneshot(Request::builder().uri("/").body(Body::empty()).expect("build request"))
.await
.expect("router.oneshot");
assert_eq!(response.status(), StatusCode::OK);
let bytes = to_bytes(response.into_body(), 64 * 1024).await.expect("body");
let body = std::str::from_utf8(&bytes).expect("utf8");
assert!(
body.contains(r#"<div id="root">"#),
"GET / must serve the dashboard index when dashboard is enabled; got: {body}"
);
}
#[tokio::test]
async fn router_falls_back_to_index_on_unknown_spa_route() {
let dist = make_dashboard_stub_dist();
let app = router_with_resolved_dist(&dashboard_on_config(), Some(dist.path()), None);
let response = app
.oneshot(
Request::builder()
.uri("/agents/abc")
.body(Body::empty())
.expect("build request"),
)
.await
.expect("router.oneshot");
assert_eq!(
response.status(),
StatusCode::OK,
"SPA fallback must return 200, not 404"
);
let bytes = to_bytes(response.into_body(), 64 * 1024).await.expect("body");
let body = std::str::from_utf8(&bytes).expect("utf8");
assert!(
body.contains(r#"<div id="root">"#),
"SPA fallback body must be index.html; got: {body}"
);
}
#[tokio::test]
async fn router_preserves_healthz_when_dashboard_enabled() {
let dist = make_dashboard_stub_dist();
let app = router_with_resolved_dist(&dashboard_on_config(), Some(dist.path()), None);
let response = app
.oneshot(
Request::builder()
.uri("/healthz")
.body(Body::empty())
.expect("build request"),
)
.await
.expect("router.oneshot");
assert_eq!(response.status(), StatusCode::OK);
let ctype = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
assert!(
ctype.starts_with("application/json"),
"API route must keep its JSON content-type with the SPA mounted; got {ctype:?}"
);
let bytes = to_bytes(response.into_body(), 8 * 1024).await.expect("body");
let body: serde_json::Value = serde_json::from_slice(&bytes).expect("parse json");
assert_eq!(body["mode"], "local");
}
#[tokio::test]
async fn router_does_not_mount_dashboard_when_config_disables_it() {
let dist = make_dashboard_stub_dist();
let cfg = LocalModeConfig {
port: 0,
dashboard: false,
storage_path: std::path::PathBuf::from("/dev/null"),
};
let app = router_with_resolved_dist(&cfg, Some(dist.path()), None);
let response = app
.oneshot(Request::builder().uri("/").body(Body::empty()).expect("build request"))
.await
.expect("router.oneshot");
assert_eq!(
response.status(),
StatusCode::NOT_FOUND,
"with dashboard disabled, GET / must return 404 — no SPA mounted"
);
}
#[tokio::test]
async fn router_serves_healthz_when_dashboard_enabled_but_dist_missing() {
let app = router_with_resolved_dist(&dashboard_on_config(), None, None);
let healthz = app
.clone()
.oneshot(
Request::builder()
.uri("/healthz")
.body(Body::empty())
.expect("build request"),
)
.await
.expect("router.oneshot");
assert_eq!(healthz.status(), StatusCode::OK);
let root = app
.oneshot(Request::builder().uri("/").body(Body::empty()).expect("build request"))
.await
.expect("router.oneshot");
assert_eq!(
root.status(),
StatusCode::NOT_FOUND,
"no dist resolved → no SPA mounted, but the gateway must still answer /healthz"
);
}
}