elastik-core 8.0.1

Elastik — Audi-ted L5 storage engine. SQLite for files.
Documentation
//! Binary/server runtime assembly.
//!
//! Startup constructs the protocol-neutral `Engine` first and keeps HTTP
//! adapter state beside it in `ServerState`.

#[cfg(all(not(test), feature = "coap"))]
pub(crate) mod coap;
#[cfg(all(not(test), feature = "coap"))]
pub(crate) mod coap_errors;
#[cfg(not(test))]
pub(crate) mod handler;
#[cfg(not(test))]
pub(crate) mod listen;
#[cfg(not(test))]
pub(crate) mod middleware;
#[cfg(not(test))]
pub(crate) mod pipeline;
#[cfg(not(test))]
pub(crate) mod proc;
#[cfg(not(test))]
pub(crate) mod response;
#[cfg(not(test))]
pub(crate) mod route;
mod state;
pub(crate) use state::ServerState;

#[cfg(not(test))]
use std::net::IpAddr;
#[cfg(not(test))]
use std::path::PathBuf;

#[cfg(all(not(test), feature = "coap"))]
use crate::config::{coap_bind_from_env, DEFAULT_COAP_MAX_IN_FLIGHT};
#[cfg(not(test))]
use crate::config::{
    env_nonzero_usize, env_optional_usize, env_usize, hmac_key_from_env_value, listen_addr,
    should_warn_public_read, DEFAULT_LISTEN_REPLAY_MAX, DEFAULT_MAX_LISTEN_CONNECTIONS,
    DEFAULT_MAX_MEMORY_BYTES, DEFAULT_MAX_WORLD_BYTES, DEFAULT_READ_CACHE_MAX_ENTRIES,
};
#[cfg(not(test))]
use crate::{
    engine::{Engine, EngineBuilder},
    engine_types::SecretBytes,
    VERSION,
};

#[cfg(not(test))]
pub(crate) async fn run_from_env() {
    crate::pipeline::init_trace_from_env();

    let host = std::env::var("ELASTIK_HOST").unwrap_or_else(|_| "127.0.0.1".into());
    let port: u16 = std::env::var("ELASTIK_PORT")
        .ok()
        .and_then(|s| s.parse().ok())
        .unwrap_or(3105);
    #[cfg(feature = "coap")]
    let coap_bind = coap_bind_from_env();
    let data = PathBuf::from(std::env::var("ELASTIK_DATA").unwrap_or_else(|_| "./data".into()));
    let max_world_bytes = env_usize("ELASTIK_MAX_WORLD_BYTES", DEFAULT_MAX_WORLD_BYTES);
    let max_memory_bytes = env_usize("ELASTIK_MAX_MEMORY_BYTES", DEFAULT_MAX_MEMORY_BYTES);
    let max_storage_bytes = env_optional_usize("ELASTIK_MAX_STORAGE_BYTES");
    let max_listen_connections = env_nonzero_usize(
        "ELASTIK_MAX_LISTEN_CONNECTIONS",
        DEFAULT_MAX_LISTEN_CONNECTIONS,
    );
    let listen_replay_max =
        env_nonzero_usize("ELASTIK_LISTEN_REPLAY_MAX", DEFAULT_LISTEN_REPLAY_MAX);
    #[cfg(feature = "coap")]
    let coap_max_in_flight =
        env_nonzero_usize("ELASTIK_COAP_MAX_IN_FLIGHT", DEFAULT_COAP_MAX_IN_FLIGHT);
    let read_cache_max_entries = env_nonzero_usize(
        "ELASTIK_READ_CACHE_MAX_ENTRIES",
        DEFAULT_READ_CACHE_MAX_ENTRIES,
    );
    let hmac_key = hmac_key_from_env_value(std::env::var("ELASTIK_KEY").ok()).expect(
        "ELASTIK_KEY must be a non-empty string; the audit chain has no meaning without it",
    );
    let tokens = ServerTokens::from_env();
    let persist_header_allowlist = crate::config::header_allowlist_from_env();
    let persist_header_user_deny = crate::config::header_user_deny_from_env();
    let engine = build_engine_from_env(
        data,
        hmac_key,
        &tokens,
        EngineLimits {
            max_world_bytes,
            max_memory_bytes,
            max_storage_bytes,
            max_listen_connections,
            listen_replay_max,
            read_cache_max_entries,
        },
    );
    let state = ServerState::new(
        engine.clone(),
        max_world_bytes,
        persist_header_allowlist,
        persist_header_user_deny,
    );

    let addr = listen_addr(&host, port);
    let listener = tokio::net::TcpListener::bind(&addr).await.expect("bind");
    let bind_ip = listener
        .local_addr()
        .map(|addr| addr.ip())
        .unwrap_or_else(|_| IpAddr::from([127, 0, 0, 1]));
    eprintln!("elastik-core v{VERSION} on http://{addr}/");
    print_auth_summary(&tokens, bind_ip);
    #[cfg(feature = "coap")]
    if let Some((coap_host, coap_port)) = coap_bind {
        let coap_addr = listen_addr(&coap_host, coap_port);
        let coap_engine = engine.clone();
        let coap_shutdown = coap_engine.shutdown_receiver();
        tokio::spawn(async move {
            crate::coap::serve(coap_engine, coap_addr, coap_shutdown, coap_max_in_flight).await;
        });
    }
    let app = route::build_app(state);

    let serve_result = axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal(engine.clone()))
        .await;
    serve_result.expect("axum server failed");
    drop(engine);
}

#[cfg(not(test))]
struct EngineLimits {
    max_world_bytes: usize,
    max_memory_bytes: usize,
    max_storage_bytes: Option<usize>,
    max_listen_connections: usize,
    listen_replay_max: usize,
    read_cache_max_entries: usize,
}

#[cfg(not(test))]
struct ServerTokens {
    read: Option<Vec<u8>>,
    write: Option<Vec<u8>>,
    approve: Option<Vec<u8>>,
}

#[cfg(not(test))]
impl ServerTokens {
    fn from_env() -> Self {
        Self {
            read: nonempty_env("ELASTIK_READ_TOKEN"),
            write: nonempty_env("ELASTIK_WRITE_TOKEN").or_else(|| nonempty_env("ELASTIK_TOKEN")),
            approve: nonempty_env("ELASTIK_APPROVE_TOKEN"),
        }
    }

    fn read_required(&self) -> bool {
        self.read.is_some()
    }
}

#[cfg(not(test))]
fn build_engine_from_env(
    data: PathBuf,
    hmac_key: Vec<u8>,
    tokens: &ServerTokens,
    limits: EngineLimits,
) -> Engine {
    let key = SecretBytes::new(hmac_key).expect("ELASTIK_KEY validated before Engine build");
    let builder = Engine::builder()
        .data_root(data)
        .key(key)
        .max_world_bytes(limits.max_world_bytes)
        .max_memory_bytes(limits.max_memory_bytes)
        .max_storage_bytes(limits.max_storage_bytes)
        .max_listen_connections(limits.max_listen_connections)
        .listen_replay_max(limits.listen_replay_max)
        .read_cache_max_entries(limits.read_cache_max_entries);
    configure_tokens(builder, tokens)
        .build()
        .unwrap_or_else(|err| panic!("build Engine failed: {err:?}"))
}

#[cfg(not(test))]
fn configure_tokens(mut builder: EngineBuilder, tokens: &ServerTokens) -> EngineBuilder {
    if let Some(token) = &tokens.read {
        builder = builder.read_token(token.clone());
    }
    if let Some(token) = &tokens.write {
        builder = builder.write_token(token.clone());
    }
    if let Some(token) = &tokens.approve {
        builder = builder.approve_token(token.clone());
    }
    builder
}

#[cfg(not(test))]
fn print_auth_summary(tokens: &ServerTokens, bind_ip: IpAddr) {
    eprintln!("auth:");
    eprintln!(
        "  read:    {}",
        if tokens.read_required() {
            "token required"
        } else {
            "public (ELASTIK_READ_TOKEN not set)"
        }
    );
    eprintln!(
        "  write:   {}",
        if tokens.write.is_some() {
            "token required"
        } else {
            "disabled (ELASTIK_WRITE_TOKEN not set)"
        }
    );
    eprintln!(
        "  approve: {}",
        if tokens.approve.is_some() {
            "token required"
        } else {
            "disabled (ELASTIK_APPROVE_TOKEN not set)"
        }
    );
    if env_set_but_empty("ELASTIK_READ_TOKEN") {
        eprintln!("  warning: empty ELASTIK_READ_TOKEN treated as unset (reads public)");
    }
    if should_warn_public_read(bind_ip, tokens.read_required()) {
        eprintln!(
            "  WARNING: reads are public on non-loopback interface {bind_ip}; set ELASTIK_READ_TOKEN to gate reads."
        );
    }
    if env_set_but_empty("ELASTIK_WRITE_TOKEN") {
        eprintln!("  warning: empty ELASTIK_WRITE_TOKEN treated as unset (PUT/POST disabled)");
    }
    if std::env::var("ELASTIK_TOKEN").is_ok() {
        eprintln!("  warning: ELASTIK_TOKEN is deprecated; rename it to ELASTIK_WRITE_TOKEN.");
    }
    if env_set_but_empty("ELASTIK_APPROVE_TOKEN") {
        eprintln!(
            "  warning: empty ELASTIK_APPROVE_TOKEN treated as unset (DELETE/system writes disabled)"
        );
    }
    if tokens.write.is_none() {
        eprintln!("  warning: ELASTIK_WRITE_TOKEN not set; ordinary PUT/POST are disabled.");
    }
    if tokens.approve.is_none() {
        eprintln!(
            "  warning: ELASTIK_APPROVE_TOKEN not set; DELETE and system writes are disabled."
        );
    }
}

#[cfg(not(test))]
fn nonempty_env(name: &str) -> Option<Vec<u8>> {
    std::env::var(name)
        .ok()
        .filter(|value| !value.trim().is_empty())
        .map(String::into_bytes)
}

#[cfg(not(test))]
fn env_set_but_empty(name: &str) -> bool {
    match std::env::var(name) {
        Ok(value) => value.trim().is_empty(),
        Err(_) => false,
    }
}

#[cfg(not(test))]
async fn shutdown_signal(engine: Engine) {
    wait_for_shutdown_signal().await;
    eprintln!("elastik-core: shutdown signal received");
    engine.shutdown();
}

#[cfg(all(not(test), unix))]
async fn wait_for_shutdown_signal() {
    use tokio::signal::unix::{signal, SignalKind};

    let mut sigterm = match signal(SignalKind::terminate()) {
        Ok(sigterm) => sigterm,
        Err(e) => {
            eprintln!("elastik-core: failed to install SIGTERM handler: {e}; waiting for Ctrl-C");
            let _ = tokio::signal::ctrl_c().await;
            return;
        }
    };

    tokio::select! {
        _ = tokio::signal::ctrl_c() => {},
        _ = sigterm.recv() => {},
    }
}

#[cfg(all(not(test), not(unix)))]
async fn wait_for_shutdown_signal() {
    let _ = tokio::signal::ctrl_c().await;
}