Skip to main content

zeph_index/
indexer.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Project indexing orchestrator: walk → chunk → embed → store.
5//!
6//! The top-level type is [`CodeIndexer`]. It drives a full project index via
7//! [`CodeIndexer::index_project`] and supports incremental updates via
8//! [`CodeIndexer::reindex_file`] (called by the file watcher).
9//!
10//! ## Concurrency model
11//!
12//! Files are processed in two nested loops:
13//!
14//! 1. **Memory batches** — files are split into groups of
15//!    [`IndexerConfig::memory_batch_size`] to bound peak in-flight state.
16//! 2. **Per-batch concurrency** — within each memory batch, files are processed
17//!    concurrently up to [`IndexerConfig::embed_concurrency`] using
18//!    `futures::stream::buffer_unordered`.
19//!
20//! Chunks that already exist in the store (matched by content hash) are skipped
21//! without any embedding call, making re-runs over an unchanged project O(1) in
22//! LLM API cost.
23
24use std::collections::HashSet;
25use std::path::Path;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::time::Duration;
29
30use futures::StreamExt as _;
31use tokio::sync::watch;
32
33use crate::chunker::{ChunkerConfig, CodeChunk, chunk_file};
34use crate::context::contextualize_for_embedding;
35use crate::error::{IndexError, Result};
36use crate::languages::{detect_language, is_indexable};
37use crate::store::{ChunkInsert, CodeStore};
38use zeph_common::BlockingSpawner;
39use zeph_llm::any::AnyProvider;
40use zeph_llm::provider::LlmProvider;
41
42/// Monotonically increasing counter for generating unique `chunk_file` task names.
43///
44/// Multiple concurrent `index_file` calls use the same logical name `"chunk_file"`.
45/// The supervisor aborts any existing task with the same name on re-registration, so
46/// each spawn must get a unique name to avoid silently aborting in-flight tasks.
47static CHUNK_TASK_COUNTER: AtomicU64 = AtomicU64::new(0);
48
49/// Configuration for [`CodeIndexer`].
50///
51/// All fields have reasonable defaults via [`Default`]. Override individual fields
52/// when you need to tune throughput, memory use, or API rate limits.
53///
54/// # Examples
55///
56/// ```no_run
57/// use zeph_index::indexer::IndexerConfig;
58///
59/// let config = IndexerConfig::default();
60/// assert_eq!(config.concurrency, 2);
61/// assert_eq!(config.embed_concurrency, 1);
62///
63/// // High-throughput mode for a fast local embedding server.
64/// let fast = IndexerConfig {
65///     embed_concurrency: 8,
66///     memory_batch_size: 64,
67///     ..IndexerConfig::default()
68/// };
69/// ```
70#[derive(Debug, Clone)]
71pub struct IndexerConfig {
72    /// Chunker configuration controlling chunk size thresholds.
73    pub chunker: ChunkerConfig,
74    /// Number of files to process concurrently within each memory batch. Default: 2.
75    pub concurrency: usize,
76    /// Maximum number of new chunks to upsert per Qdrant call. Default: 16.
77    ///
78    /// Larger values reduce round-trips but increase per-call memory.
79    pub batch_size: usize,
80    /// Number of files per outer memory batch during initial indexing. Default: 16.
81    ///
82    /// Lowering this reduces peak heap usage at the cost of more `yield_now` calls.
83    pub memory_batch_size: usize,
84    /// Maximum file size in bytes. Files larger than this are silently skipped. Default: 512 KiB.
85    ///
86    /// Large files (e.g. generated code, vendored libraries) rarely provide useful
87    /// retrieval signal and are expensive to embed.
88    pub max_file_bytes: usize,
89    /// Maximum parallel `embed_batch` calls per memory batch. Default: 1.
90    ///
91    /// Keep this low when using hosted embedding APIs with strict TPM rate limits.
92    pub embed_concurrency: usize,
93}
94
95impl Default for IndexerConfig {
96    fn default() -> Self {
97        Self {
98            chunker: ChunkerConfig::default(),
99            concurrency: 2,
100            batch_size: 16,
101            memory_batch_size: 16,
102            max_file_bytes: 512 * 1024,
103            embed_concurrency: 1,
104        }
105    }
106}
107
108/// Snapshot of indexing progress, sent through a [`tokio::sync::watch`] channel.
109///
110/// The caller passes an `Option<&watch::Sender<IndexProgress>>` to
111/// [`CodeIndexer::index_project`]. Each time a file completes the sender receives an
112/// updated snapshot so the TUI or CLI can display a live progress bar.
113///
114/// # Examples
115///
116/// ```no_run
117/// use tokio::sync::watch;
118/// use zeph_index::indexer::IndexProgress;
119///
120/// let (tx, mut rx) = watch::channel(IndexProgress::default());
121/// tx.send(IndexProgress { files_done: 1, files_total: 10, chunks_created: 5 }).unwrap();
122/// assert_eq!(rx.borrow().files_done, 1);
123/// ```
124#[derive(Debug, Clone, Default)]
125pub struct IndexProgress {
126    /// Number of files fully processed so far.
127    pub files_done: usize,
128    /// Total number of indexable files discovered in the project root.
129    pub files_total: usize,
130    /// Cumulative number of new chunks created across all processed files.
131    pub chunks_created: usize,
132}
133
134/// Summary statistics produced at the end of a full [`CodeIndexer::index_project`] run.
135///
136/// Errors are collected rather than short-circuiting so the majority of the project
137/// is indexed even when individual files fail (e.g. due to transient IO errors or
138/// unsupported encodings).
139#[derive(Debug, Default)]
140pub struct IndexReport {
141    /// Total number of files visited by the directory walker.
142    pub files_scanned: usize,
143    /// Number of files that produced at least one new chunk.
144    pub files_indexed: usize,
145    /// New chunks embedded and upserted into Qdrant.
146    pub chunks_created: usize,
147    /// Chunks skipped because an identical content hash already exists in the store.
148    pub chunks_skipped: usize,
149    /// Chunks deleted from the store because their file was removed from the project.
150    pub chunks_removed: usize,
151    /// Per-file error messages collected during the run.
152    pub errors: Vec<String>,
153    /// Wall-clock duration of the entire run in milliseconds.
154    pub duration_ms: u64,
155}
156
157/// Orchestrates code indexing over a project tree.
158///
159/// `CodeIndexer` is the primary driver of the indexing pipeline. It walks the file
160/// tree, delegates per-file work to `FileIndexWorker`, and coordinates the Qdrant +
161/// `SQLite` writes via [`CodeStore`].
162///
163/// # Cloning and concurrency
164///
165/// `CodeIndexer` is **not** `Clone` — it is typically wrapped in an [`Arc`] and shared
166/// between the initial indexing task and the file watcher.
167///
168/// # Examples
169///
170/// ```no_run
171/// use std::sync::Arc;
172/// use zeph_index::indexer::{CodeIndexer, IndexerConfig};
173/// use zeph_index::store::CodeStore;
174/// # async fn example() -> zeph_index::Result<()> {
175/// # let store: CodeStore = panic!("placeholder");
176/// # let provider: Arc<zeph_llm::any::AnyProvider> = panic!("placeholder");
177///
178/// let indexer = CodeIndexer::new(store, provider, IndexerConfig::default());
179/// let report = indexer.index_project(std::path::Path::new("."), None).await?;
180/// println!("indexed {} files in {}ms", report.files_indexed, report.duration_ms);
181/// # Ok(())
182/// # }
183/// ```
184pub struct CodeIndexer {
185    store: CodeStore,
186    provider: Arc<AnyProvider>,
187    config: IndexerConfig,
188    /// Optional supervised spawner for `chunk_file` blocking tasks.
189    ///
190    /// When `Some`, each `chunk_file` call is routed through the spawner so it
191    /// appears in the supervisor registry (snapshot, graceful shutdown, metrics).
192    /// When `None`, falls back to `tokio::task::spawn_blocking`.
193    spawner: Option<Arc<dyn BlockingSpawner>>,
194    /// Re-entrancy guard: prevents concurrent `index_project` runs on the same indexer.
195    indexing: Arc<AtomicBool>,
196}
197
198impl CodeIndexer {
199    /// Create a new `CodeIndexer`.
200    ///
201    /// The `store` and `provider` are cloned cheaply (reference-counted) across
202    /// the concurrent file-processing tasks.
203    #[must_use]
204    pub fn new(store: CodeStore, provider: Arc<AnyProvider>, config: IndexerConfig) -> Self {
205        Self {
206            store,
207            provider,
208            config,
209            spawner: None,
210            indexing: Arc::new(AtomicBool::new(false)),
211        }
212    }
213
214    /// Attach a supervised blocking spawner for `chunk_file` tasks.
215    ///
216    /// When set, each `chunk_file` call is routed through the spawner so it is
217    /// visible in supervisor snapshots and subject to graceful shutdown.
218    ///
219    /// # Examples
220    ///
221    /// ```no_run
222    /// use std::sync::Arc;
223    /// use zeph_index::indexer::{CodeIndexer, IndexerConfig};
224    /// use zeph_index::store::CodeStore;
225    /// use zeph_common::BlockingSpawner;
226    ///
227    /// # fn example(
228    /// #     store: CodeStore,
229    /// #     provider: Arc<zeph_llm::any::AnyProvider>,
230    /// #     spawner: Arc<dyn BlockingSpawner>,
231    /// # ) {
232    /// let indexer = CodeIndexer::new(store, provider, IndexerConfig::default())
233    ///     .with_spawner(spawner);
234    /// # }
235    /// ```
236    #[must_use]
237    pub fn with_spawner(mut self, spawner: Arc<dyn BlockingSpawner>) -> Self {
238        self.spawner = Some(spawner);
239        self
240    }
241
242    /// Full project indexing with incremental change detection.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the embedding probe or collection setup fails.
247    #[tracing::instrument(name = "index.indexer.index_project", skip_all)]
248    pub async fn index_project(
249        &self,
250        root: &Path,
251        progress_tx: Option<&watch::Sender<IndexProgress>>,
252    ) -> Result<IndexReport> {
253        tracing::Span::current().record("root", tracing::field::display(root.display()));
254        if self
255            .indexing
256            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
257            .is_err()
258        {
259            tracing::info!("index_project already running, skipping concurrent request");
260            return Ok(IndexReport::default());
261        }
262        let _guard = IndexingGuard(Arc::clone(&self.indexing));
263
264        let start = std::time::Instant::now();
265        let mut report = IndexReport::default();
266
267        self.ensure_collection_for_provider().await?;
268        let (entries, current_files) = self.walk_project_files(root).await?;
269        let total = entries.len();
270        tracing::info!(total, "indexing started");
271
272        let memory_batch_size = self.config.memory_batch_size.max(1);
273        let mut files_done = 0usize;
274        for batch in entries.chunks(memory_batch_size) {
275            self.index_batch(
276                batch,
277                root,
278                total,
279                &mut files_done,
280                &mut report,
281                progress_tx,
282            )
283            .await;
284        }
285
286        self.cleanup_removed_files(&current_files, &mut report)
287            .await?;
288
289        report.duration_ms = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
290        Ok(report)
291    }
292
293    #[tracing::instrument(name = "index.indexer.ensure_collection", skip_all)]
294    async fn ensure_collection_for_provider(&self) -> Result<()> {
295        const STARTUP_EMBED_TIMEOUT_SECS: u64 = 15;
296        let probe = tokio::time::timeout(
297            Duration::from_secs(STARTUP_EMBED_TIMEOUT_SECS),
298            self.provider.embed("probe"),
299        )
300        .await
301        .map_err(|_| {
302            tracing::warn!(
303                timeout_secs = STARTUP_EMBED_TIMEOUT_SECS,
304                "embedding provider timed out during startup"
305            );
306            crate::error::IndexError::EmbedTimeout(STARTUP_EMBED_TIMEOUT_SECS)
307        })??;
308        let vector_size = u64::try_from(probe.len())?;
309        self.store.ensure_collection(vector_size).await
310    }
311
312    #[tracing::instrument(name = "index.indexer.walk_project_files", skip_all)]
313    async fn walk_project_files(
314        &self,
315        root: &Path,
316    ) -> Result<(Vec<ignore::DirEntry>, HashSet<String>)> {
317        let root_buf = root.to_path_buf();
318        let walk = move || {
319            let entries: Vec<_> = ignore::WalkBuilder::new(&root_buf)
320                .hidden(true)
321                .git_ignore(true)
322                .build()
323                .flatten()
324                .filter(|e| e.file_type().is_some_and(|ft| ft.is_file()) && is_indexable(e.path()))
325                .collect();
326
327            let mut current_files: HashSet<String> = HashSet::new();
328            for entry in &entries {
329                let rel_path = entry
330                    .path()
331                    .strip_prefix(&root_buf)
332                    .unwrap_or(entry.path())
333                    .to_string_lossy()
334                    .to_string();
335                current_files.insert(rel_path);
336            }
337            (entries, current_files)
338        };
339
340        if let Some(ref spawner) = self.spawner {
341            let (result_tx, result_rx) = tokio::sync::oneshot::channel();
342            let _join = spawner.spawn_blocking_named(
343                std::sync::Arc::from("walk_project_files"),
344                Box::new(move || {
345                    let _ = result_tx.send(walk());
346                }),
347            );
348            result_rx
349                .await
350                .map_err(|_| IndexError::Other("walk_project_files task dropped result".to_owned()))
351        } else {
352            tokio::task::spawn_blocking(walk)
353                .await
354                .map_err(|e| IndexError::Other(format!("directory walk panicked: {e:#}")))
355        }
356    }
357
358    #[tracing::instrument(name = "index.indexer.index_batch", skip_all)]
359    #[allow(clippy::too_many_arguments)]
360    async fn index_batch(
361        &self,
362        batch: &[ignore::DirEntry],
363        root: &Path,
364        total: usize,
365        files_done: &mut usize,
366        report: &mut IndexReport,
367        progress_tx: Option<&watch::Sender<IndexProgress>>,
368    ) {
369        let store = self.store.clone();
370        let provider = Arc::clone(&self.provider);
371        let config = self.config.clone();
372        let spawner = self.spawner.clone();
373        let concurrency = self.config.embed_concurrency.max(1);
374
375        let file_pairs = make_file_pairs(batch, root);
376
377        let mut stream =
378            futures::stream::iter(file_pairs.into_iter().map(|(rel_path, abs_path)| {
379                let store = store.clone();
380                let provider = Arc::clone(&provider);
381                let config = config.clone();
382                let spawner = spawner.clone();
383                async move {
384                    let worker = FileIndexWorker {
385                        store,
386                        provider,
387                        config,
388                        spawner,
389                    };
390                    let result = worker.index_file(&abs_path, &rel_path).await;
391                    (rel_path, result)
392                }
393            }))
394            .buffer_unordered(concurrency);
395
396        while let Some((rel_path, outcome)) = stream.next().await {
397            report.files_scanned += 1;
398            *files_done += 1;
399            match outcome {
400                Ok((created, skipped)) => {
401                    if created > 0 {
402                        report.files_indexed += 1;
403                    }
404                    report.chunks_created += created;
405                    report.chunks_skipped += skipped;
406                    tracing::info!(
407                        file = %rel_path,
408                        progress = format_args!("{files_done}/{total}"),
409                        created,
410                        skipped,
411                    );
412                }
413                Err(e) => {
414                    report.errors.push(format!("{rel_path}: {e:#}"));
415                }
416            }
417            if let Some(tx) = progress_tx {
418                let _ = tx.send(IndexProgress {
419                    files_done: *files_done,
420                    files_total: total,
421                    chunks_created: report.chunks_created,
422                });
423            }
424        }
425
426        // Drop stream to release all in-flight future state before the next batch.
427        drop(stream);
428        tokio::task::yield_now().await;
429    }
430
431    #[tracing::instrument(name = "index.indexer.cleanup_removed_files", skip_all)]
432    async fn cleanup_removed_files(
433        &self,
434        current_files: &HashSet<String>,
435        report: &mut IndexReport,
436    ) -> Result<()> {
437        let indexed = self.store.indexed_files().await?;
438        for old_file in &indexed {
439            if !current_files.contains(old_file) {
440                match self.store.remove_file_chunks(old_file).await {
441                    Ok(n) => report.chunks_removed += n,
442                    Err(e) => report.errors.push(format!("cleanup {old_file}: {e:#}")),
443                }
444            }
445        }
446        Ok(())
447    }
448
449    /// Re-index a specific file (for file watcher).
450    ///
451    /// # Errors
452    ///
453    /// Returns an error if reading, chunking, or embedding fails.
454    #[tracing::instrument(name = "index.indexer.reindex_file", skip_all)]
455    pub async fn reindex_file(&self, root: &Path, abs_path: &Path) -> Result<usize> {
456        tracing::Span::current().record("file_path", tracing::field::display(abs_path.display()));
457        let rel_path = abs_path
458            .strip_prefix(root)
459            .unwrap_or(abs_path)
460            .to_string_lossy()
461            .to_string();
462
463        self.store.remove_file_chunks(&rel_path).await?;
464        let worker = FileIndexWorker {
465            store: self.store.clone(),
466            provider: Arc::clone(&self.provider),
467            config: self.config.clone(),
468            spawner: self.spawner.clone(),
469        };
470        let (created, _) = worker.index_file(abs_path, &rel_path).await?;
471        Ok(created)
472    }
473}
474
475/// Per-file indexing worker — cloneable and `Send` so it can run inside `buffer_unordered`.
476struct FileIndexWorker {
477    store: CodeStore,
478    provider: Arc<AnyProvider>,
479    config: IndexerConfig,
480    spawner: Option<Arc<dyn BlockingSpawner>>,
481}
482
483impl FileIndexWorker {
484    /// Embed and upsert all new chunks from a single file.
485    ///
486    /// New chunks (those not already in the store) are accumulated, embedded in order, and
487    /// upserted in a single batch call to minimise round-trips to `Qdrant` and `SQLite`.
488    #[tracing::instrument(name = "index.indexer.index_file", skip_all)]
489    async fn index_file(&self, abs_path: &Path, rel_path: &str) -> Result<(usize, usize)> {
490        tracing::Span::current().record("file_path", rel_path);
491        let metadata = tokio::fs::metadata(abs_path).await?;
492        if metadata.len() > self.config.max_file_bytes as u64 {
493            tracing::debug!(
494                file = %abs_path.display(),
495                size = metadata.len(),
496                "skipping oversized file"
497            );
498            return Ok((0, 0));
499        }
500        let source = tokio::fs::read_to_string(abs_path).await?;
501        let lang = detect_language(abs_path).ok_or(IndexError::UnsupportedLanguage)?;
502
503        let rel_path_owned = rel_path.to_owned();
504        let chunker_config = self.config.chunker.clone();
505        let chunks = if let Some(ref spawner) = self.spawner {
506            // Route through the supervised spawner so the task appears in registry.
507            // BlockingSpawner::spawn_blocking_named is object-safe (returns JoinHandle<()>),
508            // so we communicate the typed result via a oneshot channel.
509            //
510            // Each spawn gets a unique name to prevent the supervisor's "abort if same
511            // name already exists" logic from silently aborting concurrent in-flight tasks
512            // when embed_concurrency > 1.
513            let task_id = CHUNK_TASK_COUNTER.fetch_add(1, Ordering::Relaxed);
514            let task_name: std::sync::Arc<str> =
515                std::sync::Arc::from(format!("chunk_file_{task_id}").as_str());
516            let (result_tx, result_rx) = tokio::sync::oneshot::channel();
517            let _join = spawner.spawn_blocking_named(
518                task_name,
519                Box::new(move || {
520                    let result = chunk_file(&source, &rel_path_owned, lang, &chunker_config);
521                    let _ = result_tx.send(result);
522                }),
523            );
524            result_rx
525                .await
526                .map_err(|_| IndexError::Other("chunk_file task dropped result".to_owned()))??
527        } else {
528            tokio::task::spawn_blocking(move || {
529                chunk_file(&source, &rel_path_owned, lang, &chunker_config)
530            })
531            .await
532            .map_err(|e| IndexError::Other(format!("chunk_file panicked: {e}")))??
533        };
534
535        // Batch-check which hashes already exist to avoid N individual queries.
536        let all_hashes: Vec<&str> = chunks.iter().map(|c| c.content_hash.as_str()).collect();
537        let existing = self.store.existing_hashes(&all_hashes).await?;
538
539        let mut new_chunks: Vec<CodeChunk> = Vec::new();
540        let mut skipped = 0usize;
541
542        for chunk in chunks {
543            if existing.contains(&chunk.content_hash) {
544                skipped += 1;
545            } else {
546                new_chunks.push(chunk);
547            }
548        }
549
550        if new_chunks.is_empty() {
551            return Ok((0, skipped));
552        }
553
554        // Embed all new chunks in a single batch call, then zip with inserts.
555        let embedding_texts: Vec<String> =
556            new_chunks.iter().map(contextualize_for_embedding).collect();
557        let text_refs: Vec<&str> = embedding_texts.iter().map(String::as_str).collect();
558        let vectors = self.provider.embed_batch(&text_refs).await?;
559
560        let batch: Vec<(ChunkInsert<'_>, Vec<f32>)> = new_chunks
561            .iter()
562            .zip(vectors)
563            .map(|(chunk, vector)| (chunk_to_insert(chunk), vector))
564            .collect();
565
566        let created = match tokio::time::timeout(
567            Duration::from_secs(30),
568            self.store.upsert_chunks_batch(batch),
569        )
570        .await
571        {
572            Ok(Ok(inserted)) => inserted.len(),
573            Ok(Err(e)) => {
574                tracing::warn!("upsert_chunks_batch failed, skipping batch: {e}");
575                0
576            }
577            Err(_elapsed) => {
578                tracing::warn!(
579                    "upsert_chunks_batch timed out after 30s, skipping batch of {} chunks",
580                    new_chunks.len()
581                );
582                0
583            }
584        };
585
586        if created > 0 {
587            tracing::debug!("{rel_path}: {created} chunks indexed, {skipped} unchanged");
588        }
589
590        Ok((created, skipped))
591    }
592}
593
594fn make_file_pairs(batch: &[ignore::DirEntry], root: &Path) -> Vec<(String, std::path::PathBuf)> {
595    batch
596        .iter()
597        .map(|entry| {
598            let rel = entry
599                .path()
600                .strip_prefix(root)
601                .unwrap_or(entry.path())
602                .to_string_lossy()
603                .to_string();
604            let abs = entry.path().to_path_buf();
605            (rel, abs)
606        })
607        .collect()
608}
609
610fn chunk_to_insert(chunk: &CodeChunk) -> ChunkInsert<'_> {
611    ChunkInsert {
612        file_path: &chunk.file_path,
613        language: chunk.language.id(),
614        node_type: &chunk.node_type,
615        entity_name: chunk.entity_name.as_deref(),
616        line_start: chunk.line_range.0,
617        line_end: chunk.line_range.1,
618        code: &chunk.code,
619        scope_chain: &chunk.scope_chain,
620        content_hash: &chunk.content_hash,
621    }
622}
623
624/// RAII guard that resets the re-entrancy flag when dropped.
625struct IndexingGuard(Arc<AtomicBool>);
626
627impl Drop for IndexingGuard {
628    fn drop(&mut self) {
629        self.0.store(false, Ordering::Release);
630    }
631}
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636
637    #[test]
638    fn index_progress_default() {
639        let p = IndexProgress::default();
640        assert_eq!(p.files_done, 0);
641        assert_eq!(p.files_total, 0);
642        assert_eq!(p.chunks_created, 0);
643    }
644
645    #[test]
646    fn progress_send_no_receivers_is_ignored() {
647        let (tx, rx) = tokio::sync::watch::channel(IndexProgress::default());
648        drop(rx);
649        // send with no receivers must not panic
650        let _ = tx.send(IndexProgress {
651            files_done: 1,
652            files_total: 5,
653            chunks_created: 3,
654        });
655    }
656
657    #[test]
658    fn progress_send_multiple_times_accumulates() {
659        let (tx, rx) = tokio::sync::watch::channel(IndexProgress::default());
660        for i in 1..=3usize {
661            let _ = tx.send(IndexProgress {
662                files_done: i,
663                files_total: 3,
664                chunks_created: i * 2,
665            });
666        }
667        let p = rx.borrow();
668        assert_eq!(p.files_done, 3);
669        assert_eq!(p.files_total, 3);
670        assert_eq!(p.chunks_created, 6);
671    }
672
673    #[test]
674    fn progress_none_tx_skips_send() {
675        // When progress_tx is None the loop body must not panic — verified by
676        // constructing the same conditional used in index_project.
677        let progress_tx: Option<&tokio::sync::watch::Sender<IndexProgress>> = None;
678        let entries = [1usize, 2, 3];
679        for (i, _) in entries.iter().enumerate() {
680            if let Some(tx) = progress_tx {
681                let _ = tx.send(IndexProgress {
682                    files_done: i + 1,
683                    files_total: entries.len(),
684                    chunks_created: 0,
685                });
686            }
687        }
688        // reaching here means no panic when tx is None
689    }
690
691    #[test]
692    fn chunk_to_insert_maps_fields() {
693        let chunk = CodeChunk {
694            code: "fn test() {}".to_string(),
695            file_path: "src/lib.rs".to_string(),
696            language: crate::languages::Lang::Rust,
697            node_type: "function_item".to_string(),
698            entity_name: Some("test".to_string()),
699            line_range: (1, 3),
700            scope_chain: "Foo".to_string(),
701            imports: String::new(),
702            content_hash: "abc".to_string(),
703        };
704
705        let insert = chunk_to_insert(&chunk);
706        assert_eq!(insert.file_path, "src/lib.rs");
707        assert_eq!(insert.language, "rust");
708        assert_eq!(insert.entity_name, Some("test"));
709        assert_eq!(insert.line_start, 1);
710        assert_eq!(insert.line_end, 3);
711    }
712
713    #[test]
714    fn default_config() {
715        let config = IndexerConfig::default();
716        assert_eq!(config.chunker.target_size, 600);
717        assert_eq!(config.concurrency, 2);
718        assert_eq!(config.batch_size, 16);
719        assert_eq!(config.embed_concurrency, 1);
720    }
721
722    #[test]
723    fn indexer_config_custom_concurrency_and_batch_size() {
724        let config = IndexerConfig {
725            concurrency: 8,
726            batch_size: 64,
727            ..IndexerConfig::default()
728        };
729        assert_eq!(config.concurrency, 8);
730        assert_eq!(config.batch_size, 64);
731    }
732
733    #[test]
734    fn index_report_defaults() {
735        let report = IndexReport::default();
736        assert_eq!(report.files_scanned, 0);
737        assert!(report.errors.is_empty());
738    }
739
740    /// Verify that `chunk_file` runs inside `spawn_blocking` and that the dedup path
741    /// (all hashes already in `SQLite`) reaches `Ok((0, N))` without touching Qdrant.
742    ///
743    /// Two assertions:
744    /// 1. First `index_file` call with pre-seeded hashes → `(0, N)` (all skipped).
745    /// 2. Second identical call → same `(0, N)` (dedup is idempotent).
746    ///
747    /// The test does not require a live Qdrant instance because `upsert_chunks_batch`
748    /// returns early when `new_chunks` is empty.
749    #[tokio::test]
750    async fn index_file_spawn_blocking_dedup_path() {
751        use std::sync::Arc;
752        use tempfile::TempDir;
753        use zeph_llm::any::AnyProvider;
754        use zeph_llm::mock::MockProvider;
755        use zeph_memory::QdrantOps;
756
757        let dir = TempDir::new().unwrap();
758        let rs_path = dir.path().join("sample.rs");
759        std::fs::write(
760            &rs_path,
761            "pub fn hello() -> &'static str { \"hello\" }\n\
762             pub fn world() -> &'static str { \"world\" }\n",
763        )
764        .unwrap();
765
766        let pool = zeph_db::DbConfig {
767            url: ":memory:".to_string(),
768            ..Default::default()
769        }
770        .connect()
771        .await
772        .unwrap();
773
774        // Pre-seed the chunk hashes into SQLite so `existing_hashes` returns them all
775        // and `new_chunks` is empty — Qdrant upsert is never called.
776        let source = std::fs::read_to_string(&rs_path).unwrap();
777        let lang = crate::languages::detect_language(&rs_path).unwrap();
778        let chunks =
779            crate::chunker::chunk_file(&source, "sample.rs", lang, &ChunkerConfig::default())
780                .unwrap();
781        let chunk_count = chunks.len();
782        assert!(chunk_count > 0, "test file must produce at least one chunk");
783
784        for (i, chunk) in chunks.iter().enumerate() {
785            zeph_db::query(zeph_db::sql!(
786                "INSERT INTO chunk_metadata \
787                 (qdrant_id, file_path, content_hash, line_start, line_end, language, node_type) \
788                 VALUES (?, ?, ?, ?, ?, ?, ?)"
789            ))
790            .bind(format!("q{i}"))
791            .bind("sample.rs")
792            .bind(&chunk.content_hash)
793            .bind(i64::try_from(chunk.line_range.0).unwrap_or(i64::MAX))
794            .bind(i64::try_from(chunk.line_range.1).unwrap_or(i64::MAX))
795            .bind("rust")
796            .bind("function_item")
797            .execute(&pool)
798            .await
799            .unwrap();
800        }
801
802        let ops = QdrantOps::new("http://127.0.0.1:1", None).unwrap();
803        let store = crate::store::CodeStore::with_ops(ops, pool);
804        let provider = Arc::new(AnyProvider::Mock(
805            MockProvider::default().with_embedding(vec![0.0_f32; 384]),
806        ));
807        let worker = FileIndexWorker {
808            store,
809            provider,
810            config: IndexerConfig::default(),
811            spawner: None,
812        };
813
814        // First call: all hashes exist → (0, chunk_count).
815        let (created, skipped) = worker.index_file(&rs_path, "sample.rs").await.unwrap();
816        assert_eq!(created, 0);
817        assert_eq!(skipped, chunk_count);
818
819        // Second call: same result — dedup is idempotent.
820        let (created2, skipped2) = worker.index_file(&rs_path, "sample.rs").await.unwrap();
821        assert_eq!(created2, 0);
822        assert_eq!(skipped2, chunk_count);
823    }
824
825    /// Verify that `index_file` works correctly when a `BlockingSpawner` is provided.
826    ///
827    /// Uses a minimal `MockBlockingSpawner` that delegates to `tokio::task::spawn_blocking`,
828    /// exercising the `spawner: Some(...)` branch in `FileIndexWorker::index_file`.
829    #[tokio::test]
830    async fn index_file_with_blocking_spawner() {
831        use std::sync::Arc;
832        use tempfile::TempDir;
833        use zeph_llm::any::AnyProvider;
834        use zeph_llm::mock::MockProvider;
835        use zeph_memory::QdrantOps;
836
837        struct MockBlockingSpawner;
838
839        impl BlockingSpawner for MockBlockingSpawner {
840            fn spawn_blocking_named(
841                &self,
842                _name: std::sync::Arc<str>,
843                f: Box<dyn FnOnce() + Send + 'static>,
844            ) -> tokio::task::JoinHandle<()> {
845                tokio::task::spawn_blocking(f)
846            }
847        }
848
849        let dir = TempDir::new().unwrap();
850        let rs_path = dir.path().join("sample.rs");
851        tokio::fs::write(&rs_path, b"fn hello() {}\n")
852            .await
853            .unwrap();
854
855        let pool = zeph_db::DbConfig {
856            url: ":memory:".to_string(),
857            ..Default::default()
858        }
859        .connect()
860        .await
861        .unwrap();
862
863        let ops = QdrantOps::new("http://127.0.0.1:1", None).unwrap();
864        let store = crate::store::CodeStore::with_ops(ops, pool);
865        let provider = Arc::new(AnyProvider::Mock(
866            MockProvider::default().with_embedding(vec![0.0_f32; 384]),
867        ));
868        let worker = FileIndexWorker {
869            store,
870            provider,
871            config: IndexerConfig::default(),
872            spawner: Some(Arc::new(MockBlockingSpawner)),
873        };
874
875        // With all hashes absent from SQLite the Qdrant upsert would be attempted, but
876        // our mock QdrantOps uses port 1 so it would fail. The test verifies that the
877        // spawner path is taken by confirming `chunk_file` runs (if it panicked or the
878        // oneshot was dropped, we'd get IndexError::Other, not IndexError::VectorStore).
879        let result = worker.index_file(&rs_path, "sample.rs").await;
880        // Qdrant is unavailable → we expect a VectorStore/Other error, NOT a panic.
881        // The important invariant is that we do NOT get "chunk_file task dropped result".
882        if let Err(ref e) = result {
883            let msg = e.to_string();
884            assert!(
885                !msg.contains("chunk_file task dropped result"),
886                "spawner path must not drop the result channel; got: {msg}"
887            );
888        }
889    }
890
891    /// Verify that the re-entrancy guard resets correctly after a normal run.
892    #[test]
893    fn indexing_guard_resets_flag_on_drop() {
894        let flag = Arc::new(AtomicBool::new(false));
895        {
896            // Simulate acquiring the guard.
897            flag.store(true, Ordering::Relaxed);
898            let _guard = IndexingGuard(Arc::clone(&flag));
899            assert!(flag.load(Ordering::Relaxed));
900        }
901        // Guard dropped — flag must be false.
902        assert!(!flag.load(Ordering::Relaxed));
903    }
904
905    /// Verify that `ensure_collection_for_provider` returns `IndexError::EmbedTimeout`
906    /// when the embedding provider exceeds the 15-second startup timeout.
907    ///
908    /// Uses `tokio::time::pause` + `advance` to avoid a real 15-second wall-clock wait.
909    /// DB is initialised before pausing time to avoid `SQLite` pool timeout under paused clock.
910    /// Must run serially — `tokio::time::pause` is process-global and breaks parallel tests.
911    #[serial_test::serial]
912    #[tokio::test]
913    async fn ensure_collection_timeout_returns_embed_timeout_error() {
914        use std::sync::Arc;
915        use zeph_llm::any::AnyProvider;
916        use zeph_llm::mock::MockProvider;
917        use zeph_memory::QdrantOps;
918
919        let pool = zeph_db::DbConfig {
920            url: ":memory:".to_string(),
921            ..Default::default()
922        }
923        .connect()
924        .await
925        .unwrap();
926
927        // Pause time only after DB initialisation to avoid SQLite PoolTimedOut.
928        tokio::time::pause();
929
930        // embed_delay_ms must exceed the 15 s timeout; we pair it with time::advance
931        // so the test completes instantly in wall-clock time.
932        let slow_provider = Arc::new(AnyProvider::Mock(
933            MockProvider::default()
934                .with_embed_delay(20_000) // 20 s > 15 s timeout
935                .with_embedding(vec![0.0_f32; 384]),
936        ));
937
938        let ops = QdrantOps::new("http://127.0.0.1:1", None).unwrap();
939        let store = crate::store::CodeStore::with_ops(ops, pool);
940        let indexer = CodeIndexer::new(store, slow_provider, IndexerConfig::default());
941
942        // Spawn the operation so we can advance mock time from the test body.
943        let handle = tokio::spawn(async move { indexer.ensure_collection_for_provider().await });
944        tokio::time::advance(std::time::Duration::from_secs(16)).await;
945        let result = handle.await.unwrap();
946
947        match result {
948            Err(crate::error::IndexError::EmbedTimeout(secs)) => {
949                assert_eq!(secs, 15, "timeout value must be the configured 15 s");
950            }
951            other => panic!("expected IndexError::EmbedTimeout, got: {other:?}"),
952        }
953    }
954
955    /// Verify that `compare_exchange` rejects a second caller while the flag is set.
956    #[test]
957    fn indexing_guard_compare_exchange_skips_concurrent() {
958        let flag = Arc::new(AtomicBool::new(false));
959
960        // First caller acquires.
961        assert!(
962            flag.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
963                .is_ok(),
964            "first caller should succeed"
965        );
966        // Second caller must be rejected.
967        assert!(
968            flag.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
969                .is_err(),
970            "second caller should be rejected while flag is true"
971        );
972
973        // Reset.
974        flag.store(false, Ordering::Release);
975
976        // Third caller can acquire again.
977        assert!(
978            flag.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
979                .is_ok(),
980            "third caller should succeed after reset"
981        );
982    }
983}