offline_intelligence/
thread_server.rs1use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{info, warn, debug};
11
12use crate::{
13 config::Config,
14 shared_state::{SharedState, UnifiedAppState},
15 thread_pool::{ThreadPool, ThreadPoolConfig},
16 worker_threads::{ContextWorker, CacheWorker, DatabaseWorker, LLMWorker},
17 memory_db::MemoryDatabase,
18};
19
20#[derive(Clone)]
22pub struct ThreadBasedAppState {
23 pub shared_state: Arc<SharedState>,
24 pub thread_pool: Arc<RwLock<Option<ThreadPool>>>,
25 pub context_worker: Arc<ContextWorker>,
26 pub cache_worker: Arc<CacheWorker>,
27 pub database_worker: Arc<DatabaseWorker>,
28 pub llm_worker: Arc<LLMWorker>,
29}
30
31pub async fn run_thread_server(cfg: Config) -> anyhow::Result<()> {
33 crate::telemetry::init_tracing();
34 crate::metrics::init_metrics();
35 cfg.print_config();
36
37 info!("Starting thread-based server architecture");
38
39 let memory_db_path = std::path::Path::new("./data/conversations.db");
41 let memory_database = match MemoryDatabase::new(memory_db_path) {
42 Ok(db) => {
43 info!("Memory database initialized at: {}", memory_db_path.display());
44 Arc::new(db)
45 }
46 Err(e) => {
47 warn!("Failed to initialize memory database: {}. Falling back to in-memory.", e);
48 Arc::new(MemoryDatabase::new_in_memory()?)
49 }
50 };
51
52 let shared_state = Arc::new(SharedState::new(cfg.clone(), memory_database.clone())?);
54
55 info!("🚀 Initializing Runtime Manager for multi-format model support");
57 let runtime_manager = Arc::new(crate::model_runtime::RuntimeManager::new());
58
59 let runtime_config = crate::model_runtime::RuntimeConfig {
61 model_path: std::path::PathBuf::from(&cfg.model_path),
62 format: crate::model_runtime::ModelFormat::GGUF, host: cfg.llama_host.clone(),
64 port: cfg.llama_port,
65 context_size: cfg.ctx_size,
66 batch_size: cfg.batch_size,
67 threads: cfg.threads,
68 gpu_layers: cfg.gpu_layers,
69 runtime_binary: Some(std::path::PathBuf::from(&cfg.llama_bin)),
70 extra_config: serde_json::json!({}),
71 };
72
73 match runtime_manager.initialize_auto(runtime_config).await {
75 Ok(base_url) => {
76 info!("✅ Model runtime initialized successfully");
77 info!(" Runtime endpoint: {}", base_url);
78 }
80 Err(e) => {
81 warn!("⚠️ Runtime initialization failed: {}", e);
82 warn!(" The system will attempt to use the configured backend_url directly");
83 }
84 }
85
86 let context_worker: Arc<ContextWorker> = Arc::new(ContextWorker::new(shared_state.clone()));
88 let cache_worker: Arc<CacheWorker> = Arc::new(CacheWorker::new(shared_state.clone()));
89 let database_worker: Arc<DatabaseWorker> = Arc::new(DatabaseWorker::new(shared_state.clone()));
90 let llm_worker = shared_state.llm_worker.clone();
91
92 let cache_manager = match crate::cache_management::create_default_cache_manager(
94 crate::cache_management::KVCacheConfig::default(),
95 memory_database.clone(),
96 ) {
97 Ok(manager) => {
98 info!("Cache manager initialized successfully");
99 Some(Arc::new(manager))
100 }
101 Err(e) => {
102 warn!("Failed to initialize cache manager: {}, cache features disabled", e);
103 None
104 }
105 };
106
107 let context_orchestrator = match crate::context_engine::create_default_orchestrator(
109 memory_database.clone(),
110 ).await {
111 Ok(mut orchestrator) => {
112 orchestrator.set_llm_worker(shared_state.llm_worker.clone());
115 info!("Context orchestrator initialized with semantic search support");
116 Some(orchestrator)
117 }
118 Err(e) => {
119 warn!("Failed to initialize context orchestrator: {}. Memory features disabled.", e);
120 None
121 }
122 };
123
124 let thread_pool_config = ThreadPoolConfig::new(&cfg);
126 let mut thread_pool = ThreadPool::new(thread_pool_config, shared_state.clone());
127 thread_pool.start().await?;
128
129 {
131 let mut cache_guard = shared_state.cache_manager.write()
132 .map_err(|_| anyhow::anyhow!("Failed to acquire cache manager write lock"))?;
133 *cache_guard = cache_manager;
134
135 }
138
139 if let Err(e) = shared_state.database_pool.embeddings.initialize_index("llama-server") {
142 debug!("Embedding index init: {} (will build on first embedding store)", e);
143 } else {
144 info!("Embedding HNSW index loaded from existing data");
145 }
146
147 {
149 let mut orch_guard = shared_state.context_orchestrator.write().await;
150 *orch_guard = context_orchestrator;
151 }
152
153 let unified_state = UnifiedAppState::new(shared_state.clone());
155
156 info!("Starting HTTP server on {}:{}", cfg.api_host, cfg.api_port);
158 let listener = tokio::net::TcpListener::bind(format!("{}:{}", cfg.api_host, cfg.api_port)).await?;
159
160 let app = build_compatible_router(unified_state);
161
162 axum::serve(listener, app).await?;
163
164 Ok(())
165}
166
167fn build_compatible_router(state: UnifiedAppState) -> axum::Router {
169 use axum::{
170 Router,
171 routing::{get, post, put, delete},
172 };
173 use tower_http::{
174 cors::{Any, CorsLayer},
175 trace::TraceLayer,
176 timeout::TimeoutLayer,
177 };
178 use std::time::Duration;
179
180 let cors = CorsLayer::new()
181 .allow_origin(Any)
182 .allow_methods([axum::http::Method::GET, axum::http::Method::POST, axum::http::Method::PUT, axum::http::Method::DELETE])
183 .allow_headers(Any);
184
185 Router::new()
186 .route("/generate/stream", post(crate::api::stream_api::generate_stream))
188 .route("/generate/title", post(crate::api::title_api::generate_title))
190 .route("/conversations", get(crate::api::conversation_api::get_conversations))
192 .route("/conversations/:id", get(crate::api::conversation_api::get_conversation))
193 .route("/conversations/:id/title", put(crate::api::conversation_api::update_conversation_title))
194 .route("/conversations/:id/pinned", post(crate::api::conversation_api::update_conversation_pinned))
195 .route("/conversations/:id", delete(crate::api::conversation_api::delete_conversation))
196 .route("/healthz", get(|| async { "OK" }))
197 .layer(cors)
198 .layer(TraceLayer::new_for_http())
199 .layer(TimeoutLayer::new(Duration::from_secs(600)))
200 .with_state(state)
201}