use std::sync::Arc;
use axum::{
extract::{Path as AxumPath, Query, State},
http::{HeaderValue, StatusCode},
response::IntoResponse,
routing::{delete, get, post},
Json, Router,
};
use serde_json::{json, Value};
use crate::auth;
use crate::command::Command;
use crate::handler::{self, CommandResult};
use crate::server::AppState;
type AppResult = Result<Json<Value>, (StatusCode, Json<Value>)>;
type EngineHandle = Arc<parking_lot::Mutex<yantrikdb::YantrikDB>>;
type AppError = (StatusCode, Json<Value>);
fn app_error(status: StatusCode, message: impl Into<String>) -> AppError {
(status, Json(json!({ "error": message.into() })))
}
fn resolve_engine(
state: &AppState,
auth_header: Option<&str>,
) -> Result<(i64, EngineHandle), AppError> {
let token = auth_header
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "missing Bearer token"))?;
if let Some(ref cluster) = state.cluster {
if let Some(ref secret) = cluster.config.cluster_secret {
if token == secret.as_str() {
let control = state.control.lock();
let db_record = control
.get_database("default")
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| {
app_error(StatusCode::NOT_FOUND, "default database not found")
})?;
drop(control);
let engine = state
.pool
.get_engine(&db_record)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
state.workers.start_for_database(
db_record.id,
db_record.name.clone(),
std::sync::Arc::clone(&engine),
);
return Ok((db_record.id, engine));
}
}
}
let token_hash = auth::hash_token(token);
let control = state.control.lock();
let db_id = control
.validate_token(&token_hash)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "invalid or revoked token"))?;
let db_record = control
.get_database_by_id(db_id)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| app_error(StatusCode::NOT_FOUND, "database not found"))?;
drop(control);
let engine = state
.pool
.get_engine(&db_record)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
state.workers.start_for_database(
db_id,
db_record.name.clone(),
std::sync::Arc::clone(&engine),
);
Ok((db_id, engine))
}
async fn execute_cmd(
engine: Arc<parking_lot::Mutex<yantrikdb::YantrikDB>>,
cmd: Command,
control: Arc<parking_lot::Mutex<crate::control::ControlDb>>,
inflight: &std::sync::atomic::AtomicU32,
) -> AppResult {
use std::sync::atomic::Ordering;
let current = inflight.fetch_add(1, Ordering::Relaxed);
if current >= crate::server::MAX_INFLIGHT {
inflight.fetch_sub(1, Ordering::Relaxed);
return Err(app_error(
StatusCode::SERVICE_UNAVAILABLE,
format!(
"server overloaded: {} inflight ops (max {}). Retry later.",
current,
crate::server::MAX_INFLIGHT,
),
));
}
let result = tokio::task::spawn_blocking(move || {
let lock_start = std::time::Instant::now();
let db = engine.lock();
crate::metrics::record_engine_lock_wait(lock_start.elapsed());
handler::execute_with_guard(db, cmd, Some(control.as_ref()))
})
.await
.map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("join error: {e}"),
)
});
inflight.fetch_sub(1, Ordering::Relaxed);
let result = result?;
match result {
Ok(CommandResult::Json(v)) => Ok(Json(v)),
Ok(CommandResult::RecallResults { results, total }) => {
Ok(Json(json!({ "results": results, "total": total })))
}
Ok(CommandResult::Pong) => Ok(Json(json!({ "status": "ok" }))),
Err(e) => Err(app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
}
}
async fn health(State(state): State<Arc<AppState>>) -> Json<Value> {
let mut payload = json!({
"status": "ok",
"engines_loaded": state.pool.loaded_count(),
});
if let Some(ref cluster) = state.cluster {
payload["cluster"] = json!({
"node_id": cluster.node_id(),
"role": format!("{:?}", cluster.state.leader_role()),
"term": cluster.state.current_term(),
"leader": cluster.state.current_leader(),
"accepts_writes": cluster.state.accepts_writes(),
"healthy": cluster.is_healthy(),
});
}
Json(payload)
}
async fn health_deep(State(state): State<Arc<AppState>>) -> (StatusCode, Json<Value>) {
let mut checks = Vec::new();
let mut all_pass = true;
{
let engine_check = tokio::task::spawn_blocking({
let control = state.control.clone();
let pool = state.pool.clone();
move || {
let start = std::time::Instant::now();
let db_record = {
let ctrl = control.lock();
ctrl.get_database("default").ok().flatten()
};
if let Some(rec) = db_record {
if let Ok(engine) = pool.get_engine(&rec) {
let timeout = std::time::Duration::from_millis(100);
if engine.try_lock_for(timeout).is_some() {
let elapsed = start.elapsed();
return json!({
"check": "engine_lock",
"pass": true,
"latency_ms": elapsed.as_secs_f64() * 1000.0,
});
}
}
}
let elapsed = start.elapsed();
json!({
"check": "engine_lock",
"pass": false,
"latency_ms": elapsed.as_secs_f64() * 1000.0,
"error": "could not acquire engine lock within 100ms",
})
}
})
.await
.unwrap_or_else(|e| json!({"check": "engine_lock", "pass": false, "error": e.to_string()}));
if !engine_check["pass"].as_bool().unwrap_or(false) {
all_pass = false;
}
checks.push(engine_check);
}
{
let control_check = tokio::task::spawn_blocking({
let control = state.control.clone();
move || {
let start = std::time::Instant::now();
let ctrl = control.lock();
match ctrl.list_databases() {
Ok(dbs) => {
let elapsed = start.elapsed();
json!({
"check": "control_db",
"pass": true,
"latency_ms": elapsed.as_secs_f64() * 1000.0,
"databases": dbs.len(),
})
}
Err(e) => {
let elapsed = start.elapsed();
json!({
"check": "control_db",
"pass": false,
"latency_ms": elapsed.as_secs_f64() * 1000.0,
"error": e.to_string(),
})
}
}
}
})
.await
.unwrap_or_else(|e| json!({"check": "control_db", "pass": false, "error": e.to_string()}));
if !control_check["pass"].as_bool().unwrap_or(false) {
all_pass = false;
}
checks.push(control_check);
}
if let Some(ref cluster) = state.cluster {
let healthy = cluster.is_healthy();
if !healthy {
all_pass = false;
}
checks.push(json!({
"check": "cluster_quorum",
"pass": healthy,
"node_id": cluster.node_id(),
"role": format!("{:?}", cluster.state.leader_role()),
"term": cluster.state.current_term(),
"leader": cluster.state.current_leader(),
}));
}
let status = if all_pass {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let encryption_enabled = state.pool.is_encrypted();
let encryption_status = if encryption_enabled {
json!({"enabled": true, "algorithm": "AES-256-GCM"})
} else {
json!({"enabled": false, "algorithm": null})
};
let in_flight_used = state.admission.cfg.max_in_flight_recall
- state.admission.in_flight_recall.available_permits();
let expanded_used = state.admission.cfg.max_concurrent_expanded_recall
- state.admission.expanded_recall.available_permits();
let admission_state = json!({
"hard_top_k_cap": state.admission.cfg.hard_top_k_cap,
"max_request_body_bytes": state.admission.cfg.max_request_body_bytes,
"in_flight_recall": {
"max": state.admission.cfg.max_in_flight_recall,
"in_use": in_flight_used,
},
"expanded_recall": {
"max": state.admission.cfg.max_concurrent_expanded_recall,
"in_use": expanded_used,
},
});
let runtime_state = json!({
"control_runtime_isolated": state.control_runtime.is_some(),
});
let local_snap = crate::version::VersionSnapshot::local();
let version_block = json!({
"build_id": local_snap.build_id,
"wire": local_snap.wire,
"min_supported_wire": local_snap.min_supported_wire,
"table_schema_versions": local_snap.table_schema_versions,
});
(
status,
Json(json!({
"status": if all_pass { "healthy" } else { "degraded" },
"encryption": encryption_status,
"checks": checks,
"admission": admission_state,
"runtime": runtime_state,
"version": version_block,
})),
)
}
fn check_memory_quota(
state: &AppState,
db_id: i64,
engine: &EngineHandle,
count: usize,
) -> Result<(), (StatusCode, Json<Value>)> {
let quota = {
let ctrl = state.control.lock();
ctrl.get_quota(db_id).unwrap_or_default()
};
let current = engine
.try_lock()
.and_then(|db| db.stats(None).ok())
.map(|s| s.active_memories)
.unwrap_or(0);
if current + count as i64 > quota.max_memories {
return Err(app_error(
StatusCode::TOO_MANY_REQUESTS,
format!(
"would exceed memory quota: current={}, adding={}, max={}",
current, count, quota.max_memories,
),
));
}
Ok(())
}
fn check_writable(state: &AppState) -> Result<(), (StatusCode, Json<Value>)> {
if let Some(ref cluster) = state.cluster {
if !cluster.state.accepts_writes() {
let leader = cluster.state.current_leader();
let msg = match leader {
Some(id) => format!("read-only: not the leader (current leader: node {})", id),
None => "read-only: no leader elected".into(),
};
return Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({"error": msg, "leader": leader})),
));
}
}
Ok(())
}
async fn remember(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("remember");
check_writable(&state)?;
let (db_id, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
check_memory_quota(&state, db_id, &engine, 1)?;
let cmd = Command::Remember {
text: body["text"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'text'"))?
.into(),
memory_type: body
.get("memory_type")
.and_then(|v| v.as_str())
.unwrap_or("semantic")
.into(),
importance: body
.get("importance")
.and_then(|v| v.as_f64())
.unwrap_or(0.5),
valence: body.get("valence").and_then(|v| v.as_f64()).unwrap_or(0.0),
half_life: body
.get("half_life")
.and_then(|v| v.as_f64())
.unwrap_or(168.0),
metadata: body.get("metadata").cloned().unwrap_or(json!({})),
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
certainty: body
.get("certainty")
.and_then(|v| v.as_f64())
.unwrap_or(1.0),
domain: body
.get("domain")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
source: body
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("user")
.into(),
emotional_state: body
.get("emotional_state")
.and_then(|v| v.as_str())
.map(String::from),
embedding: body.get("embedding").and_then(|v| {
v.as_array().map(|a| {
a.iter()
.filter_map(|x| x.as_f64().map(|f| f as f32))
.collect()
})
}),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn remember_batch(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("remember_batch");
check_writable(&state)?;
let (db_id, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let memories_arr = body
.get("memories")
.and_then(|v| v.as_array())
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'memories' array"))?;
if memories_arr.is_empty() {
return Ok(Json(json!({"rids": [], "count": 0})));
}
let quota = {
let ctrl = state.control.lock();
ctrl.get_quota(db_id).unwrap_or_default()
};
if memories_arr.len() > quota.max_batch_size as usize {
return Err(app_error(
StatusCode::TOO_MANY_REQUESTS,
format!(
"batch size {} exceeds quota {} for this database",
memories_arr.len(),
quota.max_batch_size
),
));
}
check_memory_quota(&state, db_id, &engine, memories_arr.len())?;
if memories_arr.len() > 10_000 {
return Err(app_error(
StatusCode::BAD_REQUEST,
"batch size exceeds 10000",
));
}
let mut memories = Vec::with_capacity(memories_arr.len());
for (i, m) in memories_arr.iter().enumerate() {
let text = m
.get("text")
.and_then(|v| v.as_str())
.ok_or_else(|| {
app_error(
StatusCode::BAD_REQUEST,
format!("memories[{}]: missing 'text'", i),
)
})?
.to_string();
memories.push(crate::command::RememberInput {
text,
memory_type: m
.get("memory_type")
.and_then(|v| v.as_str())
.unwrap_or("semantic")
.into(),
importance: m.get("importance").and_then(|v| v.as_f64()).unwrap_or(0.5),
valence: m.get("valence").and_then(|v| v.as_f64()).unwrap_or(0.0),
half_life: m.get("half_life").and_then(|v| v.as_f64()).unwrap_or(168.0),
metadata: m.get("metadata").cloned().unwrap_or(json!({})),
namespace: m
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
certainty: m.get("certainty").and_then(|v| v.as_f64()).unwrap_or(1.0),
domain: m
.get("domain")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
source: m
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("user")
.into(),
emotional_state: m
.get("emotional_state")
.and_then(|v| v.as_str())
.map(String::from),
embedding: m.get("embedding").and_then(|v| {
v.as_array().map(|a| {
a.iter()
.filter_map(|x| x.as_f64().map(|f| f as f32))
.collect()
})
}),
});
}
let cmd = Command::RememberBatch { memories };
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn recall(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("recall");
let top_k = body.get("top_k").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
let expand_entities = body
.get("expand_entities")
.and_then(|v| v.as_bool())
.unwrap_or(true);
crate::metrics::record_recall_request("v1", expand_entities);
crate::metrics::record_recall_top_k("v1", top_k);
if let Err(reason) = crate::admission::check_top_k(top_k, state.admission.cfg.hard_top_k_cap) {
let status = StatusCode::from_u16(reason.http_status()).unwrap_or(StatusCode::BAD_REQUEST);
return Err((
status,
Json(json!({
"error": reason.message(),
"reason": reason.metric_label(),
"hard_top_k_cap": state.admission.cfg.hard_top_k_cap,
})),
));
}
let _permits = match state
.admission
.acquire_recall_permits(expand_entities)
.await
{
Ok(p) => p,
Err(reason) => {
let status = StatusCode::from_u16(reason.http_status())
.unwrap_or(StatusCode::SERVICE_UNAVAILABLE);
return Err((
status,
Json(json!({
"error": reason.message(),
"reason": reason.metric_label(),
"retry_after_ms": 200,
})),
));
}
};
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Recall {
query: body["query"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'query'"))?
.into(),
top_k,
memory_type: body
.get("memory_type")
.and_then(|v| v.as_str())
.map(String::from),
include_consolidated: body
.get("include_consolidated")
.and_then(|v| v.as_bool())
.unwrap_or(false),
expand_entities,
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.map(String::from),
domain: body
.get("domain")
.and_then(|v| v.as_str())
.map(String::from),
source: body
.get("source")
.and_then(|v| v.as_str())
.map(String::from),
query_embedding: body.get("query_embedding").and_then(|v| {
v.as_array().map(|a| {
a.iter()
.filter_map(|x| x.as_f64().map(|f| f as f32))
.collect()
})
}),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn forget(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("forget");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let rid = body["rid"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rid'"))?
.into();
execute_cmd(
engine,
Command::Forget { rid },
state.control.clone(),
&state.inflight,
)
.await
}
async fn relate(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> Result<impl IntoResponse, AppError> {
let _timer = crate::metrics::HandlerTimer::new("relate");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Relate {
entity: body["entity"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'entity'"))?
.into(),
target: body["target"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'target'"))?
.into(),
relationship: body["relationship"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'relationship'"))?
.into(),
weight: body.get("weight").and_then(|v| v.as_f64()).unwrap_or(1.0),
};
let json = execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await?;
let mut response = json.into_response();
response
.headers_mut()
.insert("deprecation", HeaderValue::from_static("true"));
response.headers_mut().insert(
"link",
HeaderValue::from_static(r#"</v1/claim>; rel="successor-version""#),
);
Ok(response)
}
async fn ingest_claim(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("ingest_claim");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::IngestClaim {
src: body["src"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'src'"))?
.into(),
rel_type: body["rel_type"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rel_type'"))?
.into(),
dst: body["dst"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'dst'"))?
.into(),
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("default")
.into(),
polarity: body.get("polarity").and_then(|v| v.as_i64()).unwrap_or(1) as i32,
modality: body
.get("modality")
.and_then(|v| v.as_str())
.unwrap_or("asserted")
.into(),
valid_from: body.get("valid_from").and_then(|v| v.as_f64()),
valid_to: body.get("valid_to").and_then(|v| v.as_f64()),
extractor: body
.get("extractor")
.and_then(|v| v.as_str())
.unwrap_or("manual")
.into(),
extractor_version: body
.get("extractor_version")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
confidence_band: body
.get("confidence_band")
.and_then(|v| v.as_str())
.unwrap_or("medium")
.into(),
source_memory_rid: body
.get("source_memory_rid")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
span_start: body
.get("span_start")
.and_then(|v| v.as_i64())
.map(|v| v as i32),
span_end: body
.get("span_end")
.and_then(|v| v.as_i64())
.map(|v| v as i32),
weight: body.get("weight").and_then(|v| v.as_f64()).unwrap_or(1.0),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn add_alias(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("add_alias");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::AddAlias {
alias: body["alias"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'alias'"))?
.into(),
canonical_name: body["canonical_name"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'canonical_name'"))?
.into(),
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("default")
.into(),
source: body
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("explicit")
.into(),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn get_claims(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("get_claims");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let entity = params
.get("entity")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'entity' query parameter"))?;
let namespace = params.get("namespace").cloned();
execute_cmd(
engine,
Command::GetClaims { entity, namespace },
state.control.clone(),
&state.inflight,
)
.await
}
async fn think(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("think");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Think {
run_consolidation: body
.get("run_consolidation")
.and_then(|v| v.as_bool())
.unwrap_or(true),
run_conflict_scan: body
.get("run_conflict_scan")
.and_then(|v| v.as_bool())
.unwrap_or(true),
run_pattern_mining: body
.get("run_pattern_mining")
.and_then(|v| v.as_bool())
.unwrap_or(false),
run_personality: body
.get("run_personality")
.and_then(|v| v.as_bool())
.unwrap_or(true),
consolidation_limit: body
.get("consolidation_limit")
.and_then(|v| v.as_u64())
.unwrap_or(50) as usize,
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn conflicts(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("conflicts");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Conflicts {
status: params.get("status").cloned(),
conflict_type: params.get("conflict_type").cloned(),
entity: params.get("entity").cloned(),
namespace: params.get("namespace").cloned(),
limit: params
.get("limit")
.and_then(|v| v.parse().ok())
.unwrap_or(50),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn resolve_conflict(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(conflict_id): AxumPath<String>,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("resolve_conflict");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Resolve {
conflict_id,
strategy: body["strategy"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'strategy'"))?
.into(),
winner_rid: body
.get("winner_rid")
.and_then(|v| v.as_str())
.map(String::from),
new_text: body
.get("new_text")
.and_then(|v| v.as_str())
.map(String::from),
resolution_note: body
.get("resolution_note")
.and_then(|v| v.as_str())
.map(String::from),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn session_start(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::SessionStart {
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("default")
.into(),
client_id: body
.get("client_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
metadata: body.get("metadata").cloned().unwrap_or(json!({})),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn session_end(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(session_id): AxumPath<String>,
body: Option<Json<Value>>,
) -> AppResult {
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let summary =
body.and_then(|Json(b)| b.get("summary").and_then(|v| v.as_str()).map(String::from));
let cmd = Command::SessionEnd {
session_id,
summary,
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn personality(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("personality");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
execute_cmd(
engine,
Command::Personality,
state.control.clone(),
&state.inflight,
)
.await
}
async fn stats(State(state): State<Arc<AppState>>, headers: axum::http::HeaderMap) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("stats");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
execute_cmd(
engine,
Command::Stats,
state.control.clone(),
&state.inflight,
)
.await
}
async fn create_database(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
check_writable(&state)?;
let _ = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let name: String = body["name"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'name'"))?
.to_string();
let control = state.control.lock();
if control
.database_exists(&name)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
{
return Err(app_error(
StatusCode::CONFLICT,
format!("database '{}' already exists", name),
));
}
let id = control
.create_database(&name, &name)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
drop(control);
let db_dir = state.pool.data_dir().join(&name);
std::fs::create_dir_all(&db_dir)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(json!({
"name": name,
"id": id,
"message": format!("database '{}' created", name),
})))
}
async fn cluster_promote(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _ = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let Some(ref ctx) = state.cluster else {
return Err(app_error(
StatusCode::BAD_REQUEST,
"single-node mode — nothing to promote",
));
};
if !matches!(ctx.state.configured_role, crate::config::NodeRole::Voter) {
return Err(app_error(
StatusCode::BAD_REQUEST,
"this node is not a voter — cannot become leader",
));
}
if ctx.state.is_leader() {
return Ok(Json(json!({
"status": "already_leader",
"node_id": ctx.node_id(),
"term": ctx.state.current_term(),
})));
}
let ctx_clone = std::sync::Arc::clone(ctx);
tokio::spawn(async move {
if let Err(e) = crate::cluster::election::start_election(ctx_clone).await {
tracing::error!(error = %e, "manual promotion failed");
}
});
Ok(Json(json!({
"status": "election_started",
"node_id": ctx.node_id(),
"current_term": ctx.state.current_term(),
"message": "check /v1/cluster in a few seconds to see the new leader"
})))
}
async fn metrics(State(state): State<Arc<AppState>>) -> String {
let mut out = String::new();
out.push_str("# HELP yantrikdb_engines_loaded Number of engine instances currently loaded\n");
out.push_str("# TYPE yantrikdb_engines_loaded gauge\n");
out.push_str(&format!(
"yantrikdb_engines_loaded {}\n",
state.pool.loaded_count()
));
if let Some(ref cluster) = state.cluster {
out.push_str("# HELP yantrikdb_cluster_term Current Raft term\n");
out.push_str("# TYPE yantrikdb_cluster_term gauge\n");
out.push_str(&format!(
"yantrikdb_cluster_term {{node_id=\"{}\"}} {}\n",
cluster.node_id(),
cluster.state.current_term()
));
out.push_str("# HELP yantrikdb_cluster_is_leader Whether this node is currently the leader (1) or not (0)\n");
out.push_str("# TYPE yantrikdb_cluster_is_leader gauge\n");
out.push_str(&format!(
"yantrikdb_cluster_is_leader {{node_id=\"{}\"}} {}\n",
cluster.node_id(),
if cluster.state.is_leader() { 1 } else { 0 }
));
out.push_str(
"# HELP yantrikdb_cluster_healthy Whether this node has quorum (1) or not (0)\n",
);
out.push_str("# TYPE yantrikdb_cluster_healthy gauge\n");
out.push_str(&format!(
"yantrikdb_cluster_healthy {{node_id=\"{}\"}} {}\n",
cluster.node_id(),
if cluster.is_healthy() { 1 } else { 0 }
));
out.push_str("# HELP yantrikdb_cluster_peer_reachable Whether each peer is reachable\n");
out.push_str("# TYPE yantrikdb_cluster_peer_reachable gauge\n");
for peer in cluster.peers.snapshot() {
out.push_str(&format!(
"yantrikdb_cluster_peer_reachable {{addr=\"{}\",role=\"{:?}\"}} {}\n",
peer.addr,
peer.configured_role,
if peer.reachable { 1 } else { 0 }
));
}
}
let default_db = {
let control = state.control.lock();
control.get_database("default").ok().flatten()
};
if let Some(rec) = default_db {
if let Ok(engine) = state.pool.get_engine(&rec) {
let stats_opt = {
engine.try_lock().and_then(|db| db.stats(None).ok())
};
if let Some(stats) = stats_opt {
{
out.push_str("# HELP yantrikdb_active_memories Number of active memories\n");
out.push_str("# TYPE yantrikdb_active_memories gauge\n");
out.push_str(&format!(
"yantrikdb_active_memories {{db=\"default\"}} {}\n",
stats.active_memories
));
out.push_str(
"# HELP yantrikdb_consolidated_memories Number of consolidated memories\n",
);
out.push_str("# TYPE yantrikdb_consolidated_memories gauge\n");
out.push_str(&format!(
"yantrikdb_consolidated_memories {{db=\"default\"}} {}\n",
stats.consolidated_memories
));
out.push_str("# HELP yantrikdb_edges Number of knowledge graph edges\n");
out.push_str("# TYPE yantrikdb_edges gauge\n");
out.push_str(&format!(
"yantrikdb_edges {{db=\"default\"}} {}\n",
stats.edges
));
out.push_str(
"# HELP yantrikdb_open_conflicts Number of unresolved conflicts\n",
);
out.push_str("# TYPE yantrikdb_open_conflicts gauge\n");
out.push_str(&format!(
"yantrikdb_open_conflicts {{db=\"default\"}} {}\n",
stats.open_conflicts
));
out.push_str("# HELP yantrikdb_operations_total Total operations\n");
out.push_str("# TYPE yantrikdb_operations_total counter\n");
out.push_str(&format!(
"yantrikdb_operations_total {{db=\"default\"}} {}\n",
stats.operations
));
}
}
}
}
out.push_str(&crate::metrics::global().render_prometheus());
out
}
async fn cluster_status(State(state): State<Arc<AppState>>) -> Json<Value> {
let Some(ref ctx) = state.cluster else {
return Json(json!({
"clustered": false,
"message": "single-node mode (no replication)"
}));
};
let peers: Vec<Value> = ctx
.peers
.snapshot()
.into_iter()
.map(|p| {
json!({
"node_id": p.node_id,
"addr": p.addr,
"role": format!("{:?}", p.configured_role).to_lowercase(),
"reachable": p.reachable,
"current_term": p.current_term,
"last_seen_secs_ago": p.last_seen.map(|t| t.elapsed().as_secs_f64()),
})
})
.collect();
Json(json!({
"clustered": true,
"node_id": ctx.node_id(),
"role": format!("{:?}", ctx.state.leader_role()),
"configured_role": format!("{:?}", ctx.state.configured_role).to_lowercase(),
"current_term": ctx.state.current_term(),
"leader_id": ctx.state.current_leader(),
"voted_for": ctx.state.voted_for(),
"accepts_writes": ctx.state.accepts_writes(),
"healthy": ctx.is_healthy(),
"quorum_size": ctx.quorum_size(),
"peers": peers,
}))
}
async fn list_databases(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _ = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let databases = state
.control
.lock()
.list_databases()
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let list: Vec<Value> = databases
.iter()
.map(|d| json!({ "id": d.id, "name": d.name, "created_at": d.created_at }))
.collect();
Ok(Json(json!({ "databases": list })))
}
async fn control_snapshot(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let token = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "missing Bearer token"))?;
let is_master = state
.cluster
.as_ref()
.and_then(|c| c.config.cluster_secret.as_ref())
.map(|s| token == s.as_str())
.unwrap_or(false);
if !is_master {
return Err(app_error(
StatusCode::FORBIDDEN,
"control-snapshot requires cluster master token",
));
}
let snapshot = tokio::task::spawn_blocking({
let control = state.control.clone();
move || control.lock().export_snapshot()
})
.await
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(serde_json::to_value(snapshot).unwrap_or_default()))
}
async fn admin_snapshot(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let token = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "missing Bearer token"))?;
let db_name = body
.get("database")
.and_then(|v| v.as_str())
.unwrap_or("default")
.to_string();
let is_master = state
.cluster
.as_ref()
.and_then(|c| c.config.cluster_secret.as_ref())
.map(|s| token == s.as_str())
.unwrap_or(false);
if !is_master {
let token_hash = auth::hash_token(token);
let control = state.control.lock();
let token_db_id = control
.validate_token(&token_hash)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "invalid or revoked token"))?;
let target_db = control
.get_database(&db_name)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| {
app_error(
StatusCode::NOT_FOUND,
format!("database '{}' not found", db_name),
)
})?;
drop(control);
if token_db_id != target_db.id {
return Err(app_error(
StatusCode::FORBIDDEN,
format!(
"token does not authenticate database '{}' — provide cluster master token \
or a token for that specific database",
db_name
),
));
}
}
let output_dir = body
.get("output_dir")
.and_then(|v| v.as_str())
.map(std::path::PathBuf::from)
.unwrap_or_else(|| state.pool.data_dir().join("snapshots"));
let control = state.control.clone();
let pool = state.pool.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<Value> {
let db_record = {
let ctrl = control.lock();
ctrl.get_database(&db_name)?
.ok_or_else(|| anyhow::anyhow!("database '{}' not found", db_name))?
};
let engine = pool.get_engine(&db_record)?;
let db = engine.lock();
let conn = db.conn();
conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)")?;
drop(conn);
let src_dir = pool.data_dir().join(&db_record.path);
let src_db = src_dir.join("yantrik.db");
if !src_db.exists() {
anyhow::bail!("database file not found: {:?}", src_db);
}
std::fs::create_dir_all(&output_dir)?;
let ts = chrono_ts();
let dest_name = format!("{}-{}.db", db_name, ts);
let dest_path = output_dir.join(&dest_name);
std::fs::copy(&src_db, &dest_path)?;
let data = std::fs::read(&dest_path)?;
let hash = blake3::hash(&data);
let size = data.len();
Ok(serde_json::json!({
"database": db_name,
"path": dest_path.to_str().unwrap_or(""),
"size_bytes": size,
"checksum_blake3": hash.to_hex().to_string(),
"timestamp": ts,
}))
})
.await
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(result))
}
async fn ingest_claim_with_lineage(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("ingest_claim_with_lineage");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let source_lineage: Vec<String> = body
.get("source_lineage")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let cmd = Command::IngestClaimWithLineage {
src: body["src"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'src'"))?
.into(),
rel_type: body["rel_type"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rel_type'"))?
.into(),
dst: body["dst"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'dst'"))?
.into(),
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("default")
.into(),
polarity: body.get("polarity").and_then(|v| v.as_i64()).unwrap_or(1) as i32,
modality: body
.get("modality")
.and_then(|v| v.as_str())
.unwrap_or("asserted")
.into(),
valid_from: body.get("valid_from").and_then(|v| v.as_f64()),
valid_to: body.get("valid_to").and_then(|v| v.as_f64()),
extractor: body
.get("extractor")
.and_then(|v| v.as_str())
.unwrap_or("manual")
.into(),
extractor_version: body
.get("extractor_version")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
confidence_band: body
.get("confidence_band")
.and_then(|v| v.as_str())
.unwrap_or("medium")
.into(),
source_memory_rid: body
.get("source_memory_rid")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
weight: body.get("weight").and_then(|v| v.as_f64()).unwrap_or(1.0),
source_lineage,
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn get_mobility(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("get_mobility");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::GetMobility {
src: params
.get("src")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'src'"))?,
rel_type: params
.get("rel_type")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rel_type'"))?,
dst: params
.get("dst")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'dst'"))?,
namespace: params
.get("namespace")
.cloned()
.unwrap_or_else(|| "default".to_string()),
regime: params
.get("regime")
.cloned()
.unwrap_or_else(|| "default".to_string()),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn get_contest(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("get_contest");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::GetContest {
src: params
.get("src")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'src'"))?,
rel_type: params
.get("rel_type")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rel_type'"))?,
dst: params
.get("dst")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'dst'"))?,
namespace: params
.get("namespace")
.cloned()
.unwrap_or_else(|| "default".to_string()),
regime: params
.get("regime")
.cloned()
.unwrap_or_else(|| "default".to_string()),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn record_move_event(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("record_move_event");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let string_array = |key: &str| -> Vec<String> {
body.get(key)
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default()
};
let cmd = Command::RecordMoveEvent {
move_type: body["move_type"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'move_type'"))?
.into(),
operator_version: body
.get("operator_version")
.and_then(|v| v.as_str())
.unwrap_or("v1")
.into(),
context_regime: body
.get("context_regime")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
observability: body
.get("observability")
.and_then(|v| v.as_str())
.unwrap_or("observed")
.into(),
inference_confidence: body.get("inference_confidence").and_then(|v| v.as_f64()),
inference_basis: body
.get("inference_basis")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(|s| s.to_string()))
.collect()
}),
input_claim_ids: string_array("input_claim_ids"),
output_claim_ids: string_array("output_claim_ids"),
side_effect_claim_ids: string_array("side_effect_claim_ids"),
dependencies: string_array("dependencies"),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn list_flagged(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("list_flagged");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let flag_mask = params
.get("flag_mask")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let limit = params
.get("limit")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(50);
let cmd = Command::ListFlaggedPropositions { flag_mask, limit };
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
fn require_master_token(state: &AppState, headers: &axum::http::HeaderMap) -> Result<(), AppError> {
let token = headers
.get("authorization")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "missing Bearer token"))?;
if let Some(ref cluster) = state.cluster {
if let Some(ref secret) = cluster.config.cluster_secret {
if token == secret.as_str() {
return Ok(());
}
}
}
Err(app_error(
StatusCode::FORBIDDEN,
"debug endpoints require the cluster master token",
))
}
async fn debug_history(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(tenant_id): AxumPath<i64>,
axum::extract::Query(params): axum::extract::Query<crate::debug::history::HistoryParams>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_history");
require_master_token(&state, &headers)?;
let resp = crate::debug::history::read_history(
&state.commit_log,
crate::commit::TenantId::new(tenant_id),
¶ms,
)
.await
.map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("read_history failed: {e}"),
)
})?;
Ok(Json(serde_json::to_value(resp).map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("history serialize failed: {e}"),
)
})?))
}
#[derive(serde::Deserialize)]
struct DebugFaultInjectBody {
fault: crate::debug::FaultKind,
ttl_secs: Option<u64>,
}
async fn debug_fault_inject(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<DebugFaultInjectBody>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_fault_inject");
require_master_token(&state, &headers)?;
let id = state.fault_registry.inject(body.fault, body.ttl_secs);
Ok(Json(json!({
"fault_id": id.to_string(),
})))
}
async fn debug_fault_list(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_fault_list");
require_master_token(&state, &headers)?;
let faults = state.fault_registry.list();
Ok(Json(serde_json::to_value(faults).map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("fault list serialize failed: {e}"),
)
})?))
}
async fn debug_fault_clear(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_fault_clear");
require_master_token(&state, &headers)?;
let n = state.fault_registry.clear();
Ok(Json(json!({ "cleared": n })))
}
async fn debug_fault_remove(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(fault_id): AxumPath<u64>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_fault_remove");
require_master_token(&state, &headers)?;
let removed = state
.fault_registry
.remove(crate::debug::FaultId::new(fault_id));
if removed {
Ok(Json(json!({ "removed": true })))
} else {
Err(app_error(
StatusCode::NOT_FOUND,
format!("no fault with id fault_{fault_id}"),
))
}
}
#[derive(serde::Deserialize)]
struct JobsListParams {
tenant: Option<i64>,
state: Option<String>,
limit: Option<usize>,
}
const MAX_JOBS_LIST_LIMIT: usize = 500;
async fn jobs_list(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Query(params): axum::extract::Query<JobsListParams>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("jobs_list");
require_master_token(&state, &headers)?;
let tenant_filter = params.tenant.map(crate::commit::TenantId::new);
let state_filter = params
.state
.as_deref()
.and_then(crate::jobs::JobState::from_str);
let limit = params.limit.unwrap_or(100).min(MAX_JOBS_LIST_LIMIT);
let records = state
.jobs
.list(tenant_filter, state_filter, limit)
.await
.map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("jobs list failed: {e}"),
)
})?;
Ok(Json(serde_json::to_value(records).map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("jobs list serialize failed: {e}"),
)
})?))
}
async fn jobs_get(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(job_id_str): AxumPath<String>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("jobs_get");
require_master_token(&state, &headers)?;
let uuid = job_id_str.parse::<uuid7::Uuid>().map_err(|e| {
app_error(
StatusCode::BAD_REQUEST,
format!("invalid job id `{job_id_str}`: {e}"),
)
})?;
let record = state
.jobs
.get(crate::jobs::JobId(uuid))
.await
.map_err(|e| match e {
crate::jobs::JobError::NotFound { .. } => {
app_error(StatusCode::NOT_FOUND, e.to_string())
}
other => app_error(StatusCode::INTERNAL_SERVER_ERROR, other.to_string()),
})?;
Ok(Json(serde_json::to_value(record).map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("job serialize failed: {e}"),
)
})?))
}
async fn jobs_cancel(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(job_id_str): AxumPath<String>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("jobs_cancel");
require_master_token(&state, &headers)?;
let uuid = job_id_str.parse::<uuid7::Uuid>().map_err(|e| {
app_error(
StatusCode::BAD_REQUEST,
format!("invalid job id `{job_id_str}`: {e}"),
)
})?;
state
.jobs
.cancel(crate::jobs::JobId(uuid))
.await
.map_err(|e| match e {
crate::jobs::JobError::NotFound { .. } => {
app_error(StatusCode::NOT_FOUND, e.to_string())
}
crate::jobs::JobError::TerminalState { .. } => {
app_error(StatusCode::CONFLICT, e.to_string())
}
other => app_error(StatusCode::INTERNAL_SERVER_ERROR, other.to_string()),
})?;
Ok(Json(json!({ "cancelled": job_id_str })))
}
async fn admin_migrations_list(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("admin_migrations");
require_master_token(&state, &headers)?;
let mut all = serde_json::json!({});
let commit_log_path = state.data_dir.join("commit_log.sqlite");
let jobs_path = state.data_dir.join("jobs.sqlite");
for (label, path) in [("commit_log", &commit_log_path), ("jobs", &jobs_path)] {
let summary = match rusqlite::Connection::open_with_flags(
path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
) {
Ok(conn) => match crate::migrations::MigrationRunner::applied_summary(&conn) {
Ok(rows) => serde_json::json!(rows
.iter()
.map(|(id, name)| serde_json::json!({"id": id, "name": name}))
.collect::<Vec<_>>()),
Err(e) => serde_json::json!({"error": e.to_string()}),
},
Err(e) => serde_json::json!({"error": format!("open failed: {e}")}),
};
all[label] = summary;
}
Ok(Json(all))
}
fn chrono_ts() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
format!("{}", secs)
}
pub fn router(state: Arc<AppState>) -> Router {
let body_limit = state.admission.cfg.max_request_body_bytes;
let raft_sub_router = state.raft.as_ref().map(|assembly| {
crate::raft::raft_status_router(assembly.raft.clone())
.merge(crate::raft::raft_receive_router(assembly.raft.clone()))
});
let mut app = Router::new()
.route("/v1/health", get(health))
.route("/v1/health/deep", get(health_deep))
.route("/v1/remember", post(remember))
.route("/v1/remember/batch", post(remember_batch))
.route("/v1/recall", post(recall))
.route("/v1/forget", post(forget))
.route("/v1/relate", post(relate))
.route("/v1/claim", post(ingest_claim))
.route("/v1/claims", get(get_claims))
.route("/v1/alias", post(add_alias))
.route("/v1/think", post(think))
.route("/v1/conflicts", get(conflicts))
.route("/v1/conflicts/{id}/resolve", post(resolve_conflict))
.route("/v1/sessions", post(session_start))
.route("/v1/sessions/{id}", delete(session_end))
.route("/v1/personality", get(personality))
.route("/v1/stats", get(stats))
.route("/v1/databases", post(create_database))
.route("/v1/databases", get(list_databases))
.route("/v1/cluster", get(cluster_status))
.route("/v1/cluster/promote", post(cluster_promote))
.route("/v1/admin/control-snapshot", get(control_snapshot))
.route("/v1/admin/snapshot", post(admin_snapshot))
.route("/v1/claim_with_lineage", post(ingest_claim_with_lineage))
.route("/v1/mobility", get(get_mobility))
.route("/v1/contest", get(get_contest))
.route("/v1/move_events", post(record_move_event))
.route("/v1/flagged_propositions", get(list_flagged))
.route("/v1/debug/history/{tenant_id}", get(debug_history))
.route("/v1/debug/fault/inject", post(debug_fault_inject))
.route("/v1/debug/fault", get(debug_fault_list))
.route("/v1/debug/fault/clear", post(debug_fault_clear))
.route("/v1/debug/fault/{fault_id}", delete(debug_fault_remove))
.route("/v1/jobs", get(jobs_list))
.route("/v1/jobs/{job_id}", get(jobs_get))
.route("/v1/jobs/{job_id}", delete(jobs_cancel))
.route("/v1/admin/migrations", get(admin_migrations_list))
.route("/metrics", get(metrics));
let mut app = app
.layer(tower_http::limit::RequestBodyLimitLayer::new(body_limit))
.with_state(state);
if let Some(raft_router) = raft_sub_router {
app = app.merge(raft_router);
}
app
}