1use crate::archive::{Archive, ArchiveWriter};
4use crate::cache::LockFreeCache;
5use crate::error::{CascError, Result};
6use crate::index::{
7 AsyncIndexConfig, AsyncIndexManager, CombinedIndex, GroupIndex, IdxParser, IndexFile,
8};
9use crate::manifest::{FileMapping, ManifestConfig, TactManifests};
10use crate::progressive::{ChunkLoader, ProgressiveConfig, ProgressiveFileManager, SizeHint};
11use crate::types::{ArchiveLocation, CascConfig, EKey, StorageStats};
12use dashmap::DashMap;
13use parking_lot::RwLock;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tracing::{debug, error, info, warn};
17
18pub struct CascStorage {
20 config: CascConfig,
22
23 indices: Arc<DashMap<u8, IndexFile>>,
25
26 combined_index: Arc<CombinedIndex>,
28
29 async_index_manager: Option<Arc<AsyncIndexManager>>,
31
32 archives: Arc<RwLock<HashMap<u16, Archive>>>,
34
35 cache: Arc<LockFreeCache>,
37
38 current_archive: Arc<RwLock<Option<ArchiveWriter>>>,
40
41 tact_manifests: Option<TactManifests>,
43
44 progressive_manager: Option<ProgressiveFileManager>,
46
47 #[allow(dead_code)]
49 stats: Arc<RwLock<StorageStats>>,
50}
51
52impl CascStorage {
53 pub fn new(config: CascConfig) -> Result<Self> {
55 let data_path = &config.data_path;
57 let indices_path = data_path.join("indices");
58 let data_subpath = data_path.join("data");
59
60 std::fs::create_dir_all(&indices_path)?;
61 std::fs::create_dir_all(&data_subpath)?;
62
63 let cache_size_bytes = (config.cache_size_mb as usize) * 1024 * 1024;
64
65 Ok(Self {
66 config,
67 indices: Arc::new(DashMap::new()),
68 combined_index: Arc::new(CombinedIndex::new()),
69 async_index_manager: None,
70 archives: Arc::new(RwLock::new(HashMap::new())),
71 cache: Arc::new(LockFreeCache::new(cache_size_bytes)),
72 current_archive: Arc::new(RwLock::new(None)),
73 tact_manifests: None,
74 progressive_manager: None,
75 stats: Arc::new(RwLock::new(StorageStats::default())),
76 })
77 }
78
79 pub fn load_indices(&self) -> Result<()> {
81 match tokio::runtime::Handle::try_current() {
83 Ok(_handle) => {
84 debug!("In async context, using sequential loading to avoid runtime conflict");
87 self.load_indices_sequential()
88 }
89 Err(_) => {
90 debug!("No async runtime, using sequential loading");
92 self.load_indices_sequential()
93 }
94 }
95 }
96
97 pub async fn new_async(config: CascConfig) -> Result<Self> {
99 let data_path = &config.data_path;
101 let indices_path = data_path.join("indices");
102 let data_subpath = data_path.join("data");
103
104 std::fs::create_dir_all(&indices_path)?;
105 std::fs::create_dir_all(&data_subpath)?;
106
107 let cache_size_bytes = (config.cache_size_mb as usize) * 1024 * 1024;
108
109 let storage = Self {
110 config,
111 indices: Arc::new(DashMap::new()),
112 combined_index: Arc::new(CombinedIndex::new()),
113 async_index_manager: None,
114 archives: Arc::new(RwLock::new(HashMap::new())),
115 cache: Arc::new(LockFreeCache::new(cache_size_bytes)),
116 current_archive: Arc::new(RwLock::new(None)),
117 tact_manifests: None,
118 progressive_manager: None,
119 stats: Arc::new(RwLock::new(StorageStats::default())),
120 };
121
122 storage.load_indices_parallel().await?;
124 storage.load_archives()?;
125
126 Ok(storage)
127 }
128
129 pub async fn load_indices_parallel(&self) -> Result<()> {
131 info!(
132 "Loading CASC indices from {:?} (parallel)",
133 self.config.data_path
134 );
135
136 use tokio::task::JoinSet;
137
138 let indices_path = self.config.data_path.join("indices");
140 let data_path = self.config.data_path.join("data");
141
142 let mut idx_paths = Vec::new();
144
145 if data_path.exists() {
147 if let Ok(entries) = tokio::fs::read_dir(&data_path).await {
148 let mut entries = entries;
149 while let Ok(Some(entry)) = entries.next_entry().await {
150 let path = entry.path();
151 if path.extension().and_then(|s| s.to_str()) == Some("idx") {
152 idx_paths.push(path);
153 }
154 }
155 }
156 }
157
158 if indices_path.exists() {
160 if let Ok(entries) = tokio::fs::read_dir(&indices_path).await {
161 let mut entries = entries;
162 while let Ok(Some(entry)) = entries.next_entry().await {
163 let path = entry.path();
164 if path.extension().and_then(|s| s.to_str()) == Some("idx") {
165 idx_paths.push(path);
166 }
167 }
168 }
169 }
170
171 if idx_paths.is_empty() {
172 info!("No .idx files found");
173 return Ok(());
174 }
175
176 info!("Found {} .idx files, loading in parallel", idx_paths.len());
177
178 let mut join_set = JoinSet::new();
180
181 for idx_path in idx_paths {
182 join_set.spawn_blocking(move || -> Result<(u8, IndexFile)> {
183 match IdxParser::parse_file(&idx_path) {
184 Ok(parser) => {
185 let bucket = parser.bucket();
186 debug!(
187 "Loaded .idx file for bucket {:02x}: {} entries",
188 bucket,
189 parser.len()
190 );
191
192 let entries_map = parser.into_entries();
193 let mut index = IndexFile::new(crate::index::IndexVersion::V7);
194
195 for (ekey, location) in entries_map {
197 index.add_entry(ekey, location);
198 }
199
200 Ok((bucket, index))
201 }
202 Err(e) => {
203 warn!("Failed to load index {:?}: {}", idx_path, e);
204 Err(e)
205 }
206 }
207 });
208 }
209
210 let mut loaded_count = 0;
212 while let Some(result) = join_set.join_next().await {
213 match result {
214 Ok(Ok((bucket, index))) => {
215 for (ekey, location) in index.entries() {
217 self.combined_index.insert(*ekey, *location);
218 }
219 self.indices.insert(bucket, index);
220 loaded_count += 1;
221 }
222 Ok(Err(e)) => {
223 debug!("Index loading task failed: {}", e);
224 }
226 Err(e) => {
227 warn!("Task join failed: {}", e);
228 }
229 }
230 }
231
232 info!("Loaded {} bucket indices (parallel)", loaded_count);
233 Ok(())
234 }
235
236 pub fn load_indices_sequential(&self) -> Result<()> {
238 info!(
239 "Loading CASC indices from {:?} (sequential)",
240 self.config.data_path
241 );
242
243 let indices_path = self.config.data_path.join("indices");
245 let data_path = self.config.data_path.join("data");
246
247 if data_path.exists() {
249 if let Ok(entries) = std::fs::read_dir(&data_path) {
250 for entry in entries {
251 let entry = entry?;
252 let path = entry.path();
253
254 if path.extension().and_then(|s| s.to_str()) == Some("idx") {
255 match IdxParser::parse_file(&path) {
256 Ok(parser) => {
257 let bucket = parser.bucket();
258 debug!(
259 "Loaded .idx file for bucket {:02x}: {} entries",
260 bucket,
261 parser.len()
262 );
263
264 let entries_map = parser.into_entries();
266
267 let mut index = IndexFile::new(crate::index::IndexVersion::V7);
268
269 for (ekey, location) in entries_map {
271 index.add_entry(ekey, location);
272 self.combined_index.insert(ekey, location);
273 }
274
275 self.indices.insert(bucket, index);
276 }
277 Err(e) => {
278 warn!("Failed to load index {:?}: {}", path, e);
279 }
280 }
281 }
282 }
283 }
284 }
285
286 if indices_path.exists() {
288 for entry in std::fs::read_dir(&indices_path)? {
289 let entry = entry?;
290 let path = entry.path();
291
292 if path.extension().and_then(|s| s.to_str()) == Some("idx") {
293 match IdxParser::parse_file(&path) {
294 Ok(parser) => {
295 let bucket = parser.bucket();
296 debug!(
297 "Loaded .idx file for bucket {:02x}: {} entries",
298 bucket,
299 parser.len()
300 );
301
302 let mut index = IndexFile::new(crate::index::IndexVersion::V7);
303 for (ekey, location) in parser.into_entries() {
305 index.add_entry(ekey, location);
306 self.combined_index.insert(ekey, location);
307 }
308
309 self.indices.insert(bucket, index);
310 }
311 Err(e) => {
312 warn!("Failed to load index {:?}: {}", path, e);
313 }
314 }
315 }
316 }
317 }
318
319 #[allow(unreachable_code)]
321 if false {
322 for entry in std::fs::read_dir(&indices_path)? {
323 let entry = entry?;
324 let path = entry.path();
325
326 if path.extension().and_then(|s| s.to_str()) == Some("index") {
327 match GroupIndex::parse_file(&path) {
328 Ok(group) => {
329 let bucket = group.bucket_index();
330 debug!(
331 "Loaded .index file for bucket {:02x}: {} entries",
332 bucket,
333 group.len()
334 );
335
336 self.indices
338 .entry(bucket)
339 .and_modify(|index| {
340 for (ekey, location) in group.entries() {
341 index.add_entry(*ekey, *location);
342 }
343 })
344 .or_insert_with(|| {
345 let mut index = IndexFile::new(crate::index::IndexVersion::V7);
346 for (ekey, location) in group.entries() {
347 index.add_entry(*ekey, *location);
348 }
349 index
350 });
351 }
352 Err(e) => {
353 warn!("Failed to load group index {:?}: {}", path, e);
354 }
355 }
356 }
357 }
358 }
359
360 info!("Loaded {} bucket indices", self.indices.len());
361 Ok(())
362 }
363
364 pub fn load_archives(&self) -> Result<()> {
366 info!("Loading CASC archives from {:?}", self.config.data_path);
367
368 let data_path = self.config.data_path.join("data");
369 let mut archives = self.archives.write();
370
371 for entry in std::fs::read_dir(&data_path)? {
372 let entry = entry?;
373 let path = entry.path();
374 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
375
376 if filename.starts_with("data.") {
377 if let Some(id_str) = filename.strip_prefix("data.") {
379 if let Ok(id) = id_str.parse::<u16>() {
380 match Archive::new(id, path.clone()) {
381 Ok(archive) => {
382 debug!("Loaded archive {}: size={}", id, archive.size);
383 archives.insert(id, archive);
384 }
385 Err(e) => {
386 warn!("Failed to load archive {:?}: {}", path, e);
387 }
388 }
389 }
390 }
391 }
392 }
393
394 info!("Loaded {} archives", archives.len());
395 Ok(())
396 }
397
398 pub fn read_arc(&self, ekey: &EKey) -> Result<Arc<Vec<u8>>> {
400 if let Some(data) = self.cache.get(ekey) {
402 debug!("Cache hit for {} (zero-copy)", ekey);
403 return Ok(data); }
405
406 let data = self.read_and_decompress(ekey)?;
408 let data_arc = Arc::new(data);
409
410 self.cache.put(*ekey, Arc::clone(&data_arc));
412
413 Ok(data_arc)
414 }
415
416 pub fn read(&self, ekey: &EKey) -> Result<Vec<u8>> {
418 let arc_data = self.read_arc(ekey)?;
419 Ok((*arc_data).clone())
420 }
421
422 fn read_and_decompress(&self, ekey: &EKey) -> Result<Vec<u8>> {
424 debug!("Looking up EKey {} using combined index", ekey);
426
427 let location = self.combined_index.lookup(ekey).ok_or_else(|| {
428 debug!("EKey {} not found in combined index", ekey);
429 CascError::EntryNotFound(ekey.to_string())
430 })?;
431
432 debug!(
433 "Found {} in archive {} at offset {:x}",
434 ekey, location.archive_id, location.offset
435 );
436
437 let raw_data = {
439 let mut archives = self.archives.write();
440 let archive = archives
441 .get_mut(&location.archive_id)
442 .ok_or(CascError::ArchiveNotFound(location.archive_id))?;
443
444 archive.read_at(&location)?
445 };
446
447 const CASC_ENTRY_HEADER_SIZE: usize = 30;
454
455 if raw_data.len() < CASC_ENTRY_HEADER_SIZE {
456 return Err(CascError::InvalidArchiveFormat(format!(
457 "Archive data too small: {} bytes",
458 raw_data.len()
459 )));
460 }
461
462 let compressed_data = raw_data[CASC_ENTRY_HEADER_SIZE..].to_vec();
464
465 use std::io::{Cursor, Read};
467 let cursor = Cursor::new(compressed_data);
468 let mut stream = blte::create_streaming_reader(cursor, None)
469 .map_err(|e| CascError::DecompressionError(e.to_string()))?;
470
471 let mut decompressed = Vec::new();
472 stream
473 .read_to_end(&mut decompressed)
474 .map_err(|e| CascError::DecompressionError(e.to_string()))?;
475
476 Ok(decompressed)
477 }
478
479 pub fn write(&self, ekey: &EKey, data: &[u8]) -> Result<()> {
481 if self.config.read_only {
482 return Err(CascError::ReadOnly);
483 }
484
485 let bucket = ekey.bucket_index();
487 if let Some(index) = self.indices.get(&bucket) {
488 if index.lookup(ekey).is_some() {
489 debug!("File {} already exists, skipping write", ekey);
490 return Ok(());
491 }
492 }
493
494 let compressed =
496 blte::compress_data_single(data.to_vec(), blte::CompressionMode::ZLib, None)?;
497
498 let location = self.write_to_archive(&compressed)?;
500
501 self.indices
503 .entry(bucket)
504 .or_insert_with(|| IndexFile::new(crate::index::IndexVersion::V7))
505 .add_entry(*ekey, location);
506
507 self.cache.put(*ekey, Arc::new(data.to_vec()));
509
510 debug!(
511 "Wrote {} to archive {} at offset {:x}",
512 ekey, location.archive_id, location.offset
513 );
514 Ok(())
515 }
516
517 fn write_to_archive(&self, data: &[u8]) -> Result<ArchiveLocation> {
519 let mut current_archive = self.current_archive.write();
520
521 if current_archive.is_none()
523 || current_archive.as_ref().unwrap().current_offset() + data.len() as u64
524 > self.config.max_archive_size
525 {
526 let archive_id = self.get_next_archive_id();
528 let archive_path = self
529 .config
530 .data_path
531 .join("data")
532 .join(format!("data.{archive_id:03}"));
533
534 *current_archive = Some(ArchiveWriter::create(&archive_path, archive_id)?);
535
536 let mut archives = self.archives.write();
538 archives.insert(archive_id, Archive::new(archive_id, archive_path)?);
539 }
540
541 let writer = current_archive.as_mut().unwrap();
542 let offset = writer.write(data)?;
543
544 Ok(ArchiveLocation {
545 archive_id: writer.archive_id(),
546 offset,
547 size: data.len() as u32,
548 })
549 }
550
551 fn get_next_archive_id(&self) -> u16 {
553 let archives = self.archives.read();
554 archives.keys().max().map(|id| id + 1).unwrap_or(0)
555 }
556
557 pub fn verify(&self) -> Result<Vec<EKey>> {
559 info!("Verifying CASC storage integrity");
560 let mut errors = Vec::new();
561
562 for index_ref in self.indices.iter() {
563 let index = index_ref.value();
564 for (ekey, _location) in index.entries() {
565 match self.read(ekey) {
567 Ok(_) => {
568 }
570 Err(e) => {
571 warn!("Verification failed for {}: {}", ekey, e);
572 errors.push(*ekey);
573 }
574 }
575 }
576 }
577
578 if errors.is_empty() {
579 info!("Storage verification complete: all files OK");
580 } else {
581 warn!("Storage verification found {} errors", errors.len());
582 }
583
584 Ok(errors)
585 }
586
587 pub fn rebuild_indices(&self) -> Result<()> {
589 if self.config.read_only {
590 return Err(CascError::ReadOnly);
591 }
592
593 info!("Rebuilding CASC indices");
594
595 self.indices.clear();
597
598 let archives = self.archives.read();
600 for (_id, archive) in archives.iter() {
601 warn!(
604 "Archive scanning not yet implemented for {:?}",
605 archive.path()
606 );
607 }
608
609 Ok(())
610 }
611
612 pub fn stats(&self) -> StorageStats {
614 let mut file_count = 0usize;
616 for index_ref in self.indices.iter() {
617 file_count += index_ref.value().entries().count();
618 }
619
620 let archives = self.archives.read();
621 let total_archives = archives.len();
622
623 let mut total_size = 0u64;
624 for archive in archives.values() {
625 total_size += archive.size;
626 }
627
628 StorageStats {
629 total_archives: total_archives as u32,
630 total_indices: self.indices.len() as u32,
631 total_size,
632 file_count: file_count as u64,
633 duplicate_count: 0,
634 compression_ratio: 0.0,
635 }
636 }
637
638 pub fn enumerate_files_vec(&self) -> Vec<(EKey, ArchiveLocation)> {
641 let mut all_entries = Vec::new();
642
643 for index_ref in self.indices.iter() {
644 let _bucket = *index_ref.key();
645 let index = index_ref.value();
646
647 let bucket_entries: Vec<(EKey, ArchiveLocation)> = index
648 .entries()
649 .map(|(ekey, location)| (*ekey, *location))
650 .collect();
651 all_entries.extend(bucket_entries);
652 }
653
654 all_entries
655 }
656
657 pub fn enumerate_files(&self) -> impl Iterator<Item = (EKey, ArchiveLocation)> + '_ {
660 self.indices.iter().flat_map(|index_ref| {
661 index_ref
662 .value()
663 .entries()
664 .map(|(ekey, location)| (*ekey, *location))
665 .collect::<Vec<_>>()
666 })
667 }
668
669 pub fn get_all_ekeys(&self) -> Vec<EKey> {
671 self.enumerate_files().map(|(ekey, _)| ekey).collect()
672 }
673
674 pub fn test_ekey_lookup(&self) -> Result<()> {
676 let all_files = self.enumerate_files_vec();
678 if let Some((test_ekey, expected_location)) = all_files.first().copied() {
679 info!("Testing lookup with first enumerated EKey: {}", test_ekey);
680 info!(
681 "Expected location: archive={}, offset={:x}, size={}",
682 expected_location.archive_id, expected_location.offset, expected_location.size
683 );
684
685 match self.read(&test_ekey) {
687 Ok(data) => {
688 info!("SUCCESS: Read {} bytes from EKey {}", data.len(), test_ekey);
689 Ok(())
690 }
691 Err(e) => {
692 error!("FAILED to read EKey {}: {}", test_ekey, e);
693
694 let bucket = test_ekey.bucket_index();
696 info!("EKey {} maps to bucket {:02x}", test_ekey, bucket);
697
698 if let Some(index) = self.indices.get(&bucket) {
699 info!("Bucket {:02x} exists with {} entries", bucket, index.len());
700
701 let found = index.entries().any(|(k, _)| *k == test_ekey);
703
704 if found {
705 info!("EKey IS in the bucket but lookup failed!");
706 } else {
707 info!("EKey is NOT in the bucket!");
708
709 let entries: Vec<String> = index
711 .entries()
712 .take(3)
713 .map(|(k, _)| k.to_string())
714 .collect();
715 info!("First 3 entries in bucket: {:?}", entries);
716 }
717 } else {
718 error!("Bucket {:02x} doesn't exist!", bucket);
719 }
720
721 Err(e)
722 }
723 }
724 } else {
725 error!("No files found in storage!");
726 Err(CascError::EntryNotFound("No files in storage".to_string()))
727 }
728 }
729
730 pub fn files_per_archive(&self) -> std::collections::HashMap<u16, usize> {
732 let mut counts = std::collections::HashMap::new();
733 for (_ekey, location) in self.enumerate_files() {
734 *counts.entry(location.archive_id).or_insert(0) += 1;
735 }
736 counts
737 }
738
739 pub fn clear_cache(&self) {
741 self.cache.clear();
742 }
743
744 pub fn flush(&self) -> Result<()> {
746 if let Some(writer) = self.current_archive.write().as_mut() {
747 writer.flush()?;
748 }
749 Ok(())
750 }
751
752 pub fn init_tact_manifests(&mut self, config: ManifestConfig) {
756 self.tact_manifests = Some(TactManifests::new(config));
757 info!("Initialized TACT manifest support");
758 }
759
760 pub fn load_root_manifest(&self, data: Vec<u8>) -> Result<()> {
762 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
763 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
764 })?;
765 manifests.load_root_from_data(data)
766 }
767
768 pub fn load_encoding_manifest(&self, data: Vec<u8>) -> Result<()> {
770 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
771 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
772 })?;
773 manifests.load_encoding_from_data(data)
774 }
775
776 pub fn load_root_manifest_from_file(&self, path: &std::path::Path) -> Result<()> {
778 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
779 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
780 })?;
781 manifests.load_root_from_file(path)
782 }
783
784 pub fn load_encoding_manifest_from_file(&self, path: &std::path::Path) -> Result<()> {
786 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
787 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
788 })?;
789 manifests.load_encoding_from_file(path)
790 }
791
792 pub fn load_listfile(&self, path: &std::path::Path) -> Result<usize> {
794 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
795 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
796 })?;
797 manifests.load_listfile(path)
798 }
799
800 pub fn read_by_fdid(&self, fdid: u32) -> Result<Vec<u8>> {
802 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
803 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
804 })?;
805
806 let mapping = manifests.lookup_by_fdid(fdid)?;
807 let ekey = mapping
808 .encoding_key
809 .ok_or_else(|| CascError::EntryNotFound(format!("EKey for FDID {fdid}")))?;
810
811 self.read(&ekey)
812 }
813
814 pub fn read_by_filename(&self, filename: &str) -> Result<Vec<u8>> {
816 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
817 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
818 })?;
819
820 let mapping = manifests.lookup_by_filename(filename)?;
821 let ekey = mapping
822 .encoding_key
823 .ok_or_else(|| CascError::EntryNotFound(format!("EKey for filename {filename}")))?;
824
825 self.read(&ekey)
826 }
827
828 pub fn get_fdid_for_filename(&self, filename: &str) -> Option<u32> {
830 self.tact_manifests
831 .as_ref()?
832 .get_fdid_for_filename(filename)
833 }
834
835 pub fn get_all_fdids(&self) -> Result<Vec<u32>> {
837 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
838 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
839 })?;
840 manifests.get_all_fdids()
841 }
842
843 pub fn tact_manifests_loaded(&self) -> bool {
845 self.tact_manifests.as_ref().is_some_and(|m| m.is_loaded())
846 }
847
848 pub fn get_file_mapping(&self, fdid: u32) -> Result<FileMapping> {
850 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
851 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
852 })?;
853 manifests.lookup_by_fdid(fdid)
854 }
855
856 pub fn clear_manifest_cache(&self) {
858 if let Some(manifests) = &self.tact_manifests {
859 manifests.clear_cache();
860 }
861 }
862
863 pub async fn init_async_indices(&mut self) -> Result<()> {
867 let config = AsyncIndexConfig {
868 max_concurrent_files: 16,
869 buffer_size: 128 * 1024, enable_caching: true,
871 max_cache_entries: 100_000,
872 enable_background_updates: false, };
874
875 let manager = Arc::new(AsyncIndexManager::new(config));
876
877 let loaded = manager.load_directory(&self.config.data_path).await?;
879
880 info!("Async index manager initialized with {} indices", loaded);
881 self.async_index_manager = Some(manager);
882
883 Ok(())
884 }
885
886 pub async fn lookup_async(&self, ekey: &EKey) -> Option<ArchiveLocation> {
888 if let Some(ref manager) = self.async_index_manager {
889 manager.lookup(ekey).await
890 } else {
891 self.combined_index.lookup(ekey)
893 }
894 }
895
896 pub async fn lookup_batch_async(&self, ekeys: &[EKey]) -> Vec<Option<ArchiveLocation>> {
898 if let Some(ref manager) = self.async_index_manager {
899 manager.lookup_batch(ekeys).await
900 } else {
901 self.combined_index.lookup_batch(ekeys)
903 }
904 }
905
906 pub async fn start_index_background_updates(&self, interval: std::time::Duration) {
908 if let Some(ref manager) = self.async_index_manager {
909 manager
910 .start_background_updates(self.config.data_path.clone(), interval)
911 .await;
912 info!(
913 "Started background index updates with interval {:?}",
914 interval
915 );
916 }
917 }
918
919 pub async fn stop_index_background_updates(&self) {
921 if let Some(ref manager) = self.async_index_manager {
922 manager.stop_background_updates().await;
923 info!("Stopped background index updates");
924 }
925 }
926
927 pub async fn get_async_index_stats(&self) -> Option<crate::index::AsyncIndexStats> {
929 if let Some(ref manager) = self.async_index_manager {
930 Some(manager.get_stats().await)
931 } else {
932 None
933 }
934 }
935
936 pub async fn clear_async_index_cache(&self) {
938 if let Some(ref manager) = self.async_index_manager {
939 manager.clear_cache().await;
940 debug!("Cleared async index cache");
941 }
942 }
943
944 pub fn init_progressive_loading(&mut self, config: ProgressiveConfig) {
948 let chunk_loader = Arc::new(CascStorageChunkLoader {
949 storage: self as *const CascStorage,
950 });
951
952 self.progressive_manager = Some(ProgressiveFileManager::new(config, chunk_loader));
953 info!("Initialized progressive file loading");
954 }
955
956 pub async fn read_progressive(
958 &self,
959 ekey: &EKey,
960 size_hint: SizeHint,
961 ) -> Result<Arc<crate::progressive::ProgressiveFile>> {
962 let manager = self.progressive_manager.as_ref().ok_or_else(|| {
963 CascError::InvalidArchiveFormat("Progressive loading not initialized".to_string())
964 })?;
965
966 Ok(manager
967 .get_or_create_progressive_file(*ekey, size_hint)
968 .await)
969 }
970
971 pub async fn read_by_fdid_progressive(
973 &self,
974 fdid: u32,
975 ) -> Result<Arc<crate::progressive::ProgressiveFile>> {
976 let manifests = self.tact_manifests.as_ref().ok_or_else(|| {
977 CascError::ManifestNotLoaded("TACT manifests not initialized".to_string())
978 })?;
979
980 let mapping = manifests.lookup_by_fdid(fdid)?;
981 let ekey = mapping
982 .encoding_key
983 .ok_or_else(|| CascError::EntryNotFound(format!("EKey for FDID {fdid}")))?;
984
985 let size_hint = if let Some(location) = self.combined_index.lookup(&ekey) {
987 SizeHint::Minimum(location.size as u64)
989 } else {
990 SizeHint::Unknown
991 };
992
993 self.read_progressive(&ekey, size_hint).await
994 }
995
996 pub fn get_size_hint_for_ekey(&self, ekey: &EKey) -> SizeHint {
998 if let Some(location) = self.combined_index.lookup(ekey) {
999 SizeHint::Minimum(location.size as u64)
1002 } else {
1003 SizeHint::Unknown
1004 }
1005 }
1006
1007 pub fn has_progressive_loading(&self) -> bool {
1009 self.progressive_manager.is_some()
1010 }
1011
1012 pub async fn cleanup_progressive_files(&self) {
1014 if let Some(manager) = &self.progressive_manager {
1015 use std::time::Duration;
1016 manager
1017 .cleanup_inactive_files(Duration::from_secs(300))
1018 .await; }
1020 }
1021
1022 pub async fn get_progressive_stats(&self) -> Vec<(EKey, crate::progressive::LoadingStats)> {
1024 if let Some(manager) = &self.progressive_manager {
1025 manager.get_global_stats().await
1026 } else {
1027 Vec::new()
1028 }
1029 }
1030}
1031
1032struct CascStorageChunkLoader {
1034 storage: *const CascStorage,
1035}
1036
1037unsafe impl Send for CascStorageChunkLoader {}
1040unsafe impl Sync for CascStorageChunkLoader {}
1041
1042#[async_trait::async_trait]
1043impl ChunkLoader for CascStorageChunkLoader {
1044 async fn load_chunk(&self, ekey: EKey, offset: u64, size: usize) -> Result<Vec<u8>> {
1045 let storage = unsafe { &*self.storage };
1049
1050 let location = storage.combined_index.lookup(&ekey).ok_or_else(|| {
1052 debug!("EKey {} not found in combined index", ekey);
1053 CascError::EntryNotFound(ekey.to_string())
1054 })?;
1055
1056 debug!(
1057 "Loading chunk for {} from archive {} at offset {:x} (chunk offset={}, size={})",
1058 ekey, location.archive_id, location.offset, offset, size
1059 );
1060
1061 let raw_data = {
1063 let mut archives = storage.archives.write();
1064 let archive = archives
1065 .get_mut(&location.archive_id)
1066 .ok_or(CascError::ArchiveNotFound(location.archive_id))?;
1067
1068 archive.read_at(&location)?
1069 };
1070
1071 const CASC_ENTRY_HEADER_SIZE: usize = 30;
1073
1074 if raw_data.len() < CASC_ENTRY_HEADER_SIZE {
1075 return Err(CascError::InvalidArchiveFormat(format!(
1076 "Archive data too small: {} bytes",
1077 raw_data.len()
1078 )));
1079 }
1080
1081 let compressed_data = raw_data[CASC_ENTRY_HEADER_SIZE..].to_vec();
1083
1084 use std::io::{Cursor, Read};
1086 let cursor = Cursor::new(compressed_data);
1087 let mut stream = blte::create_streaming_reader(cursor, None)
1088 .map_err(|e| CascError::DecompressionError(e.to_string()))?;
1089
1090 if offset > 0 {
1092 let mut discard_buf = vec![0u8; 8192]; let mut remaining = offset;
1094
1095 while remaining > 0 {
1096 let to_read = (remaining as usize).min(discard_buf.len());
1097 let read = stream
1098 .read(&mut discard_buf[..to_read])
1099 .map_err(|e| CascError::DecompressionError(e.to_string()))?;
1100
1101 if read == 0 {
1102 break; }
1104
1105 remaining -= read as u64;
1106 }
1107 }
1108
1109 let mut chunk_data = vec![0u8; size];
1111 let actual_read = stream
1112 .read(&mut chunk_data)
1113 .map_err(|e| CascError::DecompressionError(e.to_string()))?;
1114
1115 chunk_data.truncate(actual_read);
1117
1118 debug!(
1119 "Loaded chunk for {} (offset={}, requested_size={}, actual_size={})",
1120 ekey, offset, size, actual_read
1121 );
1122
1123 Ok(chunk_data)
1124 }
1125}