use crate::error::Result;
use nexus_orchestrator::{Event, EventType, Orchestrator};
use nexus_storage::{MemoryRepository, NamespaceRepository, StorageManager};
use sqlx::SqlitePool;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::error;
pub struct AppState {
pub storage: StorageManager,
pub orchestrator: Orchestrator,
pub memory_repo: MemoryRepository,
pub namespace_repo: NamespaceRepository,
pub ws_sender: broadcast::Sender<crate::models::WebSocketMessage>,
pub start_time: std::time::Instant,
}
impl AppState {
pub async fn new(storage: StorageManager, orchestrator: Orchestrator) -> Result<Self> {
let pool = storage.pool().clone();
let memory_repo = MemoryRepository::new(pool.clone());
let namespace_repo = NamespaceRepository::new(pool.clone());
let (ws_sender, _) = broadcast::channel(1000);
let state = Self {
storage,
orchestrator,
memory_repo,
namespace_repo,
ws_sender,
start_time: std::time::Instant::now(),
};
state.start_event_forwarding().await?;
Ok(state)
}
async fn start_event_forwarding(&self) -> Result<()> {
let mut rx = self.orchestrator.subscribe_events();
let ws_sender = self.ws_sender.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
if let Some(msg) = Self::convert_event_to_ws_message(&event) {
let _ = ws_sender.send(msg);
}
}
Err(e) => {
error!("Event receive error: {}", e);
break;
}
}
}
});
Ok(())
}
fn convert_event_to_ws_message(event: &Event) -> Option<crate::models::WebSocketMessage> {
use crate::models::{WebSocketMessage, WebSocketMessageType};
match event.event_type {
EventType::MemoryStored => {
let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
let agent_type = event.get::<String>("agent_type").unwrap_or_default();
let data = serde_json::json!({
"memory_id": memory_id,
"agent_type": agent_type,
});
Some(WebSocketMessage::new(
WebSocketMessageType::MemoryStored,
data,
))
}
EventType::MemoryUpdated => {
let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
Some(WebSocketMessage::memory_updated(memory_id))
}
EventType::MemoryDeleted => {
let memory_id = event.get::<i64>("memory_id").unwrap_or(0);
Some(WebSocketMessage::memory_deleted(memory_id))
}
EventType::SessionStarted => {
let session_id = event.get::<String>("session_id").unwrap_or_default();
let data = serde_json::json!({
"session_id": session_id,
});
Some(WebSocketMessage::new(
WebSocketMessageType::SessionStarted,
data,
))
}
EventType::SessionEnded => {
let session_id = event.get::<String>("session_id").unwrap_or_default();
let data = serde_json::json!({
"session_id": session_id,
});
Some(WebSocketMessage::new(
WebSocketMessageType::SessionEnded,
data,
))
}
_ => None,
}
}
pub fn pool(&self) -> &SqlitePool {
self.storage.pool()
}
pub fn uptime_seconds(&self) -> u64 {
self.start_time.elapsed().as_secs()
}
pub fn subscribe_ws(&self) -> broadcast::Receiver<crate::models::WebSocketMessage> {
self.ws_sender.subscribe()
}
pub fn broadcast_ws(&self, msg: crate::models::WebSocketMessage) -> Result<()> {
let _ = self.ws_sender.send(msg);
Ok(())
}
}
pub type SharedState = Arc<RwLock<AppState>>;