Skip to main content

offline_intelligence/
thread_server.rs

1//! Thread-based server implementation
2//!
3//! This module provides the server startup that uses thread-based
4//! shared memory architecture. All API handlers access state through
5//! Arc-wrapped shared memory (UnifiedAppState) — zero network hops
6//! between components. The only network call is to the localhost llama-server.
7
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{info, warn, debug};
11
12use crate::{
13    config::Config,
14    shared_state::{SharedState, UnifiedAppState},
15    thread_pool::{ThreadPool, ThreadPoolConfig},
16    worker_threads::{ContextWorker, CacheWorker, DatabaseWorker, LLMWorker},
17    memory_db::MemoryDatabase,
18};
19
20/// Thread-based unified application state (internal, used during initialization)
21#[derive(Clone)]
22pub struct ThreadBasedAppState {
23    pub shared_state: Arc<SharedState>,
24    pub thread_pool: Arc<RwLock<Option<ThreadPool>>>,
25    pub context_worker: Arc<ContextWorker>,
26    pub cache_worker: Arc<CacheWorker>,
27    pub database_worker: Arc<DatabaseWorker>,
28    pub llm_worker: Arc<LLMWorker>,
29}
30
31/// Run server with thread-based architecture
32pub async fn run_thread_server(cfg: Config) -> anyhow::Result<()> {
33    crate::telemetry::init_tracing();
34    crate::metrics::init_metrics();
35    cfg.print_config();
36
37    info!("Starting thread-based server architecture");
38
39    // Initialize database
40    let memory_db_path = std::path::Path::new("./data/conversations.db");
41    let memory_database = match MemoryDatabase::new(memory_db_path) {
42        Ok(db) => {
43            info!("Memory database initialized at: {}", memory_db_path.display());
44            Arc::new(db)
45        }
46        Err(e) => {
47            warn!("Failed to initialize memory database: {}. Falling back to in-memory.", e);
48            Arc::new(MemoryDatabase::new_in_memory()?)
49        }
50    };
51
52    // Initialize shared state (creates LLM worker internally with backend_url)
53    let shared_state = Arc::new(SharedState::new(cfg.clone(), memory_database.clone())?);
54
55    // Initialize Runtime Manager for multi-format model support
56    info!("🚀 Initializing Runtime Manager for multi-format model support");
57    let runtime_manager = Arc::new(crate::model_runtime::RuntimeManager::new());
58    
59    // Configure and initialize the runtime based on detected model format
60    let runtime_config = crate::model_runtime::RuntimeConfig {
61        model_path: std::path::PathBuf::from(&cfg.model_path),
62        format: crate::model_runtime::ModelFormat::GGUF, // Will be auto-detected
63        host: cfg.llama_host.clone(),
64        port: cfg.llama_port,
65        context_size: cfg.ctx_size,
66        batch_size: cfg.batch_size,
67        threads: cfg.threads,
68        gpu_layers: cfg.gpu_layers,
69        runtime_binary: Some(std::path::PathBuf::from(&cfg.llama_bin)),
70        extra_config: serde_json::json!({}),
71    };
72    
73    // Initialize with automatic format detection
74    match runtime_manager.initialize_auto(runtime_config).await {
75        Ok(base_url) => {
76            info!("✅ Model runtime initialized successfully");
77            info!("   Runtime endpoint: {}", base_url);
78            // Update backend_url in shared state config if needed
79        }
80        Err(e) => {
81            warn!("⚠️  Runtime initialization failed: {}", e);
82            warn!("   The system will attempt to use the configured backend_url directly");
83        }
84    }
85
86    // Initialize workers
87    let context_worker: Arc<ContextWorker> = Arc::new(ContextWorker::new(shared_state.clone()));
88    let cache_worker: Arc<CacheWorker> = Arc::new(CacheWorker::new(shared_state.clone()));
89    let database_worker: Arc<DatabaseWorker> = Arc::new(DatabaseWorker::new(shared_state.clone()));
90    let llm_worker = shared_state.llm_worker.clone();
91
92    // Initialize cache manager
93    let cache_manager = match crate::cache_management::create_default_cache_manager(
94        crate::cache_management::KVCacheConfig::default(),
95        memory_database.clone(),
96    ) {
97        Ok(manager) => {
98            info!("Cache manager initialized successfully");
99            Some(Arc::new(manager))
100        }
101        Err(e) => {
102            warn!("Failed to initialize cache manager: {}, cache features disabled", e);
103            None
104        }
105    };
106
107    // Initialize context orchestrator
108    let context_orchestrator = match crate::context_engine::create_default_orchestrator(
109        memory_database.clone(),
110    ).await {
111        Ok(mut orchestrator) => {
112            // Inject LLM worker so the orchestrator can generate query embeddings
113            // for semantic search when the hot KV cache doesn't have the answer.
114            orchestrator.set_llm_worker(shared_state.llm_worker.clone());
115            info!("Context orchestrator initialized with semantic search support");
116            Some(orchestrator)
117        }
118        Err(e) => {
119            warn!("Failed to initialize context orchestrator: {}. Memory features disabled.", e);
120            None
121        }
122    };
123
124    // Initialize thread pool
125    let thread_pool_config = ThreadPoolConfig::new(&cfg);
126    let mut thread_pool = ThreadPool::new(thread_pool_config, shared_state.clone());
127    thread_pool.start().await?;
128
129    // Update shared state with initialized components
130    {
131        let mut cache_guard = shared_state.cache_manager.write()
132            .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager write lock"))?;
133        *cache_guard = cache_manager;
134
135        // LLM runtime is now managed by RuntimeManager, no need to initialize here
136        // shared_state.initialize_llm_runtime()?;  // Removed - handled by RuntimeManager
137    }
138
139    // Initialize embedding HNSW index from any previously stored embeddings
140    // This makes semantic search available immediately on startup.
141    if let Err(e) = shared_state.database_pool.embeddings.initialize_index("llama-server") {
142        debug!("Embedding index init: {} (will build on first embedding store)", e);
143    } else {
144        info!("Embedding HNSW index loaded from existing data");
145    }
146
147    // Set context orchestrator (tokio RwLock for async access from handlers)
148    {
149        let mut orch_guard = shared_state.context_orchestrator.write().await;
150        *orch_guard = context_orchestrator;
151    }
152
153    // Build the unified app state for the router
154    let unified_state = UnifiedAppState::new(shared_state.clone());
155
156    // Start HTTP server
157    info!("Starting HTTP server on {}:{}", cfg.api_host, cfg.api_port);
158    let listener = tokio::net::TcpListener::bind(format!("{}:{}", cfg.api_host, cfg.api_port)).await?;
159
160    let app = build_compatible_router(unified_state);
161
162    axum::serve(listener, app).await?;
163
164    Ok(())
165}
166
167/// Build router for 1-hop architecture
168fn build_compatible_router(state: UnifiedAppState) -> axum::Router {
169    use axum::{
170        Router,
171        routing::{get, post, put, delete},
172    };
173    use tower_http::{
174        cors::{Any, CorsLayer},
175        trace::TraceLayer,
176        timeout::TimeoutLayer,
177    };
178    use std::time::Duration;
179
180    let cors = CorsLayer::new()
181        .allow_origin(Any)
182        .allow_methods([axum::http::Method::GET, axum::http::Method::POST, axum::http::Method::PUT, axum::http::Method::DELETE])
183        .allow_headers(Any);
184
185    Router::new()
186        // Core 1-hop streaming endpoint
187        .route("/generate/stream", post(crate::api::stream_api::generate_stream))
188        // Title generation via shared memory -> LLM worker
189        .route("/generate/title", post(crate::api::title_api::generate_title))
190        // Conversation CRUD via shared memory -> database
191        .route("/conversations", get(crate::api::conversation_api::get_conversations))
192        .route("/conversations/:id", get(crate::api::conversation_api::get_conversation))
193        .route("/conversations/:id/title", put(crate::api::conversation_api::update_conversation_title))
194        .route("/conversations/:id/pinned", post(crate::api::conversation_api::update_conversation_pinned))
195        .route("/conversations/:id", delete(crate::api::conversation_api::delete_conversation))
196        .route("/healthz", get(|| async { "OK" }))
197        .layer(cors)
198        .layer(TraceLayer::new_for_http())
199        .layer(TimeoutLayer::new(Duration::from_secs(600)))
200        .with_state(state)
201}