roboticus-api 0.11.4

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
use axum::{
    extract::{Path, Query, State},
    response::IntoResponse,
};
use serde::Deserialize;
use serde_json::Value;

use super::{AppState, JsonError, bad_request, internal_err};

#[derive(Deserialize)]
pub struct LimitQuery {
    pub limit: Option<i64>,
}

#[derive(Deserialize)]
pub struct SearchQuery {
    pub q: Option<String>,
}

pub async fn get_working_memory(
    State(state): State<AppState>,
    Path(session_id): Path<String>,
) -> impl IntoResponse {
    match roboticus_db::memory::retrieve_working(&state.db, &session_id) {
        Ok(entries) => {
            let items: Vec<Value> = entries
                .into_iter()
                .map(|e| {
                    serde_json::json!({
                        "id": e.id,
                        "session_id": e.session_id,
                        "entry_type": e.entry_type,
                        "content": e.content,
                        "importance": e.importance,
                        "created_at": e.created_at,
                    })
                })
                .collect();
            Ok(axum::Json(serde_json::json!({ "entries": items })))
        }
        Err(e) => Err(internal_err(&e)),
    }
}

const MAX_MEMORY_LIMIT: i64 = 1000;

pub async fn get_working_memory_all(
    State(state): State<AppState>,
    Query(params): Query<LimitQuery>,
) -> impl IntoResponse {
    let limit = params.limit.unwrap_or(100).clamp(1, MAX_MEMORY_LIMIT);
    match roboticus_db::memory::retrieve_working_all(&state.db, limit) {
        Ok(entries) => {
            let items: Vec<Value> = entries
                .into_iter()
                .map(|e| {
                    serde_json::json!({
                        "id": e.id,
                        "session_id": e.session_id,
                        "entry_type": e.entry_type,
                        "content": e.content,
                        "importance": e.importance,
                        "created_at": e.created_at,
                    })
                })
                .collect();
            Ok(axum::Json(serde_json::json!({ "entries": items })))
        }
        Err(e) => Err(internal_err(&e)),
    }
}

pub async fn get_episodic_memory(
    State(state): State<AppState>,
    Query(params): Query<LimitQuery>,
) -> impl IntoResponse {
    let limit = params.limit.unwrap_or(50).clamp(1, MAX_MEMORY_LIMIT);
    match roboticus_db::memory::retrieve_episodic(&state.db, limit) {
        Ok(entries) => {
            let items: Vec<Value> = entries
                .into_iter()
                .map(|e| {
                    serde_json::json!({
                        "id": e.id,
                        "classification": e.classification,
                        "content": e.content,
                        "importance": e.importance,
                        "created_at": e.created_at,
                    })
                })
                .collect();
            Ok(axum::Json(serde_json::json!({ "entries": items })))
        }
        Err(e) => Err(internal_err(&e)),
    }
}

pub async fn get_semantic_memory(
    State(state): State<AppState>,
    Path(category): Path<String>,
) -> impl IntoResponse {
    match roboticus_db::memory::retrieve_semantic(&state.db, &category) {
        Ok(entries) => {
            let items: Vec<Value> = entries
                .into_iter()
                .map(|e| {
                    serde_json::json!({
                        "id": e.id,
                        "category": e.category,
                        "key": e.key,
                        "value": e.value,
                        "confidence": e.confidence,
                        "created_at": e.created_at,
                        "updated_at": e.updated_at,
                    })
                })
                .collect();
            Ok(axum::Json(serde_json::json!({ "entries": items })))
        }
        Err(e) => Err(internal_err(&e)),
    }
}

pub async fn get_semantic_categories(State(state): State<AppState>) -> impl IntoResponse {
    match roboticus_db::memory::list_semantic_categories(&state.db) {
        Ok(cats) => {
            let items: Vec<Value> = cats
                .into_iter()
                .map(|(cat, count)| serde_json::json!({ "category": cat, "count": count }))
                .collect();
            Ok(axum::Json(serde_json::json!({ "categories": items })))
        }
        Err(e) => Err(internal_err(&e)),
    }
}

pub async fn get_semantic_memory_all(
    State(state): State<AppState>,
    Query(params): Query<LimitQuery>,
) -> impl IntoResponse {
    let limit = params.limit.unwrap_or(100).clamp(1, MAX_MEMORY_LIMIT);
    match roboticus_db::memory::retrieve_semantic_all(&state.db, limit) {
        Ok(entries) => {
            let items: Vec<Value> = entries
                .into_iter()
                .map(|e| {
                    serde_json::json!({
                        "id": e.id,
                        "category": e.category,
                        "key": e.key,
                        "value": e.value,
                        "confidence": e.confidence,
                        "created_at": e.created_at,
                        "updated_at": e.updated_at,
                    })
                })
                .collect();
            Ok(axum::Json(serde_json::json!({ "entries": items })))
        }
        Err(e) => Err(internal_err(&e)),
    }
}

pub async fn memory_search(
    State(state): State<AppState>,
    Query(params): Query<SearchQuery>,
) -> impl IntoResponse {
    let query = params.q.unwrap_or_default();
    if query.is_empty() {
        return Err(bad_request("missing ?q= parameter"));
    }
    if query.len() > 512 {
        return Err(bad_request("search query too long (max 512 chars)"));
    }
    match roboticus_db::memory::fts_search(&state.db, &query, 100) {
        Ok(results) => Ok(axum::Json(serde_json::json!({ "results": results }))),
        Err(e) => Err(internal_err(&e)),
    }
}

// ── Memory health analytics ────────────────────────────────────

/// GET /api/memory/health — retrieval analytics for the dashboard.
///
/// Aggregates memory retrieval metrics from `context_snapshots` over the last
/// 7 days. Gracefully handles databases that have not yet run migration 025 by
/// returning zero values.
pub async fn memory_health(State(state): State<AppState>) -> impl IntoResponse {
    let conn = state.db.conn();

    // Retrieval metrics from context_snapshots (best-effort; fields may be NULL)
    let result = conn.query_row(
        "SELECT
            COALESCE(AVG(CAST(retrieval_hit AS REAL)), 0.0),
            COALESCE(AVG(avg_similarity), 0.0),
            COALESCE(AVG(budget_utilization), 0.0),
            COUNT(*)
         FROM context_snapshots
         WHERE created_at >= datetime('now', '-7 days')
           AND memory_tokens > 0",
        [],
        |row| {
            Ok((
                row.get::<_, f64>(0)?,
                row.get::<_, f64>(1)?,
                row.get::<_, f64>(2)?,
                row.get::<_, i64>(3)?,
            ))
        },
    );

    let (hit_rate, avg_sim, avg_util, total_turns) = result.unwrap_or((0.0, 0.0, 0.0, 0));

    // Actual tier population counts from the memory tables (not retrieval samples)
    let tier_breakdown = serde_json::json!({
        "episodic": conn.query_row(
            "SELECT count(*) FROM episodic_memory WHERE memory_state = 'active'",
            [], |r| r.get::<_, i64>(0)).unwrap_or(0),
        "semantic": conn.query_row(
            "SELECT count(*) FROM semantic_memory WHERE memory_state = 'active'",
            [], |r| r.get::<_, i64>(0)).unwrap_or(0),
        "working": conn.query_row(
            "SELECT count(*) FROM working_memory", [], |r| r.get::<_, i64>(0)).unwrap_or(0),
        "procedural": conn.query_row(
            "SELECT count(*) FROM procedural_memory", [], |r| r.get::<_, i64>(0)).unwrap_or(0),
        "relationship": conn.query_row(
            "SELECT count(*) FROM relationship_memory", [], |r| r.get::<_, i64>(0)).unwrap_or(0),
    });

    // Index coverage: how many memories have index entries
    let index_count: i64 = conn
        .query_row("SELECT count(*) FROM memory_index", [], |r| r.get(0))
        .unwrap_or(0);

    Ok::<_, JsonError>(axum::Json(serde_json::json!({
        "retrieval_hit_rate": hit_rate,
        "avg_similarity_trend": avg_sim,
        "budget_utilization_avg": avg_util,
        "total_turns_analyzed": total_turns,
        "tier_breakdown": tier_breakdown,
        "index_entries": index_count,
    })))
}

/// POST /api/memory/consolidate — trigger a full consolidation cycle now.
pub async fn trigger_consolidation(State(state): State<AppState>) -> impl IntoResponse {
    let db = state.db.clone();
    let report = tokio::task::spawn_blocking(move || {
        roboticus_agent::consolidation::run_consolidation(&db, None)
    })
    .await
    .map_err(|e| JsonError(axum::http::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
    .map_err(|e| internal_err(&e))?;

    Ok::<_, JsonError>(axum::Json(serde_json::json!({
        "indexed": report.indexed,
        "deduped": report.deduped,
        "tier_synced": report.tier_synced,
        "decayed": report.decayed,
        "pruned": report.pruned,
        "orphans_cleaned": report.orphans_cleaned,
        "duration_ms": report.duration_ms,
        "total_actions": report.total_actions(),
    })))
}

/// POST /api/memory/reindex — backfill all missing index entries.
pub async fn trigger_reindex(State(state): State<AppState>) -> impl IntoResponse {
    let db = state.db.clone();
    let indexed = tokio::task::spawn_blocking(move || {
        roboticus_db::memory_index::backfill_missing_index_entries(&db, 10_000)
    })
    .await
    .map_err(|e| JsonError(axum::http::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
    .map_err(|e| internal_err(&e))?;

    Ok::<_, JsonError>(axum::Json(serde_json::json!({
        "indexed": indexed,
    })))
}

// ── Knowledge ingestion ────────────────────────────────────────

#[derive(Deserialize)]
pub struct IngestRequest {
    pub path: String,
}

pub async fn knowledge_ingest(
    State(state): State<AppState>,
    axum::Json(body): axum::Json<IngestRequest>,
) -> impl IntoResponse {
    use roboticus_agent::ingest::{ingest_directory, ingest_file};
    use std::path::Path;

    let workspace = {
        let cfg = state.config.read().await;
        cfg.agent.workspace.clone()
    };
    if !workspace.exists() {
        return Err(bad_request(format!(
            "workspace root '{}' does not exist",
            workspace.display()
        )));
    }
    let workspace_root = match std::fs::canonicalize(&workspace) {
        Ok(p) => p,
        Err(e) => {
            return Err(bad_request(format!(
                "workspace root '{}' is not accessible: {e}",
                workspace.display()
            )));
        }
    };
    let raw = Path::new(&body.path);
    let candidate = if raw.is_absolute() {
        raw.to_path_buf()
    } else {
        workspace_root.join(raw)
    };
    let target = match std::fs::canonicalize(&candidate) {
        Ok(p) => p,
        Err(_) => return Err(bad_request("path does not exist or is not accessible")),
    };
    if !target.starts_with(&workspace_root) {
        return Err(bad_request(format!(
            "path '{}' escapes workspace root '{}'",
            target.display(),
            workspace_root.display()
        )));
    }

    let results = if target.is_dir() {
        match ingest_directory(&state.db, &target) {
            Ok(r) => r,
            Err(e) => return Err(internal_err(&e)),
        }
    } else if target.is_file() {
        match ingest_file(&state.db, &target) {
            Ok(r) => vec![r],
            Err(e) => return Err(bad_request(format!("{e}"))),
        }
    } else {
        return Err(bad_request("path does not exist or is not accessible"));
    };

    let total_chunks: usize = results.iter().map(|r| r.chunks_stored).sum();
    Ok(axum::Json(serde_json::json!({
        "files_ingested": results.len(),
        "total_chunks": total_chunks,
        "results": results,
    })))
}