Skip to main content

nexus_memory_web/api/
agent.rs

1//! Agent API endpoints for the always-on memory agent
2
3use axum::{extract::State, Json};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use tracing::error;
7
8use crate::error::{Result, WebError};
9use crate::models::{
10    AgentBoostRequest, AgentBoostResponse, AgentConsolidateResponse, AgentIngestRequest,
11    AgentIngestResponse, AgentQueryRequest, AgentQueryResponse, AgentStatusResponse,
12};
13use crate::state::AppState;
14
15/// POST /api/agent/ingest — Ingest text with LLM enrichment
16pub async fn agent_ingest(
17    State(state): State<Arc<RwLock<AppState>>>,
18    Json(request): Json<AgentIngestRequest>,
19) -> Result<Json<AgentIngestResponse>> {
20    let state = state.read().await;
21
22    let supervisor = state
23        .agent_supervisor
24        .as_ref()
25        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
26
27    if request.text.trim().is_empty() {
28        return Err(WebError::InvalidRequest("Text cannot be empty".to_string()));
29    }
30
31    let ingest_svc = supervisor.ingest_service();
32    let namespace_id = supervisor.namespace_id();
33
34    let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
35
36    match ingest_svc
37        .ingest(&request.text, &request.source, namespace_id, &memory_repo)
38        .await
39    {
40        Ok(memory_id) => Ok(Json(AgentIngestResponse {
41            success: true,
42            memory_id: Some(memory_id),
43            summary: None,
44            error: None,
45        })),
46        Err(e) => {
47            error!(error = %e, "Agent ingest failed");
48            Ok(Json(AgentIngestResponse {
49                success: false,
50                memory_id: None,
51                summary: None,
52                error: Some(e.to_string()),
53            }))
54        }
55    }
56}
57
58/// POST /api/agent/query — Query memory with LLM synthesis
59pub async fn agent_query(
60    State(state): State<Arc<RwLock<AppState>>>,
61    Json(request): Json<AgentQueryRequest>,
62) -> Result<Json<AgentQueryResponse>> {
63    let state = state.read().await;
64
65    let supervisor = state
66        .agent_supervisor
67        .as_ref()
68        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
69
70    if request.question.trim().is_empty() {
71        return Err(WebError::InvalidRequest(
72            "Question cannot be empty".to_string(),
73        ));
74    }
75
76    let query_svc = supervisor.query_service();
77    let namespace_id = supervisor.namespace_id();
78
79    let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
80    let relation_repo = nexus_storage::MemoryRelationRepository::new(state.pool());
81
82    match query_svc
83        .query(
84            &request.question,
85            namespace_id,
86            &memory_repo,
87            &relation_repo,
88        )
89        .await
90    {
91        Ok(answer) => {
92            supervisor.increment_queries_answered().await;
93            Ok(Json(AgentQueryResponse {
94                success: true,
95                question: request.question,
96                answer: Some(answer.answer),
97                error: None,
98            }))
99        }
100        Err(e) => {
101            error!(error = %e, "Agent query failed");
102            Ok(Json(AgentQueryResponse {
103                success: false,
104                question: request.question,
105                answer: None,
106                error: Some(e.to_string()),
107            }))
108        }
109    }
110}
111
112/// POST /api/agent/consolidate — Trigger manual consolidation
113pub async fn agent_consolidate(
114    State(state): State<Arc<RwLock<AppState>>>,
115) -> Result<Json<AgentConsolidateResponse>> {
116    let state = state.read().await;
117
118    let supervisor = state
119        .agent_supervisor
120        .as_ref()
121        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
122
123    let namespace_id = supervisor.namespace_id();
124    let config = nexus_core::Config::from_env().map_err(|e| WebError::Config(e.to_string()))?;
125
126    let lease_owner = format!("web-agent-consolidate-{}", namespace_id);
127    let embeddings = nexus_agent::create_embedding_service(&config).await;
128    match nexus_agent::run_dream_cycle(
129        state.pool().clone(),
130        &config.cognition,
131        &nexus_core::config::AgentConfig {
132            namespace: supervisor.get_status().await.namespace,
133            ..Default::default()
134        },
135        nexus_llm::create_client_auto_with_fallback()
136            .map_err(|e| WebError::Config(format!("Failed to create LLM client: {}", e)))?,
137        embeddings,
138        nexus_agent::DreamCycleRequest {
139            namespace_id,
140            lease_owner: &lease_owner,
141            perspective: None,
142            session_key: None,
143            reflect_reason: "web_manual_dream",
144            digest_reason: "web_manual_digest",
145        },
146    )
147    .await
148    {
149        Ok(processed) => Ok(Json(AgentConsolidateResponse {
150            success: true,
151            memories_processed: processed,
152            error: None,
153        })),
154        Err(e) => {
155            error!(error = %e, "Agent consolidation failed");
156            Ok(Json(AgentConsolidateResponse {
157                success: false,
158                memories_processed: 0,
159                error: Some(format!("{}", e)),
160            }))
161        }
162    }
163}
164
165/// POST /api/agent/boost — Pin or boost a memory in cognitive cache
166pub async fn agent_boost(
167    State(state): State<Arc<RwLock<AppState>>>,
168    Json(request): Json<AgentBoostRequest>,
169) -> Result<Json<AgentBoostResponse>> {
170    let state = state.read().await;
171
172    let _supervisor = state
173        .agent_supervisor
174        .as_ref()
175        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
176
177    let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
178    let memory = memory_repo
179        .get_by_id(request.memory_id)
180        .await
181        .map_err(|e| WebError::Storage(e.to_string()))?
182        .ok_or_else(|| WebError::NotFound(format!("Memory {} not found", request.memory_id)))?;
183
184    // Verify the memory belongs to the active namespace
185    if memory.namespace_id != _supervisor.namespace_id() {
186        return Err(WebError::InvalidRequest(
187            "Memory does not belong to the active namespace".to_string(),
188        ));
189    }
190
191    // Resolve project root for cache path — explicit root_dir required for web API
192    let cwd = request
193        .root_dir
194        .as_ref()
195        .map(std::path::PathBuf::from)
196        .ok_or_else(|| WebError::InvalidRequest("root_dir is required".to_string()))?;
197
198    // Check for path traversal attempts
199    if cwd
200        .components()
201        .any(|comp| comp == std::path::Component::ParentDir)
202    {
203        return Err(WebError::InvalidRequest(
204            "root_dir must not contain path traversal segments".to_string(),
205        ));
206    }
207
208    // Canonicalize to resolve symlinks and reject nonexistent paths
209    let cwd = cwd
210        .canonicalize()
211        .map_err(|e| WebError::Config(format!("Invalid root_dir: {}", e)))?;
212
213    // Ensure the path is a directory
214    if !cwd.is_dir() {
215        return Err(WebError::InvalidRequest(
216            "root_dir must be a directory".to_string(),
217        ));
218    }
219
220    // Reject obviously unsafe paths (system pseudo-filesystems)
221    let path_str = cwd.to_string_lossy();
222    let is_pseudo_fs = ["/proc", "/sys", "/dev"]
223        .iter()
224        .any(|prefix| path_str == *prefix || path_str.starts_with(&format!("{}/", prefix)));
225    if is_pseudo_fs {
226        return Err(WebError::InvalidRequest(
227            "root_dir must not point to a system pseudo-filesystem".to_string(),
228        ));
229    }
230    let project_identity = nexus_core::ProjectIdentity::resolve(&cwd);
231    let nexus_dir = project_identity.root_dir.join(".nexus");
232
233    let mut cache = nexus_agent::CognitiveCache::load_or_init(&nexus_dir);
234
235    let config = nexus_core::Config::from_env().unwrap_or_default();
236
237    let relevance_score = request
238        .boost_score
239        .unwrap_or(memory.relevance_score.unwrap_or(0.85));
240    let tier = nexus_agent::ConfidenceTier::from_score(relevance_score);
241
242    let inserted = cache.hot_cache.promote(
243        nexus_agent::HotCacheEntry {
244            memory_id: memory.id,
245            content: memory.content,
246            relevance_score,
247            tier,
248            promoted_at: chrono::Utc::now(),
249            last_surfaced: chrono::Utc::now(),
250            hot_streak: 1,
251            pinned: request.pin,
252            source_agent: Some("web-ui".to_string()),
253        },
254        config.cognitive_system.hot_cache_max_entries,
255    );
256
257    if !inserted {
258        return Ok(Json(AgentBoostResponse {
259            success: false,
260            error: Some("Cache at capacity with all entries pinned".to_string()),
261        }));
262    }
263
264    cache
265        .save(&nexus_dir)
266        .map_err(|e| WebError::Storage(e.to_string()))?;
267
268    Ok(Json(AgentBoostResponse {
269        success: true,
270        error: None,
271    }))
272}
273
274/// GET /api/agent/status — Get agent status
275pub async fn agent_status(
276    State(state): State<Arc<RwLock<AppState>>>,
277) -> Result<Json<AgentStatusResponse>> {
278    let state = state.read().await;
279
280    match &state.agent_supervisor {
281        Some(supervisor) => {
282            let status = supervisor.get_status().await;
283            Ok(Json(AgentStatusResponse {
284                enabled: status.enabled,
285                namespace: status.namespace,
286                inbox_dir: status.inbox_dir,
287                files_processed: status.files_processed,
288                memories_consolidated: status.memories_consolidated,
289                queries_answered: status.queries_answered,
290                last_scan: status.last_scan.map(|d| d.to_rfc3339()),
291                last_consolidation: status.last_consolidation.map(|d| d.to_rfc3339()),
292                errors: status.errors,
293                uptime_secs: state.uptime_seconds(),
294            }))
295        }
296        None => Ok(Json(AgentStatusResponse {
297            enabled: false,
298            namespace: String::new(),
299            inbox_dir: String::new(),
300            files_processed: 0,
301            memories_consolidated: 0,
302            queries_answered: 0,
303            last_scan: None,
304            last_consolidation: None,
305            errors: Vec::new(),
306            uptime_secs: state.uptime_seconds(),
307        })),
308    }
309}