Skip to main content

journal_engine/
indexing.rs

1//! Journal file indexing infrastructure.
2//!
3//! This module provides infrastructure for indexing journal files:
4//! - Batch parallel indexing with time budget enforcement
5//! - Cache builder for file indexes
6
7use crate::{
8    cache::{FileIndexCache, FileIndexKey},
9    error::{EngineError, Result},
10    query_time_range::QueryTimeRange,
11};
12use journal_index::{FileIndex, FileIndexer, IndexingLimits, Seconds};
13use journal_registry::Registry;
14use std::sync::Arc;
15use std::sync::atomic::AtomicUsize;
16use tokio_util::sync::CancellationToken;
17use tracing::{error, trace};
18
19const MAX_BATCH_INDEX_THREADS: usize = 4;
20
21// ============================================================================
22// File Index Cache Builder
23// ============================================================================
24
25/// Builder for constructing a FileIndexCache with custom configuration.
26pub struct FileIndexCacheBuilder {
27    cache_path: Option<std::path::PathBuf>,
28    memory_capacity: Option<usize>,
29    disk_capacity: Option<usize>,
30    block_size: Option<usize>,
31    enable_disk_cache: bool,
32}
33
34impl FileIndexCacheBuilder {
35    /// Creates a new builder with no configuration.
36    ///
37    /// All options use defaults if not explicitly set:
38    /// - Cache path: temp directory + "journal-engine-cache"
39    /// - Memory capacity: 128 entries
40    /// - Disk capacity: 16 MB
41    /// - Block size: 4 MB
42    pub fn new() -> Self {
43        Self {
44            cache_path: None,
45            memory_capacity: None,
46            disk_capacity: None,
47            block_size: None,
48            enable_disk_cache: true,
49        }
50    }
51
52    /// Sets the cache directory path.
53    pub fn with_cache_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
54        self.cache_path = Some(path.into());
55        self
56    }
57
58    /// Sets the memory capacity (number of items to keep in memory).
59    pub fn with_memory_capacity(mut self, capacity: usize) -> Self {
60        self.memory_capacity = Some(capacity);
61        self
62    }
63
64    /// Sets the disk capacity in bytes.
65    pub fn with_disk_capacity(mut self, capacity: usize) -> Self {
66        self.disk_capacity = Some(capacity);
67        self
68    }
69
70    /// Sets the block size in bytes.
71    pub fn with_block_size(mut self, size: usize) -> Self {
72        self.block_size = Some(size);
73        self
74    }
75
76    /// Disables the disk-backed cache and keeps indexes in memory only.
77    pub fn without_disk_cache(mut self) -> Self {
78        self.enable_disk_cache = false;
79        self
80    }
81
82    /// Builds the FileIndexCache with the configured settings.
83    pub async fn build(self) -> Result<FileIndexCache> {
84        use foyer::HybridCacheBuilder;
85
86        let memory_capacity = self.memory_capacity.unwrap_or(128);
87        let memory = HybridCacheBuilder::new()
88            .with_name("file-index-cache")
89            .with_policy(foyer::HybridCachePolicy::WriteOnInsertion)
90            .memory(memory_capacity)
91            .with_shards(4);
92
93        if !self.enable_disk_cache {
94            return memory.storage().build().await.map_err(Into::into);
95        }
96
97        use foyer::{
98            BlockEngineBuilder, DeviceBuilder, FsDeviceBuilder, IoEngineBuilder,
99            PsyncIoEngineBuilder,
100        };
101
102        let cache_path = self
103            .cache_path
104            // nosemgrep: rust.lang.security.temp-dir.temp-dir -- caller-configurable non-sensitive disk cache default.
105            .unwrap_or_else(|| std::env::temp_dir().join("journal-engine-cache"));
106        let disk_capacity = self.disk_capacity.unwrap_or(16 * 1024 * 1024);
107        let block_size = self.block_size.unwrap_or(4 * 1024 * 1024);
108
109        std::fs::create_dir_all(&cache_path).map_err(|e| {
110            EngineError::Io(std::io::Error::other(format!(
111                "Failed to create cache directory: {}",
112                e
113            )))
114        })?;
115
116        let cache = memory
117            .storage()
118            .with_io_engine(PsyncIoEngineBuilder::new().build().await?)
119            .with_engine_config(
120                BlockEngineBuilder::new(
121                    FsDeviceBuilder::new(&cache_path)
122                        .with_capacity(disk_capacity)
123                        .build()?,
124                )
125                .with_block_size(block_size),
126            )
127            .build()
128            .await?;
129
130        Ok(cache)
131    }
132}
133
134impl Default for FileIndexCacheBuilder {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use tempfile::tempdir;
144
145    #[tokio::test(flavor = "current_thread")]
146    async fn build_without_disk_cache_does_not_create_disk_cache_files() {
147        let tmp = tempdir().expect("tempdir");
148        let cache_path = tmp.path().join("foyer-cache");
149        let cache = FileIndexCacheBuilder::new()
150            .with_cache_path(&cache_path)
151            .with_memory_capacity(4)
152            .without_disk_cache()
153            .build()
154            .await
155            .expect("build in-memory file index cache");
156
157        cache
158            .close()
159            .await
160            .expect("close in-memory file index cache");
161        assert!(
162            !cache_path.exists(),
163            "expected memory-only file index cache to avoid creating {}",
164            cache_path.display()
165        );
166    }
167}
168
169// ============================================================================
170// Batch Processing
171// ============================================================================
172
173/// Batch computes file indexes in parallel using rayon, with cache checking and time budget enforcement.
174///
175/// This function:
176/// 1. Checks cache for all keys upfront
177/// 2. Identifies cache misses
178/// 3. Uses tokio::task to compute missing indexes in parallel
179/// 4. Inserts newly computed indexes into cache
180/// 5. Returns all results (cached + newly computed)
181///
182/// # Arguments
183/// * `cache` - The file index cache
184/// * `registry` - Registry to update with file metadata
185/// * `keys` - Vector of (file, facets, source_timestamp_field) to fetch/compute indexes for
186/// * `time_range` - Query time range for bucket duration calculation
187/// * `cancellation` - Token to signal cancellation from the caller
188/// * `indexing_limits` - Configuration limits for indexing (cardinality, payload size)
189/// * `progress_counter` - Optional atomic counter incremented after each file is indexed
190///
191/// # Returns
192/// Vector of responses for each key. Successful responses contain the file index.
193/// If cancelled, returns Cancelled error.
194pub async fn batch_compute_file_indexes(
195    cache: &FileIndexCache,
196    registry: &Registry,
197    keys: Vec<FileIndexKey>,
198    time_range: &QueryTimeRange,
199    cancellation: CancellationToken,
200    indexing_limits: IndexingLimits,
201    progress_counter: Option<Arc<AtomicUsize>>,
202) -> Result<Vec<(FileIndexKey, FileIndex)>> {
203    let bucket_duration = time_range.bucket_duration_seconds();
204    let cache_lookup_results = lookup_cached_indexes(cache, &keys, &cancellation).await?;
205    let CachePartition {
206        mut responses,
207        keys_to_compute,
208        stats,
209    } = partition_cache_results(cache_lookup_results, keys.len(), bucket_duration);
210
211    if cancellation.is_cancelled() {
212        return Err(EngineError::Cancelled);
213    }
214
215    trace!(
216        "phase 2 summary: hits={}, misses={}, stale={}, incompatible_bucket={}",
217        stats.cache_hits, stats.cache_misses, stats.stale_entries, stats.incompatible_bucket
218    );
219
220    let computed_results = compute_missing_indexes(
221        keys_to_compute,
222        bucket_duration,
223        cancellation.clone(),
224        indexing_limits,
225        progress_counter,
226    )
227    .await?;
228
229    store_computed_indexes(registry, cache, &mut responses, computed_results);
230    Ok(responses)
231}
232
233async fn lookup_cached_indexes(
234    cache: &FileIndexCache,
235    keys: &[FileIndexKey],
236    cancellation: &CancellationToken,
237) -> Result<Vec<(FileIndexKey, Result<Option<FileIndex>>)>> {
238    let cache_lookup_futures = keys.iter().map(|key| {
239        let key_clone = key.clone();
240        async move {
241            let cached = cache
242                .get(&key_clone)
243                .await
244                .map(|entry| entry.map(|e| e.value().clone()))
245                .map_err(|e| e.into());
246            (key_clone, cached)
247        }
248    });
249
250    tokio::select! {
251        results = futures::future::join_all(cache_lookup_futures) => Ok(results),
252        _ = cancellation.cancelled() => Err(EngineError::Cancelled),
253    }
254}
255
256#[derive(Default)]
257struct CacheStats {
258    cache_hits: usize,
259    cache_misses: usize,
260    stale_entries: usize,
261    incompatible_bucket: usize,
262}
263
264struct CachePartition {
265    responses: Vec<(FileIndexKey, FileIndex)>,
266    keys_to_compute: Vec<FileIndexKey>,
267    stats: CacheStats,
268}
269
270fn partition_cache_results(
271    cache_lookup_results: Vec<(FileIndexKey, Result<Option<FileIndex>>)>,
272    key_count: usize,
273    bucket_duration: Seconds,
274) -> CachePartition {
275    let mut partition = CachePartition {
276        responses: Vec::with_capacity(key_count),
277        keys_to_compute: Vec::new(),
278        stats: CacheStats::default(),
279    };
280
281    for (key, cache_lookup_result) in cache_lookup_results {
282        partition_cache_result(key, cache_lookup_result, bucket_duration, &mut partition);
283    }
284
285    partition
286}
287
288fn partition_cache_result(
289    key: FileIndexKey,
290    cache_lookup_result: Result<Option<FileIndex>>,
291    bucket_duration: Seconds,
292    partition: &mut CachePartition,
293) {
294    match cache_lookup_result {
295        Ok(Some(file_index)) => partition_cached_index(key, file_index, bucket_duration, partition),
296        Ok(None) => {
297            partition.stats.cache_misses += 1;
298            partition.keys_to_compute.push(key);
299        }
300        Err(e) => {
301            error!("cached file index lookup error {}", e);
302        }
303    }
304}
305
306fn partition_cached_index(
307    key: FileIndexKey,
308    file_index: FileIndex,
309    bucket_duration: Seconds,
310    partition: &mut CachePartition,
311) {
312    let fresh = file_index.is_fresh();
313    let bucket_ok = compatible_bucket_duration(&file_index, bucket_duration);
314
315    if fresh && bucket_ok {
316        partition.stats.cache_hits += 1;
317        partition.responses.push((key, file_index));
318        return;
319    }
320
321    if !fresh {
322        partition.stats.stale_entries += 1;
323    }
324    if !bucket_ok {
325        partition.stats.incompatible_bucket += 1;
326    }
327    partition.keys_to_compute.push(key);
328}
329
330fn compatible_bucket_duration(file_index: &FileIndex, bucket_duration: Seconds) -> bool {
331    file_index.bucket_duration() <= bucket_duration
332        && bucket_duration.is_multiple_of(file_index.bucket_duration())
333}
334
335async fn compute_missing_indexes(
336    keys_to_compute: Vec<FileIndexKey>,
337    bucket_duration: Seconds,
338    cancellation: CancellationToken,
339    indexing_limits: IndexingLimits,
340    progress_counter: Option<Arc<AtomicUsize>>,
341) -> Result<Vec<(FileIndexKey, Result<FileIndex>)>> {
342    let compute_threads = compute_thread_count(keys_to_compute.len());
343    let cancellation_for_select = cancellation.clone();
344    let compute_task = tokio::task::spawn_blocking(move || {
345        compute_missing_indexes_blocking(
346            keys_to_compute,
347            bucket_duration,
348            cancellation,
349            indexing_limits,
350            progress_counter,
351            compute_threads,
352        )
353    });
354
355    tokio::select! {
356        result = compute_task => match result {
357            Ok(result) => result,
358            Err(e) => Err(EngineError::Io(std::io::Error::other(format!(
359                "Blocking task panicked: {}",
360                e
361            )))),
362        },
363        _ = cancellation_for_select.cancelled() => Err(EngineError::Cancelled),
364    }
365}
366
367fn compute_thread_count(key_count: usize) -> usize {
368    key_count.max(1).min(
369        std::thread::available_parallelism()
370            .map(|value| value.get())
371            .unwrap_or(1)
372            .min(MAX_BATCH_INDEX_THREADS),
373    )
374}
375
376fn compute_missing_indexes_blocking(
377    keys_to_compute: Vec<FileIndexKey>,
378    bucket_duration: Seconds,
379    cancellation: CancellationToken,
380    indexing_limits: IndexingLimits,
381    progress_counter: Option<Arc<AtomicUsize>>,
382    compute_threads: usize,
383) -> Result<Vec<(FileIndexKey, Result<FileIndex>)>> {
384    use rayon::prelude::*;
385    use std::sync::Arc;
386    use std::sync::atomic::AtomicBool;
387
388    let cancelled = Arc::new(AtomicBool::new(false));
389    let thread_pool = build_index_thread_pool(compute_threads)?;
390
391    Ok(thread_pool.install(|| {
392        keys_to_compute
393            .into_par_iter()
394            .map(|key| {
395                compute_one_index(
396                    key,
397                    bucket_duration,
398                    &cancellation,
399                    indexing_limits,
400                    progress_counter.as_ref(),
401                    &cancelled,
402                )
403            })
404            .collect::<Vec<(FileIndexKey, Result<FileIndex>)>>()
405    }))
406}
407
408fn build_index_thread_pool(compute_threads: usize) -> Result<rayon::ThreadPool> {
409    // Build a bounded local pool per call instead of using Rayon’s global pool.
410    // The global pool previously stayed alive after rebuild/indexing and kept a
411    // full worker set plus allocator arenas resident in the plugin process.
412    rayon::ThreadPoolBuilder::new()
413        .num_threads(compute_threads)
414        .build()
415        .map_err(|err| {
416            EngineError::Io(std::io::Error::other(format!(
417                "failed to build rayon index pool: {}",
418                err
419            )))
420        })
421}
422
423fn compute_one_index(
424    key: FileIndexKey,
425    bucket_duration: Seconds,
426    cancellation: &CancellationToken,
427    indexing_limits: IndexingLimits,
428    progress_counter: Option<&Arc<AtomicUsize>>,
429    cancelled: &std::sync::atomic::AtomicBool,
430) -> (FileIndexKey, Result<FileIndex>) {
431    if cancellation.is_cancelled() || cancelled.load(std::sync::atomic::Ordering::Relaxed) {
432        cancelled.store(true, std::sync::atomic::Ordering::Relaxed);
433        return (key, Err(EngineError::Cancelled));
434    }
435
436    let result = index_one_file(&key, bucket_duration, indexing_limits);
437    if result.is_ok()
438        && let Some(counter) = progress_counter
439    {
440        counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
441    }
442
443    (key, result)
444}
445
446fn index_one_file(
447    key: &FileIndexKey,
448    bucket_duration: Seconds,
449    indexing_limits: IndexingLimits,
450) -> Result<FileIndex> {
451    FileIndexer::new(indexing_limits)
452        .index(
453            &key.file,
454            key.source_timestamp_field.as_ref(),
455            key.facets.as_slice(),
456            bucket_duration,
457        )
458        .map_err(|e| e.into())
459}
460
461fn store_computed_indexes(
462    registry: &Registry,
463    cache: &FileIndexCache,
464    responses: &mut Vec<(FileIndexKey, FileIndex)>,
465    computed_results: Vec<(FileIndexKey, Result<FileIndex>)>,
466) {
467    for (key, response) in computed_results {
468        match response {
469            Ok(index) => {
470                update_registry_time_range(registry, &key, &index);
471                cache.insert(key.clone(), index.clone());
472                responses.push((key, index));
473            }
474            Err(e) => {
475                error!(
476                    "file index computation failed for file={}: {}",
477                    key.file.path(),
478                    e
479                );
480            }
481        }
482    }
483}
484
485fn update_registry_time_range(registry: &Registry, key: &FileIndexKey, index: &FileIndex) {
486    registry.update_time_range(
487        &key.file,
488        index.start_time(),
489        index.end_time(),
490        index.indexed_at(),
491        index.online(),
492    );
493}