casc_storage/storage/
casc_storage.rs

1//! Main CASC storage implementation
2
3use crate::archive::{Archive, ArchiveWriter};
4use crate::cache::LockFreeCache;
5use crate::error::{CascError, Result};
6use crate::index::{
7    AsyncIndexConfig, AsyncIndexManager, CombinedIndex, GroupIndex, IdxParser, IndexFile,
8};
9use crate::manifest::{FileMapping, ManifestConfig, TactManifests};
10use crate::progressive::{ChunkLoader, ProgressiveConfig, ProgressiveFileManager, SizeHint};
11use crate::types::{ArchiveLocation, CascConfig, EKey, StorageStats};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tracing::{debug, error, info, warn};
17
18/// Main CASC storage implementation
19pub struct CascStorage {
20    /// Configuration
21    config: CascConfig,
22
23    /// Bucket-based indices (0x00-0x0F) - kept for compatibility
24    indices: Arc<DashMap<u8, IndexFile>>,
25
26    /// Combined index for optimized lookups
27    combined_index: Arc<CombinedIndex>,
28
29    /// Async index manager for parallel operations
30    async_index_manager: Option<Arc<AsyncIndexManager>>,
31
32    /// Archive files
33    archives: Arc<RwLock<HashMap<u16, Archive>>>,
34
35    /// Lock-free cache for decompressed content (using Arc to avoid cloning)
36    cache: Arc<LockFreeCache>,
37
38    /// Current archive for writing
39    current_archive: Arc<RwLock<Option<ArchiveWriter>>>,
40
41    /// TACT manifest integration for FileDataID lookups
42    tact_manifests: Option<TactManifests>,
43
44    /// Progressive file loading manager
45    progressive_manager: Option<ProgressiveFileManager>,
46
47    /// Storage statistics
48    #[allow(dead_code)]
49    stats: Arc<RwLock<StorageStats>>,
50}
51
52impl CascStorage {
53    /// Create a new CASC storage instance
54    pub fn new(config: CascConfig) -> Result<Self> {
55        // Create data directories if they don't exist
56        let data_path = &config.data_path;
57        let indices_path = data_path.join("indices");
58        let data_subpath = data_path.join("data");
59
60        std::fs::create_dir_all(&indices_path)?;
61        std::fs::create_dir_all(&data_subpath)?;
62
63        let cache_size_bytes = (config.cache_size_mb as usize) * 1024 * 1024;
64
65        Ok(Self {
66            config,
67            indices: Arc::new(DashMap::new()),
68            combined_index: Arc::new(CombinedIndex::new()),
69            async_index_manager: None,
70            archives: Arc::new(RwLock::new(HashMap::new())),
71            cache: Arc::new(LockFreeCache::new(cache_size_bytes)),
72            current_archive: Arc::new(RwLock::new(None)),
73            tact_manifests: None,
74            progressive_manager: None,
75            stats: Arc::new(RwLock::new(StorageStats::default())),
76        })
77    }
78
79    /// Load indices from disk
80    pub fn load_indices(&self) -> Result<()> {
81        // Check if we're already in an async context
82        match tokio::runtime::Handle::try_current() {
83            Ok(_handle) => {
84                // We're in an async context, but we can't use block_on
85                // Fall back to sequential loading to avoid runtime conflict
86                debug!("In async context, using sequential loading to avoid runtime conflict");
87                self.load_indices_sequential()
88            }
89            Err(_) => {
90                // No async runtime, use sequential loading
91                debug!("No async runtime, using sequential loading");
92                self.load_indices_sequential()
93            }
94        }
95    }
96
97    /// Create new CASC storage asynchronously (recommended for async contexts)
98    pub async fn new_async(config: CascConfig) -> Result<Self> {
99        // Create data directories if they don't exist
100        let data_path = &config.data_path;
101        let indices_path = data_path.join("indices");
102        let data_subpath = data_path.join("data");
103
104        std::fs::create_dir_all(&indices_path)?;
105        std::fs::create_dir_all(&data_subpath)?;
106
107        let cache_size_bytes = (config.cache_size_mb as usize) * 1024 * 1024;
108
109        let storage = Self {
110            config,
111            indices: Arc::new(DashMap::new()),
112            combined_index: Arc::new(CombinedIndex::new()),
113            async_index_manager: None,
114            archives: Arc::new(RwLock::new(HashMap::new())),
115            cache: Arc::new(LockFreeCache::new(cache_size_bytes)),
116            current_archive: Arc::new(RwLock::new(None)),
117            tact_manifests: None,
118            progressive_manager: None,
119            stats: Arc::new(RwLock::new(StorageStats::default())),
120        };
121
122        // Load indices asynchronously for better performance
123        storage.load_indices_parallel().await?;
124        storage.load_archives()?;
125
126        Ok(storage)
127    }
128
129    /// Load indices from disk with parallel processing (3-5x faster)
130    pub async fn load_indices_parallel(&self) -> Result<()> {
131        info!(
132            "Loading CASC indices from {:?} (parallel)",
133            self.config.data_path
134        );
135
136        use tokio::task::JoinSet;
137
138        // Try multiple locations for indices
139        let indices_path = self.config.data_path.join("indices");
140        let data_path = self.config.data_path.join("data");
141
142        // Collect all .idx files from both directories
143        let mut idx_paths = Vec::new();
144
145        // Collect from data directory
146        if data_path.exists() {
147            if let Ok(entries) = tokio::fs::read_dir(&data_path).await {
148                let mut entries = entries;
149                while let Ok(Some(entry)) = entries.next_entry().await {
150                    let path = entry.path();
151                    if path.extension().and_then(|s| s.to_str()) == Some("idx") {
152                        idx_paths.push(path);
153                    }
154                }
155            }
156        }
157
158        // Collect from indices directory
159        if indices_path.exists() {
160            if let Ok(entries) = tokio::fs::read_dir(&indices_path).await {
161                let mut entries = entries;
162                while let Ok(Some(entry)) = entries.next_entry().await {
163                    let path = entry.path();
164                    if path.extension().and_then(|s| s.to_str()) == Some("idx") {
165                        idx_paths.push(path);
166                    }
167                }
168            }
169        }
170
171        if idx_paths.is_empty() {
172            info!("No .idx files found");
173            return Ok(());
174        }
175
176        info!("Found {} .idx files, loading in parallel", idx_paths.len());
177
178        // Process all .idx files in parallel
179        let mut join_set = JoinSet::new();
180
181        for idx_path in idx_paths {
182            join_set.spawn_blocking(move || -> Result<(u8, IndexFile)> {
183                match IdxParser::parse_file(&idx_path) {
184                    Ok(parser) => {
185                        let bucket = parser.bucket();
186                        debug!(
187                            "Loaded .idx file for bucket {:02x}: {} entries",
188                            bucket,
189                            parser.len()
190                        );
191
192                        let entries_map = parser.into_entries();
193                        let mut index = IndexFile::new(crate::index::IndexVersion::V7);
194
195                        // Add all entries to the index
196                        for (ekey, location) in entries_map {
197                            index.add_entry(ekey, location);
198                        }
199
200                        Ok((bucket, index))
201                    }
202                    Err(e) => {
203                        warn!("Failed to load index {:?}: {}", idx_path, e);
204                        Err(e)
205                    }
206                }
207            });
208        }
209
210        // Collect all results
211        let mut loaded_count = 0;
212        while let Some(result) = join_set.join_next().await {
213            match result {
214                Ok(Ok((bucket, index))) => {
215                    // Also populate combined index
216                    for (ekey, location) in index.entries() {
217                        self.combined_index.insert(*ekey, *location);
218                    }
219                    self.indices.insert(bucket, index);
220                    loaded_count += 1;
221                }
222                Ok(Err(e)) => {
223                    debug!("Index loading task failed: {}", e);
224                    // Continue loading other indices even if one fails
225                }
226                Err(e) => {
227                    warn!("Task join failed: {}", e);
228                }
229            }
230        }
231
232        info!("Loaded {} bucket indices (parallel)", loaded_count);
233        Ok(())
234    }
235
236    /// Load indices from disk (sequential fallback)
237    pub fn load_indices_sequential(&self) -> Result<()> {
238        info!(
239            "Loading CASC indices from {:?} (sequential)",
240            self.config.data_path
241        );
242
243        // Try multiple locations for indices
244        let indices_path = self.config.data_path.join("indices");
245        let data_path = self.config.data_path.join("data");
246
247        // Load .idx files from data directory (WoW Era format)
248        if data_path.exists() {
249            if let Ok(entries) = std::fs::read_dir(&data_path) {
250                for entry in entries {
251                    let entry = entry?;
252                    let path = entry.path();
253
254                    if path.extension().and_then(|s| s.to_str()) == Some("idx") {
255                        match IdxParser::parse_file(&path) {
256                            Ok(parser) => {
257                                let bucket = parser.bucket();
258                                debug!(
259                                    "Loaded .idx file for bucket {:02x}: {} entries",
260                                    bucket,
261                                    parser.len()
262                                );
263
264                                // Consume parser and get all entries at once
265                                let entries_map = parser.into_entries();
266
267                                let mut index = IndexFile::new(crate::index::IndexVersion::V7);
268
269                                // Add all entries to the index and combined index
270                                for (ekey, location) in entries_map {
271                                    index.add_entry(ekey, location);
272                                    self.combined_index.insert(ekey, location);
273                                }
274
275                                self.indices.insert(bucket, index);
276                            }
277                            Err(e) => {
278                                warn!("Failed to load index {:?}: {}", path, e);
279                            }
280                        }
281                    }
282                }
283            }
284        }
285
286        // Load .idx files from indices directory (if exists)
287        if indices_path.exists() {
288            for entry in std::fs::read_dir(&indices_path)? {
289                let entry = entry?;
290                let path = entry.path();
291
292                if path.extension().and_then(|s| s.to_str()) == Some("idx") {
293                    match IdxParser::parse_file(&path) {
294                        Ok(parser) => {
295                            let bucket = parser.bucket();
296                            debug!(
297                                "Loaded .idx file for bucket {:02x}: {} entries",
298                                bucket,
299                                parser.len()
300                            );
301
302                            let mut index = IndexFile::new(crate::index::IndexVersion::V7);
303                            // Transfer ownership of entries to avoid lifetime issues
304                            for (ekey, location) in parser.into_entries() {
305                                index.add_entry(ekey, location);
306                                self.combined_index.insert(ekey, location);
307                            }
308
309                            self.indices.insert(bucket, index);
310                        }
311                        Err(e) => {
312                            warn!("Failed to load index {:?}: {}", path, e);
313                        }
314                    }
315                }
316            }
317        }
318
319        // Load .index files (group indices) - disabled until format is understood
320        #[allow(unreachable_code)]
321        if false {
322            for entry in std::fs::read_dir(&indices_path)? {
323                let entry = entry?;
324                let path = entry.path();
325
326                if path.extension().and_then(|s| s.to_str()) == Some("index") {
327                    match GroupIndex::parse_file(&path) {
328                        Ok(group) => {
329                            let bucket = group.bucket_index();
330                            debug!(
331                                "Loaded .index file for bucket {:02x}: {} entries",
332                                bucket,
333                                group.len()
334                            );
335
336                            // Merge with existing index or create new
337                            self.indices
338                                .entry(bucket)
339                                .and_modify(|index| {
340                                    for (ekey, location) in group.entries() {
341                                        index.add_entry(*ekey, *location);
342                                    }
343                                })
344                                .or_insert_with(|| {
345                                    let mut index = IndexFile::new(crate::index::IndexVersion::V7);
346                                    for (ekey, location) in group.entries() {
347                                        index.add_entry(*ekey, *location);
348                                    }
349                                    index
350                                });
351                        }
352                        Err(e) => {
353                            warn!("Failed to load group index {:?}: {}", path, e);
354                        }
355                    }
356                }
357            }
358        }
359
360        info!("Loaded {} bucket indices", self.indices.len());
361        Ok(())
362    }
363
364    /// Load archive files
365    pub fn load_archives(&self) -> Result<()> {
366        info!("Loading CASC archives from {:?}", self.config.data_path);
367
368        let data_path = self.config.data_path.join("data");
369        let mut archives = self.archives.write();
370
371        for entry in std::fs::read_dir(&data_path)? {
372            let entry = entry?;
373            let path = entry.path();
374            let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
375
376            if filename.starts_with("data.") {
377                // Extract archive ID from filename (data.XXX)
378                if let Some(id_str) = filename.strip_prefix("data.") {
379                    if let Ok(id) = id_str.parse::<u16>() {
380                        match Archive::new(id, path.clone()) {
381                            Ok(archive) => {
382                                debug!("Loaded archive {}: size={}", id, archive.size);
383                                archives.insert(id, archive);
384                            }
385                            Err(e) => {
386                                warn!("Failed to load archive {:?}: {}", path, e);
387                            }
388                        }
389                    }
390                }
391            }
392        }
393
394        info!("Loaded {} archives", archives.len());
395        Ok(())
396    }
397
398    /// Read a file by its encoding key (zero-copy when cached)
399    pub fn read_arc(&self, ekey: &EKey) -> Result<Arc<Vec<u8>>> {
400        // Check cache first - lock-free operation
401        if let Some(data) = self.cache.get(ekey) {
402            debug!("Cache hit for {} (zero-copy)", ekey);
403            return Ok(data); // Zero-copy return from cache
404        }
405
406        // Not in cache, need to read and decompress
407        let data = self.read_and_decompress(ekey)?;
408        let data_arc = Arc::new(data);
409
410        // Update cache - lock-free operation
411        self.cache.put(*ekey, Arc::clone(&data_arc));
412
413        Ok(data_arc)
414    }
415
416    /// Read a file by its encoding key (compatibility method, always clones)
417    pub fn read(&self, ekey: &EKey) -> Result<Vec<u8>> {
418        let arc_data = self.read_arc(ekey)?;
419        Ok((*arc_data).clone())
420    }
421
422    /// Internal method to read and decompress without caching logic
423    fn read_and_decompress(&self, ekey: &EKey) -> Result<Vec<u8>> {
424        // Use optimized combined index for O(log n) lookup
425        debug!("Looking up EKey {} using combined index", ekey);
426
427        let location = self.combined_index.lookup(ekey).ok_or_else(|| {
428            debug!("EKey {} not found in combined index", ekey);
429            CascError::EntryNotFound(ekey.to_string())
430        })?;
431
432        debug!(
433            "Found {} in archive {} at offset {:x}",
434            ekey, location.archive_id, location.offset
435        );
436
437        // Read from archive
438        let raw_data = {
439            let mut archives = self.archives.write();
440            let archive = archives
441                .get_mut(&location.archive_id)
442                .ok_or(CascError::ArchiveNotFound(location.archive_id))?;
443
444            archive.read_at(&location)?
445        };
446
447        // CASC archives have a 30-byte header before the BLTE data:
448        // 16 bytes: BlteHash (encoding key)
449        // 4 bytes: Size of header + data
450        // 2 bytes: Flags
451        // 4 bytes: ChecksumA
452        // 4 bytes: ChecksumB
453        const CASC_ENTRY_HEADER_SIZE: usize = 30;
454
455        if raw_data.len() < CASC_ENTRY_HEADER_SIZE {
456            return Err(CascError::InvalidArchiveFormat(format!(
457                "Archive data too small: {} bytes",
458                raw_data.len()
459            )));
460        }
461
462        // Skip the header and get the BLTE data
463        let compressed_data = raw_data[CASC_ENTRY_HEADER_SIZE..].to_vec();
464
465        // Decompress using streaming BLTE for better memory efficiency
466        use std::io::{Cursor, Read};
467        let cursor = Cursor::new(compressed_data);
468        let mut stream = blte::create_streaming_reader(cursor, None)
469            .map_err(|e| CascError::DecompressionError(e.to_string()))?;
470
471        let mut decompressed = Vec::new();
472        stream
473            .read_to_end(&mut decompressed)
474            .map_err(|e| CascError::DecompressionError(e.to_string()))?;
475
476        Ok(decompressed)
477    }
478
479    /// Write a file with the given encoding key
480    pub fn write(&self, ekey: &EKey, data: &[u8]) -> Result<()> {
481        if self.config.read_only {
482            return Err(CascError::ReadOnly);
483        }
484
485        // Check if already exists
486        let bucket = ekey.bucket_index();
487        if let Some(index) = self.indices.get(&bucket) {
488            if index.lookup(ekey).is_some() {
489                debug!("File {} already exists, skipping write", ekey);
490                return Ok(());
491            }
492        }
493
494        // Compress data using BLTE
495        let compressed =
496            blte::compress_data_single(data.to_vec(), blte::CompressionMode::ZLib, None)?;
497
498        // Get or create current archive
499        let location = self.write_to_archive(&compressed)?;
500
501        // Update index
502        self.indices
503            .entry(bucket)
504            .or_insert_with(|| IndexFile::new(crate::index::IndexVersion::V7))
505            .add_entry(*ekey, location);
506
507        // Update cache with Arc to avoid future clones - lock-free operation
508        self.cache.put(*ekey, Arc::new(data.to_vec()));
509
510        debug!(
511            "Wrote {} to archive {} at offset {:x}",
512            ekey, location.archive_id, location.offset
513        );
514        Ok(())
515    }
516
517    /// Write compressed data to the current archive
518    fn write_to_archive(&self, data: &[u8]) -> Result<ArchiveLocation> {
519        let mut current_archive = self.current_archive.write();
520
521        // Check if we need a new archive
522        if current_archive.is_none()
523            || current_archive.as_ref().unwrap().current_offset() + data.len() as u64
524                > self.config.max_archive_size
525        {
526            // Create new archive
527            let archive_id = self.get_next_archive_id();
528            let archive_path = self
529                .config
530                .data_path
531                .join("data")
532                .join(format!("data.{archive_id:03}"));
533
534            *current_archive = Some(ArchiveWriter::create(&archive_path, archive_id)?);
535
536            // Register the new archive
537            let mut archives = self.archives.write();
538            archives.insert(archive_id, Archive::new(archive_id, archive_path)?);
539        }
540
541        let writer = current_archive.as_mut().unwrap();
542        let offset = writer.write(data)?;
543
544        Ok(ArchiveLocation {
545            archive_id: writer.archive_id(),
546            offset,
547            size: data.len() as u32,
548        })
549    }
550
551    /// Get the next available archive ID
552    fn get_next_archive_id(&self) -> u16 {
553        let archives = self.archives.read();
554        archives.keys().max().map(|id| id + 1).unwrap_or(0)
555    }
556
557    /// Verify storage integrity
558    pub fn verify(&self) -> Result<Vec<EKey>> {
559        info!("Verifying CASC storage integrity");
560        let mut errors = Vec::new();
561
562        for index_ref in self.indices.iter() {
563            let index = index_ref.value();
564            for (ekey, _location) in index.entries() {
565                // Try to read the file
566                match self.read(ekey) {
567                    Ok(_) => {
568                        // Successfully read and decompressed
569                    }
570                    Err(e) => {
571                        warn!("Verification failed for {}: {}", ekey, e);
572                        errors.push(*ekey);
573                    }
574                }
575            }
576        }
577
578        if errors.is_empty() {
579            info!("Storage verification complete: all files OK");
580        } else {
581            warn!("Storage verification found {} errors", errors.len());
582        }
583
584        Ok(errors)
585    }
586
587    /// Build indices from scratch by scanning archives
588    pub fn rebuild_indices(&self) -> Result<()> {
589        if self.config.read_only {
590            return Err(CascError::ReadOnly);
591        }
592
593        info!("Rebuilding CASC indices");
594
595        // Clear existing indices
596        self.indices.clear();
597
598        // Scan all archives
599        let archives = self.archives.read();
600        for (_id, archive) in archives.iter() {
601            // This would require parsing the archive format
602            // For now, this is a placeholder
603            warn!(
604                "Archive scanning not yet implemented for {:?}",
605                archive.path()
606            );
607        }
608
609        Ok(())
610    }
611
612    /// Get storage statistics
613    pub fn stats(&self) -> StorageStats {
614        // Calculate stats from current state
615        let mut file_count = 0usize;
616        for index_ref in self.indices.iter() {
617            file_count += index_ref.value().entries().count();
618        }
619
620        let archives = self.archives.read();
621        let total_archives = archives.len();
622
623        let mut total_size = 0u64;
624        for archive in archives.values() {
625            total_size += archive.size;
626        }
627
628        StorageStats {
629            total_archives: total_archives as u32,
630            total_indices: self.indices.len() as u32,
631            total_size,
632            file_count: file_count as u64,
633            duplicate_count: 0,
634            compression_ratio: 0.0,
635        }
636    }
637
638    /// Enumerate all files in the storage
639    /// Returns a vector of (EKey, ArchiveLocation) pairs
640    pub fn enumerate_files_vec(&self) -> Vec<(EKey, ArchiveLocation)> {
641        let mut all_entries = Vec::new();
642
643        for index_ref in self.indices.iter() {
644            let _bucket = *index_ref.key();
645            let index = index_ref.value();
646
647            let bucket_entries: Vec<(EKey, ArchiveLocation)> = index
648                .entries()
649                .map(|(ekey, location)| (*ekey, *location))
650                .collect();
651            all_entries.extend(bucket_entries);
652        }
653
654        all_entries
655    }
656
657    /// Enumerate all files in the storage
658    /// Returns an iterator over (EKey, ArchiveLocation) pairs
659    pub fn enumerate_files(&self) -> impl Iterator<Item = (EKey, ArchiveLocation)> + '_ {
660        self.indices.iter().flat_map(|index_ref| {
661            index_ref
662                .value()
663                .entries()
664                .map(|(ekey, location)| (*ekey, *location))
665                .collect::<Vec<_>>()
666        })
667    }
668
669    /// Get all EKeys in the storage
670    pub fn get_all_ekeys(&self) -> Vec<EKey> {
671        self.enumerate_files().map(|(ekey, _)| ekey).collect()
672    }
673
674    /// Test function to verify EKey lookup is working
675    pub fn test_ekey_lookup(&self) -> Result<()> {
676        // Get the first EKey from enumeration (use vec to avoid iterator issues)
677        let all_files = self.enumerate_files_vec();
678        if let Some((test_ekey, expected_location)) = all_files.first().copied() {
679            info!("Testing lookup with first enumerated EKey: {}", test_ekey);
680            info!(
681                "Expected location: archive={}, offset={:x}, size={}",
682                expected_location.archive_id, expected_location.offset, expected_location.size
683            );
684
685            // Try to read it using the normal read path
686            match self.read(&test_ekey) {
687                Ok(data) => {
688                    info!("SUCCESS: Read {} bytes from EKey {}", data.len(), test_ekey);
689                    Ok(())
690                }
691                Err(e) => {
692                    error!("FAILED to read EKey {}: {}", test_ekey, e);
693
694                    // Debug why it failed
695                    let bucket = test_ekey.bucket_index();
696                    info!("EKey {} maps to bucket {:02x}", test_ekey, bucket);
697
698                    if let Some(index) = self.indices.get(&bucket) {
699                        info!("Bucket {:02x} exists with {} entries", bucket, index.len());
700
701                        // Check if the key exists in the bucket
702                        let found = index.entries().any(|(k, _)| *k == test_ekey);
703
704                        if found {
705                            info!("EKey IS in the bucket but lookup failed!");
706                        } else {
707                            info!("EKey is NOT in the bucket!");
708
709                            // Show first few entries
710                            let entries: Vec<String> = index
711                                .entries()
712                                .take(3)
713                                .map(|(k, _)| k.to_string())
714                                .collect();
715                            info!("First 3 entries in bucket: {:?}", entries);
716                        }
717                    } else {
718                        error!("Bucket {:02x} doesn't exist!", bucket);
719                    }
720
721                    Err(e)
722                }
723            }
724        } else {
725            error!("No files found in storage!");
726            Err(CascError::EntryNotFound("No files in storage".to_string()))
727        }
728    }
729
730    /// Count files per archive
731    pub fn files_per_archive(&self) -> std::collections::HashMap<u16, usize> {
732        let mut counts = std::collections::HashMap::new();
733        for (_ekey, location) in self.enumerate_files() {
734            *counts.entry(location.archive_id).or_insert(0) += 1;
735        }
736        counts
737    }
738
739    /// Clear the cache
740    pub fn clear_cache(&self) {
741        self.cache.clear();
742    }
743
744    /// Flush any pending writes
745    pub fn flush(&self) -> Result<()> {
746        if let Some(writer) = self.current_archive.write().as_mut() {
747            writer.flush()?;
748        }
749        Ok(())
750    }
751
752    // === TACT Manifest Integration ===
753
754    /// Initialize TACT manifest support with configuration
755    pub fn init_tact_manifests(&mut self, config: ManifestConfig) {
756        self.tact_manifests = Some(TactManifests::new(config));
757        info!("Initialized TACT manifest support");
758    }
759
760    /// Load root manifest from raw data
761    pub fn load_root_manifest(&self, data: Vec<u8>) -> Result<()> {
762        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
763            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
764        })?;
765        manifests.load_root_from_data(data)
766    }
767
768    /// Load encoding manifest from raw data
769    pub fn load_encoding_manifest(&self, data: Vec<u8>) -> Result<()> {
770        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
771            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
772        })?;
773        manifests.load_encoding_from_data(data)
774    }
775
776    /// Load root manifest from file
777    pub fn load_root_manifest_from_file(&self, path: &std::path::Path) -> Result<()> {
778        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
779            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
780        })?;
781        manifests.load_root_from_file(path)
782    }
783
784    /// Load encoding manifest from file
785    pub fn load_encoding_manifest_from_file(&self, path: &std::path::Path) -> Result<()> {
786        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
787            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
788        })?;
789        manifests.load_encoding_from_file(path)
790    }
791
792    /// Load a community listfile for filename resolution
793    pub fn load_listfile(&self, path: &std::path::Path) -> Result<usize> {
794        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
795            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
796        })?;
797        manifests.load_listfile(path)
798    }
799
800    /// Read a file by FileDataID
801    pub fn read_by_fdid(&self, fdid: u32) -> Result<Vec<u8>> {
802        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
803            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
804        })?;
805
806        let mapping = manifests.lookup_by_fdid(fdid)?;
807        let ekey = mapping
808            .encoding_key
809            .ok_or_else(|| CascError::EntryNotFound(format!("EKey for FDID {fdid}")))?;
810
811        self.read(&ekey)
812    }
813
814    /// Read a file by filename (requires loaded listfile or root manifest)
815    pub fn read_by_filename(&self, filename: &str) -> Result<Vec<u8>> {
816        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
817            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
818        })?;
819
820        let mapping = manifests.lookup_by_filename(filename)?;
821        let ekey = mapping
822            .encoding_key
823            .ok_or_else(|| CascError::EntryNotFound(format!("EKey for filename {filename}")))?;
824
825        self.read(&ekey)
826    }
827
828    /// Get FileDataID for a filename (if known)
829    pub fn get_fdid_for_filename(&self, filename: &str) -> Option<u32> {
830        self.tact_manifests
831            .as_ref()?
832            .get_fdid_for_filename(filename)
833    }
834
835    /// Get all known FileDataIDs
836    pub fn get_all_fdids(&self) -> Result<Vec<u32>> {
837        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
838            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
839        })?;
840        manifests.get_all_fdids()
841    }
842
843    /// Check if TACT manifests are loaded and ready
844    pub fn tact_manifests_loaded(&self) -> bool {
845        self.tact_manifests.as_ref().is_some_and(|m| m.is_loaded())
846    }
847
848    /// Get file mapping information for a FileDataID
849    pub fn get_file_mapping(&self, fdid: u32) -> Result<FileMapping> {
850        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
851            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
852        })?;
853        manifests.lookup_by_fdid(fdid)
854    }
855
856    /// Clear TACT manifest caches
857    pub fn clear_manifest_cache(&self) {
858        if let Some(manifests) = &self.tact_manifests {
859            manifests.clear_cache();
860        }
861    }
862
863    // === Async Index Operations ===
864
865    /// Initialize async index manager for parallel operations
866    pub async fn init_async_indices(&mut self) -> Result<()> {
867        let config = AsyncIndexConfig {
868            max_concurrent_files: 16,
869            buffer_size: 128 * 1024, // 128KB buffers
870            enable_caching: true,
871            max_cache_entries: 100_000,
872            enable_background_updates: false, // Can be enabled later
873        };
874
875        let manager = Arc::new(AsyncIndexManager::new(config));
876
877        // Load existing indices
878        let loaded = manager.load_directory(&self.config.data_path).await?;
879
880        info!("Async index manager initialized with {} indices", loaded);
881        self.async_index_manager = Some(manager);
882
883        Ok(())
884    }
885
886    /// Perform async lookup using the async index manager
887    pub async fn lookup_async(&self, ekey: &EKey) -> Option<ArchiveLocation> {
888        if let Some(ref manager) = self.async_index_manager {
889            manager.lookup(ekey).await
890        } else {
891            // Fallback to sync lookup
892            self.combined_index.lookup(ekey)
893        }
894    }
895
896    /// Batch lookup for multiple keys using async operations
897    pub async fn lookup_batch_async(&self, ekeys: &[EKey]) -> Vec<Option<ArchiveLocation>> {
898        if let Some(ref manager) = self.async_index_manager {
899            manager.lookup_batch(ekeys).await
900        } else {
901            // Fallback to sync batch lookup
902            self.combined_index.lookup_batch(ekeys)
903        }
904    }
905
906    /// Start background index updates with specified interval
907    pub async fn start_index_background_updates(&self, interval: std::time::Duration) {
908        if let Some(ref manager) = self.async_index_manager {
909            manager
910                .start_background_updates(self.config.data_path.clone(), interval)
911                .await;
912            info!(
913                "Started background index updates with interval {:?}",
914                interval
915            );
916        }
917    }
918
919    /// Stop background index updates
920    pub async fn stop_index_background_updates(&self) {
921        if let Some(ref manager) = self.async_index_manager {
922            manager.stop_background_updates().await;
923            info!("Stopped background index updates");
924        }
925    }
926
927    /// Get async index statistics
928    pub async fn get_async_index_stats(&self) -> Option<crate::index::AsyncIndexStats> {
929        if let Some(ref manager) = self.async_index_manager {
930            Some(manager.get_stats().await)
931        } else {
932            None
933        }
934    }
935
936    /// Clear async index cache
937    pub async fn clear_async_index_cache(&self) {
938        if let Some(ref manager) = self.async_index_manager {
939            manager.clear_cache().await;
940            debug!("Cleared async index cache");
941        }
942    }
943
944    // === Progressive Loading Support ===
945
946    /// Initialize progressive file loading with configuration
947    pub fn init_progressive_loading(&mut self, config: ProgressiveConfig) {
948        let chunk_loader = Arc::new(CascStorageChunkLoader {
949            storage: self as *const CascStorage,
950        });
951
952        self.progressive_manager = Some(ProgressiveFileManager::new(config, chunk_loader));
953        info!("Initialized progressive file loading");
954    }
955
956    /// Read a file progressively with size hints
957    pub async fn read_progressive(
958        &self,
959        ekey: &EKey,
960        size_hint: SizeHint,
961    ) -> Result<Arc<crate::progressive::ProgressiveFile>> {
962        let manager = self.progressive_manager.as_ref().ok_or_else(|| {
963            CascError::InvalidArchiveFormat("Progressive loading not initialized".to_string())
964        })?;
965
966        Ok(manager
967            .get_or_create_progressive_file(*ekey, size_hint)
968            .await)
969    }
970
971    /// Read a file by FileDataID progressively with size hints from manifest
972    pub async fn read_by_fdid_progressive(
973        &self,
974        fdid: u32,
975    ) -> Result<Arc<crate::progressive::ProgressiveFile>> {
976        let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
977            CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
978        })?;
979
980        let mapping = manifests.lookup_by_fdid(fdid)?;
981        let ekey = mapping
982            .encoding_key
983            .ok_or_else(|| CascError::EntryNotFound(format!("EKey for FDID {fdid}")))?;
984
985        // Create size hint from archive location data
986        let size_hint = if let Some(location) = self.combined_index.lookup(&ekey) {
987            // Archive location gives us compressed size, actual size is usually larger
988            SizeHint::Minimum(location.size as u64)
989        } else {
990            SizeHint::Unknown
991        };
992
993        self.read_progressive(&ekey, size_hint).await
994    }
995
996    /// Get size hint for an EKey from archive location
997    pub fn get_size_hint_for_ekey(&self, ekey: &EKey) -> SizeHint {
998        if let Some(location) = self.combined_index.lookup(ekey) {
999            // Archive location gives us a minimum size (compressed size)
1000            // Actual decompressed size is usually larger
1001            SizeHint::Minimum(location.size as u64)
1002        } else {
1003            SizeHint::Unknown
1004        }
1005    }
1006
1007    /// Check if progressive loading is available
1008    pub fn has_progressive_loading(&self) -> bool {
1009        self.progressive_manager.is_some()
1010    }
1011
1012    /// Cleanup inactive progressive files
1013    pub async fn cleanup_progressive_files(&self) {
1014        if let Some(manager) = &self.progressive_manager {
1015            use std::time::Duration;
1016            manager
1017                .cleanup_inactive_files(Duration::from_secs(300))
1018                .await; // 5 minutes
1019        }
1020    }
1021
1022    /// Get progressive loading statistics
1023    pub async fn get_progressive_stats(&self) -> Vec<(EKey, crate::progressive::LoadingStats)> {
1024        if let Some(manager) = &self.progressive_manager {
1025            manager.get_global_stats().await
1026        } else {
1027            Vec::new()
1028        }
1029    }
1030}
1031
1032/// ChunkLoader implementation for CascStorage
1033struct CascStorageChunkLoader {
1034    storage: *const CascStorage,
1035}
1036
1037// Safety: CascStorageChunkLoader is only used with a valid CascStorage pointer
1038// and the storage lifetime is guaranteed by the ProgressiveFileManager
1039unsafe impl Send for CascStorageChunkLoader {}
1040unsafe impl Sync for CascStorageChunkLoader {}
1041
1042#[async_trait::async_trait]
1043impl ChunkLoader for CascStorageChunkLoader {
1044    async fn load_chunk(&self, ekey: EKey, offset: u64, size: usize) -> Result<Vec<u8>> {
1045        // SAFETY: The storage pointer is valid for the entire lifetime of CascStorageChunkLoader
1046        // as it's created from a reference to CascStorage and the lifetime 'a ensures that
1047        // the CascStorage outlives this ChunkLoader instance.
1048        let storage = unsafe { &*self.storage };
1049
1050        // Get the location of the file
1051        let location = storage.combined_index.lookup(&ekey).ok_or_else(|| {
1052            debug!("EKey {} not found in combined index", ekey);
1053            CascError::EntryNotFound(ekey.to_string())
1054        })?;
1055
1056        debug!(
1057            "Loading chunk for {} from archive {} at offset {:x} (chunk offset={}, size={})",
1058            ekey, location.archive_id, location.offset, offset, size
1059        );
1060
1061        // Read the compressed data from archive
1062        let raw_data = {
1063            let mut archives = storage.archives.write();
1064            let archive = archives
1065                .get_mut(&location.archive_id)
1066                .ok_or(CascError::ArchiveNotFound(location.archive_id))?;
1067
1068            archive.read_at(&location)?
1069        };
1070
1071        // CASC archives have a 30-byte header before the BLTE data
1072        const CASC_ENTRY_HEADER_SIZE: usize = 30;
1073
1074        if raw_data.len() < CASC_ENTRY_HEADER_SIZE {
1075            return Err(CascError::InvalidArchiveFormat(format!(
1076                "Archive data too small: {} bytes",
1077                raw_data.len()
1078            )));
1079        }
1080
1081        // Skip the header and get the BLTE data
1082        let compressed_data = raw_data[CASC_ENTRY_HEADER_SIZE..].to_vec();
1083
1084        // Decompress using streaming BLTE
1085        use std::io::{Cursor, Read};
1086        let cursor = Cursor::new(compressed_data);
1087        let mut stream = blte::create_streaming_reader(cursor, None)
1088            .map_err(|e| CascError::DecompressionError(e.to_string()))?;
1089
1090        // Seek to the requested offset in the decompressed stream
1091        if offset > 0 {
1092            let mut discard_buf = vec![0u8; 8192]; // 8KB discard buffer
1093            let mut remaining = offset;
1094
1095            while remaining > 0 {
1096                let to_read = (remaining as usize).min(discard_buf.len());
1097                let read = stream
1098                    .read(&mut discard_buf[..to_read])
1099                    .map_err(|e| CascError::DecompressionError(e.to_string()))?;
1100
1101                if read == 0 {
1102                    break; // End of stream
1103                }
1104
1105                remaining -= read as u64;
1106            }
1107        }
1108
1109        // Read the requested chunk
1110        let mut chunk_data = vec![0u8; size];
1111        let actual_read = stream
1112            .read(&mut chunk_data)
1113            .map_err(|e| CascError::DecompressionError(e.to_string()))?;
1114
1115        // Resize to actual read size
1116        chunk_data.truncate(actual_read);
1117
1118        debug!(
1119            "Loaded chunk for {} (offset={}, requested_size={}, actual_size={})",
1120            ekey, offset, size, actual_read
1121        );
1122
1123        Ok(chunk_data)
1124    }
1125}