use crate::error::Result;
use crate::WebError;
use nexus_agent::AgentSupervisor;
use nexus_core::traits::EmbeddingService;
use nexus_orchestrator::{Event, EventType, Orchestrator};
use nexus_storage::{MemoryRepository, NamespaceRepository, StorageManager};
use sqlx::SqlitePool;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{error, info};
pub struct AppState {
pub storage: StorageManager,
pub orchestrator: Orchestrator,
pub memory_repo: MemoryRepository,
pub namespace_repo: NamespaceRepository,
pub ws_sender: broadcast::Sender<crate::models::WebSocketMessage>,
pub start_time: std::time::Instant,
pub agent_supervisor: Option<AgentSupervisor>,
}
impl AppState {
pub async fn new(storage: StorageManager, orchestrator: Orchestrator) -> Result<Self> {
let pool = storage.pool().clone();
let memory_repo = MemoryRepository::new(pool.clone());
let namespace_repo = NamespaceRepository::new(pool.clone());
let (ws_sender, _) = broadcast::channel(1000);
let agent_supervisor = match Self::create_agent_supervisor(&pool, &namespace_repo).await {
Ok(Some(supervisor)) => {
info!("Agent supervisor initialized");
Some(supervisor)
}
Ok(None) => None,
Err(e) => {
error!("Failed to initialize agent supervisor: {}", e);
None
}
};
let state = Self {
storage,
orchestrator,
memory_repo,
namespace_repo,
ws_sender,
start_time: std::time::Instant::now(),
agent_supervisor,
};
state.start_event_forwarding().await?;
Ok(state)
}
async fn start_event_forwarding(&self) -> Result<()> {
let mut rx = self.orchestrator.subscribe_events();
let ws_sender = self.ws_sender.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
if let Some(msg) = Self::convert_event_to_ws_message(&event) {
let _ = ws_sender.send(msg);
}
}
Err(e) => {
error!("Event receive error: {}", e);
break;
}
}
}
});
Ok(())
}
fn convert_event_to_ws_message(event: &Event) -> Option<crate::models::WebSocketMessage> {
use crate::models::{WebSocketMessage, WebSocketMessageType};
match event.event_type {
EventType::MemoryStored => {
let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
let agent_type = event.get::<String>("agent_type").unwrap_or_default();
let data = serde_json::json!({
"memory_id": memory_id,
"agent_type": agent_type,
});
Some(WebSocketMessage::new(
WebSocketMessageType::MemoryStored,
data,
))
}
EventType::MemoryUpdated => {
let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
Some(WebSocketMessage::memory_updated(memory_id))
}
EventType::MemoryDeleted => {
let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
Some(WebSocketMessage::memory_deleted(memory_id))
}
EventType::SessionStarted => {
let session_id = event.get::<String>("session_id").unwrap_or_default();
let data = serde_json::json!({
"session_id": session_id,
});
Some(WebSocketMessage::new(
WebSocketMessageType::SessionStarted,
data,
))
}
EventType::SessionEnded => {
let session_id = event.get::<String>("session_id").unwrap_or_default();
let data = serde_json::json!({
"session_id": session_id,
});
Some(WebSocketMessage::new(
WebSocketMessageType::SessionEnded,
data,
))
}
EventType::CognitiveDrift => {
let similarity = event.get::<f32>("similarity").unwrap_or(0.0);
let agent_type = event.get::<String>("agent_type").unwrap_or_default();
Some(WebSocketMessage::cognitive_drift(similarity, &agent_type))
}
EventType::DreamCompleted => {
let agent_type = event.get::<String>("agent_type").unwrap_or_default();
let processed = event.get::<usize>("processed").unwrap_or(0);
Some(WebSocketMessage::dream_completed(&agent_type, processed))
}
EventType::MorningRecall => {
let namespace = event.get::<String>("namespace").unwrap_or_default();
let count = event.get::<usize>("count").unwrap_or(0);
Some(WebSocketMessage::morning_recall(&namespace, count))
}
_ => None,
}
}
pub fn pool(&self) -> &SqlitePool {
self.storage.pool()
}
async fn create_agent_supervisor(
pool: &SqlitePool,
namespace_repo: &NamespaceRepository,
) -> Result<Option<AgentSupervisor>> {
let config = nexus_core::Config::from_env().map_err(|e| WebError::Config(e.to_string()))?;
if !config.agent.enabled {
return Ok(None);
}
let llm = nexus_llm::create_client_auto_with_fallback()
.map_err(|e| WebError::Config(format!("Failed to create LLM client: {}", e)))?;
let query_embedder: Option<Arc<dyn EmbeddingService>> =
match nexus_embeddings::create_service(&config).await {
Ok(service) => service,
Err(error) => {
error!("Failed to initialize query embedding service: {}", error);
None
}
};
let namespace = namespace_repo
.get_or_create(&config.agent.namespace, "nexus-agent")
.await
.map_err(|e| WebError::Storage(e.to_string()))?;
let project_root =
nexus_core::ProjectIdentity::resolve(&std::env::current_dir().unwrap_or_default())
.root_dir;
let mut supervisor =
AgentSupervisor::new(config.agent, llm, pool.clone(), namespace.id, project_root);
if let Some(embedder) = query_embedder {
supervisor = supervisor.with_query_embedder(embedder);
}
supervisor
.start()
.await
.map_err(|e| WebError::Config(format!("Failed to start agent supervisor: {}", e)))?;
Ok(Some(supervisor))
}
pub fn uptime_seconds(&self) -> u64 {
self.start_time.elapsed().as_secs()
}
pub fn subscribe_ws(&self) -> broadcast::Receiver<crate::models::WebSocketMessage> {
self.ws_sender.subscribe()
}
pub fn broadcast_ws(&self, msg: crate::models::WebSocketMessage) -> Result<()> {
let _ = self.ws_sender.send(msg);
Ok(())
}
}
pub type SharedState = Arc<RwLock<AppState>>;