mod api;
mod audit;
mod auth;
mod cleanup;
mod projector;
mod scheduler;
mod web;
#[cfg(target_os = "windows")]
mod service;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use anyhow::{Context, Result};
use clap::Parser;
use kanade_shared::config::{LogSection, load_backend_config};
use kanade_shared::default_paths;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
use tokio::net::TcpListener;
use tower_http::trace::TraceLayer;
use tracing::{error, info, warn};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
#[derive(Parser, Debug)]
#[command(
name = "kanade-backend",
about = "kanade backend (axum + SQLite projector)",
version
)]
struct Cli {
#[arg(long, global = true)]
config: Option<PathBuf>,
#[command(subcommand)]
command: Option<Command>,
}
#[derive(clap::Subcommand, Debug)]
enum Command {
ResolveDbPath,
CheckQuarantine {
version: String,
},
ArmForSwap {
new_version: String,
installed_exe: PathBuf,
},
WipeProjector,
}
fn main() -> Result<()> {
let cli = Cli::parse();
if let Some(cmd) = &cli.command {
return match cmd {
Command::ResolveDbPath => print_resolved_db_path(cli.config.as_deref()),
Command::CheckQuarantine { version } => check_quarantine(version),
Command::ArmForSwap {
new_version,
installed_exe,
} => arm_for_swap(new_version, installed_exe),
Command::WipeProjector => wipe_projector(cli.config.as_deref()),
};
}
match std::env::current_exe() {
Ok(exe) => {
use kanade_shared::boot_sentinel::{BootDecision, BootSentinel, DEFAULT_MAX_ATTEMPTS};
let sentinel =
BootSentinel::new(&default_paths::data_dir(), exe, env!("CARGO_PKG_VERSION"));
if let BootDecision::RolledBack { from } = sentinel.check_on_boot(DEFAULT_MAX_ATTEMPTS)
{
eprintln!(
"boot sentinel: {from} crash-looped on boot — rolled back to last-good; \
exiting (1) for restart"
);
std::process::exit(1);
}
}
Err(e) => {
eprintln!(
"boot sentinel: current_exe() failed ({e}) — skipping crash-loop rollback \
check this boot; proceeding unguarded"
);
}
}
#[cfg(target_os = "windows")]
{
match service::try_run_as_service() {
Ok(()) => return Ok(()),
Err(e) if service::is_not_under_scm(&e) => {
}
Err(e) => return Err(anyhow::anyhow!("service dispatcher failed: {e}")),
}
}
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
runtime.block_on(run_backend())
}
fn print_resolved_db_path(config: Option<&Path>) -> Result<()> {
let cfg_path = default_paths::find_config(config, "KANADE_BACKEND_CONFIG", "backend.toml")?;
let cfg =
load_backend_config(&cfg_path).with_context(|| format!("load config from {cfg_path:?}"))?;
println!("{}", cfg.db.sqlite_path);
Ok(())
}
struct UserRow {
username: String,
password_hash: String,
role: String,
disabled: i64,
must_change_pw: i64,
created_at: String,
updated_at: String,
}
fn wipe_projector(config: Option<&Path>) -> Result<()> {
let cfg_path = default_paths::find_config(config, "KANADE_BACKEND_CONFIG", "backend.toml")?;
let cfg =
load_backend_config(&cfg_path).with_context(|| format!("load config from {cfg_path:?}"))?;
let db_path = cfg.db.sqlite_path.clone();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build current-thread runtime")?;
let restored = runtime.block_on(wipe_projector_at(&db_path))?;
eprintln!(
"wipe-projector: wiped projector DB at {db_path}; preserved {restored} user account(s)"
);
Ok(())
}
async fn wipe_projector_at(db_path: &str) -> Result<usize> {
let users = snapshot_users(db_path).await.context("snapshot users")?;
remove_db_files(db_path).context("remove projector DB files")?;
let opts = SqliteConnectOptions::from_str(&format!("sqlite://{db_path}"))
.with_context(|| format!("parse sqlite path {db_path}"))?
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(std::time::Duration::from_secs(30));
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(opts)
.await
.context("open fresh sqlite pool")?;
sqlx::migrate!("./migrations")
.run(&pool)
.await
.context("run migrations on fresh DB")?;
let restored = restore_users(&pool, &users)
.await
.context("restore users")?;
pool.close().await;
Ok(restored)
}
async fn snapshot_users(db_path: &str) -> Result<Vec<UserRow>> {
if !Path::new(db_path).exists() {
return Ok(Vec::new());
}
let opts = SqliteConnectOptions::from_str(&format!("sqlite://{db_path}"))
.with_context(|| format!("parse sqlite path {db_path}"))?
.read_only(true)
.busy_timeout(std::time::Duration::from_secs(30));
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(opts)
.await
.with_context(|| {
format!(
"open existing projector DB at {db_path} to preserve accounts — refusing to wipe \
rather than risk silent account loss. Stop the service and close any SQLite \
client holding it open, then re-run (or, if the DB is corrupt and you accept \
losing accounts, delete it by hand first)."
)
})?;
let has_users: Option<String> =
sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
.fetch_optional(&pool)
.await
.context("probe for users table")?;
if has_users.is_none() {
pool.close().await;
return Ok(Vec::new());
}
let rows = sqlx::query_as::<_, (String, String, String, i64, i64, String, String)>(
"SELECT username, password_hash, role, disabled, must_change_pw, created_at, updated_at \
FROM users",
)
.fetch_all(&pool)
.await
.context("select users")?;
pool.close().await;
Ok(rows
.into_iter()
.map(
|(username, password_hash, role, disabled, must_change_pw, created_at, updated_at)| {
UserRow {
username,
password_hash,
role,
disabled,
must_change_pw,
created_at,
updated_at,
}
},
)
.collect())
}
async fn restore_users(pool: &sqlx::SqlitePool, users: &[UserRow]) -> Result<usize> {
let mut tx = pool.begin().await.context("begin restore transaction")?;
for u in users {
sqlx::query(
"INSERT INTO users \
(username, password_hash, role, disabled, must_change_pw, created_at, updated_at) \
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(&u.username)
.bind(&u.password_hash)
.bind(&u.role)
.bind(u.disabled)
.bind(u.must_change_pw)
.bind(&u.created_at)
.bind(&u.updated_at)
.execute(&mut *tx)
.await
.with_context(|| format!("restore user {}", u.username))?;
}
tx.commit().await.context("commit restore transaction")?;
Ok(users.len())
}
fn remove_db_files(db_path: &str) -> Result<()> {
for path in [
format!("{db_path}-wal"),
format!("{db_path}-shm"),
db_path.to_string(),
] {
match std::fs::remove_file(&path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to delete '{path}': {e}. Something is holding the projector DB open \
(a running backend, a SQLite client, an antivirus scan) — stop the service \
and close any client, then re-run."
));
}
}
}
Ok(())
}
fn check_quarantine(version: &str) -> Result<()> {
let exe = std::env::current_exe().context("current_exe")?;
let sentinel = kanade_shared::boot_sentinel::BootSentinel::new(
&default_paths::data_dir(),
exe,
env!("CARGO_PKG_VERSION"),
);
if sentinel.is_quarantined(version) {
eprintln!(
"check-quarantine: {version} is QUARANTINED (it crash-looped on a prior boot and was \
rolled back). Refusing — republish a fixed binary under a new version, or clear the \
quarantine."
);
std::process::exit(3);
}
eprintln!("check-quarantine: {version} is not quarantined (safe to deploy)");
Ok(())
}
fn arm_for_swap(new_version: &str, installed_exe: &Path) -> Result<()> {
let sentinel = kanade_shared::boot_sentinel::BootSentinel::new(
&default_paths::data_dir(),
installed_exe.to_path_buf(),
env!("CARGO_PKG_VERSION"),
);
sentinel
.arm_for_swap(installed_exe, new_version)
.with_context(|| format!("arm boot sentinel for {new_version}"))?;
eprintln!(
"arm-for-swap: snapshotted {} -> last-good and armed boot sentinel for {new_version}",
installed_exe.display()
);
Ok(())
}
pub(crate) async fn run_backend() -> Result<()> {
let cli = Cli::parse();
let cfg_path = default_paths::find_config(
cli.config.as_deref(),
"KANADE_BACKEND_CONFIG",
"backend.toml",
)?;
let cfg =
load_backend_config(&cfg_path).with_context(|| format!("load config from {cfg_path:?}"))?;
let _log_guard = init_tracing(&cfg.log)
.with_context(|| format!("init tracing from [log] in {cfg_path:?}"))?;
let default_panic_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
let backtrace = std::backtrace::Backtrace::force_capture();
error!(panic = %info, %backtrace, "panic");
default_panic_hook(info);
}));
info!(
bind = %cfg.server.bind,
nats = %cfg.nats.url,
db = %cfg.db.sqlite_path,
log_path = %cfg.log.path,
log_keep_days = cfg.log.keep_days,
"starting kanade-backend",
);
let sqlite_path = PathBuf::from(&cfg.db.sqlite_path);
if let Some(parent) = sqlite_path.parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent)
.await
.with_context(|| format!("create sqlite parent {parent:?}"))?;
}
let sqlite_opts = SqliteConnectOptions::from_str(&format!("sqlite://{}", cfg.db.sqlite_path))
.with_context(|| format!("parse sqlite path {}", cfg.db.sqlite_path))?
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(std::time::Duration::from_secs(30));
let pool = SqlitePoolOptions::new()
.max_connections(8)
.connect_with(sqlite_opts)
.await
.context("open sqlite pool")?;
sqlx::migrate!("./migrations")
.run(&pool)
.await
.context("run migrations")?;
info!("sqlite migrations applied");
if let Err(e) = api::accounts::seed_bootstrap_admin(&pool).await {
warn!(error = %e, "bootstrap admin seed failed");
}
let nats = kanade_shared::nats_client::connect(&cfg.nats.url).await?;
info!("connected to NATS");
let jetstream = async_nats::jetstream::new(nats.clone());
kanade_shared::bootstrap::ensure_jetstream_resources(&jetstream)
.await
.context("ensure_jetstream_resources")?;
info!("jetstream resources ready");
if let Err(e) = projector::consumer_reset::reset_if_wiped(&jetstream, &pool).await {
warn!(error = %e, "projector consumer reset check failed");
}
match jetstream
.get_key_value(kanade_shared::kv::BUCKET_JOBS)
.await
{
Ok(jobs_kv) => {
let mut manifests = Vec::new();
match jobs_kv.keys().await {
Ok(keys_stream) => {
match futures::TryStreamExt::try_collect::<Vec<String>>(keys_stream).await {
Ok(keys) => {
for k in keys {
match jobs_kv.get(&k).await {
Ok(Some(bytes)) => {
match serde_json::from_slice::<
kanade_shared::manifest::Manifest,
>(&bytes)
{
Ok(m) => manifests.push(m),
Err(e) => tracing::warn!(
error = %e,
job_key = %k,
"explode prewarm: manifest deserialize failed",
),
}
}
Ok(None) => {}
Err(e) => tracing::warn!(
error = %e,
job_key = %k,
"explode prewarm: KV get failed",
),
}
}
}
Err(e) => tracing::warn!(
error = %e,
"explode prewarm: collect keys failed",
),
}
}
Err(e) => tracing::warn!(
error = %e,
"explode prewarm: keys() failed",
),
}
if let Err(e) = projector::explode::ensure_tables_for_jobs(&pool, manifests).await {
error!(error = %e, "explode: startup table-ensure pass failed (will retry per-result)");
}
}
Err(e) => tracing::warn!(
error = %e,
bucket = %kanade_shared::kv::BUCKET_JOBS,
"explode prewarm: BUCKET_JOBS KV unreachable (ok if fresh install)",
),
}
let explode_spec_cache = projector::spec_cache::ExplodeSpecCache::new();
{
let pool = pool.clone();
let js = jetstream.clone();
let cache = explode_spec_cache.clone();
tokio::spawn(async move {
if let Err(e) = projector::results::run(js, pool, cache).await {
error!(error = %e, "results projector exited");
}
});
}
{
let pool = pool.clone();
let js = jetstream.clone();
tokio::spawn(async move {
if let Err(e) = projector::audit::run(js, pool).await {
error!(error = %e, "audit projector exited");
}
});
}
{
let pool = pool.clone();
let nats_client = nats.clone();
tokio::spawn(async move {
if let Err(e) = projector::heartbeat::run(nats_client, pool).await {
error!(error = %e, "heartbeat projector exited");
}
});
}
{
let pool = pool.clone();
let nats_client = nats.clone();
tokio::spawn(async move {
if let Err(e) = projector::host_perf::run(nats_client, pool).await {
error!(error = %e, "host_perf projector exited");
}
});
}
{
let pool = pool.clone();
let nats_client = nats.clone();
tokio::spawn(async move {
if let Err(e) = projector::process_perf::run(nats_client, pool).await {
error!(error = %e, "process_perf projector exited");
}
});
}
{
let pool = pool.clone();
let js = jetstream.clone();
let cache = explode_spec_cache.clone();
tokio::spawn(async move {
if let Err(e) = projector::events::run(js, pool, cache).await {
error!(error = %e, "events projector exited");
}
});
}
{
let pool = pool.clone();
let js = jetstream.clone();
tokio::spawn(async move {
if let Err(e) = projector::obs_events::run(js, pool).await {
error!(error = %e, "obs_events projector exited");
}
});
}
{
let pool = pool.clone();
let js = jetstream.clone();
tokio::spawn(async move {
if let Err(e) = projector::notifications::run(js, pool).await {
error!(error = %e, "notification-acks projector exited");
}
});
}
let _cleanup_handle = cleanup::spawn(pool.clone());
match projector::spec_cache::prewarm(&explode_spec_cache, &jetstream).await {
Ok(n) => info!(cached = n, "explode spec cache prewarm done"),
Err(e) => warn!(
error = %e,
"explode spec cache prewarm failed (watcher + miss-path fallback will recover)",
),
}
{
let cache = explode_spec_cache.clone();
let js = jetstream.clone();
tokio::spawn(async move {
if let Err(e) = projector::spec_cache::run(cache, js).await {
error!(error = %e, "explode spec cache watcher exited");
}
});
}
let app_state = api::AppState {
pool: pool.clone(),
nats,
jetstream,
explode_spec_cache,
};
{
let s = app_state.clone();
tokio::spawn(async move {
if let Err(e) = scheduler::run(s).await {
error!(error = %e, "scheduler exited");
}
});
}
let app = api::router(app_state)
.layer(axum::middleware::from_fn_with_state(
pool.clone(),
auth::verify,
))
.layer(TraceLayer::new_for_http());
let listener = TcpListener::bind(&cfg.server.bind)
.await
.with_context(|| format!("bind {}", cfg.server.bind))?;
info!(bind = %cfg.server.bind, "axum serving");
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
match std::env::current_exe() {
Ok(exe) => {
let sentinel = kanade_shared::boot_sentinel::BootSentinel::new(
&default_paths::data_dir(),
exe,
env!("CARGO_PKG_VERSION"),
);
if let Err(e) = sentinel.confirm_healthy() {
tracing::warn!(error = %e, "boot sentinel: confirm_healthy failed");
}
}
Err(e) => {
tracing::warn!(
error = %e,
"boot sentinel: current_exe() failed — healthy version not promoted to last-good"
);
}
}
});
axum::serve(listener, app).await.context("axum serve")?;
Ok(())
}
fn init_tracing(log: &LogSection) -> Result<Option<tracing_appender::non_blocking::WorkerGuard>> {
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| log.level.clone().into());
if log.keep_days == 0 {
let _ = tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stdout))
.try_init();
return Ok(None);
}
let path = Path::new(&log.path);
let dir = path
.parent()
.with_context(|| format!("[log] path '{}' has no parent dir", log.path))?;
let stem = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("backend");
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("log");
std::fs::create_dir_all(dir).with_context(|| format!("create log dir {dir:?}"))?;
let appender = tracing_appender::rolling::Builder::new()
.filename_prefix(stem)
.filename_suffix(ext)
.rotation(tracing_appender::rolling::Rotation::DAILY)
.max_log_files(log.keep_days)
.build(dir)
.with_context(|| format!("build rolling appender at {dir:?}"))?;
let (non_blocking, guard) = tracing_appender::non_blocking(appender);
let _ = tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stdout))
.with(
tracing_subscriber::fmt::layer()
.with_writer(non_blocking)
.with_ansi(false),
)
.try_init();
Ok(Some(guard))
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::SqlitePool;
fn temp_db_path(tag: &str) -> String {
let p = std::env::temp_dir().join(format!(
"kanade-wipe-test-{}-{}.db",
std::process::id(),
tag
));
let _ = remove_db_files(p.to_str().unwrap());
p.to_string_lossy().into_owned()
}
async fn open(db_path: &str) -> SqlitePool {
let opts = SqliteConnectOptions::from_str(&format!("sqlite://{db_path}"))
.unwrap()
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(opts)
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
#[tokio::test]
async fn wipe_preserves_users_and_drops_projector_rows() {
let db_path = temp_db_path("preserve");
let pool = open(&db_path).await;
sqlx::query(
"INSERT INTO users \
(username, password_hash, role, disabled, must_change_pw, created_at, updated_at) \
VALUES ('admin', 'argon2hash', 'admin', 0, 1, '2026-01-02 03:04:05', '2026-01-02 03:04:05')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO check_status (pc_id, check_name, status, detail, recorded_at, label) \
VALUES ('pc1', 'c', 'ok', 'd', '2026-01-02 03:04:05', 'L')",
)
.execute(&pool)
.await
.unwrap();
pool.close().await;
let restored = wipe_projector_at(&db_path).await.unwrap();
assert_eq!(restored, 1, "the one admin account is restored");
let pool = open(&db_path).await;
let (user, hash, role, disabled, must, created): (
String,
String,
String,
i64,
i64,
String,
) = sqlx::query_as(
"SELECT username, password_hash, role, disabled, must_change_pw, created_at \
FROM users",
)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(user, "admin");
assert_eq!(hash, "argon2hash");
assert_eq!(role, "admin");
assert_eq!(disabled, 0);
assert_eq!(must, 1);
assert_eq!(created, "2026-01-02 03:04:05", "timestamps round-trip");
let checks: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM check_status")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
checks, 0,
"projector-derived rows are dropped (replay refills)"
);
pool.close().await;
let _ = remove_db_files(&db_path);
}
#[tokio::test]
async fn wipe_on_missing_db_is_a_noop() {
let db_path = temp_db_path("missing");
assert!(!Path::new(&db_path).exists());
let restored = wipe_projector_at(&db_path).await.unwrap();
assert_eq!(restored, 0);
assert!(Path::new(&db_path).exists(), "schema is (re)created");
let pool = open(&db_path).await;
let users: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(users, 0);
pool.close().await;
let _ = remove_db_files(&db_path);
}
}