use std::collections::HashSet;
use base64::Engine;
use chrono::Utc;
use k2db::OwnershipMode;
use mongodb::bson::{Bson, Document, doc};
use mongodb::options::{ClientOptions, IndexOptions};
use mongodb::{Client, Database, IndexModel};
use rand::RngCore;
use rand::rngs::OsRng;
use scrypt::{Params, scrypt};
use subtle::ConstantTimeEq;
use crate::bootstrap::BootstrapConfig;
use crate::cli::{BootstrapArgs, CreateKeyArgs, RevokeKeyArgs, SetConfigArgs};
const SERVER_CONFIG_COLLECTION: &str = "server_config";
const KEYS_COLLECTION: &str = "keys";
const AUDIT_LOG_COLLECTION: &str = "audit_log";
const MIGRATIONS_COLLECTION: &str = "migrations";
#[derive(Debug, Clone)]
pub struct InitOutcome {
pub bootstrap: BootstrapConfig,
pub server_config_id: String,
pub issued_key: Option<IssuedKey>,
}
#[derive(Debug, Clone)]
pub struct RecoverOutcome {
pub bootstrap: BootstrapConfig,
pub server_config_action: RecoverConfigAction,
pub server_config_id: String,
pub issued_key: Option<IssuedKey>,
}
#[derive(Debug, Clone, Copy)]
pub enum RecoverConfigAction {
Preserved,
Replaced,
}
#[derive(Debug, Clone)]
pub struct IssuedKey {
pub name: String,
pub key_id: String,
pub printable: String,
pub database: String,
pub permissions: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct KeySummary {
pub name: Option<String>,
pub key_id: String,
pub database: String,
pub permissions: Vec<String>,
pub active: bool,
pub expires_at: Option<i64>,
pub revoked_at: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct RevokeKeyOutcome {
pub key_id: String,
pub was_active: bool,
pub was_revoked: bool,
}
#[derive(Debug, Clone)]
pub struct ActiveServerConfig {
pub id: String,
pub listen_host: String,
pub listen_port: u16,
pub ownership_mode: OwnershipMode,
pub slow_query_ms: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct SetConfigOutcome {
pub previous_config_id: Option<String>,
pub active_config: ActiveServerConfig,
}
#[derive(Debug, thiserror::Error)]
pub enum ControlPlaneError {
#[error(transparent)]
Bootstrap(#[from] crate::bootstrap::BootstrapResolveError),
#[error("mongo control-plane error: {0}")]
Mongo(#[from] mongodb::error::Error),
#[error("invalid bootstrap input: {0}")]
InvalidInput(String),
#[error("bootstrap verification failed")]
BootstrapVerificationFailed,
#[error("control plane is already initialized")]
AlreadyInitialized,
#[error("control plane is not initialized")]
NotInitialized,
#[error("control-plane key not found: {0}")]
KeyNotFound(String),
}
#[derive(Debug, Clone)]
struct DesiredServerConfig {
listen_host: String,
listen_port: u16,
ownership_mode: OwnershipMode,
slow_query_ms: Option<u64>,
}
#[derive(Debug, Clone)]
struct SeedKeySpec {
name: String,
database: String,
permissions: Vec<String>,
}
pub async fn init(args: &BootstrapArgs) -> Result<InitOutcome, ControlPlaneError> {
let bootstrap = BootstrapConfig::resolve(args)?;
let desired = desired_server_config(args, None)?;
let client = connect(&bootstrap).await?;
let db = client.database(&bootstrap.system_db_name);
ensure_reserved_collections(&db).await?;
ensure_indexes(&db).await?;
if load_bootstrap_state(&db).await?.is_some() || has_any_server_config(&db).await? {
return Err(ControlPlaneError::AlreadyInitialized);
}
let actor = "bootstrap";
persist_bootstrap_state(&db, &bootstrap.bootstrap_token, actor).await?;
let server_config_id = insert_active_server_config(&db, &desired, actor).await?;
insert_init_migration(&db, actor).await?;
let issued_key = if let Some(spec) = seed_key_spec(args)? {
Some(insert_runtime_key(&db, &spec, actor).await?)
} else {
None
};
append_audit(
&db,
actor,
"control-plane.init",
&server_config_id,
"ok",
doc! {
"seeded_runtime_key": issued_key.as_ref().map(|key| key.name.clone()),
},
)
.await?;
Ok(InitOutcome {
bootstrap,
server_config_id,
issued_key,
})
}
pub async fn recover(args: &BootstrapArgs) -> Result<RecoverOutcome, ControlPlaneError> {
let bootstrap = BootstrapConfig::resolve(args)?;
let client = connect(&bootstrap).await?;
let db = client.database(&bootstrap.system_db_name);
ensure_reserved_collections(&db).await?;
ensure_indexes(&db).await?;
verify_bootstrap_token(&db, &bootstrap.bootstrap_token).await?;
let active_configs = load_active_server_configs(&db).await?;
let desired = if active_configs.len() == 1 {
let base = parse_server_config_document(&active_configs[0])?;
desired_server_config(args, Some(base))?
} else {
desired_server_config(args, None)?
};
let (server_config_action, server_config_id) = if active_configs.len() == 1
&& config_overrides_absent(args)
&& parse_server_config_document(&active_configs[0]).is_ok()
{
(
RecoverConfigAction::Preserved,
active_configs[0]
.get_str("_uuid")
.unwrap_or("unknown")
.to_owned(),
)
} else {
deactivate_active_server_configs(&db, "bootstrap").await?;
(
RecoverConfigAction::Replaced,
insert_active_server_config(&db, &desired, "bootstrap").await?,
)
};
let issued_key = if let Some(spec) = seed_key_spec(args)? {
Some(insert_runtime_key(&db, &spec, "bootstrap").await?)
} else {
None
};
append_audit(
&db,
"bootstrap",
"control-plane.recover",
&server_config_id,
"ok",
doc! {
"server_config_action": match server_config_action {
RecoverConfigAction::Preserved => "preserved",
RecoverConfigAction::Replaced => "replaced",
},
"seeded_runtime_key": issued_key.as_ref().map(|key| key.name.clone()),
},
)
.await?;
Ok(RecoverOutcome {
bootstrap,
server_config_action,
server_config_id,
issued_key,
})
}
pub async fn create_key(
bootstrap_args: &BootstrapArgs,
create_args: &CreateKeyArgs,
) -> Result<IssuedKey, ControlPlaneError> {
let bootstrap = BootstrapConfig::resolve(bootstrap_args)?;
let client = connect(&bootstrap).await?;
let db = client.database(&bootstrap.system_db_name);
ensure_reserved_collections(&db).await?;
ensure_indexes(&db).await?;
verify_bootstrap_token(&db, &bootstrap.bootstrap_token).await?;
let spec = create_key_spec(create_args)?;
insert_runtime_key(&db, &spec, "bootstrap").await
}
pub async fn list_keys(bootstrap_args: &BootstrapArgs) -> Result<Vec<KeySummary>, ControlPlaneError> {
let bootstrap = BootstrapConfig::resolve(bootstrap_args)?;
let client = connect(&bootstrap).await?;
let db = client.database(&bootstrap.system_db_name);
ensure_reserved_collections(&db).await?;
verify_bootstrap_token(&db, &bootstrap.bootstrap_token).await?;
let mut cursor = db
.collection::<Document>(KEYS_COLLECTION)
.find(doc! { "kind": "api_key" })
.await?;
let mut keys = Vec::new();
while cursor.advance().await? {
keys.push(key_summary_from_document(&cursor.deserialize_current()?)?);
}
keys.sort_by(|left, right| left.key_id.cmp(&right.key_id));
Ok(keys)
}
pub async fn revoke_key(
bootstrap_args: &BootstrapArgs,
revoke_args: &RevokeKeyArgs,
) -> Result<RevokeKeyOutcome, ControlPlaneError> {
let bootstrap = BootstrapConfig::resolve(bootstrap_args)?;
let client = connect(&bootstrap).await?;
let db = client.database(&bootstrap.system_db_name);
ensure_reserved_collections(&db).await?;
ensure_indexes(&db).await?;
verify_bootstrap_token(&db, &bootstrap.bootstrap_token).await?;
let key_id = revoke_args.key_id.trim();
if key_id.is_empty() {
return Err(ControlPlaneError::InvalidInput(
"key_id must not be empty".to_owned(),
));
}
let Some(existing) = db
.collection::<Document>(KEYS_COLLECTION)
.find_one(doc! {
"kind": "api_key",
"key_id": key_id,
})
.await?
else {
return Err(ControlPlaneError::KeyNotFound(key_id.to_owned()));
};
let was_active = existing.get_bool("active").unwrap_or(false);
let was_revoked = !matches!(existing.get("revoked_at"), None | Some(Bson::Null));
let now = Utc::now().timestamp_millis();
db.collection::<Document>(KEYS_COLLECTION)
.update_one(
doc! {
"kind": "api_key",
"key_id": key_id,
},
doc! {
"$set": {
"active": false,
"revoked_at": now,
"updated_at": now,
"updated_by": "bootstrap",
}
},
)
.await?;
append_audit(
&db,
"bootstrap",
"keys.revoke",
key_id,
"ok",
doc! {
"was_active": was_active,
"was_revoked": was_revoked,
},
)
.await?;
Ok(RevokeKeyOutcome {
key_id: key_id.to_owned(),
was_active,
was_revoked,
})
}
pub async fn get_config(
bootstrap_args: &BootstrapArgs,
) -> Result<ActiveServerConfig, ControlPlaneError> {
let bootstrap = BootstrapConfig::resolve(bootstrap_args)?;
let client = connect(&bootstrap).await?;
let db = client.database(&bootstrap.system_db_name);
ensure_reserved_collections(&db).await?;
verify_bootstrap_token(&db, &bootstrap.bootstrap_token).await?;
load_single_active_server_config(&db).await
}
pub async fn set_config(
bootstrap_args: &BootstrapArgs,
set_args: &SetConfigArgs,
) -> Result<SetConfigOutcome, ControlPlaneError> {
if !has_set_config_overrides(set_args) {
return Err(ControlPlaneError::InvalidInput(
"config set requires at least one override".to_owned(),
));
}
let bootstrap = BootstrapConfig::resolve(bootstrap_args)?;
let client = connect(&bootstrap).await?;
let db = client.database(&bootstrap.system_db_name);
ensure_reserved_collections(&db).await?;
ensure_indexes(&db).await?;
verify_bootstrap_token(&db, &bootstrap.bootstrap_token).await?;
let active_configs = load_active_server_configs(&db).await?;
let previous_config = if active_configs.len() == 1 {
Some(active_server_config_from_document(&active_configs[0])?)
} else {
None
};
let desired = desired_server_config_from_set_args(set_args, previous_config.as_ref())?;
deactivate_active_server_configs(&db, "bootstrap").await?;
let config_id = insert_active_server_config(&db, &desired, "bootstrap").await?;
append_audit(
&db,
"bootstrap",
"config.set",
&config_id,
"ok",
doc! {
"previous_config_id": previous_config.as_ref().map(|config| config.id.clone()),
},
)
.await?;
Ok(SetConfigOutcome {
previous_config_id: previous_config.as_ref().map(|config| config.id.clone()),
active_config: ActiveServerConfig {
id: config_id,
listen_host: desired.listen_host,
listen_port: desired.listen_port,
ownership_mode: desired.ownership_mode,
slow_query_ms: desired.slow_query_ms,
},
})
}
async fn connect(bootstrap: &BootstrapConfig) -> Result<Client, ControlPlaneError> {
let options = ClientOptions::parse(&bootstrap.mongo_uri).await?;
Ok(Client::with_options(options)?)
}
fn desired_server_config(
args: &BootstrapArgs,
base: Option<DesiredServerConfig>,
) -> Result<DesiredServerConfig, ControlPlaneError> {
let base = base.unwrap_or(DesiredServerConfig {
listen_host: "0.0.0.0".to_owned(),
listen_port: 3000,
ownership_mode: OwnershipMode::Strict,
slow_query_ms: Some(250),
});
let ownership_mode = match args.ownership_mode.as_deref() {
Some(value) => parse_ownership_mode(value)?,
None => base.ownership_mode,
};
let listen_host = args
.listen_host
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.unwrap_or(base.listen_host);
Ok(DesiredServerConfig {
listen_host,
listen_port: args.listen_port.unwrap_or(base.listen_port),
ownership_mode,
slow_query_ms: args.slow_query_ms.or(base.slow_query_ms),
})
}
fn config_overrides_absent(args: &BootstrapArgs) -> bool {
args.listen_host.is_none()
&& args.listen_port.is_none()
&& args.ownership_mode.is_none()
&& args.slow_query_ms.is_none()
}
fn has_set_config_overrides(args: &SetConfigArgs) -> bool {
args.listen_host.is_some()
|| args.listen_port.is_some()
|| args.ownership_mode.is_some()
|| args.slow_query_ms.is_some()
|| args.clear_slow_query_ms
}
fn seed_key_spec(args: &BootstrapArgs) -> Result<Option<SeedKeySpec>, ControlPlaneError> {
match (&args.seed_key_name, &args.seed_key_database) {
(None, None) => Ok(None),
(Some(_), None) | (None, Some(_)) => Err(ControlPlaneError::InvalidInput(
"seed_key_name and seed_key_database must be provided together".to_owned(),
)),
(Some(name), Some(database)) => {
let name = name.trim();
let database = database.trim();
if name.is_empty() || database.is_empty() {
return Err(ControlPlaneError::InvalidInput(
"seed key name and database must not be empty".to_owned(),
));
}
let mut seen = HashSet::new();
let permissions = args
.seed_key_permissions
.iter()
.map(|permission| permission.trim())
.filter(|permission| !permission.is_empty())
.filter(|permission| seen.insert((*permission).to_owned()))
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
if permissions.is_empty() {
return Err(ControlPlaneError::InvalidInput(
"at least one --seed-key-permission is required when seeding a runtime key"
.to_owned(),
));
}
Ok(Some(SeedKeySpec {
name: name.to_owned(),
database: database.to_owned(),
permissions,
}))
}
}
}
fn create_key_spec(args: &CreateKeyArgs) -> Result<SeedKeySpec, ControlPlaneError> {
let name = args.name.trim();
let database = args.database.trim();
if name.is_empty() || database.is_empty() {
return Err(ControlPlaneError::InvalidInput(
"key name and database must not be empty".to_owned(),
));
}
let mut seen = HashSet::new();
let permissions = args
.permissions
.iter()
.map(|permission| permission.trim())
.filter(|permission| !permission.is_empty())
.filter(|permission| seen.insert((*permission).to_owned()))
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
if permissions.is_empty() {
return Err(ControlPlaneError::InvalidInput(
"at least one --permission is required".to_owned(),
));
}
Ok(SeedKeySpec {
name: name.to_owned(),
database: database.to_owned(),
permissions,
})
}
fn desired_server_config_from_set_args(
args: &SetConfigArgs,
base: Option<&ActiveServerConfig>,
) -> Result<DesiredServerConfig, ControlPlaneError> {
let base = base.cloned().unwrap_or(ActiveServerConfig {
id: String::new(),
listen_host: "0.0.0.0".to_owned(),
listen_port: 3000,
ownership_mode: OwnershipMode::Strict,
slow_query_ms: Some(250),
});
let ownership_mode = match args.ownership_mode.as_deref() {
Some(value) => parse_ownership_mode(value)?,
None => base.ownership_mode,
};
let listen_host = args
.listen_host
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.unwrap_or(base.listen_host);
let slow_query_ms = if args.clear_slow_query_ms {
None
} else {
args.slow_query_ms.or(base.slow_query_ms)
};
Ok(DesiredServerConfig {
listen_host,
listen_port: args.listen_port.unwrap_or(base.listen_port),
ownership_mode,
slow_query_ms,
})
}
fn parse_ownership_mode(value: &str) -> Result<OwnershipMode, ControlPlaneError> {
match value.trim().to_ascii_lowercase().as_str() {
"lax" => Ok(OwnershipMode::Lax),
"strict" => Ok(OwnershipMode::Strict),
_ => Err(ControlPlaneError::InvalidInput(
"ownership_mode must be \"lax\" or \"strict\"".to_owned(),
)),
}
}
async fn ensure_reserved_collections(db: &Database) -> Result<(), ControlPlaneError> {
let existing = db.list_collection_names().await?;
for name in [
KEYS_COLLECTION,
SERVER_CONFIG_COLLECTION,
AUDIT_LOG_COLLECTION,
MIGRATIONS_COLLECTION,
] {
if !existing.iter().any(|item| item == name) {
db.create_collection(name).await?;
}
}
Ok(())
}
async fn ensure_indexes(db: &Database) -> Result<(), ControlPlaneError> {
db.collection::<Document>(KEYS_COLLECTION)
.create_index(
IndexModel::builder()
.keys(doc! { "key_id": 1 })
.options(
IndexOptions::builder()
.name(Some("uniq_key_id".to_owned()))
.unique(Some(true))
.build(),
)
.build(),
)
.await?;
db.collection::<Document>(KEYS_COLLECTION)
.create_index(IndexModel::builder().keys(doc! { "active": 1 }).build())
.await?;
db.collection::<Document>(KEYS_COLLECTION)
.create_index(IndexModel::builder().keys(doc! { "database": 1 }).build())
.await?;
db.collection::<Document>(KEYS_COLLECTION)
.create_index(IndexModel::builder().keys(doc! { "expires_at": 1 }).build())
.await?;
db.collection::<Document>(SERVER_CONFIG_COLLECTION)
.create_index(
IndexModel::builder()
.keys(doc! { "active": 1 })
.options(
IndexOptions::builder()
.name(Some("uniq_active_server_config".to_owned()))
.unique(Some(true))
.partial_filter_expression(Some(doc! {
"kind": "server_config",
"active": true,
}))
.build(),
)
.build(),
)
.await?;
db.collection::<Document>(SERVER_CONFIG_COLLECTION)
.create_index(IndexModel::builder().keys(doc! { "updated_at": 1 }).build())
.await?;
db.collection::<Document>(AUDIT_LOG_COLLECTION)
.create_index(IndexModel::builder().keys(doc! { "at": 1 }).build())
.await?;
db.collection::<Document>(AUDIT_LOG_COLLECTION)
.create_index(IndexModel::builder().keys(doc! { "actor": 1 }).build())
.await?;
db.collection::<Document>(AUDIT_LOG_COLLECTION)
.create_index(IndexModel::builder().keys(doc! { "kind": 1 }).build())
.await?;
Ok(())
}
async fn has_any_server_config(db: &Database) -> Result<bool, ControlPlaneError> {
Ok(db
.collection::<Document>(SERVER_CONFIG_COLLECTION)
.count_documents(doc! { "kind": "server_config" })
.await?
> 0)
}
async fn load_bootstrap_state(db: &Database) -> Result<Option<Document>, ControlPlaneError> {
Ok(db
.collection::<Document>(MIGRATIONS_COLLECTION)
.find_one(doc! { "kind": "bootstrap_state" })
.await?)
}
async fn persist_bootstrap_state(
db: &Database,
bootstrap_token: &str,
actor: &str,
) -> Result<(), ControlPlaneError> {
let now = Utc::now().timestamp_millis();
db.collection::<Document>(MIGRATIONS_COLLECTION)
.insert_one(doc! {
"_uuid": generate_id(12),
"kind": "bootstrap_state",
"secret_hash": hash_secret(bootstrap_token)?,
"created_at": now,
"updated_at": now,
"created_by": actor,
"updated_by": actor,
})
.await?;
Ok(())
}
async fn verify_bootstrap_token(
db: &Database,
bootstrap_token: &str,
) -> Result<(), ControlPlaneError> {
let Some(state) = load_bootstrap_state(db).await? else {
return Err(ControlPlaneError::NotInitialized);
};
let secret_hash = state
.get_str("secret_hash")
.map_err(|_| ControlPlaneError::BootstrapVerificationFailed)?;
verify_secret(bootstrap_token, secret_hash)?;
Ok(())
}
async fn insert_active_server_config(
db: &Database,
desired: &DesiredServerConfig,
actor: &str,
) -> Result<String, ControlPlaneError> {
let now = Utc::now().timestamp_millis();
let config_id = generate_id(12);
db.collection::<Document>(SERVER_CONFIG_COLLECTION)
.insert_one(doc! {
"_uuid": &config_id,
"kind": "server_config",
"active": true,
"listen": {
"host": &desired.listen_host,
"port": i64::from(desired.listen_port),
},
"auth": {
"cache_ttl_ms": 30000_i64,
"bootstrap_enabled": true,
},
"k2db": {
"ownership_mode": ownership_mode_string(desired.ownership_mode),
"slow_query_ms": desired.slow_query_ms.map(|value| value as i64),
},
"observability": {
"ratatouille_enabled": true,
"topic_filter": "*",
},
"created_at": now,
"updated_at": now,
"created_by": actor,
"updated_by": actor,
})
.await?;
Ok(config_id)
}
async fn insert_init_migration(db: &Database, actor: &str) -> Result<(), ControlPlaneError> {
let now = Utc::now().timestamp_millis();
db.collection::<Document>(MIGRATIONS_COLLECTION)
.insert_one(doc! {
"_uuid": generate_id(12),
"kind": "migration",
"name": "control_plane_init_v1",
"status": "complete",
"created_at": now,
"updated_at": now,
"created_by": actor,
"updated_by": actor,
})
.await?;
Ok(())
}
async fn load_active_server_configs(db: &Database) -> Result<Vec<Document>, ControlPlaneError> {
let mut cursor = db
.collection::<Document>(SERVER_CONFIG_COLLECTION)
.find(doc! {
"kind": "server_config",
"active": true,
})
.await?;
let mut docs = Vec::new();
while cursor.advance().await? {
docs.push(cursor.deserialize_current()?);
}
Ok(docs)
}
async fn load_single_active_server_config(
db: &Database,
) -> Result<ActiveServerConfig, ControlPlaneError> {
let active_configs = load_active_server_configs(db).await?;
if active_configs.len() != 1 {
return Err(ControlPlaneError::InvalidInput(format!(
"expected exactly one active server_config, found {}",
active_configs.len()
)));
}
active_server_config_from_document(&active_configs[0])
}
fn parse_server_config_document(document: &Document) -> Result<DesiredServerConfig, ControlPlaneError> {
let listen = document
.get_document("listen")
.map_err(|_| ControlPlaneError::InvalidInput("invalid active server_config.listen".to_owned()))?;
let k2db = document
.get_document("k2db")
.map_err(|_| ControlPlaneError::InvalidInput("invalid active server_config.k2db".to_owned()))?;
let listen_host = listen
.get_str("host")
.map_err(|_| ControlPlaneError::InvalidInput("invalid active server_config.listen.host".to_owned()))?
.to_owned();
let listen_port = read_u16(listen.get("port"))?;
let ownership_mode = parse_ownership_mode(
k2db.get_str("ownership_mode")
.map_err(|_| ControlPlaneError::InvalidInput("invalid active server_config.k2db.ownership_mode".to_owned()))?,
)?;
let slow_query_ms = match k2db.get("slow_query_ms") {
None | Some(Bson::Null) => None,
Some(Bson::Int64(value)) => Some(*value as u64),
Some(Bson::Int32(value)) => Some(*value as u64),
_ => {
return Err(ControlPlaneError::InvalidInput(
"invalid active server_config.k2db.slow_query_ms".to_owned(),
));
}
};
Ok(DesiredServerConfig {
listen_host,
listen_port,
ownership_mode,
slow_query_ms,
})
}
fn active_server_config_from_document(
document: &Document,
) -> Result<ActiveServerConfig, ControlPlaneError> {
let desired = parse_server_config_document(document)?;
let id = document
.get_str("_uuid")
.map_err(|_| ControlPlaneError::InvalidInput("active server_config missing _uuid".to_owned()))?
.to_owned();
Ok(ActiveServerConfig {
id,
listen_host: desired.listen_host,
listen_port: desired.listen_port,
ownership_mode: desired.ownership_mode,
slow_query_ms: desired.slow_query_ms,
})
}
fn key_summary_from_document(document: &Document) -> Result<KeySummary, ControlPlaneError> {
let key_id = document
.get_str("key_id")
.map_err(|_| ControlPlaneError::InvalidInput("key document missing key_id".to_owned()))?
.to_owned();
let database = document
.get_str("database")
.map_err(|_| ControlPlaneError::InvalidInput("key document missing database".to_owned()))?
.to_owned();
let permissions = document
.get_array("permissions")
.map_err(|_| ControlPlaneError::InvalidInput("key document missing permissions".to_owned()))?
.iter()
.map(|value| match value {
Bson::String(value) => Ok(value.clone()),
_ => Err(ControlPlaneError::InvalidInput(
"key document permissions must be strings".to_owned(),
)),
})
.collect::<Result<Vec<_>, _>>()?;
Ok(KeySummary {
name: document.get_str("label").ok().map(ToOwned::to_owned),
key_id,
database,
permissions,
active: document.get_bool("active").unwrap_or(false),
expires_at: read_i64(document.get("expires_at"))?,
revoked_at: read_i64(document.get("revoked_at"))?,
})
}
fn read_u16(value: Option<&Bson>) -> Result<u16, ControlPlaneError> {
match value {
Some(Bson::Int32(value)) if *value > 0 => Ok(*value as u16),
Some(Bson::Int64(value)) if *value > 0 && *value <= u16::MAX as i64 => Ok(*value as u16),
_ => Err(ControlPlaneError::InvalidInput(
"server_config.listen.port must be a positive u16".to_owned(),
)),
}
}
fn read_i64(value: Option<&Bson>) -> Result<Option<i64>, ControlPlaneError> {
match value {
None | Some(Bson::Null) => Ok(None),
Some(Bson::Int32(value)) => Ok(Some(i64::from(*value))),
Some(Bson::Int64(value)) => Ok(Some(*value)),
Some(Bson::DateTime(value)) => Ok(Some(value.timestamp_millis())),
_ => Err(ControlPlaneError::InvalidInput(
"expected integer or datetime timestamp".to_owned(),
)),
}
}
async fn deactivate_active_server_configs(
db: &Database,
actor: &str,
) -> Result<(), ControlPlaneError> {
let now = Utc::now().timestamp_millis();
db.collection::<Document>(SERVER_CONFIG_COLLECTION)
.update_many(
doc! {
"kind": "server_config",
"active": true,
},
doc! {
"$set": {
"active": false,
"updated_at": now,
"updated_by": actor,
}
},
)
.await?;
Ok(())
}
async fn insert_runtime_key(
db: &Database,
spec: &SeedKeySpec,
actor: &str,
) -> Result<IssuedKey, ControlPlaneError> {
let now = Utc::now().timestamp_millis();
let key_id = generate_id(10);
let secret = generate_id(24);
let printable = format!("rbk_sys_{key_id}.{secret}");
let secret_hash = hash_secret(&secret)?;
db.collection::<Document>(KEYS_COLLECTION)
.insert_one(doc! {
"_uuid": generate_id(12),
"kind": "api_key",
"key_id": &key_id,
"secret_hash": secret_hash,
"database": &spec.database,
"permissions": spec.permissions.iter().cloned().map(Bson::String).collect::<Vec<_>>(),
"active": true,
"expires_at": Bson::Null,
"created_at": now,
"updated_at": now,
"created_by": actor,
"updated_by": actor,
"revoked_at": Bson::Null,
"last_used_at": Bson::Null,
"label": &spec.name,
})
.await?;
append_audit(
db,
actor,
"keys.create",
&key_id,
"ok",
doc! {
"database": &spec.database,
"permissions": spec.permissions.iter().cloned().map(Bson::String).collect::<Vec<_>>(),
"label": &spec.name,
},
)
.await?;
Ok(IssuedKey {
name: spec.name.clone(),
key_id,
printable,
database: spec.database.clone(),
permissions: spec.permissions.clone(),
})
}
async fn append_audit(
db: &Database,
actor: &str,
action: &str,
target: &str,
status: &str,
details: Document,
) -> Result<(), ControlPlaneError> {
db.collection::<Document>(AUDIT_LOG_COLLECTION)
.insert_one(doc! {
"_uuid": generate_id(12),
"kind": "audit",
"at": Utc::now().timestamp_millis(),
"actor": actor,
"action": action,
"target": target,
"status": status,
"details": details,
})
.await?;
Ok(())
}
fn ownership_mode_string(mode: OwnershipMode) -> &'static str {
match mode {
OwnershipMode::Lax => "lax",
OwnershipMode::Strict => "strict",
}
}
fn generate_id(bytes: usize) -> String {
let mut value = vec![0_u8; bytes];
OsRng.fill_bytes(&mut value);
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(value)
}
fn hash_secret(secret: &str) -> Result<String, ControlPlaneError> {
let mut salt = [0_u8; 16];
OsRng.fill_bytes(&mut salt);
let params = Params::new(14, 8, 1, 64)
.map_err(|_| ControlPlaneError::InvalidInput("invalid scrypt parameters".to_owned()))?;
let mut out = vec![0_u8; 64];
scrypt(secret.as_bytes(), &salt, ¶ms, &mut out)
.map_err(|_| ControlPlaneError::InvalidInput("failed to hash bootstrap secret".to_owned()))?;
Ok(format!(
"scrypt$16384$8$1${}${}",
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(salt),
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(out)
))
}
fn verify_secret(secret: &str, secret_hash: &str) -> Result<(), ControlPlaneError> {
let parts = secret_hash.split('$').collect::<Vec<_>>();
if parts.len() != 6 || parts[0] != "scrypt" {
return Err(ControlPlaneError::BootstrapVerificationFailed);
}
let n = parts[1]
.parse::<u32>()
.map_err(|_| ControlPlaneError::BootstrapVerificationFailed)?;
let r = parts[2]
.parse::<u32>()
.map_err(|_| ControlPlaneError::BootstrapVerificationFailed)?;
let p = parts[3]
.parse::<u32>()
.map_err(|_| ControlPlaneError::BootstrapVerificationFailed)?;
if !n.is_power_of_two() {
return Err(ControlPlaneError::BootstrapVerificationFailed);
}
let salt = decode_base64url(parts[4])?;
let expected = decode_base64url(parts[5])?;
let params = Params::new(n.ilog2() as u8, r, p, expected.len())
.map_err(|_| ControlPlaneError::BootstrapVerificationFailed)?;
let mut derived = vec![0_u8; expected.len()];
scrypt(secret.as_bytes(), &salt, ¶ms, &mut derived)
.map_err(|_| ControlPlaneError::BootstrapVerificationFailed)?;
if derived.as_slice().ct_eq(expected.as_slice()).into() {
Ok(())
} else {
Err(ControlPlaneError::BootstrapVerificationFailed)
}
}
fn decode_base64url(value: &str) -> Result<Vec<u8>, ControlPlaneError> {
if value.is_empty() {
return Err(ControlPlaneError::BootstrapVerificationFailed);
}
let mut normalized = value.replace('-', "+").replace('_', "/");
match normalized.len() % 4 {
0 => {}
2 => normalized.push_str("=="),
3 => normalized.push('='),
_ => return Err(ControlPlaneError::BootstrapVerificationFailed),
}
base64::engine::general_purpose::STANDARD
.decode(normalized)
.map_err(|_| ControlPlaneError::BootstrapVerificationFailed)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn desired_server_config_uses_contract_defaults() {
let args = BootstrapArgs {
mongo_uri: None,
mongo_uri_env: None,
bootstrap_token: None,
bootstrap_token_env: None,
system_db_name: "k2_system".to_owned(),
listen_host: None,
listen_port: None,
ownership_mode: None,
slow_query_ms: None,
seed_key_name: None,
seed_key_database: None,
seed_key_permissions: Vec::new(),
};
let desired = desired_server_config(&args, None).expect("desired config");
assert_eq!(desired.listen_host, "0.0.0.0");
assert_eq!(desired.listen_port, 3000);
assert_eq!(desired.ownership_mode, OwnershipMode::Strict);
assert_eq!(desired.slow_query_ms, Some(250));
}
#[test]
fn seed_key_requires_complete_inputs() {
let args = BootstrapArgs {
mongo_uri: None,
mongo_uri_env: None,
bootstrap_token: None,
bootstrap_token_env: None,
system_db_name: "k2_system".to_owned(),
listen_host: None,
listen_port: None,
ownership_mode: None,
slow_query_ms: None,
seed_key_name: Some("worker".to_owned()),
seed_key_database: None,
seed_key_permissions: vec!["collections.read".to_owned()],
};
assert!(seed_key_spec(&args).is_err());
}
#[test]
fn hash_and_verify_secret_round_trip() {
let hash = hash_secret("bootstrap-secret").expect("hash");
verify_secret("bootstrap-secret", &hash).expect("verify");
assert!(verify_secret("wrong", &hash).is_err());
}
#[test]
fn create_key_spec_requires_permissions() {
let args = CreateKeyArgs {
name: "worker".to_owned(),
database: "project_a".to_owned(),
permissions: Vec::new(),
};
assert!(create_key_spec(&args).is_err());
}
#[test]
fn set_config_requires_overrides() {
let args = SetConfigArgs {
listen_host: None,
listen_port: None,
ownership_mode: None,
slow_query_ms: None,
clear_slow_query_ms: false,
};
assert!(!has_set_config_overrides(&args));
}
}