use std::sync::Arc;
use axum::{
extract::{Path as AxumPath, Query, State},
http::{HeaderValue, StatusCode},
response::IntoResponse,
routing::{delete, get, post},
Json, Router,
};
use serde_json::{json, Value};
use crate::auth;
use crate::command::Command;
use crate::handler::{self, CommandResult};
use crate::server::AppState;
type AppResult = Result<Json<Value>, (StatusCode, Json<Value>)>;
#[derive(Debug, Clone)]
struct ClusterStateView {
node_id: u64,
role: String,
term: u64,
leader: Option<u64>,
leader_addr: Option<String>,
accepts_writes: bool,
healthy: bool,
raft_mode: &'static str,
last_log_index: Option<u64>,
last_applied_index: Option<u64>,
replication_lag_log_entries: Option<u64>,
role_label: Option<&'static str>,
}
fn cluster_state_view(state: &AppState) -> Option<ClusterStateView> {
if let Some(ref assembly) = state.raft {
let m = assembly.raft.metrics().borrow().clone();
let is_leader = matches!(m.state, openraft::ServerState::Leader);
let leader_id = m.current_leader.map(u64::from);
let leader_addr = leader_id.and_then(|lid| {
m.membership_config
.nodes()
.find(|(id, _)| u64::from(**id) == lid)
.map(|(_, n)| n.addr.clone())
});
let last_log_index = m.last_log_index;
let last_applied_index = m.last_applied.as_ref().map(|l| l.index);
let replication_lag = match (last_log_index, last_applied_index) {
(Some(log), Some(applied)) => Some(log.saturating_sub(applied)),
(Some(log), None) => Some(log),
(None, _) => None,
};
let role_label: &'static str = match m.state {
openraft::ServerState::Leader => "leader",
openraft::ServerState::Follower => "follower",
openraft::ServerState::Candidate => "candidate",
openraft::ServerState::Learner => "learner",
openraft::ServerState::Shutdown => "shutdown",
};
return Some(ClusterStateView {
node_id: u64::from(m.id),
role: format!("{:?}", m.state),
term: m.current_term,
leader: leader_id,
leader_addr,
accepts_writes: is_leader,
healthy: m.current_leader.is_some(),
raft_mode: "openraft",
last_log_index,
last_applied_index,
replication_lag_log_entries: replication_lag,
role_label: Some(role_label),
});
}
if let Some(ref cluster) = state.cluster {
return Some(ClusterStateView {
node_id: cluster.node_id() as u64,
role: format!("{:?}", cluster.state.leader_role()),
term: cluster.state.current_term(),
leader: cluster.state.current_leader().map(|id| id as u64),
leader_addr: None,
accepts_writes: cluster.state.accepts_writes(),
healthy: cluster.is_healthy(),
raft_mode: "raft-lite",
last_log_index: None,
last_applied_index: None,
replication_lag_log_entries: None,
role_label: None,
});
}
None
}
type EngineHandle = Arc<yantrikdb::YantrikDB>;
type AppError = (StatusCode, Json<Value>);
fn app_error(status: StatusCode, message: impl Into<String>) -> AppError {
(status, Json(json!({ "error": message.into() })))
}
fn commit_error_to_app_error(err: crate::commit::CommitError) -> AppError {
use crate::commit::CommitError as C;
match err {
C::NotLeader {
leader_id,
leader_addr,
} => (
StatusCode::TEMPORARY_REDIRECT,
Json(json!({
"error": "not_leader",
"leader_id": leader_id,
"leader_addr": leader_addr,
})),
),
C::OpIdCollision {
op_id,
tenant_id,
existing_index,
} => (
StatusCode::CONFLICT,
Json(json!({
"error": "op_id_collision",
"op_id": op_id.to_string(),
"tenant_id": tenant_id.0,
"existing_index": existing_index,
})),
),
C::UnexpectedLogIndex {
tenant_id,
expected,
actual,
} => (
StatusCode::CONFLICT,
Json(json!({
"error": "unexpected_log_index",
"tenant_id": tenant_id.0,
"expected": expected,
"actual": actual,
})),
),
C::Version(verr) => (
StatusCode::UPGRADE_REQUIRED,
Json(json!({
"error": "wire_version_mismatch",
"detail": verr.to_string(),
})),
),
C::NotYetImplemented {
variant,
planned_rfc,
} => (
StatusCode::NOT_IMPLEMENTED,
Json(json!({
"error": "not_implemented",
"variant": variant,
"planned_rfc": planned_rfc,
})),
),
C::StorageFailure { message } => (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"error": "storage_failure",
"detail": message,
"retry_after_ms": 1000,
})),
),
C::Shutdown => (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"error": "shutting_down",
"retry_after_ms": 5000,
})),
),
C::CommitTimeout { op_id } => (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"error": "commit_timeout",
"op_id": op_id.to_string(),
"retry_after_ms": 1000,
})),
),
}
}
fn resolve_engine(
state: &AppState,
auth_header: Option<&str>,
) -> Result<(i64, EngineHandle), AppError> {
let token = auth_header
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "missing Bearer token"))?;
if let Some(ref cluster) = state.cluster {
if let Some(ref secret) = cluster.config.cluster_secret {
if token == secret.as_str() {
let control = state.control.lock();
let db_record = control
.get_database("default")
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| {
app_error(StatusCode::NOT_FOUND, "default database not found")
})?;
drop(control);
let engine = state
.pool
.get_engine(&db_record)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
state.workers.start_for_database(
db_record.id,
db_record.name.clone(),
std::sync::Arc::clone(&engine),
);
return Ok((db_record.id, engine));
}
}
}
let token_hash = auth::hash_token(token);
let control = state.control.lock();
let db_id = control
.validate_token(&token_hash)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "invalid or revoked token"))?;
let db_record = control
.get_database_by_id(db_id)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| app_error(StatusCode::NOT_FOUND, "database not found"))?;
drop(control);
let engine = state
.pool
.get_engine(&db_record)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
state.workers.start_for_database(
db_id,
db_record.name.clone(),
std::sync::Arc::clone(&engine),
);
Ok((db_id, engine))
}
async fn execute_cmd(
engine: Arc<yantrikdb::YantrikDB>,
cmd: Command,
control: Arc<parking_lot::Mutex<crate::control::ControlDb>>,
inflight: &std::sync::atomic::AtomicU32,
) -> AppResult {
use std::sync::atomic::Ordering;
struct InflightGuard<'a>(&'a std::sync::atomic::AtomicU32);
impl Drop for InflightGuard<'_> {
fn drop(&mut self) {
self.0.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}
let current = inflight.fetch_add(1, Ordering::Relaxed);
if current >= crate::server::MAX_INFLIGHT {
inflight.fetch_sub(1, Ordering::Relaxed);
return Err(app_error(
StatusCode::SERVICE_UNAVAILABLE,
format!(
"server overloaded: {} inflight ops (max {}). Retry later.",
current,
crate::server::MAX_INFLIGHT,
),
));
}
let _inflight_guard = InflightGuard(inflight);
let op_name: &'static str = match &cmd {
Command::Remember { .. } => "remember",
Command::RememberBatch { .. } => "remember_batch",
Command::Recall { .. } => "recall",
Command::Forget { .. } => "forget",
Command::Stats => "stats",
Command::Ping => "ping",
_ => "other",
};
let result = tokio::task::spawn_blocking(move || {
let lock_start = std::time::Instant::now();
let db = engine.as_ref();
crate::metrics::record_engine_lock_wait(lock_start.elapsed());
let hold_start = std::time::Instant::now();
let result = handler::execute_with_guard(db, cmd, Some(control.as_ref()));
crate::metrics::record_engine_lock_hold(op_name, hold_start.elapsed());
result
})
.await
.map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("join error: {e}"),
)
});
let result = result?;
match result {
Ok(CommandResult::Json(v)) => Ok(Json(v)),
Ok(CommandResult::RecallResults { results, total }) => {
Ok(Json(json!({ "results": results, "total": total })))
}
Ok(CommandResult::Pong) => Ok(Json(json!({ "status": "ok" }))),
Err(e) => Err(app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
}
}
async fn health(State(state): State<Arc<AppState>>) -> Json<Value> {
let mut payload = json!({
"status": "ok",
"engines_loaded": state.pool.loaded_count(),
});
if let Some(view) = cluster_state_view(&state) {
payload["cluster"] = json!({
"node_id": view.node_id,
"role": view.role,
"term": view.term,
"leader": view.leader,
"accepts_writes": view.accepts_writes,
"healthy": view.healthy,
"raft_mode": view.raft_mode,
"last_log_index": view.last_log_index,
"last_applied_index": view.last_applied_index,
"replication_lag_log_entries": view.replication_lag_log_entries,
"role_label": view.role_label,
});
}
Json(payload)
}
async fn health_deep(State(state): State<Arc<AppState>>) -> (StatusCode, Json<Value>) {
let mut checks = Vec::new();
let mut all_pass = true;
{
let engine_check = tokio::task::spawn_blocking({
let control = state.control.clone();
let pool = state.pool.clone();
move || {
let start = std::time::Instant::now();
let db_record = {
let ctrl = control.lock();
ctrl.get_database("default").ok().flatten()
};
if let Some(rec) = db_record {
if let Ok(engine) = pool.get_engine(&rec) {
let timeout = std::time::Duration::from_millis(100);
if true
{
let elapsed = start.elapsed();
return json!({
"check": "engine_lock",
"pass": true,
"latency_ms": elapsed.as_secs_f64() * 1000.0,
});
}
}
}
let elapsed = start.elapsed();
json!({
"check": "engine_lock",
"pass": false,
"latency_ms": elapsed.as_secs_f64() * 1000.0,
"error": "could not acquire engine lock within 100ms",
})
}
})
.await
.unwrap_or_else(|e| json!({"check": "engine_lock", "pass": false, "error": e.to_string()}));
if !engine_check["pass"].as_bool().unwrap_or(false) {
all_pass = false;
}
checks.push(engine_check);
}
{
let control_check = tokio::task::spawn_blocking({
let control = state.control.clone();
move || {
let start = std::time::Instant::now();
let ctrl = control.lock();
match ctrl.list_databases() {
Ok(dbs) => {
let elapsed = start.elapsed();
json!({
"check": "control_db",
"pass": true,
"latency_ms": elapsed.as_secs_f64() * 1000.0,
"databases": dbs.len(),
})
}
Err(e) => {
let elapsed = start.elapsed();
json!({
"check": "control_db",
"pass": false,
"latency_ms": elapsed.as_secs_f64() * 1000.0,
"error": e.to_string(),
})
}
}
}
})
.await
.unwrap_or_else(|e| json!({"check": "control_db", "pass": false, "error": e.to_string()}));
if !control_check["pass"].as_bool().unwrap_or(false) {
all_pass = false;
}
checks.push(control_check);
}
if let Some(view) = cluster_state_view(&state) {
if !view.healthy {
all_pass = false;
}
checks.push(json!({
"check": "cluster_quorum",
"pass": view.healthy,
"node_id": view.node_id,
"role": view.role,
"term": view.term,
"leader": view.leader,
"raft_mode": view.raft_mode,
}));
}
let status = if all_pass {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let encryption_enabled = state.pool.is_encrypted();
let encryption_status = if encryption_enabled {
json!({"enabled": true, "algorithm": "AES-256-GCM"})
} else {
json!({"enabled": false, "algorithm": null})
};
let in_flight_used = state.admission.cfg.max_in_flight_recall
- state.admission.in_flight_recall.available_permits();
let expanded_used = state.admission.cfg.max_concurrent_expanded_recall
- state.admission.expanded_recall.available_permits();
let admission_state = json!({
"hard_top_k_cap": state.admission.cfg.hard_top_k_cap,
"max_request_body_bytes": state.admission.cfg.max_request_body_bytes,
"in_flight_recall": {
"max": state.admission.cfg.max_in_flight_recall,
"in_use": in_flight_used,
},
"expanded_recall": {
"max": state.admission.cfg.max_concurrent_expanded_recall,
"in_use": expanded_used,
},
});
let runtime_state = json!({
"control_runtime_isolated": state.control_runtime.is_some(),
});
let local_snap = crate::version::VersionSnapshot::local();
let version_block = json!({
"build_id": local_snap.build_id,
"wire": local_snap.wire,
"min_supported_wire": local_snap.min_supported_wire,
"table_schema_versions": local_snap.table_schema_versions,
});
(
status,
Json(json!({
"status": if all_pass { "healthy" } else { "degraded" },
"encryption": encryption_status,
"checks": checks,
"admission": admission_state,
"runtime": runtime_state,
"version": version_block,
})),
)
}
fn check_memory_quota(
state: &AppState,
db_id: i64,
engine: &EngineHandle,
count: usize,
) -> Result<(), (StatusCode, Json<Value>)> {
let quota = {
let ctrl = state.control.lock();
ctrl.get_quota(db_id).unwrap_or_default()
};
let current = engine.stats(None).map(|s| s.active_memories).unwrap_or(0);
if current + count as i64 > quota.max_memories {
return Err(app_error(
StatusCode::TOO_MANY_REQUESTS,
format!(
"would exceed memory quota: current={}, adding={}, max={}",
current, count, quota.max_memories,
),
));
}
Ok(())
}
fn check_writable(state: &AppState) -> Result<(), (StatusCode, Json<Value>)> {
let Some(view) = cluster_state_view(state) else {
return Ok(()); };
if view.accepts_writes {
return Ok(());
}
let msg = match view.leader {
Some(id) => format!("read-only: not the leader (current leader: node {})", id),
None => "read-only: no leader elected".into(),
};
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"error": msg,
"leader_node_id": view.leader,
"leader_addr": view.leader_addr,
"raft_mode": view.raft_mode,
})),
))
}
async fn remember(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("remember");
check_writable(&state)?;
let (db_id, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
check_memory_quota(&state, db_id, &engine, 1)?;
let text: String = body["text"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'text'"))?
.into();
let client_supplied: Option<Vec<f32>> = body.get("embedding").and_then(|v| {
v.as_array().map(|a| {
a.iter()
.filter_map(|x| x.as_f64().map(|f| f as f32))
.collect()
})
});
let embedding = resolve_embedding(state.as_ref(), &text, client_supplied).await?;
let rid = uuid7::uuid7().to_string();
let now_micros = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or(0);
let mutation = crate::commit::MemoryMutation::UpsertMemory {
rid: rid.clone(),
text,
memory_type: body
.get("memory_type")
.and_then(|v| v.as_str())
.unwrap_or("semantic")
.into(),
importance: body
.get("importance")
.and_then(|v| v.as_f64())
.unwrap_or(0.5),
valence: body.get("valence").and_then(|v| v.as_f64()).unwrap_or(0.0),
half_life: body
.get("half_life")
.and_then(|v| v.as_f64())
.unwrap_or(168.0),
metadata: body.get("metadata").cloned().unwrap_or(json!({})),
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
certainty: body
.get("certainty")
.and_then(|v| v.as_f64())
.unwrap_or(1.0),
domain: body
.get("domain")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
source: body
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("user")
.into(),
emotional_state: body
.get("emotional_state")
.and_then(|v| v.as_str())
.map(String::from),
embedding,
extracted_entities: vec![],
created_at_unix_micros: Some(now_micros),
embedding_model: Some("default".into()),
};
let receipt = state
.commit_log
.commit(
crate::commit::TenantId::new(db_id),
mutation,
crate::commit::CommitOptions::default(),
)
.await
.map_err(commit_error_to_app_error)?;
let _ = engine; Ok(Json(json!({
"rid": rid,
"log_index": receipt.log_index,
})))
}
async fn resolve_embedding(
state: &AppState,
text: &str,
client_supplied: Option<Vec<f32>>,
) -> Result<Option<Vec<f32>>, AppError> {
if client_supplied.is_some() {
return Ok(client_supplied);
}
let Some(embedder) = state.pool.embedder().cloned() else {
return Ok(None);
};
let owned_text = text.to_string();
let result = tokio::task::spawn_blocking(move || {
use yantrikdb::types::Embedder;
embedder.embed(&owned_text)
})
.await
.map_err(|join_err| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("embed task panicked: {}", join_err),
)
})?;
match result {
Ok(v) => Ok(Some(v)),
Err(e) => {
crate::metrics::increment_embedder_failure("remember");
tracing::error!(
error = %e,
text_len = text.len(),
"embedder failed during /v1/remember; refusing to write a row with NULL embedding"
);
Err(app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!(
"embedder failed: {} (issue #19 — write refused to prevent NULL-embedding row that would poison recall on this namespace; please retry)",
e
),
))
}
}
}
async fn remember_batch(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("remember_batch");
check_writable(&state)?;
let (db_id, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let memories_arr = body
.get("memories")
.and_then(|v| v.as_array())
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'memories' array"))?;
if memories_arr.is_empty() {
return Ok(Json(json!({"rids": [], "count": 0})));
}
let quota = {
let ctrl = state.control.lock();
ctrl.get_quota(db_id).unwrap_or_default()
};
if memories_arr.len() > quota.max_batch_size as usize {
return Err(app_error(
StatusCode::TOO_MANY_REQUESTS,
format!(
"batch size {} exceeds quota {} for this database",
memories_arr.len(),
quota.max_batch_size
),
));
}
check_memory_quota(&state, db_id, &engine, memories_arr.len())?;
if memories_arr.len() > 10_000 {
return Err(app_error(
StatusCode::BAD_REQUEST,
"batch size exceeds 10000",
));
}
let mut memories = Vec::with_capacity(memories_arr.len());
for (i, m) in memories_arr.iter().enumerate() {
let text = m
.get("text")
.and_then(|v| v.as_str())
.ok_or_else(|| {
app_error(
StatusCode::BAD_REQUEST,
format!("memories[{}]: missing 'text'", i),
)
})?
.to_string();
memories.push(crate::command::RememberInput {
text,
memory_type: m
.get("memory_type")
.and_then(|v| v.as_str())
.unwrap_or("semantic")
.into(),
importance: m.get("importance").and_then(|v| v.as_f64()).unwrap_or(0.5),
valence: m.get("valence").and_then(|v| v.as_f64()).unwrap_or(0.0),
half_life: m.get("half_life").and_then(|v| v.as_f64()).unwrap_or(168.0),
metadata: m.get("metadata").cloned().unwrap_or(json!({})),
namespace: m
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
certainty: m.get("certainty").and_then(|v| v.as_f64()).unwrap_or(1.0),
domain: m
.get("domain")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
source: m
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("user")
.into(),
emotional_state: m
.get("emotional_state")
.and_then(|v| v.as_str())
.map(String::from),
embedding: m.get("embedding").and_then(|v| {
v.as_array().map(|a| {
a.iter()
.filter_map(|x| x.as_f64().map(|f| f as f32))
.collect()
})
}),
});
}
if let Some(embedder) = state.pool.embedder().cloned() {
let needs_embed: Vec<usize> = memories
.iter()
.enumerate()
.filter(|(_, m)| m.embedding.is_none())
.map(|(i, _)| i)
.collect();
if !needs_embed.is_empty() {
let texts: Vec<String> = needs_embed
.iter()
.map(|&i| memories[i].text.clone())
.collect();
let result = tokio::task::spawn_blocking(move || {
use yantrikdb::types::Embedder;
let refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
embedder.embed_batch(&refs)
})
.await
.map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("embed batch task panicked: {}", e),
)
})?;
match result {
Ok(embeddings) => {
for (idx, vec) in needs_embed.iter().zip(embeddings.into_iter()) {
memories[*idx].embedding = Some(vec);
}
}
Err(e) => {
crate::metrics::increment_embedder_failure("remember_batch");
tracing::error!(
error = %e,
miss_count = needs_embed.len(),
batch_size = memories.len(),
"embedder failed during /v1/remember/batch; refusing partial-NULL write"
);
return Err(app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!(
"embedder failed: {} (issue #19 — batch refused to prevent NULL-embedding rows; please retry)",
e
),
));
}
}
}
}
let now_micros = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or(0);
let mut rids = Vec::with_capacity(memories.len());
let mut last_log_index: u64 = 0;
for m in memories {
let rid = uuid7::uuid7().to_string();
let mutation = crate::commit::MemoryMutation::UpsertMemory {
rid: rid.clone(),
text: m.text,
memory_type: m.memory_type,
importance: m.importance,
valence: m.valence,
half_life: m.half_life,
metadata: m.metadata,
namespace: m.namespace,
certainty: m.certainty,
domain: m.domain,
source: m.source,
emotional_state: m.emotional_state,
embedding: m.embedding,
extracted_entities: vec![],
created_at_unix_micros: Some(now_micros),
embedding_model: Some("default".into()),
};
let receipt = state
.commit_log
.commit(
crate::commit::TenantId::new(db_id),
mutation,
crate::commit::CommitOptions::default(),
)
.await
.map_err(commit_error_to_app_error)?;
rids.push(rid);
last_log_index = receipt.log_index;
}
let _ = engine; let count = rids.len();
Ok(Json(json!({
"rids": rids,
"count": count,
"log_index": last_log_index,
})))
}
async fn recall(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("recall");
let top_k = body.get("top_k").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
let expand_entities = body
.get("expand_entities")
.and_then(|v| v.as_bool())
.unwrap_or(true);
crate::metrics::record_recall_request("v1", expand_entities);
crate::metrics::record_recall_top_k("v1", top_k);
if let Err(reason) = crate::admission::check_top_k(top_k, state.admission.cfg.hard_top_k_cap) {
let status = StatusCode::from_u16(reason.http_status()).unwrap_or(StatusCode::BAD_REQUEST);
return Err((
status,
Json(json!({
"error": reason.message(),
"reason": reason.metric_label(),
"hard_top_k_cap": state.admission.cfg.hard_top_k_cap,
})),
));
}
let _permits = match state
.admission
.acquire_recall_permits(expand_entities)
.await
{
Ok(p) => p,
Err(reason) => {
let status = StatusCode::from_u16(reason.http_status())
.unwrap_or(StatusCode::SERVICE_UNAVAILABLE);
return Err((
status,
Json(json!({
"error": reason.message(),
"reason": reason.metric_label(),
"retry_after_ms": 200,
})),
));
}
};
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Recall {
query: body["query"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'query'"))?
.into(),
top_k,
memory_type: body
.get("memory_type")
.and_then(|v| v.as_str())
.map(String::from),
include_consolidated: body
.get("include_consolidated")
.and_then(|v| v.as_bool())
.unwrap_or(false),
expand_entities,
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.map(String::from),
domain: body
.get("domain")
.and_then(|v| v.as_str())
.map(String::from),
source: body
.get("source")
.and_then(|v| v.as_str())
.map(String::from),
query_embedding: body.get("query_embedding").and_then(|v| {
v.as_array().map(|a| {
a.iter()
.filter_map(|x| x.as_f64().map(|f| f as f32))
.collect()
})
}),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn forget(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("forget");
check_writable(&state)?;
let (db_id, _engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let rid: String = body["rid"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rid'"))?
.into();
let namespace: String = body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("")
.into();
let reason = body
.get("reason")
.and_then(|v| v.as_str())
.map(String::from);
let now_micros = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or(0);
let mutation = crate::commit::MemoryMutation::TombstoneMemory {
rid: rid.clone(),
reason,
requested_at_unix_micros: now_micros,
namespace,
};
let receipt = state
.commit_log
.commit(
crate::commit::TenantId::new(db_id),
mutation,
crate::commit::CommitOptions::default(),
)
.await
.map_err(commit_error_to_app_error)?;
Ok(Json(json!({
"rid": rid,
"found": true,
"log_index": receipt.log_index,
})))
}
async fn relate(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> Result<impl IntoResponse, AppError> {
let _timer = crate::metrics::HandlerTimer::new("relate");
check_writable(&state)?;
let (db_id, _engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let entity: String = body["entity"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'entity'"))?
.into();
let target: String = body["target"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'target'"))?
.into();
let rel_type: String = body["relationship"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'relationship'"))?
.into();
let weight = body.get("weight").and_then(|v| v.as_f64()).unwrap_or(1.0);
let namespace: String = body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("")
.into();
let edge_id = uuid7::uuid7().to_string();
let mutation = crate::commit::MemoryMutation::UpsertEntityEdge {
edge_id: edge_id.clone(),
src: entity,
dst: target,
rel_type,
weight,
namespace,
};
let receipt = state
.commit_log
.commit(
crate::commit::TenantId::new(db_id),
mutation,
crate::commit::CommitOptions::default(),
)
.await
.map_err(commit_error_to_app_error)?;
let mut response = Json(json!({
"edge_id": edge_id,
"log_index": receipt.log_index,
}))
.into_response();
response
.headers_mut()
.insert("deprecation", HeaderValue::from_static("true"));
response.headers_mut().insert(
"link",
HeaderValue::from_static(r#"</v1/claim>; rel="successor-version""#),
);
Ok(response)
}
async fn ingest_claim(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("ingest_claim");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::IngestClaim {
src: body["src"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'src'"))?
.into(),
rel_type: body["rel_type"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rel_type'"))?
.into(),
dst: body["dst"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'dst'"))?
.into(),
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("default")
.into(),
polarity: body.get("polarity").and_then(|v| v.as_i64()).unwrap_or(1) as i32,
modality: body
.get("modality")
.and_then(|v| v.as_str())
.unwrap_or("asserted")
.into(),
valid_from: body.get("valid_from").and_then(|v| v.as_f64()),
valid_to: body.get("valid_to").and_then(|v| v.as_f64()),
extractor: body
.get("extractor")
.and_then(|v| v.as_str())
.unwrap_or("manual")
.into(),
extractor_version: body
.get("extractor_version")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
confidence_band: body
.get("confidence_band")
.and_then(|v| v.as_str())
.unwrap_or("medium")
.into(),
source_memory_rid: body
.get("source_memory_rid")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
span_start: body
.get("span_start")
.and_then(|v| v.as_i64())
.map(|v| v as i32),
span_end: body
.get("span_end")
.and_then(|v| v.as_i64())
.map(|v| v as i32),
weight: body.get("weight").and_then(|v| v.as_f64()).unwrap_or(1.0),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn add_alias(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("add_alias");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::AddAlias {
alias: body["alias"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'alias'"))?
.into(),
canonical_name: body["canonical_name"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'canonical_name'"))?
.into(),
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("default")
.into(),
source: body
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("explicit")
.into(),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn get_claims(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("get_claims");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let entity = params
.get("entity")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'entity' query parameter"))?;
let namespace = params.get("namespace").cloned();
execute_cmd(
engine,
Command::GetClaims { entity, namespace },
state.control.clone(),
&state.inflight,
)
.await
}
async fn think(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("think");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Think {
run_consolidation: body
.get("run_consolidation")
.and_then(|v| v.as_bool())
.unwrap_or(true),
run_conflict_scan: body
.get("run_conflict_scan")
.and_then(|v| v.as_bool())
.unwrap_or(true),
run_pattern_mining: body
.get("run_pattern_mining")
.and_then(|v| v.as_bool())
.unwrap_or(false),
run_personality: body
.get("run_personality")
.and_then(|v| v.as_bool())
.unwrap_or(true),
consolidation_limit: body
.get("consolidation_limit")
.and_then(|v| v.as_u64())
.unwrap_or(50) as usize,
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn conflicts(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("conflicts");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Conflicts {
status: params.get("status").cloned(),
conflict_type: params.get("conflict_type").cloned(),
entity: params.get("entity").cloned(),
namespace: params.get("namespace").cloned(),
limit: params
.get("limit")
.and_then(|v| v.parse().ok())
.unwrap_or(50),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn resolve_conflict(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(conflict_id): AxumPath<String>,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("resolve_conflict");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::Resolve {
conflict_id,
strategy: body["strategy"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'strategy'"))?
.into(),
winner_rid: body
.get("winner_rid")
.and_then(|v| v.as_str())
.map(String::from),
new_text: body
.get("new_text")
.and_then(|v| v.as_str())
.map(String::from),
resolution_note: body
.get("resolution_note")
.and_then(|v| v.as_str())
.map(String::from),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn session_start(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::SessionStart {
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("default")
.into(),
client_id: body
.get("client_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.into(),
metadata: body.get("metadata").cloned().unwrap_or(json!({})),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn session_end(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(session_id): AxumPath<String>,
body: Option<Json<Value>>,
) -> AppResult {
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let summary =
body.and_then(|Json(b)| b.get("summary").and_then(|v| v.as_str()).map(String::from));
let cmd = Command::SessionEnd {
session_id,
summary,
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn personality(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("personality");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
execute_cmd(
engine,
Command::Personality,
state.control.clone(),
&state.inflight,
)
.await
}
async fn stats(State(state): State<Arc<AppState>>, headers: axum::http::HeaderMap) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("stats");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
execute_cmd(
engine,
Command::Stats,
state.control.clone(),
&state.inflight,
)
.await
}
async fn create_database(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
check_writable(&state)?;
let _ = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let name: String = body["name"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'name'"))?
.to_string();
let control = state.control.lock();
if control
.database_exists(&name)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
{
return Err(app_error(
StatusCode::CONFLICT,
format!("database '{}' already exists", name),
));
}
let id = control
.create_database(&name, &name)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
drop(control);
let db_dir = state.pool.data_dir().join(&name);
std::fs::create_dir_all(&db_dir)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(json!({
"name": name,
"id": id,
"message": format!("database '{}' created", name),
})))
}
async fn cluster_promote(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _ = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let Some(ref ctx) = state.cluster else {
return Err(app_error(
StatusCode::BAD_REQUEST,
"single-node mode — nothing to promote",
));
};
if !matches!(ctx.state.configured_role, crate::config::NodeRole::Voter) {
return Err(app_error(
StatusCode::BAD_REQUEST,
"this node is not a voter — cannot become leader",
));
}
if ctx.state.is_leader() {
return Ok(Json(json!({
"status": "already_leader",
"node_id": ctx.node_id(),
"term": ctx.state.current_term(),
})));
}
let ctx_clone = std::sync::Arc::clone(ctx);
tokio::spawn(async move {
if let Err(e) = crate::cluster::election::start_election(ctx_clone).await {
tracing::error!(error = %e, "manual promotion failed");
}
});
Ok(Json(json!({
"status": "election_started",
"node_id": ctx.node_id(),
"current_term": ctx.state.current_term(),
"message": "check /v1/cluster in a few seconds to see the new leader"
})))
}
#[derive(serde::Deserialize)]
struct AddLearnerRequest {
node_id: u64,
addr: String,
}
#[derive(serde::Deserialize)]
struct PromoteRequest {
voters: Vec<u64>,
}
#[derive(serde::Deserialize)]
struct RemoveRequest {
node_id: u64,
}
fn require_openraft(state: &AppState) -> Result<&Arc<crate::raft::RaftAssembly>, AppError> {
state.raft.as_ref().ok_or_else(|| {
app_error(
StatusCode::BAD_REQUEST,
"openraft mode is not active on this node — set cluster.raft_mode = \"openraft\"",
)
})
}
async fn cluster_add_learner(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<AddLearnerRequest>,
) -> AppResult {
require_master_token(&state, &headers)?;
let assembly = require_openraft(&state)?;
let node = crate::raft::types::YantrikNode::new(body.addr.clone());
let node_id = crate::raft::types::YantrikNodeId::from(body.node_id);
match assembly.raft.add_learner(node_id, node, false).await {
Ok(_resp) => Ok(Json(json!({
"status": "learner_added",
"node_id": body.node_id,
"addr": body.addr,
"note": "use /v1/cluster/raft to watch catch-up; promote when last_log_index lag is acceptable",
}))),
Err(e) => Err(app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("add_learner failed: {e}"),
)),
}
}
async fn cluster_promote_voter(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<PromoteRequest>,
) -> AppResult {
require_master_token(&state, &headers)?;
let assembly = require_openraft(&state)?;
let voters: std::collections::BTreeSet<crate::raft::types::YantrikNodeId> = body
.voters
.iter()
.copied()
.map(crate::raft::types::YantrikNodeId::from)
.collect();
if voters.is_empty() {
return Err(app_error(
StatusCode::BAD_REQUEST,
"voters list cannot be empty",
));
}
let voters_clone: Vec<u64> = voters.iter().map(|n| u64::from(*n)).collect();
match assembly.raft.change_membership(voters, false).await {
Ok(_resp) => Ok(Json(json!({
"status": "membership_changed",
"voters": voters_clone,
}))),
Err(e) => Err(app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("change_membership failed: {e}"),
)),
}
}
async fn cluster_remove(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<RemoveRequest>,
) -> AppResult {
require_master_token(&state, &headers)?;
let assembly = require_openraft(&state)?;
let metrics = assembly.raft.metrics().borrow().clone();
let mut voters: std::collections::BTreeSet<crate::raft::types::YantrikNodeId> =
metrics.membership_config.voter_ids().collect();
let target_id = crate::raft::types::YantrikNodeId::from(body.node_id);
if !voters.remove(&target_id) {
return Err(app_error(
StatusCode::BAD_REQUEST,
format!("node {} is not a current voter", body.node_id),
));
}
if voters.is_empty() {
return Err(app_error(
StatusCode::BAD_REQUEST,
"refusing to remove the last voter — would lose quorum permanently",
));
}
let remaining: Vec<u64> = voters.iter().map(|n| u64::from(*n)).collect();
match assembly.raft.change_membership(voters, false).await {
Ok(_resp) => Ok(Json(json!({
"status": "removed",
"removed_node_id": body.node_id,
"remaining_voters": remaining,
}))),
Err(e) => Err(app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("change_membership failed: {e}"),
)),
}
}
async fn cluster_initialize(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
require_master_token(&state, &headers)?;
let assembly = require_openraft(&state)?;
let metrics = assembly.raft.metrics().borrow().clone();
if metrics.membership_config.nodes().count() > 0 {
return Ok(Json(json!({
"status": "already_initialized",
"voters": metrics.membership_config.voter_ids().map(|n| u64::from(n)).collect::<Vec<_>>(),
})));
}
let node_addr = state
.cluster
.as_ref()
.and_then(|c| c.config.advertise_addr.clone())
.unwrap_or_else(|| "127.0.0.1:7440".to_string());
match crate::raft::initialize_single_node(assembly, node_addr.clone()).await {
Ok(()) => Ok(Json(json!({
"status": "initialized",
"node_id": u64::from(metrics.id),
"addr": node_addr,
}))),
Err(e) => Err(app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("initialize failed: {e:?}"),
)),
}
}
async fn metrics(State(state): State<Arc<AppState>>) -> String {
let mut out = String::new();
out.push_str("# HELP yantrikdb_engines_loaded Number of engine instances currently loaded\n");
out.push_str("# TYPE yantrikdb_engines_loaded gauge\n");
out.push_str(&format!(
"yantrikdb_engines_loaded {}\n",
state.pool.loaded_count()
));
if let Some(view) = cluster_state_view(&state) {
out.push_str("# HELP yantrikdb_cluster_term Current Raft term\n");
out.push_str("# TYPE yantrikdb_cluster_term gauge\n");
out.push_str(&format!(
"yantrikdb_cluster_term {{node_id=\"{}\",raft_mode=\"{}\"}} {}\n",
view.node_id, view.raft_mode, view.term
));
out.push_str("# HELP yantrikdb_cluster_is_leader Whether this node is currently the leader (1) or not (0)\n");
out.push_str("# TYPE yantrikdb_cluster_is_leader gauge\n");
out.push_str(&format!(
"yantrikdb_cluster_is_leader {{node_id=\"{}\",raft_mode=\"{}\"}} {}\n",
view.node_id,
view.raft_mode,
if view.accepts_writes { 1 } else { 0 }
));
out.push_str(
"# HELP yantrikdb_cluster_healthy Whether this node has quorum (1) or not (0)\n",
);
out.push_str("# TYPE yantrikdb_cluster_healthy gauge\n");
out.push_str(&format!(
"yantrikdb_cluster_healthy {{node_id=\"{}\",raft_mode=\"{}\"}} {}\n",
view.node_id,
view.raft_mode,
if view.healthy { 1 } else { 0 }
));
}
if state.raft.is_none() {
if let Some(ref cluster) = state.cluster {
out.push_str(
"# HELP yantrikdb_cluster_peer_reachable Whether each peer is reachable\n",
);
out.push_str("# TYPE yantrikdb_cluster_peer_reachable gauge\n");
for peer in cluster.peers.snapshot() {
out.push_str(&format!(
"yantrikdb_cluster_peer_reachable {{addr=\"{}\",role=\"{:?}\"}} {}\n",
peer.addr,
peer.configured_role,
if peer.reachable { 1 } else { 0 }
));
}
}
}
let default_db = {
let control = state.control.lock();
control.get_database("default").ok().flatten()
};
if let Some(rec) = default_db {
if let Ok(engine) = state.pool.get_engine(&rec) {
let stats_opt = {
engine.stats(None).ok()
};
if let Some(stats) = stats_opt {
{
out.push_str("# HELP yantrikdb_active_memories Number of active memories\n");
out.push_str("# TYPE yantrikdb_active_memories gauge\n");
out.push_str(&format!(
"yantrikdb_active_memories {{db=\"default\"}} {}\n",
stats.active_memories
));
out.push_str(
"# HELP yantrikdb_consolidated_memories Number of consolidated memories\n",
);
out.push_str("# TYPE yantrikdb_consolidated_memories gauge\n");
out.push_str(&format!(
"yantrikdb_consolidated_memories {{db=\"default\"}} {}\n",
stats.consolidated_memories
));
out.push_str("# HELP yantrikdb_edges Number of knowledge graph edges\n");
out.push_str("# TYPE yantrikdb_edges gauge\n");
out.push_str(&format!(
"yantrikdb_edges {{db=\"default\"}} {}\n",
stats.edges
));
out.push_str(
"# HELP yantrikdb_open_conflicts Number of unresolved conflicts\n",
);
out.push_str("# TYPE yantrikdb_open_conflicts gauge\n");
out.push_str(&format!(
"yantrikdb_open_conflicts {{db=\"default\"}} {}\n",
stats.open_conflicts
));
out.push_str("# HELP yantrikdb_operations_total Total operations\n");
out.push_str("# TYPE yantrikdb_operations_total counter\n");
out.push_str(&format!(
"yantrikdb_operations_total {{db=\"default\"}} {}\n",
stats.operations
));
}
}
}
}
out.push_str(&crate::metrics::global().render_prometheus());
out
}
async fn cluster_status(State(state): State<Arc<AppState>>) -> Json<Value> {
let Some(ref ctx) = state.cluster else {
return Json(json!({
"clustered": false,
"message": "single-node mode (no replication)"
}));
};
let peers: Vec<Value> = ctx
.peers
.snapshot()
.into_iter()
.map(|p| {
json!({
"node_id": p.node_id,
"addr": p.addr,
"role": format!("{:?}", p.configured_role).to_lowercase(),
"reachable": p.reachable,
"current_term": p.current_term,
"last_seen_secs_ago": p.last_seen.map(|t| t.elapsed().as_secs_f64()),
})
})
.collect();
Json(json!({
"clustered": true,
"node_id": ctx.node_id(),
"role": format!("{:?}", ctx.state.leader_role()),
"configured_role": format!("{:?}", ctx.state.configured_role).to_lowercase(),
"current_term": ctx.state.current_term(),
"leader_id": ctx.state.current_leader(),
"voted_for": ctx.state.voted_for(),
"accepts_writes": ctx.state.accepts_writes(),
"healthy": ctx.is_healthy(),
"quorum_size": ctx.quorum_size(),
"peers": peers,
}))
}
async fn list_databases(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _ = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let databases = state
.control
.lock()
.list_databases()
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let list: Vec<Value> = databases
.iter()
.map(|d| json!({ "id": d.id, "name": d.name, "created_at": d.created_at }))
.collect();
Ok(Json(json!({ "databases": list })))
}
async fn control_snapshot(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let token = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "missing Bearer token"))?;
let is_master = state
.cluster
.as_ref()
.and_then(|c| c.config.cluster_secret.as_ref())
.map(|s| token == s.as_str())
.unwrap_or(false);
if !is_master {
return Err(app_error(
StatusCode::FORBIDDEN,
"control-snapshot requires cluster master token",
));
}
let snapshot = tokio::task::spawn_blocking({
let control = state.control.clone();
move || control.lock().export_snapshot()
})
.await
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(serde_json::to_value(snapshot).unwrap_or_default()))
}
async fn admin_snapshot(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let token = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "missing Bearer token"))?;
let db_name = body
.get("database")
.and_then(|v| v.as_str())
.unwrap_or("default")
.to_string();
let is_master = state
.cluster
.as_ref()
.and_then(|c| c.config.cluster_secret.as_ref())
.map(|s| token == s.as_str())
.unwrap_or(false);
if !is_master {
let token_hash = auth::hash_token(token);
let control = state.control.lock();
let token_db_id = control
.validate_token(&token_hash)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "invalid or revoked token"))?;
let target_db = control
.get_database(&db_name)
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or_else(|| {
app_error(
StatusCode::NOT_FOUND,
format!("database '{}' not found", db_name),
)
})?;
drop(control);
if token_db_id != target_db.id {
return Err(app_error(
StatusCode::FORBIDDEN,
format!(
"token does not authenticate database '{}' — provide cluster master token \
or a token for that specific database",
db_name
),
));
}
}
let output_dir = body
.get("output_dir")
.and_then(|v| v.as_str())
.map(std::path::PathBuf::from)
.unwrap_or_else(|| state.pool.data_dir().join("snapshots"));
let control = state.control.clone();
let pool = state.pool.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<Value> {
let db_record = {
let ctrl = control.lock();
ctrl.get_database(&db_name)?
.ok_or_else(|| anyhow::anyhow!("database '{}' not found", db_name))?
};
let engine = pool.get_engine(&db_record)?;
let db = engine.as_ref();
let conn = db.conn();
conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)")?;
drop(conn);
let src_dir = pool.data_dir().join(&db_record.path);
let src_db = src_dir.join("yantrik.db");
if !src_db.exists() {
anyhow::bail!("database file not found: {:?}", src_db);
}
std::fs::create_dir_all(&output_dir)?;
let ts = chrono_ts();
let dest_name = format!("{}-{}.db", db_name, ts);
let dest_path = output_dir.join(&dest_name);
std::fs::copy(&src_db, &dest_path)?;
let data = std::fs::read(&dest_path)?;
let hash = blake3::hash(&data);
let size = data.len();
Ok(serde_json::json!({
"database": db_name,
"path": dest_path.to_str().unwrap_or(""),
"size_bytes": size,
"checksum_blake3": hash.to_hex().to_string(),
"timestamp": ts,
}))
})
.await
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(result))
}
async fn ingest_claim_with_lineage(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("ingest_claim_with_lineage");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let source_lineage: Vec<String> = body
.get("source_lineage")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let cmd = Command::IngestClaimWithLineage {
src: body["src"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'src'"))?
.into(),
rel_type: body["rel_type"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rel_type'"))?
.into(),
dst: body["dst"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'dst'"))?
.into(),
namespace: body
.get("namespace")
.and_then(|v| v.as_str())
.unwrap_or("default")
.into(),
polarity: body.get("polarity").and_then(|v| v.as_i64()).unwrap_or(1) as i32,
modality: body
.get("modality")
.and_then(|v| v.as_str())
.unwrap_or("asserted")
.into(),
valid_from: body.get("valid_from").and_then(|v| v.as_f64()),
valid_to: body.get("valid_to").and_then(|v| v.as_f64()),
extractor: body
.get("extractor")
.and_then(|v| v.as_str())
.unwrap_or("manual")
.into(),
extractor_version: body
.get("extractor_version")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
confidence_band: body
.get("confidence_band")
.and_then(|v| v.as_str())
.unwrap_or("medium")
.into(),
source_memory_rid: body
.get("source_memory_rid")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
weight: body.get("weight").and_then(|v| v.as_f64()).unwrap_or(1.0),
source_lineage,
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn get_mobility(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("get_mobility");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::GetMobility {
src: params
.get("src")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'src'"))?,
rel_type: params
.get("rel_type")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rel_type'"))?,
dst: params
.get("dst")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'dst'"))?,
namespace: params
.get("namespace")
.cloned()
.unwrap_or_else(|| "default".to_string()),
regime: params
.get("regime")
.cloned()
.unwrap_or_else(|| "default".to_string()),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn get_contest(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("get_contest");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let cmd = Command::GetContest {
src: params
.get("src")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'src'"))?,
rel_type: params
.get("rel_type")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'rel_type'"))?,
dst: params
.get("dst")
.cloned()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'dst'"))?,
namespace: params
.get("namespace")
.cloned()
.unwrap_or_else(|| "default".to_string()),
regime: params
.get("regime")
.cloned()
.unwrap_or_else(|| "default".to_string()),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn record_move_event(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("record_move_event");
check_writable(&state)?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let string_array = |key: &str| -> Vec<String> {
body.get(key)
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default()
};
let cmd = Command::RecordMoveEvent {
move_type: body["move_type"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'move_type'"))?
.into(),
operator_version: body
.get("operator_version")
.and_then(|v| v.as_str())
.unwrap_or("v1")
.into(),
context_regime: body
.get("context_regime")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
observability: body
.get("observability")
.and_then(|v| v.as_str())
.unwrap_or("observed")
.into(),
inference_confidence: body.get("inference_confidence").and_then(|v| v.as_f64()),
inference_basis: body
.get("inference_basis")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(|s| s.to_string()))
.collect()
}),
input_claim_ids: string_array("input_claim_ids"),
output_claim_ids: string_array("output_claim_ids"),
side_effect_claim_ids: string_array("side_effect_claim_ids"),
dependencies: string_array("dependencies"),
};
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
async fn list_flagged(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("list_flagged");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let flag_mask = params
.get("flag_mask")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let limit = params
.get("limit")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(50);
let cmd = Command::ListFlaggedPropositions { flag_mask, limit };
execute_cmd(engine, cmd, state.control.clone(), &state.inflight).await
}
fn require_master_token(state: &AppState, headers: &axum::http::HeaderMap) -> Result<(), AppError> {
let token = headers
.get("authorization")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or_else(|| app_error(StatusCode::UNAUTHORIZED, "missing Bearer token"))?;
if let Some(ref cluster) = state.cluster {
if let Some(ref secret) = cluster.config.cluster_secret {
if token == secret.as_str() {
return Ok(());
}
}
}
Err(app_error(
StatusCode::FORBIDDEN,
"debug endpoints require the cluster master token",
))
}
async fn debug_history(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(tenant_id): AxumPath<i64>,
axum::extract::Query(params): axum::extract::Query<crate::debug::history::HistoryParams>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_history");
require_master_token(&state, &headers)?;
let resp = crate::debug::history::read_history(
&state.commit_log,
crate::commit::TenantId::new(tenant_id),
¶ms,
)
.await
.map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("read_history failed: {e}"),
)
})?;
Ok(Json(serde_json::to_value(resp).map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("history serialize failed: {e}"),
)
})?))
}
#[derive(serde::Deserialize)]
struct DebugFaultInjectBody {
fault: crate::debug::FaultKind,
ttl_secs: Option<u64>,
}
async fn debug_fault_inject(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<DebugFaultInjectBody>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_fault_inject");
require_master_token(&state, &headers)?;
let id = state.fault_registry.inject(body.fault, body.ttl_secs);
Ok(Json(json!({
"fault_id": id.to_string(),
})))
}
async fn debug_fault_list(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_fault_list");
require_master_token(&state, &headers)?;
let faults = state.fault_registry.list();
Ok(Json(serde_json::to_value(faults).map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("fault list serialize failed: {e}"),
)
})?))
}
async fn debug_fault_clear(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_fault_clear");
require_master_token(&state, &headers)?;
let n = state.fault_registry.clear();
Ok(Json(json!({ "cleared": n })))
}
async fn debug_fault_remove(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(fault_id): AxumPath<u64>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("debug_fault_remove");
require_master_token(&state, &headers)?;
let removed = state
.fault_registry
.remove(crate::debug::FaultId::new(fault_id));
if removed {
Ok(Json(json!({ "removed": true })))
} else {
Err(app_error(
StatusCode::NOT_FOUND,
format!("no fault with id fault_{fault_id}"),
))
}
}
#[derive(serde::Deserialize)]
struct JobsListParams {
tenant: Option<i64>,
state: Option<String>,
limit: Option<usize>,
}
const MAX_JOBS_LIST_LIMIT: usize = 500;
async fn jobs_list(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Query(params): axum::extract::Query<JobsListParams>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("jobs_list");
require_master_token(&state, &headers)?;
let tenant_filter = params.tenant.map(crate::commit::TenantId::new);
let state_filter = params
.state
.as_deref()
.and_then(crate::jobs::JobState::from_str);
let limit = params.limit.unwrap_or(100).min(MAX_JOBS_LIST_LIMIT);
let records = state
.jobs
.list(tenant_filter, state_filter, limit)
.await
.map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("jobs list failed: {e}"),
)
})?;
Ok(Json(serde_json::to_value(records).map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("jobs list serialize failed: {e}"),
)
})?))
}
async fn jobs_get(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(job_id_str): AxumPath<String>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("jobs_get");
require_master_token(&state, &headers)?;
let uuid = job_id_str.parse::<uuid7::Uuid>().map_err(|e| {
app_error(
StatusCode::BAD_REQUEST,
format!("invalid job id `{job_id_str}`: {e}"),
)
})?;
let record = state
.jobs
.get(crate::jobs::JobId(uuid))
.await
.map_err(|e| match e {
crate::jobs::JobError::NotFound { .. } => {
app_error(StatusCode::NOT_FOUND, e.to_string())
}
other => app_error(StatusCode::INTERNAL_SERVER_ERROR, other.to_string()),
})?;
Ok(Json(serde_json::to_value(record).map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("job serialize failed: {e}"),
)
})?))
}
async fn jobs_cancel(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
AxumPath(job_id_str): AxumPath<String>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("jobs_cancel");
require_master_token(&state, &headers)?;
let uuid = job_id_str.parse::<uuid7::Uuid>().map_err(|e| {
app_error(
StatusCode::BAD_REQUEST,
format!("invalid job id `{job_id_str}`: {e}"),
)
})?;
state
.jobs
.cancel(crate::jobs::JobId(uuid))
.await
.map_err(|e| match e {
crate::jobs::JobError::NotFound { .. } => {
app_error(StatusCode::NOT_FOUND, e.to_string())
}
crate::jobs::JobError::TerminalState { .. } => {
app_error(StatusCode::CONFLICT, e.to_string())
}
other => app_error(StatusCode::INTERNAL_SERVER_ERROR, other.to_string()),
})?;
Ok(Json(json!({ "cancelled": job_id_str })))
}
async fn admin_migrations_list(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("admin_migrations");
require_master_token(&state, &headers)?;
let mut all = serde_json::json!({});
let commit_log_path = state.data_dir.join("commit_log.sqlite");
let jobs_path = state.data_dir.join("jobs.sqlite");
for (label, path) in [("commit_log", &commit_log_path), ("jobs", &jobs_path)] {
let summary = match rusqlite::Connection::open_with_flags(
path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
) {
Ok(conn) => match crate::migrations::MigrationRunner::applied_summary(&conn) {
Ok(rows) => serde_json::json!(rows
.iter()
.map(|(id, name)| serde_json::json!({"id": id, "name": name}))
.collect::<Vec<_>>()),
Err(e) => serde_json::json!({"error": e.to_string()}),
},
Err(e) => serde_json::json!({"error": format!("open failed: {e}")}),
};
all[label] = summary;
}
Ok(Json(all))
}
fn chrono_ts() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
format!("{}", secs)
}
const SKILL_NAMESPACE: &str = "skill_substrate";
const OUTCOME_NAMESPACE: &str = "outcome_substrate";
const VALID_SKILL_TYPES: &[&str] = &["procedure", "reference", "lesson", "pattern", "rule"];
fn validate_skill_id(s: &str) -> Result<(), &'static str> {
if s.len() < 4 || s.len() > 200 {
return Err("skill_id length must be 4..200 characters");
}
let bytes = s.as_bytes();
if !bytes[0].is_ascii_lowercase() {
return Err("skill_id must start with a lowercase letter");
}
let mut has_dot = false;
let mut last_was_dot = false;
for &b in bytes {
let c = b as char;
let is_valid = c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '.';
if !is_valid {
return Err("skill_id contains invalid character (allowed: lowercase a-z, 0-9, _, .)");
}
if c == '.' {
if last_was_dot {
return Err("skill_id contains consecutive dots");
}
has_dot = true;
last_was_dot = true;
} else {
last_was_dot = false;
}
}
if !has_dot {
return Err("skill_id must contain at least one '.' (dotted form, e.g. skill.foo.v1)");
}
if last_was_dot {
return Err("skill_id must not end with '.'");
}
Ok(())
}
fn validate_applies_to_entry(s: &str) -> Result<(), &'static str> {
if s.is_empty() {
return Err("applies_to entry must be non-empty");
}
let bytes = s.as_bytes();
if !bytes[0].is_ascii_lowercase() {
return Err("applies_to entry must start with a lowercase letter");
}
for &b in bytes {
let c = b as char;
if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
return Err(
"applies_to entry contains invalid character (allowed: lowercase a-z, 0-9, _)",
);
}
}
Ok(())
}
async fn skill_define(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("skill_define");
check_writable(&state)?;
let skill_id = body["skill_id"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'skill_id'"))?;
validate_skill_id(skill_id)
.map_err(|e| app_error(StatusCode::BAD_REQUEST, format!("INVALID_SKILL_ID: {}", e)))?;
let body_text = body["body"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'body'"))?;
if body_text.len() < 50 || body_text.len() > 5000 {
return Err(app_error(
StatusCode::BAD_REQUEST,
"INVALID_BODY_LENGTH: body must be 50..5000 characters",
));
}
let applies_to = body["applies_to"]
.as_array()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing or non-array 'applies_to'"))?;
if applies_to.is_empty() {
return Err(app_error(
StatusCode::BAD_REQUEST,
"EMPTY_APPLIES_TO: applies_to must be a non-empty array",
));
}
if applies_to.len() > 10 {
return Err(app_error(
StatusCode::BAD_REQUEST,
"TOO_MANY_APPLIES_TO: applies_to may have at most 10 entries",
));
}
let mut applies_to_strs = Vec::with_capacity(applies_to.len());
for v in applies_to {
let s = v.as_str().ok_or_else(|| {
app_error(
StatusCode::BAD_REQUEST,
"INVALID_APPLIES_TO_ENTRY: each entry must be a string",
)
})?;
validate_applies_to_entry(s).map_err(|e| {
app_error(
StatusCode::BAD_REQUEST,
format!("INVALID_APPLIES_TO_ENTRY: {}", e),
)
})?;
applies_to_strs.push(s.to_string());
}
let skill_type = body["skill_type"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'skill_type'"))?;
if !VALID_SKILL_TYPES.contains(&skill_type) {
return Err(app_error(
StatusCode::BAD_REQUEST,
format!(
"INVALID_SKILL_TYPE: must be one of {:?}, got '{}'",
VALID_SKILL_TYPES, skill_type
),
));
}
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let on_conflict = body
.get("on_conflict")
.and_then(|v| v.as_str())
.unwrap_or("reject");
if let Some(existing_rid) = find_skill_rid(&engine, skill_id) {
match on_conflict {
"reject" => {
return Err(app_error(
StatusCode::CONFLICT,
format!(
"SKILL_ID_CONFLICT: '{}' already exists (rid={}); pass on_conflict=update to overwrite or on_conflict=ignore to no-op",
skill_id, existing_rid
),
));
}
"ignore" => {
return Ok(Json(serde_json::json!({
"rid": existing_rid,
"skill_id": skill_id,
"namespace": SKILL_NAMESPACE,
"memory_type": "procedural",
"on_conflict": "ignore"
})));
}
"update" => {
let _ = engine.forget(&existing_rid);
}
other => {
return Err(app_error(
StatusCode::BAD_REQUEST,
format!(
"INVALID_ON_CONFLICT: '{}' (allowed: reject, update, ignore)",
other
),
));
}
}
}
let user_metadata = body.get("metadata").cloned().unwrap_or(Value::Null);
let mut metadata = serde_json::json!({
"record_type": "skill",
"skill_id": skill_id,
"applies_to": applies_to_strs,
"skill_type": skill_type,
});
if let Value::Object(user_map) = user_metadata {
if let Value::Object(meta_map) = &mut metadata {
for (k, v) in user_map {
meta_map.entry(k).or_insert(v);
}
}
}
execute_cmd(
engine,
Command::Remember {
text: body_text.to_string(),
memory_type: "procedural".to_string(),
importance: body
.get("importance")
.and_then(|v| v.as_f64())
.unwrap_or(0.7),
valence: 0.0,
half_life: 30.0 * 24.0 * 3600.0,
metadata,
namespace: SKILL_NAMESPACE.to_string(),
certainty: 0.9,
domain: "skill".to_string(),
source: body
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("agent")
.to_string(),
emotional_state: None,
embedding: None,
},
state.control.clone(),
&state.inflight,
)
.await
}
async fn skill_get(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Path(skill_id): axum::extract::Path<String>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("skill_get");
validate_skill_id(&skill_id)
.map_err(|e| app_error(StatusCode::BAD_REQUEST, format!("INVALID_SKILL_ID: {}", e)))?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
if let Some(rid) = find_skill_rid(&engine, &skill_id) {
if let Ok(Some(mem)) = engine.get(&rid) {
return Ok(Json(serde_json::json!({
"rid": mem.rid,
"skill_id": skill_id,
"body": mem.text,
"namespace": mem.namespace,
"memory_type": mem.memory_type,
"metadata": mem.metadata,
"created_at": mem.created_at,
})));
}
}
Err(app_error(
StatusCode::NOT_FOUND,
format!("skill_id '{}' not found", skill_id),
))
}
async fn skill_search(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("skill_search");
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let query = body["query"]
.as_str()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing 'query'"))?
.to_string();
let top_k = body
.get("top_k")
.and_then(|v| v.as_u64())
.unwrap_or(5)
.min(50) as usize;
let applies_to_filter: Option<String> = body
.get("applies_to")
.and_then(|v| v.as_array())
.and_then(|arr| arr.first())
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let skill_type_filter: Option<String> = body
.get("skill_type")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let fetch_k = (top_k * 4).max(20);
execute_cmd_with_post_filter(
engine,
Command::Recall {
query,
top_k: fetch_k,
memory_type: Some("procedural".to_string()),
include_consolidated: false,
expand_entities: false,
namespace: Some(SKILL_NAMESPACE.to_string()),
domain: Some("skill".to_string()),
source: None,
query_embedding: None,
},
state.control.clone(),
&state.inflight,
applies_to_filter,
skill_type_filter,
top_k,
)
.await
}
async fn skill_record_outcome(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Path(skill_id): axum::extract::Path<String>,
Json(body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("skill_record_outcome");
check_writable(&state)?;
validate_skill_id(&skill_id)
.map_err(|e| app_error(StatusCode::BAD_REQUEST, format!("INVALID_SKILL_ID: {}", e)))?;
let success = body["success"]
.as_bool()
.ok_or_else(|| app_error(StatusCode::BAD_REQUEST, "missing or non-bool 'success'"))?;
let context = body
.get("context")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
if find_skill_rid(&engine, &skill_id).is_none() {
return Err(app_error(
StatusCode::NOT_FOUND,
format!("skill_id '{}' not found", skill_id),
));
}
let metadata = serde_json::json!({
"record_type": "skill_outcome",
"skill_ref": skill_id,
"success": success,
"context": context,
});
execute_cmd(
engine,
Command::Remember {
text: format!(
"Outcome for {}: success={} — {}",
skill_id, success, context
),
memory_type: "episodic".to_string(),
importance: 0.5,
valence: if success { 0.3 } else { -0.3 },
half_life: 90.0 * 24.0 * 3600.0,
metadata,
namespace: OUTCOME_NAMESPACE.to_string(),
certainty: 1.0,
domain: "skill_outcome".to_string(),
source: "skill_api".to_string(),
emotional_state: None,
embedding: None,
},
state.control.clone(),
&state.inflight,
)
.await
}
async fn skill_forget(
State(state): State<Arc<AppState>>,
headers: axum::http::HeaderMap,
axum::extract::Path(skill_id): axum::extract::Path<String>,
Json(_body): Json<Value>,
) -> AppResult {
let _timer = crate::metrics::HandlerTimer::new("skill_forget");
check_writable(&state)?;
validate_skill_id(&skill_id)
.map_err(|e| app_error(StatusCode::BAD_REQUEST, format!("INVALID_SKILL_ID: {}", e)))?;
let (_, engine) = resolve_engine(
&state,
headers.get("authorization").and_then(|v| v.to_str().ok()),
)?;
let rid = find_skill_rid(&engine, &skill_id).ok_or_else(|| {
app_error(
StatusCode::NOT_FOUND,
format!("skill_id '{}' not found", skill_id),
)
})?;
execute_cmd(
engine,
Command::Forget { rid },
state.control.clone(),
&state.inflight,
)
.await
}
fn find_skill_rid(engine: &Arc<yantrikdb::YantrikDB>, skill_id: &str) -> Option<String> {
let (memories, _total) = engine
.list_memories(
10000,
0,
Some("skill"),
Some("procedural"),
Some(SKILL_NAMESPACE),
"created_at",
)
.ok()?;
for mem in memories {
if mem.metadata.get("skill_id").and_then(|v| v.as_str()) == Some(skill_id) {
return Some(mem.rid);
}
}
None
}
async fn execute_cmd_with_post_filter(
engine: Arc<yantrikdb::YantrikDB>,
cmd: Command,
control: Arc<parking_lot::Mutex<crate::control::ControlDb>>,
inflight: &std::sync::atomic::AtomicU32,
applies_to_filter: Option<String>,
skill_type_filter: Option<String>,
final_top_k: usize,
) -> AppResult {
use std::sync::atomic::Ordering;
struct InflightGuard<'a>(&'a std::sync::atomic::AtomicU32);
impl Drop for InflightGuard<'_> {
fn drop(&mut self) {
self.0.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}
let current = inflight.fetch_add(1, Ordering::Relaxed);
if current >= crate::server::MAX_INFLIGHT {
inflight.fetch_sub(1, Ordering::Relaxed);
return Err(app_error(
StatusCode::SERVICE_UNAVAILABLE,
format!(
"server overloaded: {} inflight ops (max {}). Retry later.",
current,
crate::server::MAX_INFLIGHT,
),
));
}
let _g = InflightGuard(inflight);
let inner = tokio::task::spawn_blocking(move || {
let db = engine.as_ref();
handler::execute_with_guard(db, cmd, Some(control.as_ref()))
})
.await
.map_err(|e| {
app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("join error: {e}"),
)
})?;
let result = inner.map_err(|e| app_error(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
match result {
crate::handler::CommandResult::RecallResults { results, total: _ } => {
let filtered: Vec<Value> = results
.into_iter()
.filter(|r| {
let metadata = r.get("metadata");
if let Some(ref needed) = applies_to_filter {
let ok = metadata
.and_then(|m| m.get("applies_to"))
.and_then(|v| v.as_array())
.map(|arr| arr.iter().any(|x| x.as_str() == Some(needed.as_str())))
.unwrap_or(false);
if !ok {
return false;
}
}
if let Some(ref needed) = skill_type_filter {
let ok = metadata
.and_then(|m| m.get("skill_type"))
.and_then(|v| v.as_str())
== Some(needed.as_str());
if !ok {
return false;
}
}
true
})
.take(final_top_k)
.collect();
let total = filtered.len();
Ok(Json(serde_json::json!({
"results": filtered,
"total": total,
})))
}
other => Err(app_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("unexpected command result: {:?}", other),
)),
}
}
pub fn router(state: Arc<AppState>) -> Router {
let body_limit = state.admission.cfg.max_request_body_bytes;
let raft_sub_router = state.raft.as_ref().map(|assembly| {
crate::raft::raft_status_router(assembly.raft.clone())
.merge(crate::raft::raft_receive_router(assembly.raft.clone()))
});
let mut app = Router::new()
.route("/v1/health", get(health))
.route("/v1/health/deep", get(health_deep))
.route("/v1/remember", post(remember))
.route("/v1/remember/batch", post(remember_batch))
.route("/v1/recall", post(recall))
.route("/v1/forget", post(forget))
.route("/v1/skills/define", post(skill_define))
.route("/v1/skills/search", post(skill_search))
.route("/v1/skills/{skill_id}", get(skill_get))
.route("/v1/skills/{skill_id}/outcome", post(skill_record_outcome))
.route("/v1/skills/{skill_id}/forget", post(skill_forget))
.route("/v1/relate", post(relate))
.route("/v1/claim", post(ingest_claim))
.route("/v1/claims", get(get_claims))
.route("/v1/alias", post(add_alias))
.route("/v1/think", post(think))
.route("/v1/conflicts", get(conflicts))
.route("/v1/conflicts/{id}/resolve", post(resolve_conflict))
.route("/v1/sessions", post(session_start))
.route("/v1/sessions/{id}", delete(session_end))
.route("/v1/personality", get(personality))
.route("/v1/stats", get(stats))
.route("/v1/databases", post(create_database))
.route("/v1/databases", get(list_databases))
.route("/v1/cluster", get(cluster_status))
.route("/v1/cluster/promote", post(cluster_promote))
.route("/v1/cluster/initialize", post(cluster_initialize))
.route("/v1/cluster/add-learner", post(cluster_add_learner))
.route("/v1/cluster/promote-voter", post(cluster_promote_voter))
.route("/v1/cluster/remove", post(cluster_remove))
.route("/v1/admin/control-snapshot", get(control_snapshot))
.route("/v1/admin/snapshot", post(admin_snapshot))
.route("/v1/claim_with_lineage", post(ingest_claim_with_lineage))
.route("/v1/mobility", get(get_mobility))
.route("/v1/contest", get(get_contest))
.route("/v1/move_events", post(record_move_event))
.route("/v1/flagged_propositions", get(list_flagged))
.route("/v1/debug/history/{tenant_id}", get(debug_history))
.route("/v1/debug/fault/inject", post(debug_fault_inject))
.route("/v1/debug/fault", get(debug_fault_list))
.route("/v1/debug/fault/clear", post(debug_fault_clear))
.route("/v1/debug/fault/{fault_id}", delete(debug_fault_remove))
.route("/v1/jobs", get(jobs_list))
.route("/v1/jobs/{job_id}", get(jobs_get))
.route("/v1/jobs/{job_id}", delete(jobs_cancel))
.route("/v1/admin/migrations", get(admin_migrations_list))
.route("/metrics", get(metrics));
let mut app = app
.layer(tower_http::limit::RequestBodyLimitLayer::new(body_limit))
.with_state(state);
if let Some(raft_router) = raft_sub_router {
app = app.merge(raft_router);
}
app
}
#[cfg(test)]
mod skill_validation_tests {
use super::{validate_applies_to_entry, validate_skill_id};
#[test]
fn skill_id_valid_minimal_form() {
assert!(validate_skill_id("a.b").is_err()); assert!(validate_skill_id("ab.c").is_ok()); assert!(validate_skill_id("skill.foo.v1").is_ok());
assert!(validate_skill_id("skill.invoice.validation.v3").is_ok());
}
#[test]
fn skill_id_rejects_uppercase() {
assert!(validate_skill_id("Skill.foo").is_err());
assert!(validate_skill_id("skill.Foo").is_err());
assert!(validate_skill_id("SKILL.FOO").is_err());
}
#[test]
fn skill_id_rejects_no_dot() {
let err = validate_skill_id("skillfoo").unwrap_err();
assert!(err.contains("at least one '.'"));
}
#[test]
fn skill_id_rejects_consecutive_dots() {
assert!(validate_skill_id("skill..foo").is_err());
}
#[test]
fn skill_id_rejects_trailing_dot() {
assert!(validate_skill_id("skill.foo.").is_err());
}
#[test]
fn skill_id_rejects_starts_with_digit_or_underscore_or_dot() {
assert!(validate_skill_id("1skill.foo").is_err());
assert!(validate_skill_id("_skill.foo").is_err());
assert!(validate_skill_id(".skill.foo").is_err());
}
#[test]
fn skill_id_rejects_invalid_chars() {
assert!(validate_skill_id("skill-foo.v1").is_err()); assert!(validate_skill_id("skill foo.v1").is_err()); assert!(validate_skill_id("skill/foo.v1").is_err()); assert!(validate_skill_id("skill@foo.v1").is_err()); }
#[test]
fn skill_id_length_bounds() {
assert!(validate_skill_id("a.bc").is_ok());
assert!(validate_skill_id("a.b").is_err());
let long_ok = format!("skill.{}", "a".repeat(193)); assert!(validate_skill_id(&long_ok).is_ok());
let long_err = format!("skill.{}", "a".repeat(195)); assert!(validate_skill_id(&long_err).is_err());
}
#[test]
fn skill_id_allows_underscores_and_digits_in_segments() {
assert!(validate_skill_id("skill_42.foo_bar.v1_2").is_ok());
assert!(validate_skill_id("a1.b2").is_ok());
}
#[test]
fn applies_to_entry_valid() {
assert!(validate_applies_to_entry("invoice").is_ok());
assert!(validate_applies_to_entry("meta_agent").is_ok());
assert!(validate_applies_to_entry("a").is_ok());
assert!(validate_applies_to_entry("a1").is_ok());
assert!(validate_applies_to_entry("invoice_validation_2026").is_ok());
}
#[test]
fn applies_to_entry_rejects_hyphen() {
let err = validate_applies_to_entry("meta-agent").unwrap_err();
assert!(err.contains("invalid character"));
}
#[test]
fn applies_to_entry_rejects_uppercase() {
assert!(validate_applies_to_entry("Invoice").is_err());
assert!(validate_applies_to_entry("INVOICE").is_err());
}
#[test]
fn applies_to_entry_rejects_dot_or_slash() {
assert!(validate_applies_to_entry("invoice.validation").is_err());
assert!(validate_applies_to_entry("invoice/validation").is_err());
}
#[test]
fn applies_to_entry_rejects_empty() {
assert!(validate_applies_to_entry("").is_err());
}
#[test]
fn applies_to_entry_rejects_starts_with_digit_or_underscore() {
assert!(validate_applies_to_entry("1invoice").is_err());
assert!(validate_applies_to_entry("_invoice").is_err());
}
}
#[cfg(test)]
mod commit_error_mapping_tests {
use super::commit_error_to_app_error;
use crate::commit::{CommitError, OpId, TenantId};
use axum::http::StatusCode;
#[test]
fn not_leader_maps_to_307_with_leader_info_in_body() {
let (status, body) = commit_error_to_app_error(CommitError::NotLeader {
leader_id: Some(4),
leader_addr: Some("https://192.168.4.140:7438".into()),
});
assert_eq!(status, StatusCode::TEMPORARY_REDIRECT);
let v = body.0;
assert_eq!(v["error"], "not_leader");
assert_eq!(v["leader_id"], 4);
assert_eq!(v["leader_addr"], "https://192.168.4.140:7438");
}
#[test]
fn not_leader_with_unknown_leader_emits_nulls() {
let (status, body) = commit_error_to_app_error(CommitError::NotLeader {
leader_id: None,
leader_addr: None,
});
assert_eq!(status, StatusCode::TEMPORARY_REDIRECT);
assert!(body.0["leader_id"].is_null());
assert!(body.0["leader_addr"].is_null());
}
#[test]
fn op_id_collision_maps_to_409_with_existing_index() {
let op = OpId::new_random();
let (status, body) = commit_error_to_app_error(CommitError::OpIdCollision {
op_id: op,
tenant_id: TenantId::new(7),
existing_index: 42,
});
assert_eq!(status, StatusCode::CONFLICT);
let v = body.0;
assert_eq!(v["error"], "op_id_collision");
assert_eq!(v["op_id"], op.to_string());
assert_eq!(v["tenant_id"], 7);
assert_eq!(v["existing_index"], 42);
}
#[test]
fn unexpected_log_index_maps_to_409_with_expected_actual() {
let (status, body) = commit_error_to_app_error(CommitError::UnexpectedLogIndex {
tenant_id: TenantId::new(1),
expected: 5,
actual: 7,
});
assert_eq!(status, StatusCode::CONFLICT);
let v = body.0;
assert_eq!(v["error"], "unexpected_log_index");
assert_eq!(v["expected"], 5);
assert_eq!(v["actual"], 7);
}
#[test]
fn not_yet_implemented_maps_to_501_with_planned_rfc() {
let (status, body) = commit_error_to_app_error(CommitError::NotYetImplemented {
variant: "PurgeMemory",
planned_rfc: "011",
});
assert_eq!(status, StatusCode::NOT_IMPLEMENTED);
let v = body.0;
assert_eq!(v["error"], "not_implemented");
assert_eq!(v["variant"], "PurgeMemory");
assert_eq!(v["planned_rfc"], "011");
}
#[test]
fn storage_failure_maps_to_503_with_retry_after() {
let (status, body) = commit_error_to_app_error(CommitError::StorageFailure {
message: "disk full".into(),
});
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
let v = body.0;
assert_eq!(v["error"], "storage_failure");
assert_eq!(v["detail"], "disk full");
assert_eq!(v["retry_after_ms"], 1000);
}
#[test]
fn shutdown_maps_to_503_with_longer_retry_after() {
let (status, body) = commit_error_to_app_error(CommitError::Shutdown);
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body.0["error"], "shutting_down");
assert_eq!(body.0["retry_after_ms"], 5000);
}
#[test]
fn commit_timeout_maps_to_503_with_op_id_for_idempotent_retry() {
let op = OpId::new_random();
let (status, body) = commit_error_to_app_error(CommitError::CommitTimeout { op_id: op });
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
let v = body.0;
assert_eq!(v["error"], "commit_timeout");
assert_eq!(v["op_id"], op.to_string());
assert_eq!(v["retry_after_ms"], 1000);
}
fn replication_lag(
last_log_index: Option<u64>,
last_applied_index: Option<u64>,
) -> Option<u64> {
match (last_log_index, last_applied_index) {
(Some(log), Some(applied)) => Some(log.saturating_sub(applied)),
(Some(log), None) => Some(log),
(None, _) => None,
}
}
#[test]
fn pr_6_9_lag_is_zero_when_log_and_applied_are_equal() {
assert_eq!(replication_lag(Some(18), Some(18)), Some(0));
}
#[test]
fn pr_6_9_lag_reflects_unapplied_entries() {
assert_eq!(replication_lag(Some(100), Some(95)), Some(5));
}
#[test]
fn pr_6_9_lag_clamps_at_zero_under_inversion() {
assert_eq!(replication_lag(Some(10), Some(15)), Some(0));
}
#[test]
fn pr_6_9_lag_is_log_index_when_nothing_applied_yet() {
assert_eq!(replication_lag(Some(7), None), Some(7));
}
#[test]
fn pr_6_9_lag_is_none_when_no_log_index_known() {
assert_eq!(replication_lag(None, None), None);
assert_eq!(replication_lag(None, Some(5)), None);
}
#[test]
fn version_mismatch_maps_to_426_upgrade_required() {
let verr = crate::version::VersionError::WireMajorMismatch {
node: crate::version::WireVersion::new(1, 0),
event: crate::version::WireVersion::new(2, 0),
};
let (status, body) = commit_error_to_app_error(CommitError::Version(verr));
assert_eq!(status, StatusCode::UPGRADE_REQUIRED);
assert_eq!(body.0["error"], "wire_version_mismatch");
}
#[test]
fn every_variant_produces_a_response() {
let cases = vec![
CommitError::NotLeader {
leader_id: None,
leader_addr: None,
},
CommitError::OpIdCollision {
op_id: OpId::new_random(),
tenant_id: TenantId::new(1),
existing_index: 0,
},
CommitError::UnexpectedLogIndex {
tenant_id: TenantId::new(1),
expected: 1,
actual: 2,
},
CommitError::NotYetImplemented {
variant: "X",
planned_rfc: "Y",
},
CommitError::StorageFailure {
message: "x".into(),
},
CommitError::Shutdown,
CommitError::CommitTimeout {
op_id: OpId::new_random(),
},
];
for err in cases {
let label = err.metric_label();
let (status, body) = commit_error_to_app_error(err);
let s = status.as_u16();
assert!(
s == 307 || s >= 400,
"{label} unexpected status {s} (want 307 or 4xx/5xx)"
);
assert!(
body.0.get("error").is_some(),
"{label} body must include `error` key"
);
}
}
}