Skip to main content

nexus_memory_web/api/
agent.rs

1//! Agent API endpoints for the always-on memory agent
2
3use axum::{extract::State, Json};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use tracing::error;
7
8use crate::error::{Result, WebError};
9use crate::models::{
10    AgentBoostRequest, AgentBoostResponse, AgentConsolidateResponse, AgentIngestRequest,
11    AgentIngestResponse, AgentQueryRequest, AgentQueryResponse, AgentStatusResponse,
12};
13use crate::state::AppState;
14
15/// POST /api/agent/ingest — Ingest text with LLM enrichment
16pub async fn agent_ingest(
17    State(state): State<Arc<RwLock<AppState>>>,
18    Json(request): Json<AgentIngestRequest>,
19) -> Result<Json<AgentIngestResponse>> {
20    let state = state.read().await;
21
22    let supervisor = state
23        .agent_supervisor
24        .as_ref()
25        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
26
27    if request.text.trim().is_empty() {
28        return Err(WebError::InvalidRequest("Text cannot be empty".to_string()));
29    }
30
31    let ingest_svc = supervisor.ingest_service();
32    let namespace_id = supervisor.namespace_id();
33
34    let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
35
36    match ingest_svc
37        .ingest(&request.text, &request.source, namespace_id, &memory_repo)
38        .await
39    {
40        Ok(memory_id) => Ok(Json(AgentIngestResponse {
41            success: true,
42            memory_id: Some(memory_id),
43            summary: None,
44            error: None,
45        })),
46        Err(e) => {
47            error!(error = %e, "Agent ingest failed");
48            Ok(Json(AgentIngestResponse {
49                success: false,
50                memory_id: None,
51                summary: None,
52                error: Some(e.to_string()),
53            }))
54        }
55    }
56}
57
58/// POST /api/agent/query — Query memory with LLM synthesis
59pub async fn agent_query(
60    State(state): State<Arc<RwLock<AppState>>>,
61    Json(request): Json<AgentQueryRequest>,
62) -> Result<Json<AgentQueryResponse>> {
63    let state = state.read().await;
64
65    let supervisor = state
66        .agent_supervisor
67        .as_ref()
68        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
69
70    if request.question.trim().is_empty() {
71        return Err(WebError::InvalidRequest(
72            "Question cannot be empty".to_string(),
73        ));
74    }
75
76    let query_svc = supervisor.query_service();
77    let namespace_id = supervisor.namespace_id();
78
79    let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
80    let relation_repo = nexus_storage::MemoryRelationRepository::new(state.pool());
81
82    match query_svc
83        .query(
84            &request.question,
85            namespace_id,
86            &memory_repo,
87            &relation_repo,
88        )
89        .await
90    {
91        Ok(answer) => {
92            supervisor.increment_queries_answered().await;
93            Ok(Json(AgentQueryResponse {
94                success: true,
95                question: request.question,
96                answer: Some(answer.answer),
97                error: None,
98            }))
99        }
100        Err(e) => {
101            error!(error = %e, "Agent query failed");
102            Ok(Json(AgentQueryResponse {
103                success: false,
104                question: request.question,
105                answer: None,
106                error: Some(e.to_string()),
107            }))
108        }
109    }
110}
111
112/// POST /api/agent/consolidate — Trigger manual consolidation
113pub async fn agent_consolidate(
114    State(state): State<Arc<RwLock<AppState>>>,
115) -> Result<Json<AgentConsolidateResponse>> {
116    let state = state.read().await;
117
118    let supervisor = state
119        .agent_supervisor
120        .as_ref()
121        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
122
123    let namespace_id = supervisor.namespace_id();
124    let config = nexus_core::Config::from_env().map_err(|e| WebError::Config(e.to_string()))?;
125
126    let lease_owner = format!("web-agent-consolidate-{}", namespace_id);
127    let embeddings = nexus_agent::create_embedding_service(&config).await;
128    match nexus_agent::run_dream_cycle(
129        state.pool().clone(),
130        &config.cognition,
131        &nexus_core::config::AgentConfig {
132            namespace: supervisor.get_status().await.namespace,
133            ..Default::default()
134        },
135        nexus_llm::create_client_auto_with_fallback()
136            .map_err(|e| WebError::Config(format!("Failed to create LLM client: {}", e)))?,
137        embeddings,
138        nexus_agent::DreamCycleRequest {
139            namespace_id,
140            lease_owner: &lease_owner,
141            perspective: None,
142            session_key: None,
143            reflect_reason: "web_manual_dream",
144            digest_reason: "web_manual_digest",
145        },
146    )
147    .await
148    {
149        Ok(processed) => Ok(Json(AgentConsolidateResponse {
150            success: true,
151            memories_processed: processed,
152            error: None,
153        })),
154        Err(e) => {
155            error!(error = %e, "Agent consolidation failed");
156            Ok(Json(AgentConsolidateResponse {
157                success: false,
158                memories_processed: 0,
159                error: Some(format!("{}", e)),
160            }))
161        }
162    }
163}
164
165/// POST /api/agent/boost — Pin or boost a memory in cognitive cache
166pub async fn agent_boost(
167    State(state): State<Arc<RwLock<AppState>>>,
168    Json(request): Json<AgentBoostRequest>,
169) -> Result<Json<AgentBoostResponse>> {
170    let state = state.read().await;
171
172    let _supervisor = state
173        .agent_supervisor
174        .as_ref()
175        .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
176
177    let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
178    let memory = memory_repo
179        .get_by_id(request.memory_id)
180        .await
181        .map_err(|e| WebError::Storage(e.to_string()))?
182        .ok_or_else(|| WebError::NotFound(format!("Memory {} not found", request.memory_id)))?;
183
184    // Verify the memory belongs to the active namespace
185    if memory.namespace_id != _supervisor.namespace_id() {
186        return Err(WebError::InvalidRequest(
187            "Memory does not belong to the active namespace".to_string(),
188        ));
189    }
190
191    // Resolve project root for cache path — explicit root_dir required for web API
192    let cwd = request
193        .root_dir
194        .as_ref()
195        .map(std::path::PathBuf::from)
196        .ok_or_else(|| WebError::InvalidRequest("root_dir is required".to_string()))?;
197    // Canonicalize to resolve symlinks and reject nonexistent paths
198    let cwd = cwd
199        .canonicalize()
200        .map_err(|e| WebError::Config(format!("Invalid root_dir: {}", e)))?;
201
202    // Reject obviously unsafe paths (system pseudo-filesystems)
203    let path_str = cwd.to_string_lossy();
204    let is_pseudo_fs = ["/proc", "/sys", "/dev"]
205        .iter()
206        .any(|prefix| path_str == *prefix || path_str.starts_with(&format!("{}/", prefix)));
207    if is_pseudo_fs {
208        return Err(WebError::InvalidRequest(
209            "root_dir must not point to a system pseudo-filesystem".to_string(),
210        ));
211    }
212    let project_identity = nexus_core::ProjectIdentity::resolve(&cwd);
213    let nexus_dir = project_identity.root_dir.join(".nexus");
214
215    let mut cache = nexus_agent::CognitiveCache::load_or_init(&nexus_dir);
216
217    let config = nexus_core::Config::from_env().unwrap_or_default();
218
219    let relevance_score = request
220        .boost_score
221        .unwrap_or(memory.relevance_score.unwrap_or(0.85));
222    let tier = nexus_agent::ConfidenceTier::from_score(relevance_score);
223
224    let inserted = cache.hot_cache.promote(
225        nexus_agent::HotCacheEntry {
226            memory_id: memory.id,
227            content: memory.content,
228            relevance_score,
229            tier,
230            promoted_at: chrono::Utc::now(),
231            last_surfaced: chrono::Utc::now(),
232            hot_streak: 1,
233            pinned: request.pin,
234            source_agent: Some("web-ui".to_string()),
235        },
236        config.cognitive_system.hot_cache_max_entries,
237    );
238
239    if !inserted {
240        return Ok(Json(AgentBoostResponse {
241            success: false,
242            error: Some("Cache at capacity with all entries pinned".to_string()),
243        }));
244    }
245
246    cache
247        .save(&nexus_dir)
248        .map_err(|e| WebError::Storage(e.to_string()))?;
249
250    Ok(Json(AgentBoostResponse {
251        success: true,
252        error: None,
253    }))
254}
255
256/// GET /api/agent/status — Get agent status
257pub async fn agent_status(
258    State(state): State<Arc<RwLock<AppState>>>,
259) -> Result<Json<AgentStatusResponse>> {
260    let state = state.read().await;
261
262    match &state.agent_supervisor {
263        Some(supervisor) => {
264            let status = supervisor.get_status().await;
265            Ok(Json(AgentStatusResponse {
266                enabled: status.enabled,
267                namespace: status.namespace,
268                inbox_dir: status.inbox_dir,
269                files_processed: status.files_processed,
270                memories_consolidated: status.memories_consolidated,
271                queries_answered: status.queries_answered,
272                last_scan: status.last_scan.map(|d| d.to_rfc3339()),
273                last_consolidation: status.last_consolidation.map(|d| d.to_rfc3339()),
274                errors: status.errors,
275                uptime_secs: state.uptime_seconds(),
276            }))
277        }
278        None => Ok(Json(AgentStatusResponse {
279            enabled: false,
280            namespace: String::new(),
281            inbox_dir: String::new(),
282            files_processed: 0,
283            memories_consolidated: 0,
284            queries_answered: 0,
285            last_scan: None,
286            last_consolidation: None,
287            errors: Vec::new(),
288            uptime_secs: state.uptime_seconds(),
289        })),
290    }
291}