mod api;
mod audit;
mod auth;
mod projector;
mod scheduler;
mod web;
#[cfg(target_os = "windows")]
mod service;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use anyhow::{Context, Result};
use clap::Parser;
use kanade_shared::config::{LogSection, load_backend_config};
use kanade_shared::default_paths;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use tokio::net::TcpListener;
use tower_http::trace::TraceLayer;
use tracing::{error, info};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
#[derive(Parser, Debug)]
#[command(
name = "kanade-backend",
about = "kanade backend (axum + SQLite projector)",
version
)]
struct Cli {
#[arg(long)]
config: Option<PathBuf>,
}
fn main() -> Result<()> {
#[cfg(target_os = "windows")]
{
match service::try_run_as_service() {
Ok(()) => return Ok(()),
Err(e) if service::is_not_under_scm(&e) => {
}
Err(e) => return Err(anyhow::anyhow!("service dispatcher failed: {e}")),
}
}
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
runtime.block_on(run_backend())
}
pub(crate) async fn run_backend() -> Result<()> {
let cli = Cli::parse();
let cfg_path = default_paths::find_config(
cli.config.as_deref(),
"KANADE_BACKEND_CONFIG",
"backend.toml",
)?;
let cfg =
load_backend_config(&cfg_path).with_context(|| format!("load config from {cfg_path:?}"))?;
let _log_guard = init_tracing(&cfg.log)
.with_context(|| format!("init tracing from [log] in {cfg_path:?}"))?;
info!(
bind = %cfg.server.bind,
nats = %cfg.nats.url,
db = %cfg.db.sqlite_path,
log_path = %cfg.log.path,
log_keep_days = cfg.log.keep_days,
"starting kanade-backend",
);
let sqlite_path = PathBuf::from(&cfg.db.sqlite_path);
if let Some(parent) = sqlite_path.parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent)
.await
.with_context(|| format!("create sqlite parent {parent:?}"))?;
}
let sqlite_opts = SqliteConnectOptions::from_str(&format!("sqlite://{}", cfg.db.sqlite_path))
.with_context(|| format!("parse sqlite path {}", cfg.db.sqlite_path))?
.create_if_missing(true);
let pool = SqlitePoolOptions::new()
.max_connections(8)
.connect_with(sqlite_opts)
.await
.context("open sqlite pool")?;
sqlx::migrate!("./migrations")
.run(&pool)
.await
.context("run migrations")?;
info!("sqlite migrations applied");
let nats = kanade_shared::nats_client::connect(&cfg.nats.url).await?;
info!("connected to NATS");
let jetstream = async_nats::jetstream::new(nats.clone());
kanade_shared::bootstrap::ensure_jetstream_resources(&jetstream)
.await
.context("ensure_jetstream_resources")?;
info!("jetstream resources ready");
{
let pool = pool.clone();
let js = jetstream.clone();
tokio::spawn(async move {
if let Err(e) = projector::results::run(js, pool).await {
error!(error = %e, "results projector exited");
}
});
}
{
let pool = pool.clone();
let js = jetstream.clone();
tokio::spawn(async move {
if let Err(e) = projector::audit::run(js, pool).await {
error!(error = %e, "audit projector exited");
}
});
}
{
let pool = pool.clone();
let nats_client = nats.clone();
tokio::spawn(async move {
if let Err(e) = projector::heartbeat::run(nats_client, pool).await {
error!(error = %e, "heartbeat projector exited");
}
});
}
let app_state = api::AppState {
pool: pool.clone(),
nats,
jetstream,
};
{
let s = app_state.clone();
tokio::spawn(async move {
if let Err(e) = scheduler::run(s).await {
error!(error = %e, "scheduler exited");
}
});
}
let app = api::router(app_state)
.layer(axum::middleware::from_fn(auth::verify))
.layer(TraceLayer::new_for_http());
let listener = TcpListener::bind(&cfg.server.bind)
.await
.with_context(|| format!("bind {}", cfg.server.bind))?;
info!(bind = %cfg.server.bind, "axum serving");
axum::serve(listener, app).await.context("axum serve")?;
Ok(())
}
fn init_tracing(log: &LogSection) -> Result<Option<tracing_appender::non_blocking::WorkerGuard>> {
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| log.level.clone().into());
if log.keep_days == 0 {
let _ = tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stdout))
.try_init();
return Ok(None);
}
let path = Path::new(&log.path);
let dir = path
.parent()
.with_context(|| format!("[log] path '{}' has no parent dir", log.path))?;
let stem = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("backend");
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("log");
std::fs::create_dir_all(dir).with_context(|| format!("create log dir {dir:?}"))?;
let appender = tracing_appender::rolling::Builder::new()
.filename_prefix(stem)
.filename_suffix(ext)
.rotation(tracing_appender::rolling::Rotation::DAILY)
.max_log_files(log.keep_days)
.build(dir)
.with_context(|| format!("build rolling appender at {dir:?}"))?;
let (non_blocking, guard) = tracing_appender::non_blocking(appender);
let _ = tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stdout))
.with(tracing_subscriber::fmt::layer().with_writer(non_blocking))
.try_init();
Ok(Some(guard))
}