1use crate::{
8 cache::{FileIndexCache, FileIndexKey},
9 error::{EngineError, Result},
10 query_time_range::QueryTimeRange,
11};
12use journal_index::{FileIndex, FileIndexer, IndexingLimits, Seconds};
13use journal_registry::Registry;
14use std::sync::Arc;
15use std::sync::atomic::AtomicUsize;
16use tokio_util::sync::CancellationToken;
17use tracing::{error, trace};
18
19const MAX_BATCH_INDEX_THREADS: usize = 4;
20
21pub struct FileIndexCacheBuilder {
27 cache_path: Option<std::path::PathBuf>,
28 memory_capacity: Option<usize>,
29 disk_capacity: Option<usize>,
30 block_size: Option<usize>,
31 enable_disk_cache: bool,
32}
33
34impl FileIndexCacheBuilder {
35 pub fn new() -> Self {
43 Self {
44 cache_path: None,
45 memory_capacity: None,
46 disk_capacity: None,
47 block_size: None,
48 enable_disk_cache: true,
49 }
50 }
51
52 pub fn with_cache_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
54 self.cache_path = Some(path.into());
55 self
56 }
57
58 pub fn with_memory_capacity(mut self, capacity: usize) -> Self {
60 self.memory_capacity = Some(capacity);
61 self
62 }
63
64 pub fn with_disk_capacity(mut self, capacity: usize) -> Self {
66 self.disk_capacity = Some(capacity);
67 self
68 }
69
70 pub fn with_block_size(mut self, size: usize) -> Self {
72 self.block_size = Some(size);
73 self
74 }
75
76 pub fn without_disk_cache(mut self) -> Self {
78 self.enable_disk_cache = false;
79 self
80 }
81
82 pub async fn build(self) -> Result<FileIndexCache> {
84 use foyer::HybridCacheBuilder;
85
86 let memory_capacity = self.memory_capacity.unwrap_or(128);
87 let memory = HybridCacheBuilder::new()
88 .with_name("file-index-cache")
89 .with_policy(foyer::HybridCachePolicy::WriteOnInsertion)
90 .memory(memory_capacity)
91 .with_shards(4);
92
93 if !self.enable_disk_cache {
94 return memory.storage().build().await.map_err(Into::into);
95 }
96
97 use foyer::{
98 BlockEngineBuilder, DeviceBuilder, FsDeviceBuilder, IoEngineBuilder,
99 PsyncIoEngineBuilder,
100 };
101
102 let cache_path = self
103 .cache_path
104 .unwrap_or_else(|| std::env::temp_dir().join("journal-engine-cache"));
106 let disk_capacity = self.disk_capacity.unwrap_or(16 * 1024 * 1024);
107 let block_size = self.block_size.unwrap_or(4 * 1024 * 1024);
108
109 std::fs::create_dir_all(&cache_path).map_err(|e| {
110 EngineError::Io(std::io::Error::other(format!(
111 "Failed to create cache directory: {}",
112 e
113 )))
114 })?;
115
116 let cache = memory
117 .storage()
118 .with_io_engine(PsyncIoEngineBuilder::new().build().await?)
119 .with_engine_config(
120 BlockEngineBuilder::new(
121 FsDeviceBuilder::new(&cache_path)
122 .with_capacity(disk_capacity)
123 .build()?,
124 )
125 .with_block_size(block_size),
126 )
127 .build()
128 .await?;
129
130 Ok(cache)
131 }
132}
133
134impl Default for FileIndexCacheBuilder {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use tempfile::tempdir;
144
145 #[tokio::test(flavor = "current_thread")]
146 async fn build_without_disk_cache_does_not_create_disk_cache_files() {
147 let tmp = tempdir().expect("tempdir");
148 let cache_path = tmp.path().join("foyer-cache");
149 let cache = FileIndexCacheBuilder::new()
150 .with_cache_path(&cache_path)
151 .with_memory_capacity(4)
152 .without_disk_cache()
153 .build()
154 .await
155 .expect("build in-memory file index cache");
156
157 cache
158 .close()
159 .await
160 .expect("close in-memory file index cache");
161 assert!(
162 !cache_path.exists(),
163 "expected memory-only file index cache to avoid creating {}",
164 cache_path.display()
165 );
166 }
167}
168
169pub async fn batch_compute_file_indexes(
195 cache: &FileIndexCache,
196 registry: &Registry,
197 keys: Vec<FileIndexKey>,
198 time_range: &QueryTimeRange,
199 cancellation: CancellationToken,
200 indexing_limits: IndexingLimits,
201 progress_counter: Option<Arc<AtomicUsize>>,
202) -> Result<Vec<(FileIndexKey, FileIndex)>> {
203 let bucket_duration = time_range.bucket_duration_seconds();
204 let cache_lookup_results = lookup_cached_indexes(cache, &keys, &cancellation).await?;
205 let CachePartition {
206 mut responses,
207 keys_to_compute,
208 stats,
209 } = partition_cache_results(cache_lookup_results, keys.len(), bucket_duration);
210
211 if cancellation.is_cancelled() {
212 return Err(EngineError::Cancelled);
213 }
214
215 trace!(
216 "phase 2 summary: hits={}, misses={}, stale={}, incompatible_bucket={}",
217 stats.cache_hits, stats.cache_misses, stats.stale_entries, stats.incompatible_bucket
218 );
219
220 let computed_results = compute_missing_indexes(
221 keys_to_compute,
222 bucket_duration,
223 cancellation.clone(),
224 indexing_limits,
225 progress_counter,
226 )
227 .await?;
228
229 store_computed_indexes(registry, cache, &mut responses, computed_results);
230 Ok(responses)
231}
232
233async fn lookup_cached_indexes(
234 cache: &FileIndexCache,
235 keys: &[FileIndexKey],
236 cancellation: &CancellationToken,
237) -> Result<Vec<(FileIndexKey, Result<Option<FileIndex>>)>> {
238 let cache_lookup_futures = keys.iter().map(|key| {
239 let key_clone = key.clone();
240 async move {
241 let cached = cache
242 .get(&key_clone)
243 .await
244 .map(|entry| entry.map(|e| e.value().clone()))
245 .map_err(|e| e.into());
246 (key_clone, cached)
247 }
248 });
249
250 tokio::select! {
251 results = futures::future::join_all(cache_lookup_futures) => Ok(results),
252 _ = cancellation.cancelled() => Err(EngineError::Cancelled),
253 }
254}
255
256#[derive(Default)]
257struct CacheStats {
258 cache_hits: usize,
259 cache_misses: usize,
260 stale_entries: usize,
261 incompatible_bucket: usize,
262}
263
264struct CachePartition {
265 responses: Vec<(FileIndexKey, FileIndex)>,
266 keys_to_compute: Vec<FileIndexKey>,
267 stats: CacheStats,
268}
269
270fn partition_cache_results(
271 cache_lookup_results: Vec<(FileIndexKey, Result<Option<FileIndex>>)>,
272 key_count: usize,
273 bucket_duration: Seconds,
274) -> CachePartition {
275 let mut partition = CachePartition {
276 responses: Vec::with_capacity(key_count),
277 keys_to_compute: Vec::new(),
278 stats: CacheStats::default(),
279 };
280
281 for (key, cache_lookup_result) in cache_lookup_results {
282 partition_cache_result(key, cache_lookup_result, bucket_duration, &mut partition);
283 }
284
285 partition
286}
287
288fn partition_cache_result(
289 key: FileIndexKey,
290 cache_lookup_result: Result<Option<FileIndex>>,
291 bucket_duration: Seconds,
292 partition: &mut CachePartition,
293) {
294 match cache_lookup_result {
295 Ok(Some(file_index)) => partition_cached_index(key, file_index, bucket_duration, partition),
296 Ok(None) => {
297 partition.stats.cache_misses += 1;
298 partition.keys_to_compute.push(key);
299 }
300 Err(e) => {
301 error!("cached file index lookup error {}", e);
302 }
303 }
304}
305
306fn partition_cached_index(
307 key: FileIndexKey,
308 file_index: FileIndex,
309 bucket_duration: Seconds,
310 partition: &mut CachePartition,
311) {
312 let fresh = file_index.is_fresh();
313 let bucket_ok = compatible_bucket_duration(&file_index, bucket_duration);
314
315 if fresh && bucket_ok {
316 partition.stats.cache_hits += 1;
317 partition.responses.push((key, file_index));
318 return;
319 }
320
321 if !fresh {
322 partition.stats.stale_entries += 1;
323 }
324 if !bucket_ok {
325 partition.stats.incompatible_bucket += 1;
326 }
327 partition.keys_to_compute.push(key);
328}
329
330fn compatible_bucket_duration(file_index: &FileIndex, bucket_duration: Seconds) -> bool {
331 file_index.bucket_duration() <= bucket_duration
332 && bucket_duration.is_multiple_of(file_index.bucket_duration())
333}
334
335async fn compute_missing_indexes(
336 keys_to_compute: Vec<FileIndexKey>,
337 bucket_duration: Seconds,
338 cancellation: CancellationToken,
339 indexing_limits: IndexingLimits,
340 progress_counter: Option<Arc<AtomicUsize>>,
341) -> Result<Vec<(FileIndexKey, Result<FileIndex>)>> {
342 let compute_threads = compute_thread_count(keys_to_compute.len());
343 let cancellation_for_select = cancellation.clone();
344 let compute_task = tokio::task::spawn_blocking(move || {
345 compute_missing_indexes_blocking(
346 keys_to_compute,
347 bucket_duration,
348 cancellation,
349 indexing_limits,
350 progress_counter,
351 compute_threads,
352 )
353 });
354
355 tokio::select! {
356 result = compute_task => match result {
357 Ok(result) => result,
358 Err(e) => Err(EngineError::Io(std::io::Error::other(format!(
359 "Blocking task panicked: {}",
360 e
361 )))),
362 },
363 _ = cancellation_for_select.cancelled() => Err(EngineError::Cancelled),
364 }
365}
366
367fn compute_thread_count(key_count: usize) -> usize {
368 key_count.max(1).min(
369 std::thread::available_parallelism()
370 .map(|value| value.get())
371 .unwrap_or(1)
372 .min(MAX_BATCH_INDEX_THREADS),
373 )
374}
375
376fn compute_missing_indexes_blocking(
377 keys_to_compute: Vec<FileIndexKey>,
378 bucket_duration: Seconds,
379 cancellation: CancellationToken,
380 indexing_limits: IndexingLimits,
381 progress_counter: Option<Arc<AtomicUsize>>,
382 compute_threads: usize,
383) -> Result<Vec<(FileIndexKey, Result<FileIndex>)>> {
384 use rayon::prelude::*;
385 use std::sync::Arc;
386 use std::sync::atomic::AtomicBool;
387
388 let cancelled = Arc::new(AtomicBool::new(false));
389 let thread_pool = build_index_thread_pool(compute_threads)?;
390
391 Ok(thread_pool.install(|| {
392 keys_to_compute
393 .into_par_iter()
394 .map(|key| {
395 compute_one_index(
396 key,
397 bucket_duration,
398 &cancellation,
399 indexing_limits,
400 progress_counter.as_ref(),
401 &cancelled,
402 )
403 })
404 .collect::<Vec<(FileIndexKey, Result<FileIndex>)>>()
405 }))
406}
407
408fn build_index_thread_pool(compute_threads: usize) -> Result<rayon::ThreadPool> {
409 rayon::ThreadPoolBuilder::new()
413 .num_threads(compute_threads)
414 .build()
415 .map_err(|err| {
416 EngineError::Io(std::io::Error::other(format!(
417 "failed to build rayon index pool: {}",
418 err
419 )))
420 })
421}
422
423fn compute_one_index(
424 key: FileIndexKey,
425 bucket_duration: Seconds,
426 cancellation: &CancellationToken,
427 indexing_limits: IndexingLimits,
428 progress_counter: Option<&Arc<AtomicUsize>>,
429 cancelled: &std::sync::atomic::AtomicBool,
430) -> (FileIndexKey, Result<FileIndex>) {
431 if cancellation.is_cancelled() || cancelled.load(std::sync::atomic::Ordering::Relaxed) {
432 cancelled.store(true, std::sync::atomic::Ordering::Relaxed);
433 return (key, Err(EngineError::Cancelled));
434 }
435
436 let result = index_one_file(&key, bucket_duration, indexing_limits);
437 if result.is_ok()
438 && let Some(counter) = progress_counter
439 {
440 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
441 }
442
443 (key, result)
444}
445
446fn index_one_file(
447 key: &FileIndexKey,
448 bucket_duration: Seconds,
449 indexing_limits: IndexingLimits,
450) -> Result<FileIndex> {
451 FileIndexer::new(indexing_limits)
452 .index(
453 &key.file,
454 key.source_timestamp_field.as_ref(),
455 key.facets.as_slice(),
456 bucket_duration,
457 )
458 .map_err(|e| e.into())
459}
460
461fn store_computed_indexes(
462 registry: &Registry,
463 cache: &FileIndexCache,
464 responses: &mut Vec<(FileIndexKey, FileIndex)>,
465 computed_results: Vec<(FileIndexKey, Result<FileIndex>)>,
466) {
467 for (key, response) in computed_results {
468 match response {
469 Ok(index) => {
470 update_registry_time_range(registry, &key, &index);
471 cache.insert(key.clone(), index.clone());
472 responses.push((key, index));
473 }
474 Err(e) => {
475 error!(
476 "file index computation failed for file={}: {}",
477 key.file.path(),
478 e
479 );
480 }
481 }
482 }
483}
484
485fn update_registry_time_range(registry: &Registry, key: &FileIndexKey, index: &FileIndex) {
486 registry.update_time_range(
487 &key.file,
488 index.start_time(),
489 index.end_time(),
490 index.indexed_at(),
491 index.online(),
492 );
493}