1use std::sync::{Arc, RwLock, atomic::{AtomicUsize, AtomicBool, Ordering}};
7use dashmap::DashMap;
8use tracing::info;
9
10use crate::{
11 config::Config,
12 context_engine::ContextOrchestrator,
13 memory_db::MemoryDatabase,
14 cache_management::KVCacheManager,
15 worker_threads::LLMWorker,
16 model_management::ModelManager,
17 model_runtime::RuntimeManager,
18};
19use crate::engine_management::EngineManager;
20
21#[derive(Clone)]
24pub struct PreExtracted {
25 pub text: String,
26 pub extracted_at: std::time::Instant,
27}
28
29impl PreExtracted {
30 pub fn is_stale(&self, ttl_secs: u64) -> bool {
32 self.extracted_at.elapsed().as_secs() >= ttl_secs
33 }
34}
35
36pub struct SharedSystemState {
38 pub conversations: Arc<ConversationHierarchy>,
40
41 pub llm_runtime: Arc<RwLock<Option<LLMRuntime>>>,
43
44 pub cache_manager: Arc<RwLock<Option<Arc<KVCacheManager>>>>,
46
47 pub database_pool: Arc<MemoryDatabase>,
49
50 pub config: Arc<Config>,
52
53 pub counters: Arc<AtomicCounters>,
55
56 pub context_orchestrator: Arc<tokio::sync::RwLock<Option<ContextOrchestrator>>>,
58
59 pub llm_worker: Arc<LLMWorker>,
61
62 pub model_manager: Option<Arc<ModelManager>>,
64
65 pub runtime_manager: Arc<std::sync::RwLock<Option<Arc<RuntimeManager>>>>,
67
68 pub engine_manager: Option<Arc<EngineManager>>,
70
71 pub engine_available: Arc<AtomicBool>,
73
74 pub initialization_complete: Arc<AtomicBool>,
76
77 pub http_port: Arc<RwLock<u16>>,
79
80 pub attachment_cache: Arc<DashMap<String, PreExtracted>>,
85
86 pub extraction_semaphore: Arc<tokio::sync::Semaphore>,
89}
90
91pub struct ConversationHierarchy {
93 pub sessions: DashMap<String, Arc<RwLock<SessionData>>>,
95
96 pub message_queues: DashMap<String, Arc<crossbeam_queue::ArrayQueue<PendingMessage>>>,
98
99 pub counters: Arc<AtomicCounters>,
101}
102
103#[derive(Debug, Clone)]
105pub struct SessionData {
106 pub session_id: String,
107 pub messages: Vec<crate::memory::Message>,
108 pub last_accessed: std::time::Instant,
109 pub pinned: bool,
110}
111
112#[derive(Debug, Clone)]
114pub struct PendingMessage {
115 pub message: crate::memory::Message,
116 pub timestamp: std::time::Instant,
117}
118
119pub struct AtomicCounters {
121 pub total_requests: AtomicUsize,
122 pub active_sessions: AtomicUsize,
123 pub processed_messages: AtomicUsize,
124 pub cache_hits: AtomicUsize,
125 pub cache_misses: AtomicUsize,
126}
127
128impl AtomicCounters {
129 pub fn new() -> Self {
130 Self {
131 total_requests: AtomicUsize::new(0),
132 active_sessions: AtomicUsize::new(0),
133 processed_messages: AtomicUsize::new(0),
134 cache_hits: AtomicUsize::new(0),
135 cache_misses: AtomicUsize::new(0),
136 }
137 }
138
139 pub fn inc_total_requests(&self) -> usize {
140 self.total_requests.fetch_add(1, Ordering::Relaxed) + 1
141 }
142
143 pub fn inc_processed_messages(&self) -> usize {
144 self.processed_messages.fetch_add(1, Ordering::Relaxed) + 1
145 }
146
147 pub fn inc_cache_hit(&self) -> usize {
148 self.cache_hits.fetch_add(1, Ordering::Relaxed) + 1
149 }
150
151 pub fn inc_cache_miss(&self) -> usize {
152 self.cache_misses.fetch_add(1, Ordering::Relaxed) + 1
153 }
154}
155
156pub struct LLMRuntime {
158 pub model_path: String,
159 pub context_size: u32,
160 pub batch_size: u32,
161 pub threads: u32,
162 pub gpu_layers: u32,
163 }
167
168impl SharedSystemState {
169 pub fn new(config: Config, database: Arc<MemoryDatabase>) -> anyhow::Result<Self> {
170 info!("Initializing shared system state");
171
172 let conversations = Arc::new(ConversationHierarchy {
173 sessions: DashMap::new(),
174 message_queues: DashMap::new(),
175 counters: Arc::new(AtomicCounters::new()),
176 });
177
178 let api_port = config.api_port;
180 let backend_url = config.backend_url.clone();
181
182 let config = Arc::new(config);
183 let counters = Arc::new(AtomicCounters::new());
184
185 let llm_worker = Arc::new(LLMWorker::new_with_backend(backend_url));
187
188 let max_concurrent = (num_cpus::get() / 2).max(1).min(8);
191 info!("Attachment extraction semaphore: {} concurrent slots (num_cpus={})", max_concurrent, num_cpus::get());
192
193 Ok(Self {
194 conversations,
195 llm_runtime: Arc::new(RwLock::new(None)),
196 cache_manager: Arc::new(RwLock::new(None)),
197 database_pool: database,
198 config,
199 counters,
200 context_orchestrator: Arc::new(tokio::sync::RwLock::new(None)),
201 llm_worker,
202 model_manager: None,
203 runtime_manager: Arc::new(std::sync::RwLock::new(None)),
204 engine_manager: None,
205 engine_available: Arc::new(AtomicBool::new(false)),
206 initialization_complete: Arc::new(AtomicBool::new(false)),
207 http_port: Arc::new(RwLock::new(api_port)),
208 attachment_cache: Arc::new(DashMap::new()),
209 extraction_semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent)),
210 })
211 }
212
213 pub fn mark_initialization_complete(&self) {
215 self.initialization_complete.store(true, Ordering::SeqCst);
216 info!("✅ Backend initialization marked as complete");
217 }
218
219 pub fn is_initialization_complete(&self) -> bool {
221 self.initialization_complete.load(Ordering::SeqCst)
222 }
223
224 pub fn set_llm_worker(&self, _worker: Arc<LLMWorker>) {
226 info!("LLM worker already initialized with backend URL");
230 }
231
232 pub fn set_runtime_manager(&self, runtime_manager: Arc<RuntimeManager>) -> anyhow::Result<()> {
234 let mut guard = self.runtime_manager
236 .write()
237 .map_err(|e| anyhow::anyhow!("Failed to acquire runtime manager write lock: {}", e))?;
238 *guard = Some(runtime_manager);
239 Ok(())
240 }
241
242 pub fn initialize_llm_runtime(&self) -> anyhow::Result<()> {
244 let mut runtime_guard = self.llm_runtime.try_write()
245 .map_err(|_| anyhow::anyhow!("Failed to acquire LLM runtime write lock"))?;
246
247 let runtime = LLMRuntime {
248 model_path: self.config.model_path.clone(),
249 context_size: self.config.ctx_size,
250 batch_size: self.config.batch_size,
251 threads: self.config.threads,
252 gpu_layers: self.config.gpu_layers,
253 };
254
255 *runtime_guard = Some(runtime);
256 info!("LLM runtime initialized");
257 Ok(())
258 }
259
260 pub async fn get_or_create_session(&self, session_id: &str) -> Arc<RwLock<SessionData>> {
262 if let Some(session) = self.conversations.sessions.get(session_id) {
264 return session.clone();
265 }
266
267 let new_session = Arc::new(RwLock::new(SessionData {
269 session_id: session_id.to_string(),
270 messages: Vec::new(),
271 last_accessed: std::time::Instant::now(),
272 pinned: false,
273 }));
274
275 self.conversations.sessions.insert(session_id.to_string(), new_session.clone());
276 self.counters.active_sessions.fetch_add(1, Ordering::Relaxed);
277
278 new_session
279 }
280
281 pub fn queue_message(&self, session_id: &str, message: crate::memory::Message) -> bool {
283 let queue = self.conversations.message_queues
284 .entry(session_id.to_string())
285 .or_insert_with(|| Arc::new(crossbeam_queue::ArrayQueue::new(1000)));
286
287 queue.push(PendingMessage {
288 message,
289 timestamp: std::time::Instant::now(),
290 }).is_ok()
291 }
292
293 pub async fn process_queued_messages(&self, session_id: &str) -> Vec<PendingMessage> {
295 let mut messages = Vec::new();
296
297 if let Some(queue) = self.conversations.message_queues.get(session_id) {
298 while let Some(msg) = queue.pop() {
299 messages.push(msg);
300 }
301 }
302
303 messages
304 }
305}
306
307impl ConversationHierarchy {
308 pub fn new() -> Self {
309 Self {
310 sessions: DashMap::new(),
311 message_queues: DashMap::new(),
312 counters: Arc::new(AtomicCounters::new()),
313 }
314 }
315}
316
317#[derive(Clone)]
321pub struct UnifiedAppState {
322 pub shared_state: Arc<SharedSystemState>,
323 pub context_orchestrator: Arc<tokio::sync::RwLock<Option<ContextOrchestrator>>>,
324 pub llm_worker: Arc<LLMWorker>,
325 pub auth_state: Option<Arc<crate::api::auth_api::AuthState>>,
326 pub http_client: reqwest::Client,
329}
330
331impl UnifiedAppState {
332 pub fn new(shared_state: Arc<SharedSystemState>) -> Self {
333 let context_orchestrator = shared_state.context_orchestrator.clone();
334 let llm_worker = shared_state.llm_worker.clone();
335 let http_client = reqwest::Client::builder()
338 .timeout(std::time::Duration::from_secs(300))
339 .build()
340 .unwrap_or_else(|_| reqwest::Client::new());
341 Self {
342 shared_state,
343 context_orchestrator,
344 llm_worker,
345 auth_state: None,
346 http_client,
347 }
348 }
349
350 pub async fn get_openrouter_api_key(&self) -> Option<String> {
358 if let Ok(Some(key)) = self.shared_state.database_pool.api_keys.get_key_plaintext(&crate::memory_db::ApiKeyType::OpenRouter) {
360 if !key.is_empty() {
361 info!("Using OpenRouter API key from database");
362 return Some(key);
363 }
364 }
365
366 if let Ok(key) = std::env::var("OPENROUTER_API_KEY") {
368 if !key.is_empty() {
369 info!("Using OpenRouter API key from environment variable");
370 return Some(key);
371 }
372 }
373
374 let config = &self.shared_state.config;
376 if !config.openrouter_api_key.is_empty() {
377 info!("Using OpenRouter API key from config");
378 return Some(config.openrouter_api_key.clone());
379 }
380
381 None
382 }
383
384 pub async fn get_huggingface_token(&self) -> Option<String> {
389 if let Ok(Some(token)) = self.shared_state.database_pool.api_keys.get_key_plaintext(&crate::memory_db::ApiKeyType::HuggingFace) {
391 if !token.is_empty() {
392 info!("Using HuggingFace token from database");
393 return Some(token);
394 }
395 }
396
397 if let Ok(token) = std::env::var("HUGGINGFACE_TOKEN").or_else(|_| std::env::var("HF_TOKEN")) {
400 if !token.is_empty() {
401 info!("Using HuggingFace token from environment variable");
402 return Some(token);
403 }
404 }
405
406 None
407 }
408}
409
410pub use self::SharedSystemState as SharedState;