use std::net::TcpListener;
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use mongodb::bson::{Document, doc};
use reqwest::StatusCode;
use serde_json::{Value, json};
use tempfile::TempDir;
use tokio::process::{Child, Command};
use tokio::time::sleep;
struct TestMongo {
_tmp: TempDir,
child: Child,
uri: String,
}
impl TestMongo {
async fn start() -> Self {
let tmp = tempfile::tempdir().expect("tempdir");
let port = next_port();
let log_path = tmp.path().join("mongod.log");
let mut command = Command::new(mongod_bin());
command
.arg("--dbpath")
.arg(tmp.path())
.arg("--port")
.arg(port.to_string())
.arg("--bind_ip")
.arg("127.0.0.1")
.arg("--quiet")
.arg("--logpath")
.arg(&log_path)
.stdout(Stdio::null())
.stderr(Stdio::null());
let child = command.spawn().expect("spawn mongod");
let uri = format!("mongodb://127.0.0.1:{port}/?directConnection=true");
wait_for_mongo(&uri).await;
Self {
_tmp: tmp,
child,
uri,
}
}
}
impl Drop for TestMongo {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
struct TestServer {
child: Child,
}
impl TestServer {
async fn start(bin: &PathBuf, mongo_uri: &str) -> Self {
let child = Command::new(bin)
.arg("serve")
.arg("--mongo-uri")
.arg(mongo_uri)
.arg("--system-db-name")
.arg("k2_system")
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.expect("spawn api server");
Self { child }
}
}
impl Drop for TestServer {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn init_serve_recover_round_trip_against_real_mongod() {
let mongo = TestMongo::start().await;
let api_port = next_port();
let base_url = format!("http://127.0.0.1:{api_port}");
let bootstrap_token = "bootstrap-secret";
let bin = server_bin();
let init = run_server_command(
&bin,
&[
"init",
"--mongo-uri",
&mongo.uri,
"--bootstrap-token",
bootstrap_token,
"--system-db-name",
"k2_system",
"--listen-host",
"127.0.0.1",
"--listen-port",
&api_port.to_string(),
"--ownership-mode",
"strict",
"--slow-query-ms",
"250",
"--seed-key-name",
"worker",
"--seed-key-database",
"project_a",
"--seed-key-permission",
"collections.read",
"--seed-key-permission",
"collections.write",
"--seed-key-permission",
"collections.search",
"--seed-key-permission",
"collections.aggregate",
"--seed-key-permission",
"collections.count",
"--seed-key-permission",
"collections.restore",
],
)
.await;
assert!(init.status.success(), "init failed: {}", String::from_utf8_lossy(&init.stderr));
let runtime_key = extract_stdout_value(&init.stdout, "runtime key printable: ");
let server = TestServer::start(&bin, &mongo.uri).await;
wait_for_http_ok(&format!("{base_url}/ready")).await;
let client = reqwest::Client::new();
let created = client
.post(format!("{base_url}/v1/widgets"))
.header("authorization", format!("ApiKey {runtime_key}"))
.header("x-scope", "owner1")
.json(&json!({ "name": "alpha", "seq": 1 }))
.send()
.await
.expect("create request");
assert_eq!(created.status(), StatusCode::CREATED);
let created_body: Value = created.json().await.expect("create json");
let id = created_body["id"].as_str().expect("created id").to_owned();
let fetched = client
.get(format!("{base_url}/v1/widgets/{id}"))
.header("authorization", format!("ApiKey {runtime_key}"))
.header("x-scope", "owner1")
.send()
.await
.expect("get request");
assert_eq!(fetched.status(), StatusCode::OK);
let fetched_body: Value = fetched.json().await.expect("get json");
assert_eq!(fetched_body["name"], json!("alpha"));
let counted = client
.post(format!("{base_url}/v1/widgets/count"))
.header("authorization", format!("ApiKey {runtime_key}"))
.header("x-scope", "owner1")
.json(&json!({}))
.send()
.await
.expect("count request");
assert_eq!(counted.status(), StatusCode::OK);
let count_body: Value = counted.json().await.expect("count json");
assert_eq!(count_body["count"], json!(1));
drop(server);
let direct = mongodb::Client::with_uri_str(&mongo.uri)
.await
.expect("direct mongo client");
direct
.database("k2_system")
.collection::<Document>("server_config")
.update_many(
doc! { "kind": "server_config", "active": true },
doc! { "$set": { "active": false } },
)
.await
.expect("deactivate active config");
let recover_port = next_port();
let recover = run_server_command(
&bin,
&[
"recover",
"--mongo-uri",
&mongo.uri,
"--bootstrap-token",
bootstrap_token,
"--system-db-name",
"k2_system",
"--listen-host",
"127.0.0.1",
"--listen-port",
&recover_port.to_string(),
"--ownership-mode",
"strict",
"--slow-query-ms",
"250",
],
)
.await;
assert!(recover.status.success(), "recover failed: {}", String::from_utf8_lossy(&recover.stderr));
let recover_stdout = String::from_utf8_lossy(&recover.stdout);
assert!(recover_stdout.contains("server_config action: replaced"));
let recovered_base_url = format!("http://127.0.0.1:{recover_port}");
let recovered_server = TestServer::start(&bin, &mongo.uri).await;
wait_for_http_ok(&format!("{recovered_base_url}/ready")).await;
let fetched_after_recover = client
.get(format!("{recovered_base_url}/v1/widgets/{id}"))
.header("authorization", format!("ApiKey {runtime_key}"))
.header("x-scope", "owner1")
.send()
.await
.expect("get after recover");
assert_eq!(fetched_after_recover.status(), StatusCode::OK);
let fetched_after_recover_body: Value = fetched_after_recover.json().await.expect("get after recover json");
assert_eq!(fetched_after_recover_body["name"], json!("alpha"));
drop(recovered_server);
}
async fn run_server_command(bin: &PathBuf, args: &[&str]) -> std::process::Output {
Command::new(bin)
.args(args)
.output()
.await
.expect("run server command")
}
fn extract_stdout_value(stdout: &[u8], prefix: &str) -> String {
let output = String::from_utf8_lossy(stdout);
output
.lines()
.find_map(|line| line.strip_prefix(prefix).map(ToOwned::to_owned))
.unwrap_or_else(|| panic!("missing stdout line with prefix {prefix}: {output}"))
}
fn server_bin() -> PathBuf {
if let Some(path) = std::env::var_os("CARGO_BIN_EXE_k2db-api-server") {
return PathBuf::from(path);
}
let exe = std::env::current_exe().expect("current exe path");
let debug_dir = exe
.parent()
.and_then(|path| path.parent())
.expect("target debug dir");
debug_dir.join("k2db-api-server")
}
fn mongod_bin() -> PathBuf {
std::env::var_os("MONGOD_BIN")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("mongod"))
}
fn next_port() -> u16 {
TcpListener::bind("127.0.0.1:0")
.expect("bind ephemeral port")
.local_addr()
.expect("local addr")
.port()
}
async fn wait_for_mongo(uri: &str) {
for _ in 0..50 {
if let Ok(client) = mongodb::Client::with_uri_str(uri).await {
if client.database("admin").run_command(doc! { "ping": 1 }).await.is_ok() {
return;
}
}
sleep(Duration::from_millis(100)).await;
}
panic!("mongod did not become healthy in time");
}
async fn wait_for_http_ok(url: &str) {
let client = reqwest::Client::new();
for _ in 0..80 {
if let Ok(response) = client.get(url).send().await {
if response.status().is_success() {
return;
}
}
sleep(Duration::from_millis(100)).await;
}
panic!("server did not become ready: {url}");
}