Skip to main content

nexus_memory_web/api/
agent.rs

1//! Agent API endpoints for the always-on memory agent
2
3use axum::{extract::State, Json};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use tracing::error;
7
8use crate::error::{Result, WebError};
9use crate::models::{
10    AgentConsolidateResponse, AgentIngestRequest, AgentIngestResponse, AgentQueryRequest,
11    AgentQueryResponse, AgentStatusResponse,
12};
13use crate::state::AppState;
14
15/// POST /api/agent/ingest — Ingest text with LLM enrichment
16pub async fn agent_ingest(
17    State(state): State<Arc<RwLock<AppState>>>,
18    Json(request): Json<AgentIngestRequest>,
19) -> Result<Json<AgentIngestResponse>> {
20    let state = state.read().await;
21
22    let supervisor = state
23        .agent_supervisor
24        .as_ref()
25        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
26
27    if request.text.trim().is_empty() {
28        return Err(WebError::InvalidRequest("Text cannot be empty".to_string()));
29    }
30
31    let ingest_svc = supervisor.ingest_service();
32    let namespace_id = supervisor.namespace_id();
33
34    let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
35
36    match ingest_svc
37        .ingest(&request.text, &request.source, namespace_id, &memory_repo)
38        .await
39    {
40        Ok(memory_id) => Ok(Json(AgentIngestResponse {
41            success: true,
42            memory_id: Some(memory_id),
43            summary: None,
44            error: None,
45        })),
46        Err(e) => {
47            error!(error = %e, "Agent ingest failed");
48            Ok(Json(AgentIngestResponse {
49                success: false,
50                memory_id: None,
51                summary: None,
52                error: Some(e.to_string()),
53            }))
54        }
55    }
56}
57
58/// POST /api/agent/query — Query memory with LLM synthesis
59pub async fn agent_query(
60    State(state): State<Arc<RwLock<AppState>>>,
61    Json(request): Json<AgentQueryRequest>,
62) -> Result<Json<AgentQueryResponse>> {
63    let state = state.read().await;
64
65    let supervisor = state
66        .agent_supervisor
67        .as_ref()
68        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
69
70    if request.question.trim().is_empty() {
71        return Err(WebError::InvalidRequest(
72            "Question cannot be empty".to_string(),
73        ));
74    }
75
76    let query_svc = supervisor.query_service();
77    let namespace_id = supervisor.namespace_id();
78
79    let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
80    let relation_repo = nexus_storage::MemoryRelationRepository::new(state.pool());
81
82    match query_svc
83        .query(
84            &request.question,
85            namespace_id,
86            &memory_repo,
87            &relation_repo,
88        )
89        .await
90    {
91        Ok(answer) => {
92            supervisor.increment_queries_answered().await;
93            Ok(Json(AgentQueryResponse {
94                success: true,
95                question: request.question,
96                answer: Some(answer.answer),
97                error: None,
98            }))
99        }
100        Err(e) => {
101            error!(error = %e, "Agent query failed");
102            Ok(Json(AgentQueryResponse {
103                success: false,
104                question: request.question,
105                answer: None,
106                error: Some(e.to_string()),
107            }))
108        }
109    }
110}
111
112/// POST /api/agent/consolidate — Trigger manual consolidation
113pub async fn agent_consolidate(
114    State(state): State<Arc<RwLock<AppState>>>,
115) -> Result<Json<AgentConsolidateResponse>> {
116    let state = state.read().await;
117
118    let supervisor = state
119        .agent_supervisor
120        .as_ref()
121        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
122
123    let namespace_id = supervisor.namespace_id();
124    let config = nexus_core::Config::from_env().map_err(|e| WebError::Config(e.to_string()))?;
125
126    let lease_owner = format!("web-agent-consolidate-{}", namespace_id);
127    let embeddings = nexus_agent::create_embedding_service(&config).await;
128    match nexus_agent::runtime::run_dream_cycle(
129        state.pool().clone(),
130        &config.cognition,
131        &nexus_core::config::AgentConfig {
132            namespace: supervisor.get_status().await.namespace,
133            ..Default::default()
134        },
135        nexus_llm::create_client_auto_with_fallback()
136            .map_err(|e| WebError::Config(format!("Failed to create LLM client: {}", e)))?,
137        embeddings,
138        nexus_agent::runtime::DreamCycleRequest {
139            namespace_id,
140            lease_owner: &lease_owner,
141            perspective: None,
142            session_key: None,
143            reflect_reason: "web_manual_dream",
144            digest_reason: "web_manual_digest",
145        },
146    )
147    .await
148    {
149        Ok(processed) => Ok(Json(AgentConsolidateResponse {
150            success: true,
151            memories_processed: processed,
152            error: None,
153        })),
154        Err(e) => {
155            error!(error = %e, "Agent consolidation failed");
156            Ok(Json(AgentConsolidateResponse {
157                success: false,
158                memories_processed: 0,
159                error: Some(format!("{}", e)),
160            }))
161        }
162    }
163}
164
165/// GET /api/agent/status — Get agent status
166pub async fn agent_status(
167    State(state): State<Arc<RwLock<AppState>>>,
168) -> Result<Json<AgentStatusResponse>> {
169    let state = state.read().await;
170
171    match &state.agent_supervisor {
172        Some(supervisor) => {
173            let status = supervisor.get_status().await;
174            Ok(Json(AgentStatusResponse {
175                enabled: status.enabled,
176                namespace: status.namespace,
177                inbox_dir: status.inbox_dir,
178                files_processed: status.files_processed,
179                memories_consolidated: status.memories_consolidated,
180                queries_answered: status.queries_answered,
181                last_scan: status.last_scan.map(|d| d.to_rfc3339()),
182                last_consolidation: status.last_consolidation.map(|d| d.to_rfc3339()),
183                errors: status.errors,
184                uptime_secs: state.uptime_seconds(),
185            }))
186        }
187        None => Ok(Json(AgentStatusResponse {
188            enabled: false,
189            namespace: String::new(),
190            inbox_dir: String::new(),
191            files_processed: 0,
192            memories_consolidated: 0,
193            queries_answered: 0,
194            last_scan: None,
195            last_consolidation: None,
196            errors: Vec::new(),
197            uptime_secs: state.uptime_seconds(),
198        })),
199    }
200}