Skip to main content

codesearch/daemon/
mod.rs

1//! Multi-repo daemon with periodic indexing.
2//!
3//! Replaces N `codesearch serve` processes (one per repo) with a single daemon
4//! that manages all repos from one process. Each repo keeps its own `.codesearch.db`
5//! (LMDB + tantivy + metadata) — the daemon opens all of them and searches across
6//! all stores, merging results by score.
7//!
8//! # Configuration
9//!
10//! The daemon reads a YAML config file (following the hanabi/kenshi/shinka pattern):
11//!
12//! ```yaml
13//! port: 4444
14//! index_interval: 300
15//! lmdb_map_size_mb: 2048
16//! repos:
17//!   - /path/to/repo1
18//!   - /path/to/repo2
19//! ```
20//!
21//! Load order: defaults → YAML file → env vars
22
23pub mod github;
24pub mod server;
25
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::time::Duration;
29
30use anyhow::Result;
31use serde::{Deserialize, Serialize};
32use tokio_util::sync::CancellationToken;
33use tracing::{error, info, warn};
34
35use crate::constants::DB_DIR_NAME;
36use crate::db_discovery::find_best_database;
37use crate::embed::{EmbeddingService, ModelType};
38use crate::index::{IndexManager, SharedStores};
39use crate::vectordb::VectorStore;
40
41/// Daemon configuration loaded from YAML.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct DaemonConfig {
44    /// HTTP listen port
45    #[serde(default = "default_port")]
46    pub port: u16,
47
48    /// Repository paths to manage
49    #[serde(default)]
50    pub repos: Vec<PathBuf>,
51
52    /// Re-index interval in seconds
53    #[serde(default = "default_index_interval")]
54    pub index_interval: u64,
55
56    /// LMDB map size in MB (overrides CODESEARCH_LMDB_MAP_SIZE_MB)
57    #[serde(default)]
58    pub lmdb_map_size_mb: Option<usize>,
59
60    /// Embedding model name (null = default mxbai-embed-xsmall-v1)
61    #[serde(default)]
62    pub model: Option<String>,
63
64    /// GitHub auto-discovery configuration
65    #[serde(default)]
66    pub github: Option<GitHubConfig>,
67}
68
69/// GitHub auto-discovery: resolve repos from GitHub orgs/users.
70#[derive(Debug, Clone, Serialize, Deserialize, Default)]
71pub struct GitHubConfig {
72    /// Path to file containing GitHub token (supports ~ expansion)
73    pub token_file: Option<String>,
74    /// Sources to discover repos from
75    #[serde(default)]
76    pub sources: Vec<GitHubSource>,
77}
78
79/// A single GitHub owner (org or user) to discover repos from.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct GitHubSource {
82    /// GitHub owner name (org or username)
83    pub owner: String,
84    /// Whether this is an org or user account
85    #[serde(default)]
86    pub kind: OwnerKind,
87    /// Local directory where repos are/should be cloned
88    pub clone_base: PathBuf,
89    /// Clone repos that don't exist locally
90    #[serde(default)]
91    pub auto_clone: bool,
92    /// Skip archived repositories
93    #[serde(default = "default_true")]
94    pub skip_archived: bool,
95    /// Skip forked repositories
96    #[serde(default)]
97    pub skip_forks: bool,
98    /// Glob patterns to exclude repo names (e.g. "*.wiki", "legacy-*")
99    #[serde(default)]
100    pub exclude: Vec<String>,
101}
102
103/// Whether a GitHub source is an organization or user.
104#[derive(Debug, Clone, Serialize, Deserialize, Default)]
105#[serde(rename_all = "lowercase")]
106pub enum OwnerKind {
107    #[default]
108    Org,
109    User,
110}
111
112fn default_port() -> u16 {
113    4444
114}
115
116fn default_index_interval() -> u64 {
117    300
118}
119
120fn default_true() -> bool {
121    true
122}
123
124impl Default for DaemonConfig {
125    fn default() -> Self {
126        Self {
127            port: default_port(),
128            repos: Vec::new(),
129            index_interval: default_index_interval(),
130            lmdb_map_size_mb: None,
131            model: None,
132            github: None,
133        }
134    }
135}
136
137impl DaemonConfig {
138    /// Load config from a YAML file path.
139    pub fn load(path: &Path) -> Result<Self> {
140        let content = std::fs::read_to_string(path)
141            .map_err(|e| anyhow::anyhow!("Failed to read config file {}: {}", path.display(), e))?;
142        let mut config: Self = serde_yaml::from_str(&content)
143            .map_err(|e| anyhow::anyhow!("Failed to parse config {}: {}", path.display(), e))?;
144
145        // Env var overrides
146        if let Ok(port) = std::env::var("CODESEARCH_DAEMON_PORT") {
147            if let Ok(p) = port.parse() {
148                config.port = p;
149            }
150        }
151        if let Ok(interval) = std::env::var("CODESEARCH_INDEX_INTERVAL") {
152            if let Ok(i) = interval.parse() {
153                config.index_interval = i;
154            }
155        }
156        if let Ok(size) = std::env::var("CODESEARCH_LMDB_MAP_SIZE_MB") {
157            if let Ok(s) = size.parse() {
158                config.lmdb_map_size_mb = Some(s);
159            }
160        }
161
162        Ok(config)
163    }
164}
165
166/// Per-repo handle holding its stores and metadata.
167pub struct RepoHandle {
168    pub name: String,
169    pub project_path: PathBuf,
170    pub db_path: PathBuf,
171    pub stores: Arc<SharedStores>,
172}
173
174/// Shared daemon state accessible from HTTP handlers and the reindex task.
175pub struct DaemonState {
176    pub repos: Vec<RepoHandle>,
177    pub embedding_service: tokio::sync::Mutex<EmbeddingService>,
178}
179
180/// Main daemon entry point.
181pub async fn run_daemon(config: DaemonConfig, cancel_token: CancellationToken) -> Result<()> {
182    info!("Starting codesearch daemon on port {}", config.port);
183
184    // Set LMDB map size env var if configured (used by VectorStore::new)
185    if let Some(size) = config.lmdb_map_size_mb {
186        std::env::set_var("CODESEARCH_LMDB_MAP_SIZE_MB", size.to_string());
187    }
188
189    // Resolve repos: merge explicit list with GitHub-discovered repos
190    let all_repos = github::resolve_all_repos(config.repos.clone(), config.github.as_ref()).await;
191
192    info!(
193        "Managing {} repos, re-index every {}s",
194        all_repos.len(),
195        config.index_interval
196    );
197
198    // Load embedding model once (respects config or uses default)
199    let cache_dir = crate::constants::get_global_models_cache_dir()?;
200    let model_type = config
201        .model
202        .as_ref()
203        .and_then(|m| ModelType::parse(m))
204        .unwrap_or_default();
205    info!("Loading embedding model: {:?}", model_type);
206    let embedding_service = EmbeddingService::with_cache_dir(model_type, Some(&cache_dir))?;
207    let dimensions = embedding_service.dimensions();
208
209    // Initialize repos
210    let mut repo_handles = Vec::new();
211
212    for repo_path in &all_repos {
213        match init_repo(repo_path, dimensions, &cancel_token).await {
214            Ok(handle) => {
215                info!("Initialized repo: {} ({})", handle.name, handle.db_path.display());
216                repo_handles.push(handle);
217            }
218            Err(e) => {
219                error!("Failed to initialize repo {}: {}", repo_path.display(), e);
220                // Continue with other repos — don't fail the whole daemon
221            }
222        }
223    }
224
225    if repo_handles.is_empty() {
226        return Err(anyhow::anyhow!(
227            "No repos initialized successfully. Check paths and ensure indexes exist \
228             (run `codesearch index --add -g` per repo first)."
229        ));
230    }
231
232    info!("{}/{} repos initialized", repo_handles.len(), all_repos.len());
233
234    let state = Arc::new(DaemonState {
235        repos: repo_handles,
236        embedding_service: tokio::sync::Mutex::new(embedding_service),
237    });
238
239    // Start periodic re-index task
240    let reindex_state = state.clone();
241    let reindex_cancel = cancel_token.clone();
242    let interval = Duration::from_secs(config.index_interval);
243    tokio::spawn(async move {
244        periodic_reindex(reindex_state, interval, reindex_cancel).await;
245    });
246
247    // Start HTTP server (blocks until shutdown)
248    server::run_server(state, config.port, cancel_token).await
249}
250
251/// Initialize a single repo: find/create DB, open stores, clear stale readers, refresh index.
252async fn init_repo(
253    repo_path: &Path,
254    dimensions: usize,
255    cancel_token: &CancellationToken,
256) -> Result<RepoHandle> {
257    let canonical = repo_path.canonicalize().map_err(|e| {
258        anyhow::anyhow!("Cannot canonicalize {}: {}", repo_path.display(), e)
259    })?;
260
261    let name = canonical
262        .file_name()
263        .map(|n| n.to_string_lossy().to_string())
264        .unwrap_or_else(|| canonical.display().to_string());
265
266    // Find existing database
267    let db_info = find_best_database(Some(&canonical))?;
268
269    let (project_path, db_path) = if let Some(info) = db_info {
270        (info.project_path, info.db_path)
271    } else {
272        // No DB found — create a global index
273        info!("No index found for {}, creating global index...", name);
274        crate::index::add_to_index(Some(canonical.clone()), true, cancel_token.clone()).await?;
275
276        // Symlink workaround for DB discovery
277        let global_db = dirs::home_dir()
278            .ok_or_else(|| anyhow::anyhow!("No home directory"))?
279            .join(".codesearch.dbs")
280            .join(&name)
281            .join(DB_DIR_NAME);
282
283        let local_link = canonical.join(DB_DIR_NAME);
284        if global_db.exists() && !local_link.exists() {
285            #[cfg(unix)]
286            std::os::unix::fs::symlink(&global_db, &local_link).ok();
287        }
288
289        let info = find_best_database(Some(&canonical))?
290            .ok_or_else(|| anyhow::anyhow!("Index creation succeeded but DB not found"))?;
291        (info.project_path, info.db_path)
292    };
293
294    // Open shared stores (read-write, acquires writer lock)
295    let stores = SharedStores::new(&db_path, dimensions)?;
296    let stores = Arc::new(stores);
297
298    // Clear stale LMDB readers from crashed processes
299    {
300        let vs: tokio::sync::RwLockReadGuard<'_, VectorStore> = stores.vector_store.read().await;
301        match vs.clear_stale_readers() {
302            Ok(cleared) if cleared > 0 => {
303                info!("Cleared {} stale LMDB readers for {}", cleared, name);
304            }
305            Err(e) => warn!("Failed to clear stale readers for {}: {}", name, e),
306            _ => {}
307        }
308    }
309
310    // Perform incremental refresh to bring index up to date
311    info!("Refreshing index for {}...", name);
312    IndexManager::perform_incremental_refresh_with_stores(&project_path, &db_path, &stores).await?;
313
314    Ok(RepoHandle {
315        name,
316        project_path,
317        db_path,
318        stores,
319    })
320}
321
322/// Periodically re-index all repos on a timer.
323async fn periodic_reindex(
324    state: Arc<DaemonState>,
325    interval: Duration,
326    cancel_token: CancellationToken,
327) {
328    let mut timer = tokio::time::interval(interval);
329    // First tick fires immediately — skip it since we just indexed
330    timer.tick().await;
331
332    loop {
333        tokio::select! {
334            _ = timer.tick() => {
335                info!("Periodic re-index starting...");
336                for repo in &state.repos {
337                    if cancel_token.is_cancelled() {
338                        return;
339                    }
340
341                    // Clear stale readers as safety measure
342                    {
343                        let vs: tokio::sync::RwLockReadGuard<'_, VectorStore> = repo.stores.vector_store.read().await;
344                        let _ = vs.clear_stale_readers();
345                    }
346
347                    match IndexManager::perform_incremental_refresh_with_stores(
348                        &repo.project_path,
349                        &repo.db_path,
350                        &repo.stores,
351                    ).await {
352                        Ok(()) => info!("Re-indexed {}", repo.name),
353                        Err(e) => error!("Re-index failed for {}: {}", repo.name, e),
354                    }
355                }
356                info!("Periodic re-index complete");
357            }
358            _ = cancel_token.cancelled() => {
359                info!("Periodic re-index task shutting down");
360                return;
361            }
362        }
363    }
364}