casc_storage/index/
async_index.rs

1//! Async-first index operations for CASC storage
2//!
3//! This module provides fully async index operations with features like:
4//! - Parallel index loading and parsing
5//! - Concurrent lookups with read-through caching
6//! - Batch operations for efficient bulk processing
7//! - Background index updates without blocking reads
8//! - Streaming index parsing for large files
9
10use crate::error::{CascError, Result};
11use crate::types::{ArchiveLocation, EKey};
12use dashmap::DashMap;
13use futures::stream::{self, StreamExt};
14use std::collections::BTreeMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use tokio::fs::File;
18use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
19use tokio::sync::{RwLock, Semaphore};
20use tracing::{debug, info, trace, warn};
21
22/// Async index configuration
23#[derive(Debug, Clone)]
24pub struct AsyncIndexConfig {
25    /// Maximum concurrent file operations
26    pub max_concurrent_files: usize,
27    /// Buffer size for reading index files
28    pub buffer_size: usize,
29    /// Enable read-through caching
30    pub enable_caching: bool,
31    /// Maximum entries to cache in memory
32    pub max_cache_entries: usize,
33    /// Enable background index updates
34    pub enable_background_updates: bool,
35}
36
37impl Default for AsyncIndexConfig {
38    fn default() -> Self {
39        Self {
40            max_concurrent_files: 16,
41            buffer_size: 64 * 1024, // 64KB
42            enable_caching: true,
43            max_cache_entries: 100_000,
44            enable_background_updates: true,
45        }
46    }
47}
48
49/// Async index manager for parallel operations
50pub struct AsyncIndexManager {
51    /// Configuration
52    config: AsyncIndexConfig,
53    /// Per-bucket indices
54    bucket_indices: Arc<DashMap<u8, Arc<AsyncIndex>>>,
55    /// Global lookup cache
56    lookup_cache: Arc<DashMap<EKey, ArchiveLocation>>,
57    /// Semaphore for controlling concurrent operations
58    semaphore: Arc<Semaphore>,
59    /// Background update handle
60    update_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
61}
62
63impl AsyncIndexManager {
64    /// Create a new async index manager
65    pub fn new(config: AsyncIndexConfig) -> Self {
66        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_files));
67
68        Self {
69            config,
70            bucket_indices: Arc::new(DashMap::new()),
71            lookup_cache: Arc::new(DashMap::new()),
72            semaphore,
73            update_handle: Arc::new(RwLock::new(None)),
74        }
75    }
76
77    /// Load all indices from a directory in parallel
78    pub async fn load_directory(&self, path: &Path) -> Result<usize> {
79        info!("Loading indices from {:?} with async operations", path);
80
81        // Collect all index files
82        let index_files = self.discover_index_files(path).await?;
83
84        if index_files.is_empty() {
85            info!("No index files found in {:?}", path);
86            return Ok(0);
87        }
88
89        info!(
90            "Found {} index files, loading in parallel",
91            index_files.len()
92        );
93
94        // Load all files in parallel with controlled concurrency
95        let results = stream::iter(index_files)
96            .map(|path| self.load_single_index(path))
97            .buffer_unordered(self.config.max_concurrent_files)
98            .collect::<Vec<_>>()
99            .await;
100
101        // Count successful loads and log errors
102        let mut loaded = 0;
103        for result in results {
104            match result {
105                Ok(bucket) => {
106                    debug!("Successfully loaded index for bucket {:02x}", bucket);
107                    loaded += 1;
108                }
109                Err(e) => {
110                    warn!("Failed to load index: {}", e);
111                }
112            }
113        }
114
115        info!("Successfully loaded {} indices", loaded);
116        Ok(loaded)
117    }
118
119    /// Discover all index files in a directory
120    async fn discover_index_files(&self, path: &Path) -> Result<Vec<PathBuf>> {
121        let mut index_files = Vec::new();
122
123        // Check both .idx and .index files
124        let mut entries = tokio::fs::read_dir(path).await?;
125
126        while let Some(entry) = entries.next_entry().await? {
127            let path = entry.path();
128            if let Some(ext) = path.extension() {
129                if ext == "idx" || ext == "index" {
130                    index_files.push(path);
131                }
132            }
133        }
134
135        // Also check subdirectories like data/ and indices/
136        for subdir in &["data", "indices"] {
137            let subpath = path.join(subdir);
138            if subpath.exists() {
139                if let Ok(mut entries) = tokio::fs::read_dir(&subpath).await {
140                    while let Some(entry) = entries.next_entry().await? {
141                        let path = entry.path();
142                        if let Some(ext) = path.extension() {
143                            if ext == "idx" || ext == "index" {
144                                index_files.push(path);
145                            }
146                        }
147                    }
148                }
149            }
150        }
151
152        Ok(index_files)
153    }
154
155    /// Load a single index file
156    async fn load_single_index(&self, path: PathBuf) -> Result<u8> {
157        let _permit = self.semaphore.acquire().await.unwrap();
158
159        debug!("Loading index from {:?}", path);
160
161        let index = if path.extension().and_then(|s| s.to_str()) == Some("idx") {
162            AsyncIndex::load_idx(&path).await?
163        } else {
164            AsyncIndex::load_index(&path).await?
165        };
166
167        let bucket = index.bucket();
168        self.bucket_indices.insert(bucket, Arc::new(index));
169
170        Ok(bucket)
171    }
172
173    /// Perform an async lookup
174    pub async fn lookup(&self, ekey: &EKey) -> Option<ArchiveLocation> {
175        // Check cache first
176        if self.config.enable_caching {
177            if let Some(location) = self.lookup_cache.get(ekey) {
178                trace!("Cache hit for {}", ekey);
179                return Some(*location);
180            }
181        }
182
183        // Check the appropriate bucket
184        let bucket = ekey.bucket_index();
185
186        if let Some(index) = self.bucket_indices.get(&bucket) {
187            if let Some(location) = index.lookup(ekey).await {
188                // Update cache
189                if self.config.enable_caching {
190                    self.update_cache(*ekey, location);
191                }
192                return Some(location);
193            }
194        }
195
196        // Fallback: search all buckets (rare)
197        for entry in self.bucket_indices.iter() {
198            if let Some(location) = entry.value().lookup(ekey).await {
199                // Update cache with correct bucket
200                if self.config.enable_caching {
201                    self.update_cache(*ekey, location);
202                }
203                return Some(location);
204            }
205        }
206
207        None
208    }
209
210    /// Batch lookup for multiple keys
211    pub async fn lookup_batch(&self, ekeys: &[EKey]) -> Vec<Option<ArchiveLocation>> {
212        // Process in parallel for better performance
213        let futures = ekeys.iter().map(|ekey| self.lookup(ekey));
214
215        futures::future::join_all(futures).await
216    }
217
218    /// Update the lookup cache
219    fn update_cache(&self, ekey: EKey, location: ArchiveLocation) {
220        // Simple LRU-like behavior: remove oldest if at capacity
221        if self.lookup_cache.len() >= self.config.max_cache_entries {
222            // Remove a random entry (simple eviction)
223            if let Some(entry) = self.lookup_cache.iter().next() {
224                self.lookup_cache.remove(entry.key());
225            }
226        }
227
228        self.lookup_cache.insert(ekey, location);
229    }
230
231    /// Start background index updates
232    pub async fn start_background_updates(&self, path: PathBuf, interval: std::time::Duration) {
233        if !self.config.enable_background_updates {
234            return;
235        }
236
237        let manager = Arc::new(self.clone_config());
238
239        let handle = tokio::spawn(async move {
240            let mut interval = tokio::time::interval(interval);
241
242            loop {
243                interval.tick().await;
244
245                debug!("Running background index update");
246
247                if let Err(e) = manager.refresh_indices(&path).await {
248                    warn!("Background index update failed: {}", e);
249                }
250            }
251        });
252
253        *self.update_handle.write().await = Some(handle);
254    }
255
256    /// Refresh indices without blocking reads
257    async fn refresh_indices(&self, path: &Path) -> Result<()> {
258        // Load new indices in the background
259        let index_files = self.discover_index_files(path).await?;
260
261        for file_path in index_files {
262            // Load without blocking reads
263            if let Ok(index) = self.load_single_index(file_path).await {
264                debug!("Refreshed index for bucket {:02x}", index);
265            }
266        }
267
268        Ok(())
269    }
270
271    /// Stop background updates
272    pub async fn stop_background_updates(&self) {
273        if let Some(handle) = self.update_handle.write().await.take() {
274            handle.abort();
275        }
276    }
277
278    /// Get statistics about loaded indices
279    pub async fn get_stats(&self) -> IndexStats {
280        let mut total_entries = 0;
281        let mut total_buckets = 0;
282
283        for entry in self.bucket_indices.iter() {
284            total_buckets += 1;
285            total_entries += entry.value().entry_count().await;
286        }
287
288        IndexStats {
289            total_entries,
290            total_buckets,
291            cache_size: self.lookup_cache.len(),
292            cache_hit_rate: 0.0, // Would need to track hits/misses
293        }
294    }
295
296    /// Clear all caches
297    pub async fn clear_cache(&self) {
298        self.lookup_cache.clear();
299    }
300
301    /// Clone configuration for background tasks
302    fn clone_config(&self) -> Self {
303        Self {
304            config: self.config.clone(),
305            bucket_indices: self.bucket_indices.clone(),
306            lookup_cache: self.lookup_cache.clone(),
307            semaphore: self.semaphore.clone(),
308            update_handle: Arc::new(RwLock::new(None)),
309        }
310    }
311}
312
313/// Individual async index
314pub struct AsyncIndex {
315    bucket: u8,
316    entries: Arc<RwLock<BTreeMap<EKey, ArchiveLocation>>>,
317}
318
319impl AsyncIndex {
320    /// Create a new async index
321    pub fn new(bucket: u8) -> Self {
322        Self {
323            bucket,
324            entries: Arc::new(RwLock::new(BTreeMap::new())),
325        }
326    }
327
328    /// Load an .idx file asynchronously
329    pub async fn load_idx(path: &Path) -> Result<Self> {
330        let file = File::open(path).await?;
331        let mut reader = BufReader::new(file);
332
333        // Parse header
334        let mut header_buf = vec![0u8; 8];
335        reader.read_exact(&mut header_buf).await?;
336
337        // Parse bucket from filename or header
338        let bucket = Self::extract_bucket_from_path(path)?;
339
340        let index = Self::new(bucket);
341
342        // Stream parse entries
343        index.parse_idx_entries(&mut reader).await?;
344
345        Ok(index)
346    }
347
348    /// Load an .index file asynchronously
349    pub async fn load_index(path: &Path) -> Result<Self> {
350        let file = File::open(path).await?;
351        let mut reader = BufReader::new(file);
352
353        let bucket = Self::extract_bucket_from_path(path)?;
354        let index = Self::new(bucket);
355
356        // Stream parse entries
357        index.parse_index_entries(&mut reader).await?;
358
359        Ok(index)
360    }
361
362    /// Parse .idx entries in streaming fashion
363    async fn parse_idx_entries(&self, reader: &mut BufReader<File>) -> Result<()> {
364        let mut entries = BTreeMap::new();
365        let mut buffer = vec![0u8; 4096]; // Read in chunks
366
367        // Skip to data section
368        reader.seek(tokio::io::SeekFrom::Start(0x108)).await?;
369
370        while let Ok(n) = reader.read(&mut buffer).await {
371            if n == 0 {
372                break;
373            }
374
375            // Parse entries from buffer
376            let mut offset = 0;
377            while offset + 25 <= n {
378                // 9 bytes key + 16 bytes location
379                let key_bytes = &buffer[offset..offset + 9];
380                // Create a 16-byte key from 9-byte truncated version
381                let mut full_key = [0u8; 16];
382                full_key[..9].copy_from_slice(key_bytes);
383                let ekey = EKey::new(full_key);
384
385                let archive_id = u16::from_le_bytes([buffer[offset + 9], buffer[offset + 10]]);
386                let archive_offset = u32::from_le_bytes([
387                    buffer[offset + 11],
388                    buffer[offset + 12],
389                    buffer[offset + 13],
390                    buffer[offset + 14],
391                ]);
392                let size = u32::from_le_bytes([
393                    buffer[offset + 15],
394                    buffer[offset + 16],
395                    buffer[offset + 17],
396                    buffer[offset + 18],
397                ]);
398
399                let location = ArchiveLocation {
400                    archive_id,
401                    offset: archive_offset as u64,
402                    size,
403                };
404
405                entries.insert(ekey, location);
406                offset += 25;
407            }
408        }
409
410        *self.entries.write().await = entries;
411        Ok(())
412    }
413
414    /// Parse .index entries in streaming fashion
415    async fn parse_index_entries(&self, _reader: &mut BufReader<File>) -> Result<()> {
416        // Similar streaming implementation for .index format
417        let entries = BTreeMap::new();
418
419        // Implementation would parse the group index format
420        // This is a placeholder for the actual parsing logic
421
422        *self.entries.write().await = entries;
423        Ok(())
424    }
425
426    /// Extract bucket from file path
427    fn extract_bucket_from_path(path: &Path) -> Result<u8> {
428        let filename = path
429            .file_stem()
430            .and_then(|s| s.to_str())
431            .ok_or_else(|| CascError::InvalidIndexFormat("Invalid filename".into()))?;
432
433        // Try to parse bucket from filename (e.g., "00.idx" -> 0x00)
434        if filename.len() >= 2 {
435            if let Ok(bucket) = u8::from_str_radix(&filename[..2], 16) {
436                return Ok(bucket);
437            }
438        }
439
440        // Default to bucket 0 if can't determine
441        Ok(0)
442    }
443
444    /// Async lookup
445    pub async fn lookup(&self, ekey: &EKey) -> Option<ArchiveLocation> {
446        self.entries.read().await.get(ekey).copied()
447    }
448
449    /// Get bucket index
450    pub fn bucket(&self) -> u8 {
451        self.bucket
452    }
453
454    /// Get entry count
455    pub async fn entry_count(&self) -> usize {
456        self.entries.read().await.len()
457    }
458
459    /// Add an entry (for updates)
460    pub async fn add_entry(&self, ekey: EKey, location: ArchiveLocation) {
461        self.entries.write().await.insert(ekey, location);
462    }
463
464    /// Remove an entry
465    pub async fn remove_entry(&self, ekey: &EKey) -> Option<ArchiveLocation> {
466        self.entries.write().await.remove(ekey)
467    }
468
469    /// Batch add entries
470    pub async fn add_entries_batch(&self, entries: Vec<(EKey, ArchiveLocation)>) {
471        let mut map = self.entries.write().await;
472        for (ekey, location) in entries {
473            map.insert(ekey, location);
474        }
475    }
476}
477
478/// Index statistics
479#[derive(Debug, Clone)]
480pub struct IndexStats {
481    pub total_entries: usize,
482    pub total_buckets: usize,
483    pub cache_size: usize,
484    pub cache_hit_rate: f64,
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490
491    #[tokio::test]
492    async fn test_async_index_creation() {
493        let index = AsyncIndex::new(0x00);
494        assert_eq!(index.bucket(), 0x00);
495        assert_eq!(index.entry_count().await, 0);
496    }
497
498    #[tokio::test]
499    async fn test_async_index_operations() {
500        let index = AsyncIndex::new(0x01);
501
502        let mut key_data = [0u8; 16];
503        key_data[..9].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8, 9]);
504        let ekey = EKey::new(key_data);
505        let location = ArchiveLocation {
506            archive_id: 1,
507            offset: 100,
508            size: 500,
509        };
510
511        // Add entry
512        index.add_entry(ekey, location).await;
513        assert_eq!(index.entry_count().await, 1);
514
515        // Lookup
516        let found = index.lookup(&ekey).await;
517        assert_eq!(found, Some(location));
518
519        // Remove
520        let removed = index.remove_entry(&ekey).await;
521        assert_eq!(removed, Some(location));
522        assert_eq!(index.entry_count().await, 0);
523    }
524
525    #[tokio::test]
526    async fn test_manager_creation() {
527        let config = AsyncIndexConfig::default();
528        let manager = AsyncIndexManager::new(config);
529
530        let stats = manager.get_stats().await;
531        assert_eq!(stats.total_entries, 0);
532        assert_eq!(stats.total_buckets, 0);
533    }
534
535    #[tokio::test]
536    async fn test_batch_lookup() {
537        let config = AsyncIndexConfig::default();
538        let manager = AsyncIndexManager::new(config);
539
540        // Add some test data
541        let index = AsyncIndex::new(0x00);
542        let mut key1_data = [0u8; 16];
543        key1_data[..9].copy_from_slice(&[0, 1, 2, 3, 4, 5, 6, 7, 8]);
544        let ekey1 = EKey::new(key1_data);
545
546        let mut key2_data = [0u8; 16];
547        key2_data[..9].copy_from_slice(&[0, 9, 8, 7, 6, 5, 4, 3, 2]);
548        let ekey2 = EKey::new(key2_data);
549
550        let location1 = ArchiveLocation {
551            archive_id: 1,
552            offset: 100,
553            size: 200,
554        };
555
556        let location2 = ArchiveLocation {
557            archive_id: 2,
558            offset: 300,
559            size: 400,
560        };
561
562        index.add_entry(ekey1, location1).await;
563        index.add_entry(ekey2, location2).await;
564
565        manager.bucket_indices.insert(0x00, Arc::new(index));
566
567        // Batch lookup
568        let results = manager.lookup_batch(&[ekey1, ekey2]).await;
569        assert_eq!(results.len(), 2);
570        assert_eq!(results[0], Some(location1));
571        assert_eq!(results[1], Some(location2));
572    }
573}