use moltendb_auth as auth;
use moltendb_core::{engine, handlers};
use axum::{
extract::{Extension, Path, Query as AxumQuery, State},
http::StatusCode,
Json,
};
use sysinfo::{Disks, System};
use serde_json::{json, Value};
use std::collections::HashMap as QueryMap;
use std::time::{Duration, Instant};
pub async fn handle_delegate(
State((_, _, _, _, root_username)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): axum::extract::Extension<auth::Claims>,
Json(payload): Json<auth::DelegateRequest>,
) -> Result<Json<auth::DelegateResponse>, (StatusCode, Json<Value>)> {
if !claims.is_admin() {
return Err((
StatusCode::FORBIDDEN,
Json(json!({"error": "Admin access required to delegate tokens"})),
));
}
if payload.scopes.iter().any(|s| s == "*:*:*") && claims.sub != root_username {
return Err((
StatusCode::FORBIDDEN,
Json(json!({"error": "Only the root user can mint '*:*:*' (admin) tokens"})),
));
}
for scope in &payload.scopes {
if scope != "*:*:*" {
let parts: Vec<&str> = scope.splitn(3, ':').collect();
if parts.len() != 3 {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!("Invalid scope '{}'. Expected format: 'action:collection:key'", scope)
})),
));
}
}
}
let ttl = payload.ttl_secs.unwrap_or(3600);
let (token, jti) = auth::create_scoped_token(&payload.client_id, payload.scopes.clone(), ttl)
.map_err(|e| (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": format!("Token creation failed: {}", e)})),
))?;
Ok(Json(auth::DelegateResponse {
token,
client_id: payload.client_id,
scopes: payload.scopes,
jti,
}))
}
pub async fn handle_login(
State((_, users, _, _, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Json(payload): Json<auth::LoginRequest>,
) -> Result<Json<auth::LoginResponse>, (StatusCode, Json<Value>)> {
if users.verify_user(&payload.username, &payload.password) {
match auth::create_token(&payload.username) {
Ok(token) => Ok(Json(auth::LoginResponse { token })),
Err(_) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "Failed to create token"})),
)),
}
} else {
Err((
StatusCode::UNAUTHORIZED,
Json(json!({"error": "Invalid credentials"})),
))
}
}
pub async fn handle_set(
State((db, _, max_body_size, max_keys_per_request, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): Extension<auth::Claims>,
Json(payload): Json<Value>,
) -> (StatusCode, Json<Value>) {
let collection = payload.get("collection").and_then(|v| v.as_str()).unwrap_or("");
if !claims.has_collection_access("write", collection) {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": format!("Forbidden: token requires 'write:{}:*' scope", collection) })),
);
}
let (code, body) = handlers::process_set(&db, &payload, max_body_size, max_keys_per_request);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
}
pub async fn handle_update(
State((db, _, max_body_size, max_keys_per_request, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): Extension<auth::Claims>,
Json(payload): Json<Value>,
) -> (StatusCode, Json<Value>) {
let collection = payload.get("collection").and_then(|v| v.as_str()).unwrap_or("");
if !claims.has_collection_access("write", collection) {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": format!("Forbidden: token requires 'write:{}:*' scope", collection) })),
);
}
let (code, body) = handlers::process_update(&db, &payload, max_body_size, max_keys_per_request);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
}
pub async fn handle_get(
State((db, _, max_body_size, max_keys_per_request, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): Extension<auth::Claims>,
Json(payload): Json<Value>,
) -> (StatusCode, Json<Value>) {
let collection = payload.get("collection").and_then(|v| v.as_str()).unwrap_or("");
if claims.has_collection_access("read", collection) {
let (code, body) = handlers::process_get(&db, &payload, max_body_size, max_keys_per_request);
return (StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body));
}
let allowed_keys: Vec<String> = claims.allowed_keys("read", collection);
if allowed_keys.is_empty() {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": format!("Forbidden: token has no read access to collection '{}'", collection) })),
);
}
if let Some(keys_val) = payload.get("keys") {
let requested: Vec<String> = match keys_val {
Value::String(s) => vec![s.clone()],
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str().map(String::from)).collect(),
_ => vec![],
};
for k in &requested {
if !claims.has_access("read", collection, k) {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": format!("Forbidden: token lacks 'read:{}:{}' scope", collection, k) })),
);
}
}
let (code, body) = handlers::process_get(&db, &payload, max_body_size, max_keys_per_request);
return (StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body));
}
if claims.has_prefix_wildcard("read", collection) {
let prefixes = claims.extract_prefixes("read", collection);
let mut scoped_payload = payload.clone();
scoped_payload.as_object_mut().map(|o| o.remove("_allowed_prefixes"));
if !prefixes.is_empty() {
scoped_payload["_allowed_prefixes"] = json!(prefixes);
}
let (code, body) = handlers::process_get(&db, &scoped_payload, max_body_size, max_keys_per_request);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
} else {
let mut scoped_payload = payload.clone();
scoped_payload["keys"] = json!(allowed_keys);
let (code, body) = handlers::process_get(&db, &scoped_payload, max_body_size, max_keys_per_request);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
}
}
pub async fn handle_delete(
State((db, _, max_body_size, max_keys_per_request, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): Extension<auth::Claims>,
Json(payload): Json<Value>,
) -> (StatusCode, Json<Value>) {
let collection = payload.get("collection").and_then(|v| v.as_str()).unwrap_or("");
if !claims.has_collection_access("delete", collection) {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": format!("Forbidden: token requires 'delete:{}:*' scope", collection) })),
);
}
let (code, body) = handlers::process_delete(&db, &payload, max_body_size, max_keys_per_request);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
}
#[cfg(feature = "schema")]
pub async fn handle_schema(
State((db, _, max_body_size, max_keys_per_request, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Json(payload): Json<Value>,
) -> (StatusCode, Json<Value>) {
let (code, body) = handlers::process_schema(&db, &payload, max_body_size, max_keys_per_request);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
}
pub async fn handle_snapshot(
State((db, _, _, _, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): Extension<auth::Claims>,
) -> (StatusCode, Json<Value>) {
if !claims.is_admin() {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": "Forbidden: snapshot requires admin scope" })),
);
}
let (code, body) = handlers::process_snapshot(&db);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
}
pub async fn handle_rest_get(
State((db, _, max_body_size, max_keys_per_request, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): Extension<auth::Claims>,
Path((collection, key)): Path<(String, String)>,
) -> (StatusCode, Json<Value>) {
if !claims.has_access("read", &collection, &key) {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": format!("Forbidden: token lacks 'read:{}:{}' scope", collection, key) })),
);
}
let payload = json!({
"collection": collection,
"keys": key
});
let (code, body) = handlers::process_get(&db, &payload, max_body_size, max_keys_per_request);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
}
pub async fn handle_rest_get_collection(
State((db, _, max_body_size, max_keys_per_request, _)): State<(engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): Extension<auth::Claims>,
Path(collection): Path<String>,
AxumQuery(params): AxumQuery<QueryMap<String, String>>,
) -> (StatusCode, Json<Value>) {
if !claims.has_collection_access("read", &collection) {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": format!("Forbidden: token requires 'read:{}:*' scope", collection) })),
);
}
let mut payload = json!({ "collection": collection });
if let Some(limit) = params.get("limit").and_then(|v| v.parse::<u64>().ok()) {
payload["count"] = json!(limit);
}
if let Some(offset) = params.get("offset").and_then(|v| v.parse::<u64>().ok()) {
payload["offset"] = json!(offset);
}
let (code, body) = handlers::process_get(&db, &payload, max_body_size, max_keys_per_request);
(StatusCode::from_u16(code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), Json(body))
}
pub async fn handle_revoke(
Extension(claims): Extension<auth::Claims>,
Extension(revocation_store): Extension<auth::RevocationStore>,
Extension(revocations_path): Extension<auth::RevocationsPath>,
Path(jti): Path<String>,
Json(payload): Json<Value>,
) -> (StatusCode, Json<Value>) {
if !claims.is_admin() {
return (
StatusCode::FORBIDDEN,
Json(json!({"error": "Admin access required to revoke tokens"})),
);
}
if jti.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "jti must not be empty"})),
);
}
let prune_after = if let Some(exp_secs) = payload.get("exp").and_then(|v| v.as_u64()) {
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let remaining = exp_secs.saturating_sub(now_secs);
Instant::now() + Duration::from_secs(remaining)
} else {
Instant::now() + Duration::from_secs(86400)
};
revocation_store.revoke(&jti, prune_after);
revocation_store.save_to_file(&revocations_path.0);
(
StatusCode::OK,
Json(json!({"revoked": jti, "message": "Token has been revoked successfully"})),
)
}
pub async fn handle_health() -> (StatusCode, Json<Value>) {
(StatusCode::OK, Json(json!({ "status": "ok", "message": "MoltenDB is running" })))
}
pub async fn handle_metrics(
State((db, _, _, _, _)): State<(moltendb_core::engine::Db, auth::UserStore, usize, usize, String)>,
Extension(claims): Extension<auth::Claims>,
) -> (StatusCode, Json<Value>) {
if !claims.is_admin() {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": "Admin access required" })),
);
}
let uptime_seconds = db.started_at.elapsed().as_secs();
let mut sys = System::new_all();
sys.refresh_all();
let total_ram = sys.total_memory();
let used_ram = sys.used_memory();
let free_ram = sys.free_memory();
let pid = sysinfo::get_current_pid().ok();
let process_memory_bytes = pid
.and_then(|p| sys.process(p))
.map(|p| p.memory())
.unwrap_or(0);
let disks = Disks::new_with_refreshed_list();
let disk_info: Vec<Value> = disks.iter().map(|d| {
let total = d.total_space();
let avail = d.available_space();
let used = total.saturating_sub(avail);
json!({
"mount": d.mount_point().to_string_lossy(),
"total_bytes": total,
"used_bytes": used,
"available_bytes": avail,
})
}).collect();
let hot_keys_count: usize = db.hot_keys_count();
let wal_size_bytes = db.storage.get_size().unwrap_or(0);
let storage_mode = if db.tiered_mode { "tiered" } else { "standard" };
(StatusCode::OK, Json(json!({
"uptime_seconds": uptime_seconds,
"process": {
"memory_used_bytes": process_memory_bytes,
},
"host": {
"memory": {
"total_bytes": total_ram,
"used_bytes": used_ram,
"free_bytes": free_ram,
},
"disks": disk_info,
},
"database": {
"hot_keys_count": hot_keys_count,
"hot_tier_threshold": db.hot_threshold,
"wal_size_bytes": wal_size_bytes,
"storage_mode": storage_mode,
},
})))
}