nexus_memory_web/
state.rs1use crate::error::Result;
4use crate::WebError;
5use nexus_agent::AgentSupervisor;
6use nexus_core::traits::EmbeddingService;
7use nexus_orchestrator::{Event, EventType, Orchestrator};
8use nexus_storage::{MemoryRepository, NamespaceRepository, StorageManager};
9use sqlx::SqlitePool;
10use std::sync::Arc;
11use tokio::sync::{broadcast, RwLock};
12use tracing::{error, info};
13
14pub struct AppState {
16 pub storage: StorageManager,
18 pub orchestrator: Orchestrator,
20 pub memory_repo: MemoryRepository,
22 pub namespace_repo: NamespaceRepository,
24 pub ws_sender: broadcast::Sender<crate::models::WebSocketMessage>,
26 pub start_time: std::time::Instant,
28 pub agent_supervisor: Option<AgentSupervisor>,
30}
31
32impl AppState {
33 pub async fn new(storage: StorageManager, orchestrator: Orchestrator) -> Result<Self> {
35 let pool = storage.pool().clone();
36 let memory_repo = MemoryRepository::new(pool.clone());
37 let namespace_repo = NamespaceRepository::new(pool.clone());
38
39 let (ws_sender, _) = broadcast::channel(1000);
41
42 let agent_supervisor = match Self::create_agent_supervisor(&pool, &namespace_repo).await {
44 Ok(Some(supervisor)) => {
45 info!("Agent supervisor initialized");
46 Some(supervisor)
47 }
48 Ok(None) => None,
49 Err(e) => {
50 error!("Failed to initialize agent supervisor: {}", e);
51 None
52 }
53 };
54
55 let state = Self {
56 storage,
57 orchestrator,
58 memory_repo,
59 namespace_repo,
60 ws_sender,
61 start_time: std::time::Instant::now(),
62 agent_supervisor,
63 };
64
65 state.start_event_forwarding().await?;
67
68 Ok(state)
69 }
70
71 async fn start_event_forwarding(&self) -> Result<()> {
73 let mut rx = self.orchestrator.subscribe_events();
74 let ws_sender = self.ws_sender.clone();
75
76 tokio::spawn(async move {
77 loop {
78 match rx.recv().await {
79 Ok(event) => {
80 if let Some(msg) = Self::convert_event_to_ws_message(&event) {
81 let _ = ws_sender.send(msg);
82 }
83 }
84 Err(e) => {
85 error!("Event receive error: {}", e);
86 break;
87 }
88 }
89 }
90 });
91
92 Ok(())
93 }
94
95 fn convert_event_to_ws_message(event: &Event) -> Option<crate::models::WebSocketMessage> {
97 use crate::models::{WebSocketMessage, WebSocketMessageType};
98
99 match event.event_type {
100 EventType::MemoryStored => {
101 let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
102 let agent_type = event.get::<String>("agent_type").unwrap_or_default();
103 let data = serde_json::json!({
106 "memory_id": memory_id,
107 "agent_type": agent_type,
108 });
109 Some(WebSocketMessage::new(
110 WebSocketMessageType::MemoryStored,
111 data,
112 ))
113 }
114 EventType::MemoryUpdated => {
115 let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
116 Some(WebSocketMessage::memory_updated(memory_id))
117 }
118 EventType::MemoryDeleted => {
119 let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
120 Some(WebSocketMessage::memory_deleted(memory_id))
121 }
122 EventType::SessionStarted => {
123 let session_id = event.get::<String>("session_id").unwrap_or_default();
124 let data = serde_json::json!({
125 "session_id": session_id,
126 });
127 Some(WebSocketMessage::new(
128 WebSocketMessageType::SessionStarted,
129 data,
130 ))
131 }
132 EventType::SessionEnded => {
133 let session_id = event.get::<String>("session_id").unwrap_or_default();
134 let data = serde_json::json!({
135 "session_id": session_id,
136 });
137 Some(WebSocketMessage::new(
138 WebSocketMessageType::SessionEnded,
139 data,
140 ))
141 }
142 EventType::CognitiveDrift => {
143 let similarity = event.get::<f32>("similarity").unwrap_or(0.0);
144 let agent_type = event.get::<String>("agent_type").unwrap_or_default();
145 Some(WebSocketMessage::cognitive_drift(similarity, &agent_type))
146 }
147 EventType::DreamCompleted => {
148 let agent_type = event.get::<String>("agent_type").unwrap_or_default();
149 let processed = event.get::<usize>("processed").unwrap_or(0);
150 Some(WebSocketMessage::dream_completed(&agent_type, processed))
151 }
152 EventType::MorningRecall => {
153 let namespace = event.get::<String>("namespace").unwrap_or_default();
154 let count = event.get::<usize>("count").unwrap_or(0);
155 Some(WebSocketMessage::morning_recall(&namespace, count))
156 }
157 _ => None,
158 }
159 }
160
161 pub fn pool(&self) -> &SqlitePool {
163 self.storage.pool()
164 }
165
166 async fn create_agent_supervisor(
168 pool: &SqlitePool,
169 namespace_repo: &NamespaceRepository,
170 ) -> Result<Option<AgentSupervisor>> {
171 let config = nexus_core::Config::from_env().map_err(|e| WebError::Config(e.to_string()))?;
172
173 if !config.agent.enabled {
174 return Ok(None);
175 }
176
177 let llm = nexus_llm::create_client_auto_with_fallback()
178 .map_err(|e| WebError::Config(format!("Failed to create LLM client: {}", e)))?;
179
180 let query_embedder: Option<Arc<dyn EmbeddingService>> =
181 match nexus_embeddings::create_service(&config).await {
182 Ok(service) => service,
183 Err(error) => {
184 error!("Failed to initialize query embedding service: {}", error);
185 None
186 }
187 };
188
189 let namespace = namespace_repo
191 .get_or_create(&config.agent.namespace, "nexus-agent")
192 .await
193 .map_err(|e| WebError::Storage(e.to_string()))?;
194
195 let project_root =
196 nexus_core::ProjectIdentity::resolve(&std::env::current_dir().unwrap_or_default())
197 .root_dir;
198 let mut supervisor =
199 AgentSupervisor::new(config.agent, llm, pool.clone(), namespace.id, project_root);
200 if let Some(embedder) = query_embedder {
201 supervisor = supervisor.with_query_embedder(embedder);
202 }
203 supervisor
204 .start()
205 .await
206 .map_err(|e| WebError::Config(format!("Failed to start agent supervisor: {}", e)))?;
207
208 Ok(Some(supervisor))
209 }
210
211 pub fn uptime_seconds(&self) -> u64 {
213 self.start_time.elapsed().as_secs()
214 }
215
216 pub fn subscribe_ws(&self) -> broadcast::Receiver<crate::models::WebSocketMessage> {
218 self.ws_sender.subscribe()
219 }
220
221 pub fn broadcast_ws(&self, msg: crate::models::WebSocketMessage) -> Result<()> {
223 let _ = self.ws_sender.send(msg);
224 Ok(())
225 }
226}
227
228pub type SharedState = Arc<RwLock<AppState>>;