1use axum::{extract::State, Json};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use tracing::error;
7
8use crate::error::{Result, WebError};
9use crate::models::{
10 AgentConsolidateResponse, AgentIngestRequest, AgentIngestResponse, AgentQueryRequest,
11 AgentQueryResponse, AgentStatusResponse,
12};
13use crate::state::AppState;
14
15pub async fn agent_ingest(
17 State(state): State<Arc<RwLock<AppState>>>,
18 Json(request): Json<AgentIngestRequest>,
19) -> Result<Json<AgentIngestResponse>> {
20 let state = state.read().await;
21
22 let supervisor = state
23 .agent_supervisor
24 .as_ref()
25 .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
26
27 if request.text.trim().is_empty() {
28 return Err(WebError::InvalidRequest("Text cannot be empty".to_string()));
29 }
30
31 let ingest_svc = supervisor.ingest_service();
32 let namespace_id = supervisor.namespace_id();
33
34 let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
35
36 match ingest_svc
37 .ingest(&request.text, &request.source, namespace_id, &memory_repo)
38 .await
39 {
40 Ok(memory_id) => Ok(Json(AgentIngestResponse {
41 success: true,
42 memory_id: Some(memory_id),
43 summary: None,
44 error: None,
45 })),
46 Err(e) => {
47 error!(error = %e, "Agent ingest failed");
48 Ok(Json(AgentIngestResponse {
49 success: false,
50 memory_id: None,
51 summary: None,
52 error: Some(e.to_string()),
53 }))
54 }
55 }
56}
57
58pub async fn agent_query(
60 State(state): State<Arc<RwLock<AppState>>>,
61 Json(request): Json<AgentQueryRequest>,
62) -> Result<Json<AgentQueryResponse>> {
63 let state = state.read().await;
64
65 let supervisor = state
66 .agent_supervisor
67 .as_ref()
68 .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
69
70 if request.question.trim().is_empty() {
71 return Err(WebError::InvalidRequest(
72 "Question cannot be empty".to_string(),
73 ));
74 }
75
76 let query_svc = supervisor.query_service();
77 let namespace_id = supervisor.namespace_id();
78
79 let memory_repo = nexus_storage::MemoryRepository::new(state.pool().clone());
80 let relation_repo = nexus_storage::MemoryRelationRepository::new(state.pool());
81
82 match query_svc
83 .query(
84 &request.question,
85 namespace_id,
86 &memory_repo,
87 &relation_repo,
88 )
89 .await
90 {
91 Ok(answer) => {
92 supervisor.increment_queries_answered().await;
93 Ok(Json(AgentQueryResponse {
94 success: true,
95 question: request.question,
96 answer: Some(answer.answer),
97 error: None,
98 }))
99 }
100 Err(e) => {
101 error!(error = %e, "Agent query failed");
102 Ok(Json(AgentQueryResponse {
103 success: false,
104 question: request.question,
105 answer: None,
106 error: Some(e.to_string()),
107 }))
108 }
109 }
110}
111
112pub async fn agent_consolidate(
114 State(state): State<Arc<RwLock<AppState>>>,
115) -> Result<Json<AgentConsolidateResponse>> {
116 let state = state.read().await;
117
118 let supervisor = state
119 .agent_supervisor
120 .as_ref()
121 .ok_or_else(|| WebError::InvalidRequest("Agent is not enabled".to_string()))?;
122
123 let namespace_id = supervisor.namespace_id();
124 let config = nexus_core::Config::from_env().map_err(|e| WebError::Config(e.to_string()))?;
125
126 let lease_owner = format!("web-agent-consolidate-{}", namespace_id);
127 let embeddings = nexus_agent::create_embedding_service(&config).await;
128 match nexus_agent::runtime::run_dream_cycle(
129 state.pool().clone(),
130 &config.cognition,
131 &nexus_core::config::AgentConfig {
132 namespace: supervisor.get_status().await.namespace,
133 ..Default::default()
134 },
135 nexus_llm::create_client_auto_with_fallback()
136 .map_err(|e| WebError::Config(format!("Failed to create LLM client: {}", e)))?,
137 embeddings,
138 nexus_agent::runtime::DreamCycleRequest {
139 namespace_id,
140 lease_owner: &lease_owner,
141 perspective: None,
142 session_key: None,
143 reflect_reason: "web_manual_dream",
144 digest_reason: "web_manual_digest",
145 },
146 )
147 .await
148 {
149 Ok(processed) => Ok(Json(AgentConsolidateResponse {
150 success: true,
151 memories_processed: processed,
152 error: None,
153 })),
154 Err(e) => {
155 error!(error = %e, "Agent consolidation failed");
156 Ok(Json(AgentConsolidateResponse {
157 success: false,
158 memories_processed: 0,
159 error: Some(format!("{}", e)),
160 }))
161 }
162 }
163}
164
165pub async fn agent_status(
167 State(state): State<Arc<RwLock<AppState>>>,
168) -> Result<Json<AgentStatusResponse>> {
169 let state = state.read().await;
170
171 match &state.agent_supervisor {
172 Some(supervisor) => {
173 let status = supervisor.get_status().await;
174 Ok(Json(AgentStatusResponse {
175 enabled: status.enabled,
176 namespace: status.namespace,
177 inbox_dir: status.inbox_dir,
178 files_processed: status.files_processed,
179 memories_consolidated: status.memories_consolidated,
180 queries_answered: status.queries_answered,
181 last_scan: status.last_scan.map(|d| d.to_rfc3339()),
182 last_consolidation: status.last_consolidation.map(|d| d.to_rfc3339()),
183 errors: status.errors,
184 uptime_secs: state.uptime_seconds(),
185 }))
186 }
187 None => Ok(Json(AgentStatusResponse {
188 enabled: false,
189 namespace: String::new(),
190 inbox_dir: String::new(),
191 files_processed: 0,
192 memories_consolidated: 0,
193 queries_answered: 0,
194 last_scan: None,
195 last_consolidation: None,
196 errors: Vec::new(),
197 uptime_secs: state.uptime_seconds(),
198 })),
199 }
200}