k2db-api-server 0.1.1

Single-binary Rust server for the k2db API
// SPDX-FileCopyrightText: 2026 Alexander R. Croft
// SPDX-License-Identifier: MIT

use std::net::TcpListener;
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;

use mongodb::bson::{Document, doc};
use reqwest::StatusCode;
use serde_json::{Value, json};
use tempfile::TempDir;
use tokio::process::{Child, Command};
use tokio::time::sleep;

struct TestMongo {
    _tmp: TempDir,
    child: Child,
    uri: String,
}

impl TestMongo {
    async fn start() -> Self {
        let tmp = tempfile::tempdir().expect("tempdir");
        let port = next_port();
        let log_path = tmp.path().join("mongod.log");
        let mut command = Command::new(mongod_bin());
        command
            .arg("--dbpath")
            .arg(tmp.path())
            .arg("--port")
            .arg(port.to_string())
            .arg("--bind_ip")
            .arg("127.0.0.1")
            .arg("--quiet")
            .arg("--logpath")
            .arg(&log_path)
            .stdout(Stdio::null())
            .stderr(Stdio::null());

        let child = command.spawn().expect("spawn mongod");
        let uri = format!("mongodb://127.0.0.1:{port}/?directConnection=true");
        wait_for_mongo(&uri).await;

        Self {
            _tmp: tmp,
            child,
            uri,
        }
    }
}

impl Drop for TestMongo {
    fn drop(&mut self) {
        let _ = self.child.start_kill();
    }
}

struct TestServer {
    child: Child,
}

impl TestServer {
    async fn start(bin: &PathBuf, mongo_uri: &str) -> Self {
        let child = Command::new(bin)
            .arg("serve")
            .arg("--mongo-uri")
            .arg(mongo_uri)
            .arg("--system-db-name")
            .arg("k2_system")
            .stdout(Stdio::null())
            .stderr(Stdio::null())
            .spawn()
            .expect("spawn api server");

        Self { child }
    }
}

impl Drop for TestServer {
    fn drop(&mut self) {
        let _ = self.child.start_kill();
    }
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn init_serve_recover_round_trip_against_real_mongod() {
    let mongo = TestMongo::start().await;
    let api_port = next_port();
    let base_url = format!("http://127.0.0.1:{api_port}");
    let bootstrap_token = "bootstrap-secret";
    let bin = server_bin();

    let init = run_server_command(
        &bin,
        &[
            "init",
            "--mongo-uri",
            &mongo.uri,
            "--bootstrap-token",
            bootstrap_token,
            "--system-db-name",
            "k2_system",
            "--listen-host",
            "127.0.0.1",
            "--listen-port",
            &api_port.to_string(),
            "--ownership-mode",
            "strict",
            "--slow-query-ms",
            "250",
            "--seed-key-name",
            "worker",
            "--seed-key-database",
            "project_a",
            "--seed-key-permission",
            "collections.read",
            "--seed-key-permission",
            "collections.write",
            "--seed-key-permission",
            "collections.search",
            "--seed-key-permission",
            "collections.aggregate",
            "--seed-key-permission",
            "collections.count",
            "--seed-key-permission",
            "collections.restore",
        ],
    )
    .await;
    assert!(init.status.success(), "init failed: {}", String::from_utf8_lossy(&init.stderr));

    let runtime_key = extract_stdout_value(&init.stdout, "runtime key printable: ");
    let server = TestServer::start(&bin, &mongo.uri).await;
    wait_for_http_ok(&format!("{base_url}/ready")).await;

    let client = reqwest::Client::new();
    let created = client
        .post(format!("{base_url}/v1/widgets"))
        .header("authorization", format!("ApiKey {runtime_key}"))
        .header("x-scope", "owner1")
        .json(&json!({ "name": "alpha", "seq": 1 }))
        .send()
        .await
        .expect("create request");
    assert_eq!(created.status(), StatusCode::CREATED);
    let created_body: Value = created.json().await.expect("create json");
    let id = created_body["id"].as_str().expect("created id").to_owned();

    let fetched = client
        .get(format!("{base_url}/v1/widgets/{id}"))
        .header("authorization", format!("ApiKey {runtime_key}"))
        .header("x-scope", "owner1")
        .send()
        .await
        .expect("get request");
    assert_eq!(fetched.status(), StatusCode::OK);
    let fetched_body: Value = fetched.json().await.expect("get json");
    assert_eq!(fetched_body["name"], json!("alpha"));

    let counted = client
        .post(format!("{base_url}/v1/widgets/count"))
        .header("authorization", format!("ApiKey {runtime_key}"))
        .header("x-scope", "owner1")
        .json(&json!({}))
        .send()
        .await
        .expect("count request");
    assert_eq!(counted.status(), StatusCode::OK);
    let count_body: Value = counted.json().await.expect("count json");
    assert_eq!(count_body["count"], json!(1));

    drop(server);

    let direct = mongodb::Client::with_uri_str(&mongo.uri)
        .await
        .expect("direct mongo client");
    direct
        .database("k2_system")
        .collection::<Document>("server_config")
        .update_many(
            doc! { "kind": "server_config", "active": true },
            doc! { "$set": { "active": false } },
        )
        .await
        .expect("deactivate active config");

    let recover_port = next_port();
    let recover = run_server_command(
        &bin,
        &[
            "recover",
            "--mongo-uri",
            &mongo.uri,
            "--bootstrap-token",
            bootstrap_token,
            "--system-db-name",
            "k2_system",
            "--listen-host",
            "127.0.0.1",
            "--listen-port",
            &recover_port.to_string(),
            "--ownership-mode",
            "strict",
            "--slow-query-ms",
            "250",
        ],
    )
    .await;
    assert!(recover.status.success(), "recover failed: {}", String::from_utf8_lossy(&recover.stderr));
    let recover_stdout = String::from_utf8_lossy(&recover.stdout);
    assert!(recover_stdout.contains("server_config action: replaced"));

    let recovered_base_url = format!("http://127.0.0.1:{recover_port}");
    let recovered_server = TestServer::start(&bin, &mongo.uri).await;
    wait_for_http_ok(&format!("{recovered_base_url}/ready")).await;

    let fetched_after_recover = client
        .get(format!("{recovered_base_url}/v1/widgets/{id}"))
        .header("authorization", format!("ApiKey {runtime_key}"))
        .header("x-scope", "owner1")
        .send()
        .await
        .expect("get after recover");
    assert_eq!(fetched_after_recover.status(), StatusCode::OK);
    let fetched_after_recover_body: Value = fetched_after_recover.json().await.expect("get after recover json");
    assert_eq!(fetched_after_recover_body["name"], json!("alpha"));

    drop(recovered_server);
}

async fn run_server_command(bin: &PathBuf, args: &[&str]) -> std::process::Output {
    Command::new(bin)
        .args(args)
        .output()
        .await
        .expect("run server command")
}

fn extract_stdout_value(stdout: &[u8], prefix: &str) -> String {
    let output = String::from_utf8_lossy(stdout);
    output
        .lines()
        .find_map(|line| line.strip_prefix(prefix).map(ToOwned::to_owned))
        .unwrap_or_else(|| panic!("missing stdout line with prefix {prefix}: {output}"))
}

fn server_bin() -> PathBuf {
    if let Some(path) = std::env::var_os("CARGO_BIN_EXE_k2db-api-server") {
        return PathBuf::from(path);
    }

    let exe = std::env::current_exe().expect("current exe path");
    let debug_dir = exe
        .parent()
        .and_then(|path| path.parent())
        .expect("target debug dir");
    debug_dir.join("k2db-api-server")
}

fn mongod_bin() -> PathBuf {
    std::env::var_os("MONGOD_BIN")
        .map(PathBuf::from)
        .unwrap_or_else(|| PathBuf::from("mongod"))
}

fn next_port() -> u16 {
    TcpListener::bind("127.0.0.1:0")
        .expect("bind ephemeral port")
        .local_addr()
        .expect("local addr")
        .port()
}

async fn wait_for_mongo(uri: &str) {
    for _ in 0..50 {
        if let Ok(client) = mongodb::Client::with_uri_str(uri).await {
            if client.database("admin").run_command(doc! { "ping": 1 }).await.is_ok() {
                return;
            }
        }
        sleep(Duration::from_millis(100)).await;
    }
    panic!("mongod did not become healthy in time");
}

async fn wait_for_http_ok(url: &str) {
    let client = reqwest::Client::new();
    for _ in 0..80 {
        if let Ok(response) = client.get(url).send().await {
            if response.status().is_success() {
                return;
            }
        }
        sleep(Duration::from_millis(100)).await;
    }
    panic!("server did not become ready: {url}");
}