use std::collections::{BTreeSet, HashMap};
use k2db::{AggregationMode, DatabaseConfig, HostConfig, OwnershipMode, QueryHooks};
use mongodb::bson::{Bson, doc};
use mongodb::options::{ClientOptions, ServerAddress};
use mongodb::{Client, Database};
use serde::Deserialize;
use crate::bootstrap::BootstrapConfig;
#[derive(Debug, Clone)]
pub struct AppConfig {
pub server: ServerConfig,
pub k2db: K2DbSection,
pub apikey: ApiKeySection,
}
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub host: String,
pub port: u16,
}
#[derive(Debug, Clone)]
pub struct K2DbSection {
pub hosts: Vec<HostEntry>,
pub user: Option<String>,
pub password: Option<String>,
pub auth_source: Option<String>,
pub ownership_mode: Option<OwnershipMode>,
pub replica_set: Option<String>,
pub slow_query_ms: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct HostEntry {
pub host: String,
pub port: Option<u16>,
}
#[derive(Debug, Clone)]
pub struct ApiKeySection {
pub keys: HashMap<String, ApiKeyConfig>,
}
#[derive(Debug, Clone)]
pub struct ApiKeyConfig {
pub key_id: String,
pub secret_hash: String,
pub database: String,
pub permissions: Vec<String>,
pub active: bool,
pub expires_at: Option<i64>,
}
#[derive(Debug, thiserror::Error)]
pub enum AppConfigError {
#[error("mongo control-plane error: {0}")]
Mongo(#[from] mongodb::error::Error),
#[error("missing active server_config document in control-plane database")]
MissingActiveServerConfig,
#[error("invalid configuration: {0}")]
Validation(String),
}
#[derive(Debug, Deserialize)]
struct ServerConfigDocument {
listen: ListenConfigDocument,
#[serde(default)]
k2db: RuntimeK2DbDocument,
}
#[derive(Debug, Deserialize)]
struct ListenConfigDocument {
host: String,
port: u16,
}
#[derive(Debug, Default, Deserialize)]
struct RuntimeK2DbDocument {
#[serde(default)]
ownership_mode: Option<OwnershipMode>,
#[serde(default)]
slow_query_ms: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct ApiKeyDocument {
key_id: String,
secret_hash: String,
database: String,
#[serde(default)]
permissions: Vec<String>,
active: bool,
#[serde(default)]
expires_at: Option<Bson>,
}
impl AppConfig {
pub async fn load(bootstrap: &BootstrapConfig) -> Result<Self, AppConfigError> {
let options = ClientOptions::parse(&bootstrap.mongo_uri).await?;
let server_hosts = hosts_from_options(&options)?;
let user = options
.credential
.as_ref()
.and_then(|credential| credential.username.clone());
let password = options
.credential
.as_ref()
.and_then(|credential| credential.password.clone());
let auth_source = options
.credential
.as_ref()
.and_then(|credential| credential.source.clone());
let replica_set = options.repl_set_name.clone();
let client = Client::with_options(options)?;
let control_plane = client.database(&bootstrap.system_db_name);
let server_doc = load_server_config(&control_plane).await?;
let keys = load_api_keys(&control_plane).await?;
let config = Self {
server: ServerConfig {
host: server_doc.listen.host,
port: server_doc.listen.port,
},
k2db: K2DbSection {
hosts: server_hosts,
user,
password,
auth_source,
ownership_mode: server_doc.k2db.ownership_mode,
replica_set,
slow_query_ms: server_doc.k2db.slow_query_ms,
},
apikey: ApiKeySection { keys },
};
config.validate()?;
Ok(config)
}
pub fn tenant_databases(&self) -> Vec<String> {
self.apikey
.keys
.values()
.map(|entry| entry.database.trim().to_owned())
.filter(|name| !name.is_empty())
.collect::<BTreeSet<_>>()
.into_iter()
.collect()
}
pub fn database_config(&self, database: &str) -> DatabaseConfig {
DatabaseConfig {
name: database.to_owned(),
hosts: self
.k2db
.hosts
.iter()
.map(|host| HostConfig {
host: host.host.clone(),
port: host.port,
})
.collect(),
user: self.k2db.user.clone(),
password: self.k2db.password.clone(),
auth_source: self.k2db.auth_source.clone(),
replica_set: self.k2db.replica_set.clone(),
slow_query_ms: self.k2db.slow_query_ms,
ownership_mode: self.k2db.ownership_mode.unwrap_or_default(),
aggregation_mode: AggregationMode::default(),
secure_field_prefixes: Vec::new(),
secure_field_encryption: None,
hooks: QueryHooks::default(),
}
}
fn validate(&self) -> Result<(), AppConfigError> {
if self.server.host.trim().is_empty() {
return Err(AppConfigError::Validation(
"server.listen.host must not be empty".to_owned(),
));
}
if self.server.port == 0 {
return Err(AppConfigError::Validation(
"server.listen.port must be positive".to_owned(),
));
}
if self.k2db.hosts.is_empty() {
return Err(AppConfigError::Validation(
"bootstrap mongo_uri must resolve at least one host".to_owned(),
));
}
if self.k2db.hosts.iter().any(|host| host.host.trim().is_empty()) {
return Err(AppConfigError::Validation(
"bootstrap mongo_uri resolved an empty host entry".to_owned(),
));
}
for key in self.apikey.keys.values() {
if key.key_id.trim().is_empty() {
return Err(AppConfigError::Validation(
"control-plane key_id is required".to_owned(),
));
}
if key.secret_hash.trim().is_empty() {
return Err(AppConfigError::Validation(format!(
"control-plane secret_hash is required for key {}",
key.key_id
)));
}
if key.database.trim().is_empty() {
return Err(AppConfigError::Validation(format!(
"control-plane database binding is required for key {}",
key.key_id
)));
}
}
Ok(())
}
}
async fn load_server_config(
control_plane: &Database,
) -> Result<ServerConfigDocument, AppConfigError> {
let collection = control_plane.collection::<ServerConfigDocument>("server_config");
let filter = doc! {
"kind": "server_config",
"active": true,
};
let count = collection.count_documents(filter.clone()).await?;
if count == 0 {
return Err(AppConfigError::MissingActiveServerConfig);
}
if count > 1 {
return Err(AppConfigError::Validation(
"multiple active server_config documents found".to_owned(),
));
}
collection
.find_one(filter)
.await?
.ok_or(AppConfigError::MissingActiveServerConfig)
}
async fn load_api_keys(
control_plane: &Database,
) -> Result<HashMap<String, ApiKeyConfig>, AppConfigError> {
let collection = control_plane.collection::<ApiKeyDocument>("keys");
let mut cursor = collection.find(doc! { "kind": "api_key" }).await?;
let mut keys = HashMap::new();
while cursor.advance().await? {
let document = cursor.deserialize_current()?;
let key_id = document.key_id.trim().to_owned();
if key_id.is_empty() {
return Err(AppConfigError::Validation(
"control-plane key document contains an empty key_id".to_owned(),
));
}
if keys.contains_key(&key_id) {
return Err(AppConfigError::Validation(format!(
"duplicate control-plane key_id detected: {key_id}"
)));
}
keys.insert(
key_id.clone(),
ApiKeyConfig {
key_id,
secret_hash: document.secret_hash,
database: document.database,
permissions: document.permissions,
active: document.active,
expires_at: normalize_expiry(document.expires_at)?,
},
);
}
Ok(keys)
}
fn hosts_from_options(options: &ClientOptions) -> Result<Vec<HostEntry>, AppConfigError> {
let mut hosts = Vec::with_capacity(options.hosts.len());
for address in &options.hosts {
match address {
ServerAddress::Tcp { host, port } => hosts.push(HostEntry {
host: host.clone(),
port: *port,
}),
#[cfg(unix)]
ServerAddress::Unix { path } => {
return Err(AppConfigError::Validation(format!(
"unix socket bootstrap addresses are not supported: {}",
path.display()
)));
}
_ => {
return Err(AppConfigError::Validation(
"unsupported bootstrap mongo server address".to_owned(),
));
}
}
}
Ok(hosts)
}
fn normalize_expiry(value: Option<Bson>) -> Result<Option<i64>, AppConfigError> {
match value {
None | Some(Bson::Null) => Ok(None),
Some(Bson::Int64(value)) => Ok(Some(value)),
Some(Bson::Int32(value)) => Ok(Some(i64::from(value))),
Some(Bson::DateTime(value)) => Ok(Some(value.timestamp_millis())),
Some(Bson::String(value)) => chrono::DateTime::parse_from_rfc3339(value.trim())
.map(|parsed| Some(parsed.timestamp_millis()))
.map_err(|_| {
AppConfigError::Validation(format!(
"invalid control-plane expires_at value: {value}"
))
}),
Some(other) => Err(AppConfigError::Validation(format!(
"unsupported control-plane expires_at type: {other}"
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn base_config() -> AppConfig {
AppConfig {
server: ServerConfig {
host: "0.0.0.0".to_owned(),
port: 3000,
},
k2db: K2DbSection {
hosts: vec![HostEntry {
host: "localhost".to_owned(),
port: Some(27017),
}],
user: None,
password: None,
auth_source: None,
ownership_mode: Some(OwnershipMode::Strict),
replica_set: None,
slow_query_ms: Some(250),
},
apikey: ApiKeySection {
keys: HashMap::from([
(
"beta".to_owned(),
ApiKeyConfig {
key_id: "beta".to_owned(),
secret_hash: "hash-2".to_owned(),
database: "zed".to_owned(),
permissions: Vec::new(),
active: true,
expires_at: None,
},
),
(
"alpha".to_owned(),
ApiKeyConfig {
key_id: "alpha".to_owned(),
secret_hash: "hash-1".to_owned(),
database: "alpha".to_owned(),
permissions: Vec::new(),
active: true,
expires_at: None,
},
),
(
"gamma".to_owned(),
ApiKeyConfig {
key_id: "gamma".to_owned(),
secret_hash: "hash-3".to_owned(),
database: "zed".to_owned(),
permissions: Vec::new(),
active: true,
expires_at: None,
},
),
]),
},
}
}
#[test]
fn tenant_databases_are_unique_and_sorted() {
assert_eq!(
base_config().tenant_databases(),
vec!["alpha".to_owned(), "zed".to_owned()]
);
}
#[test]
fn normalize_expiry_accepts_epoch_millis() {
let value = normalize_expiry(Some(Bson::Int64(1_770_000_000_000)))
.expect("expiry");
assert_eq!(value, Some(1_770_000_000_000));
}
#[test]
fn normalize_expiry_accepts_rfc3339_strings() {
let value = normalize_expiry(Some(Bson::String("2026-02-03T04:05:06Z".to_owned())))
.expect("expiry")
.expect("timestamp");
assert_eq!(value, 1_770_091_506_000);
}
}