ff-server 0.1.0

FlowFabric server library and HTTP binary
use std::sync::Arc;

use ff_server::admin::{load_probe_inputs, PartitionCollisionsReport};
use ff_server::api;
use ff_server::config::ServerConfig;
use ff_server::server::Server;

#[tokio::main]
async fn main() {
    // Admin subcommands are parsed before tracing init because
    // `partition-collisions` prints a plain-text table to stdout and
    // should not be interleaved with structured tracing output. Other
    // subcommands can opt into tracing individually.
    let args: Vec<String> = std::env::args().collect();
    if args.len() >= 2 && args[1] == "admin" {
        run_admin_subcommand(&args[2..]);
        return;
    }

    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| {
                    // `audit=info` is non-negotiable: rotation, security-kid
                    // changes, and other compliance events use
                    // `tracing::*!(target: "audit", ...)`. Without this
                    // directive the default subscriber silently drops every
                    // audit event (module-path directives like `ff_server=info`
                    // do not match custom targets).
                    "ff_server=info,ff_engine=info,ff_script=info,tower_http=debug,audit=info".into()
                }),
        )
        .init();

    let config = match ServerConfig::from_env() {
        Ok(c) => c,
        Err(e) => {
            tracing::error!(error = %e, "failed to load config");
            std::process::exit(1);
        }
    };

    let listen_addr = config.listen_addr.clone();
    let cors_origins = config.cors_origins.clone();
    let api_token = config.api_token.clone();

    let server = match Server::start(config).await {
        Ok(s) => s,
        Err(e) => {
            tracing::error!(error = %e, "failed to start server");
            std::process::exit(1);
        }
    };

    let server = Arc::new(server);
    let app = api::router(server.clone(), &cors_origins, api_token);

    let listener = tokio::net::TcpListener::bind(&listen_addr)
        .await
        .unwrap_or_else(|e| {
            tracing::error!(addr = %listen_addr, error = %e, "failed to bind listener");
            std::process::exit(1);
        });

    tracing::info!(addr = %listen_addr, "HTTP API listening");

    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap_or_else(|e| {
            tracing::error!(error = %e, "HTTP server error");
        });

    // Graceful engine shutdown after HTTP server stops
    match Arc::try_unwrap(server) {
        Ok(s) => s.shutdown().await,
        Err(_) => tracing::warn!("could not take exclusive server ownership for shutdown"),
    }
}

/// Dispatch an `admin` subcommand. Reads only the minimal env subset each
/// probe needs (via `ff_server::admin::load_probe_inputs` and similar);
/// does NOT call `ServerConfig::from_env()` and does NOT connect to Valkey
/// or start any long-lived task — probes are pure computations over the
/// configured state.
///
/// Exit codes:
/// - `0` — probe succeeded
/// - `2` — unknown subcommand or invalid config
fn run_admin_subcommand(args: &[String]) {
    let subcommand = args.first().map(String::as_str).unwrap_or("");
    match subcommand {
        "partition-collisions" => {
            // Use the probe-specific loader — the collisions probe is a
            // pure computation over lanes + partition_config and does not
            // need the prod-boot requirements (HMAC secret, CORS, etc.).
            let (lanes, partition_config) = match load_probe_inputs() {
                Ok(pair) => pair,
                Err(e) => {
                    eprintln!("ff-server admin partition-collisions: {e}");
                    std::process::exit(2);
                }
            };
            let report =
                PartitionCollisionsReport::compute(&lanes, &partition_config);
            print!("{}", report.format_plain());
        }
        "" => {
            eprintln!(
                "ff-server admin: no subcommand given\n\
                 \n\
                 USAGE:\n    \
                 ff-server admin <subcommand>\n\
                 \n\
                 SUBCOMMANDS:\n    \
                 partition-collisions    Report RFC-011 §5.6 partition collisions across configured lanes\n"
            );
            std::process::exit(2);
        }
        other => {
            eprintln!(
                "ff-server admin: unknown subcommand '{other}'\n\
                 \n\
                 Available subcommands: partition-collisions\n"
            );
            std::process::exit(2);
        }
    }
}

async fn shutdown_signal() {
    let ctrl_c = tokio::signal::ctrl_c();

    #[cfg(unix)]
    {
        let mut sigterm =
            tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
                .expect("failed to install SIGTERM handler");

        tokio::select! {
            _ = ctrl_c => {
                tracing::info!("received SIGINT (Ctrl+C)");
            }
            _ = sigterm.recv() => {
                tracing::info!("received SIGTERM");
            }
        }
    }

    #[cfg(not(unix))]
    {
        ctrl_c.await.expect("failed to listen for Ctrl+C");
        tracing::info!("received Ctrl+C");
    }
}