1use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{info, warn, debug, error};
11use anyhow::anyhow;
12
13use crate::{
14 config::Config,
15 shared_state::{SharedState, UnifiedAppState},
16 thread_pool::{ThreadPool, ThreadPoolConfig},
17 worker_threads::{ContextWorker, CacheWorker, DatabaseWorker, LLMWorker},
18 memory_db::MemoryDatabase,
19 model_management::ModelManager,
20};
21
22#[derive(Clone)]
24pub struct ThreadBasedAppState {
25 pub shared_state: Arc<SharedState>,
26 pub thread_pool: Arc<RwLock<Option<ThreadPool>>>,
27 pub context_worker: Arc<ContextWorker>,
28 pub cache_worker: Arc<CacheWorker>,
29 pub database_worker: Arc<DatabaseWorker>,
30 pub llm_worker: Arc<LLMWorker>,
31}
32
33pub async fn run_thread_server(cfg: Config, port_tx: Option<std::sync::mpsc::Sender<u16>>) -> anyhow::Result<()> {
40 crate::telemetry::init_tracing();
41 crate::metrics::init_metrics();
42 cfg.print_config();
43
44 info!("Starting thread-based server architecture");
45
46 let memory_db_path = dirs::data_dir()
52 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default())
53 .join("Aud.io")
54 .join("data")
55 .join("memory.db");
56
57 if let Some(parent) = memory_db_path.parent() {
58 if let Err(e) = std::fs::create_dir_all(parent) {
59 warn!("Failed to create data directory {:?}: {}", parent, e);
60 } else {
61 info!("Created data directory: {:?}", parent);
62 }
63 }
64
65 let memory_database = match MemoryDatabase::new(&memory_db_path) {
66 Ok(db) => {
67 info!("Memory database initialized at: {}", memory_db_path.display());
68 Arc::new(db)
69 }
70 Err(e) => {
71 error!(
74 "❌ CRITICAL: Failed to open SQLite database at {}: {}\n\
75 Falling back to IN-MEMORY storage — all user data will be lost on exit.\n\
76 Check that the directory is writable and no other process holds a lock on the file.",
77 memory_db_path.display(), e
78 );
79 Arc::new(MemoryDatabase::new_in_memory()?)
80 }
81 };
82
83 let mut shared_state = SharedState::new(cfg.clone(), memory_database.clone())?;
85
86 info!("📦 Initializing Model Manager");
88 match ModelManager::new() {
89 Ok(model_manager) => {
90 let model_manager_arc = Arc::new(model_manager);
91 if let Err(e) = model_manager_arc.initialize(&cfg).await {
92 warn!("⚠️ Model manager initialization failed: {}", e);
93 shared_state.model_manager = Some(model_manager_arc);
94 } else {
95 info!("✅ Model manager initialized successfully");
96 shared_state.model_manager = Some(model_manager_arc);
97 }
98 }
99 Err(e) => warn!("⚠️ Failed to create model manager: {}", e),
100 }
101
102 info!("⚙️ Initializing Engine Manager (blocking until ready)...");
105 match crate::engine_management::EngineManager::new() {
106 Ok(engine_manager) => {
107 let engine_manager_arc = Arc::new(engine_manager);
108 match engine_manager_arc.initialize(&cfg).await {
109 Ok(true) => {
110 info!("✅ Engine manager initialized with engine ready");
111 shared_state.engine_manager = Some(engine_manager_arc.clone());
112 shared_state.engine_available.store(true, std::sync::atomic::Ordering::Relaxed);
113 }
114 Ok(false) => {
115 info!("⏳ No engine found — starting in online-only mode. Download from the Models page.");
119 shared_state.engine_manager = Some(engine_manager_arc);
120 shared_state.engine_available.store(false, std::sync::atomic::Ordering::Relaxed);
121 }
122 Err(e) => {
123 warn!("⚠️ Engine manager scan failed: {}", e);
124 shared_state.engine_manager = Some(engine_manager_arc);
125 shared_state.engine_available.store(false, std::sync::atomic::Ordering::Relaxed);
126 }
127 }
128 }
129 Err(e) => {
130 error!("❌ Failed to create engine manager: {}", e);
131 shared_state.engine_available.store(false, std::sync::atomic::Ordering::Relaxed);
132 }
133 }
134
135 let shared_state = Arc::new(shared_state);
136
137 let _context_worker: Arc<ContextWorker> = Arc::new(ContextWorker::new(shared_state.clone()));
139 let _cache_worker: Arc<CacheWorker> = Arc::new(CacheWorker::new(shared_state.clone()));
140 let _database_worker: Arc<DatabaseWorker> = Arc::new(DatabaseWorker::new(shared_state.clone()));
141 let _llm_worker = shared_state.llm_worker.clone();
142
143 let cache_manager = match crate::cache_management::create_default_cache_manager(
145 crate::cache_management::KVCacheConfig::from_ctx_size(cfg.ctx_size),
146 memory_database.clone(),
147 Some(shared_state.llm_worker.clone()),
148 ) {
149 Ok(manager) => { info!("Cache manager initialized"); Some(Arc::new(manager)) }
150 Err(e) => { warn!("Cache manager failed: {}, disabled", e); None }
151 };
152 {
153 let mut g = shared_state.cache_manager.write()
154 .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager write lock"))?;
155 *g = cache_manager;
156 }
157
158 if let Err(e) = shared_state.database_pool.embeddings.initialize_index("llama-server") {
160 debug!("Embedding index init: {} (will build on first store)", e);
161 } else {
162 info!("Embedding HNSW index loaded from existing data");
163 }
164
165 let thread_pool_config = ThreadPoolConfig::new(&cfg);
167 let mut thread_pool = ThreadPool::new(thread_pool_config, shared_state.clone());
168 thread_pool.start().await?;
169
170 let unified_state = UnifiedAppState::new(shared_state.clone());
175 let app = build_compatible_router(unified_state);
176
177 let (listener, selected_port) = match try_bind_port(&cfg.api_host, cfg.api_port).await {
178 Ok(listener) => {
179 let port = listener.local_addr()?.port();
180 info!("✅ HTTP server bound to {}:{}", cfg.api_host, port);
181 (listener, port)
182 }
183 Err(e) => {
184 warn!("⚠️ Failed to bind to port {}: {}", cfg.api_port, e);
185 warn!("🔄 Scanning 8002-8999 for available port...");
186 let mut last_error = None;
187 let mut found_listener = None;
188 let mut found_port = 0u16;
189 for port in 8002u16..=8999 {
190 match try_bind_port(&cfg.api_host, port).await {
191 Ok(listener) => {
192 found_port = listener.local_addr()?.port();
193 info!("✅ HTTP server bound to alternative port {}", found_port);
194 if let Ok(mut g) = shared_state.http_port.write() { *g = found_port; }
195 found_listener = Some(listener);
196 break;
197 }
198 Err(e) => { last_error = Some(e); }
199 }
200 }
201 let listener = found_listener.ok_or_else(|| anyhow!(
202 "Failed to find available port after scanning 8002-8999.\n Last error: {:?}\n Hints: disable firewall, close other Aud.io instances, or run as Administrator.",
203 last_error
204 ))?;
205 (listener, found_port)
206 }
207 };
208
209 if let Some(ref tx) = port_tx {
211 if let Err(e) = tx.send(selected_port) {
212 warn!("Failed to send port to main thread: {}", e);
213 } else {
214 info!("✅ Port {} communicated to main thread", selected_port);
215 }
216 }
217 info!("🌐 Server will accept connections on port {}", selected_port);
218
219 {
224 let shared_state_bg = shared_state.clone();
225 let cfg_bg = cfg.clone();
226 let memory_database_bg = memory_database.clone();
227 tokio::spawn(async move {
228 shared_state_bg.mark_initialization_complete();
235 info!("✅ Backend marked as initialized — frontend may proceed");
236
237 let context_orchestrator = match crate::context_engine::create_default_orchestrator(
239 memory_database_bg,
240 cfg_bg.ctx_size,
241 ).await {
242 Ok(mut orchestrator) => {
243 orchestrator.set_llm_worker(shared_state_bg.llm_worker.clone());
244 info!("Context orchestrator initialized");
245 Some(orchestrator)
246 }
247 Err(e) => {
248 warn!("Context orchestrator failed: {}. Memory features disabled.", e);
249 None
250 }
251 };
252 {
253 let mut g = shared_state_bg.context_orchestrator.write().await;
254 *g = context_orchestrator;
255 }
256
257 info!("🚀 Initializing Runtime Manager");
259 let runtime_manager = Arc::new(crate::model_runtime::RuntimeManager::new());
260 let runtime_config = crate::model_runtime::RuntimeConfig {
261 model_path: std::path::PathBuf::from(&cfg_bg.model_path),
262 format: crate::model_runtime::ModelFormat::GGUF,
263 host: cfg_bg.llama_host.clone(),
264 port: cfg_bg.llama_port,
265 context_size: cfg_bg.ctx_size,
266 batch_size: cfg_bg.batch_size,
267 threads: cfg_bg.threads,
268 gpu_layers: cfg_bg.gpu_layers,
269 runtime_binary: if cfg_bg.llama_bin.is_empty() { None } else { Some(std::path::PathBuf::from(&cfg_bg.llama_bin)) },
270 extra_config: serde_json::json!({}),
271 };
272
273 let llama_bin_exists = !cfg_bg.llama_bin.is_empty()
275 && std::path::Path::new(&cfg_bg.llama_bin).exists();
276 let has_engine = if let Some(ref em) = shared_state_bg.engine_manager {
277 let reg = em.registry.read().await;
278 reg.get_default_engine_binary_path().is_some() || llama_bin_exists
279 } else {
280 llama_bin_exists
281 };
282
283 if let Err(e) = shared_state_bg.set_runtime_manager(runtime_manager.clone()) {
286 error!("❌ Failed to set runtime manager: {}", e);
287 }
288 shared_state_bg.llm_worker.set_runtime_manager(runtime_manager.clone());
289 info!("🔗 LLM worker linked to runtime manager");
290
291 if has_engine {
292 let last_model_loaded = 'load: {
294 let Some(data_dir) = dirs::data_dir() else { break 'load false; };
295 let last_model_path = data_dir.join("Aud.io").join("last_model.txt");
296 let Ok(last_model_id_raw) = std::fs::read_to_string(&last_model_path) else {
297 info!("ℹ️ No last used model found");
298 break 'load false;
299 };
300 let last_model_id = last_model_id_raw.trim().to_string();
301 info!("🔄 Found last used model: {}", last_model_id);
302
303 let Some(ref model_manager) = shared_state_bg.model_manager else {
304 info!("ℹ️ Model manager not available - skipping auto-load");
305 break 'load false;
306 };
307 let registry = model_manager.registry.read().await;
308 let Some(model_info) = registry.get_model(&last_model_id) else {
309 drop(registry);
310 warn!("⚠️ Last used model not found in registry: {}", last_model_id);
311 break 'load false;
312 };
313 if model_info.status != crate::model_management::registry::ModelStatus::Installed {
314 drop(registry);
315 info!("ℹ️ Last used model not installed");
316 break 'load false;
317 }
318 let Some(ref filename) = model_info.filename else {
319 drop(registry);
320 warn!("⚠️ Last used model has no filename");
321 break 'load false;
322 };
323 let model_path_for_runtime = model_manager.storage.model_path(&last_model_id, filename);
324 drop(registry);
325
326 if !model_path_for_runtime.exists() {
327 warn!("⚠️ Last used model file not found: {}", model_path_for_runtime.display());
328 break 'load false;
329 }
330 info!("✅ Auto-loading last used model from: {}", model_path_for_runtime.display());
331
332 let default_engine = if let Some(ref em) = shared_state_bg.engine_manager {
333 let reg = em.registry.read().await;
334 reg.get_default_engine_binary_path()
335 .or_else(|| if !cfg_bg.llama_bin.is_empty() { Some(std::path::PathBuf::from(&cfg_bg.llama_bin)) } else { None })
336 } else if !cfg_bg.llama_bin.is_empty() {
337 Some(std::path::PathBuf::from(&cfg_bg.llama_bin))
338 } else { None };
339
340 let mut updated_config = runtime_config.clone();
341 updated_config.model_path = model_path_for_runtime;
342 updated_config.runtime_binary = default_engine;
343
344 match runtime_manager.initialize_auto(updated_config).await {
345 Ok(base_url) => {
346 info!("✅ Last used model auto-loaded at {}", base_url);
347 match runtime_manager.health_check().await {
348 Ok(status) => { info!("✅ Runtime health check passed: {}", status); true }
349 Err(e) => { warn!("⚠️ Runtime health check failed: {}", e); false }
350 }
351 }
352 Err(e) => { warn!("⚠️ Failed to auto-load last used model: {}", e); false }
353 }
354 };
355
356 if !last_model_loaded && !cfg_bg.model_path.is_empty() {
357 let default_engine = if let Some(ref em) = shared_state_bg.engine_manager {
358 let reg = em.registry.read().await;
359 reg.get_default_engine_binary_path()
360 .or_else(|| if !cfg_bg.llama_bin.is_empty() { Some(std::path::PathBuf::from(&cfg_bg.llama_bin)) } else { None })
361 } else if !cfg_bg.llama_bin.is_empty() {
362 Some(std::path::PathBuf::from(&cfg_bg.llama_bin))
363 } else { None };
364 let mut updated_config = runtime_config;
365 updated_config.runtime_binary = default_engine;
366 info!("🚀 Initializing runtime with config model path...");
367 match runtime_manager.initialize_auto(updated_config).await {
368 Ok(base_url) => {
369 info!("✅ Runtime initialized at {}", base_url);
370 shared_state_bg.llm_worker.set_runtime_manager(runtime_manager);
371 }
372 Err(e) => warn!("⚠️ Runtime initialization failed: {}. Online-only mode.", e),
373 }
374 }
375 } else {
376 info!("⏳ No engine found - starting in online-only mode");
377 }
378
379 info!("✅ Background initialization complete");
380 });
381 }
382
383 {
387 let cache = shared_state.attachment_cache.clone();
388 tokio::spawn(async move {
389 let interval = std::time::Duration::from_secs(300); loop {
391 tokio::time::sleep(interval).await;
392 let before = cache.len();
393 cache.retain(|_, v: &mut crate::shared_state::PreExtracted| {
394 !v.is_stale(crate::api::attachment_api::CACHE_TTL_SECS)
395 });
396 let removed = before - cache.len();
397 if removed > 0 {
398 info!("Attachment cache eviction: removed {} stale entries", removed);
399 }
400 }
401 });
402 }
403
404 info!("🟢 Axum server starting on port {}...", selected_port);
406 if let Err(e) = axum::serve(listener, app).await {
407 error!("Axum server error: {}", e);
408 }
409
410 info!("Axum server stopped");
411 Ok(())
412}
413
414async fn try_bind_port(host: &str, port: u16) -> anyhow::Result<tokio::net::TcpListener> {
416 let addr = format!("{}:{}", host, port);
417 match tokio::net::TcpListener::bind(&addr).await {
418 Ok(listener) => Ok(listener),
419 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
420 Err(anyhow::anyhow!("Port {} is already in use", port))
421 }
422 Err(e) => Err(anyhow::anyhow!("Failed to bind to {}: {}", addr, e)),
423 }
424}
425
426#[derive(serde::Serialize)]
428struct HealthResponse {
429 status: String, runtime_ready: bool,
431 message: Option<String>,
432}
433
434async fn health_check(axum::extract::State(state): axum::extract::State<UnifiedAppState>) -> axum::response::Response {
436 use axum::Json;
437 use axum::response::IntoResponse;
438
439 if !state.shared_state.is_initialization_complete() {
441 return Json(HealthResponse {
442 status: "initializing".to_string(),
443 runtime_ready: false,
444 message: Some("Backend initializing...".to_string()),
445 })
446 .into_response();
447 }
448
449 let runtime_ready = state.shared_state.llm_worker.is_runtime_ready().await;
451
452 let (status, message) = if runtime_ready {
453 ("ready", None)
454 } else {
455 (
456 "degraded",
457 Some("No model loaded. Please activate a model from the Models page.".to_string())
458 )
459 };
460
461 Json(HealthResponse {
462 status: status.to_string(),
463 runtime_ready,
464 message,
465 })
466 .into_response()
467}
468
469fn build_compatible_router(mut state: UnifiedAppState) -> axum::Router {
471 use axum::{
472 Router,
473 routing::{get, post, put, delete},
474 extract::DefaultBodyLimit,
475 };
476 use tower_http::{
477 cors::{Any, CorsLayer},
478 trace::TraceLayer,
479 timeout::TimeoutLayer,
480 };
481 use std::time::Duration;
482
483 let cors = CorsLayer::new()
493 .allow_origin(Any)
494 .allow_methods([axum::http::Method::GET, axum::http::Method::POST, axum::http::Method::PUT, axum::http::Method::DELETE])
495 .allow_headers(Any);
496
497 let jwt_secret = std::env::var("JWT_SECRET")
499 .unwrap_or_else(|_| "aud-io-default-secret-change-in-production".to_string());
500
501 let users_store = state.shared_state.database_pool.users.clone();
503
504 let google_oauth = {
512 let client_id = option_env!("GOOGLE_CLIENT_ID")
513 .map(|s| s.to_string())
514 .filter(|s| !s.is_empty())
515 .or_else(|| std::env::var("GOOGLE_CLIENT_ID").ok().filter(|s| !s.is_empty()));
516
517 let client_secret = option_env!("GOOGLE_CLIENT_SECRET")
518 .map(|s| s.to_string())
519 .filter(|s| !s.is_empty())
520 .or_else(|| std::env::var("GOOGLE_CLIENT_SECRET").ok().filter(|s| !s.is_empty()));
521
522 match (client_id, client_secret) {
523 (Some(id), Some(secret)) => {
524 tracing::info!(
525 "Google OAuth configured (client_id: {}...)",
526 &id[..id.len().min(12)]
527 );
528 Some(crate::api::auth_api::GoogleOAuthPending {
529 states: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
530 client_id: id,
531 client_secret: secret,
532 })
533 }
534 _ => {
535 tracing::info!(
536 "Google OAuth not configured — set GOOGLE_CLIENT_ID + GOOGLE_CLIENT_SECRET before building"
537 );
538 None
539 }
540 }
541 };
542
543 state.auth_state = Some(Arc::new(crate::api::auth_api::AuthState {
545 users: users_store,
546 jwt_secret,
547 google: google_oauth,
548 }));
549
550 Router::new()
551 .route("/auth/signup", post(crate::api::auth_api::signup))
553 .route("/auth/login", post(crate::api::auth_api::login))
554 .route("/auth/verify-email", post(crate::api::auth_api::verify_email))
555 .route("/auth/me", post(crate::api::auth_api::get_current_user))
556 .route("/auth/google/init", post(crate::api::auth_api::google_init))
558 .route("/auth/google/callback", get(crate::api::auth_api::google_callback))
559 .route("/auth/google/status", get(crate::api::auth_api::google_status))
560 .route("/generate/stream", post(crate::api::stream_api::generate_stream))
562 .route("/online/stream", post(crate::api::online_api::online_stream))
564 .route("/generate/title", post(crate::api::title_api::generate_title))
566 .route("/conversations", get(crate::api::conversation_api::get_conversations))
568 .route("/conversations/db-stats", get(crate::api::conversation_api::get_conversations_db_stats))
569 .route("/conversations/:id", get(crate::api::conversation_api::get_conversation))
570 .route("/conversations/:id/title", put(crate::api::conversation_api::update_conversation_title))
571 .route("/conversations/:id/pinned", post(crate::api::conversation_api::update_conversation_pinned))
572 .route("/conversations/:id", delete(crate::api::conversation_api::delete_conversation))
573 .route("/models", get(crate::api::model_api::list_models))
575 .route("/models/by-mode", get(crate::api::model_api::list_models_by_mode))
576 .route("/models/active", get(crate::api::model_api::get_active_model))
577 .route("/models/search", get(crate::api::model_api::search_models))
578 .route("/models/install", post(crate::api::model_api::install_model))
579 .route("/models/remove", delete(crate::api::model_api::remove_model))
580 .route("/models/progress", get(crate::api::model_api::get_download_progress))
581 .route("/models/downloads", get(crate::api::model_api::get_active_downloads))
582 .route("/models/downloads/cancel", post(crate::api::model_api::cancel_download))
583 .route("/models/downloads/pause", post(crate::api::model_api::pause_download))
584 .route("/models/downloads/resume", post(crate::api::model_api::resume_download))
585 .route("/models/recommendations", get(crate::api::model_api::get_recommended_models))
586 .route("/models/preferences", post(crate::api::model_api::update_preferences))
587 .route("/models/refresh", post(crate::api::model_api::refresh_models))
588 .route("/models/switch", post(crate::api::model_api::switch_model))
589 .route("/models/hf/access", get(crate::api::model_api::check_hf_access))
591 .route("/models/openrouter/catalog", get(crate::api::model_api::openrouter_catalog))
593 .route("/models/openrouter/quota", get(crate::api::model_api::openrouter_quota))
594 .route("/hardware/recommendations", get(crate::api::model_api::get_hardware_recommendations))
595 .route("/hardware/info", get(crate::api::model_api::get_hardware_info))
596 .route("/metrics/system", get(crate::api::model_api::get_system_metrics))
597 .route("/storage/metadata", get(crate::api::model_api::get_storage_metadata))
598 .route("/api-keys", post(crate::api::api_keys_api::save_api_key))
600 .route("/api-keys", get(crate::api::api_keys_api::get_api_key))
601 .route("/api-keys/all", get(crate::api::api_keys_api::get_all_api_keys))
602 .route("/api-keys", delete(crate::api::api_keys_api::delete_api_key))
603 .route("/api-keys/mark-used", post(crate::api::api_keys_api::mark_key_used))
604 .route("/api-keys/verify", post(crate::api::api_keys_api::verify_api_key))
605 .route("/mode/switch", post(crate::api::mode_api::switch_mode))
607 .route("/mode/status", get(crate::api::mode_api::get_mode_status))
608 .route("/files", get(crate::api::files_api::get_files))
610 .route("/files/all", get(crate::api::files_api::get_all_files))
611 .route("/files/search", get(crate::api::files_api::search_files))
612 .route("/files/folder", post(crate::api::files_api::create_folder))
613 .route("/files/upload", post(crate::api::files_api::upload_file))
614 .route("/files/sync", post(crate::api::files_api::sync_files))
615 .route("/files/resync", post(crate::api::files_api::resync_files))
616 .route("/files/:id", get(crate::api::files_api::get_file_by_id))
617 .route("/files/:id/content", get(crate::api::files_api::get_file_content))
618 .route("/files/:id", delete(crate::api::files_api::delete_file_by_id))
619 .route("/files", delete(crate::api::files_api::delete_file))
620 .route("/all-files", get(crate::api::all_files_api::get_all_files))
622 .route("/all-files/all", get(crate::api::all_files_api::get_all_files_flat))
623 .route("/all-files/search", get(crate::api::all_files_api::search_all_files))
624 .route("/all-files/folder", post(crate::api::all_files_api::create_all_files_folder))
625 .route("/all-files/upload", post(crate::api::all_files_api::upload_all_file))
626 .route("/all-files/upload-structure", post(crate::api::all_files_api::upload_all_files_structure))
627 .route("/all-files/:id", get(crate::api::all_files_api::get_all_file_by_id))
628 .route("/all-files/:id/content", get(crate::api::all_files_api::get_all_file_content))
629 .route("/all-files/:id", delete(crate::api::all_files_api::delete_all_file_by_id))
630 .route("/all-files", delete(crate::api::all_files_api::delete_all_file))
631 .route("/feedback", post(crate::api::feedback_api::submit_feedback))
633 .route("/notify-login", post(crate::api::login_notification_api::notify_user_login))
635 .route("/attachments/preprocess", post(crate::api::attachment_api::preprocess_attachments))
637 .route("/metrics", get(crate::metrics::get_metrics))
639 .route("/healthz", get(health_check))
640 .route("/admin/shutdown", post(crate::admin::stop_backend))
641.layer(cors)
642 .layer(TraceLayer::new_for_http())
643 .layer(TimeoutLayer::new(Duration::from_secs(600)))
644 .layer(DefaultBodyLimit::max(50 * 1024 * 1024))
645 .with_state(state)
646}