1use crate::error::{CascError, Result};
11use crate::types::{ArchiveLocation, EKey};
12use dashmap::DashMap;
13use futures::stream::{self, StreamExt};
14use std::collections::BTreeMap;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use tokio::fs::File;
18use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
19use tokio::sync::{RwLock, Semaphore};
20use tracing::{debug, info, trace, warn};
21
22#[derive(Debug, Clone)]
24pub struct AsyncIndexConfig {
25 pub max_concurrent_files: usize,
27 pub buffer_size: usize,
29 pub enable_caching: bool,
31 pub max_cache_entries: usize,
33 pub enable_background_updates: bool,
35}
36
37impl Default for AsyncIndexConfig {
38 fn default() -> Self {
39 Self {
40 max_concurrent_files: 16,
41 buffer_size: 64 * 1024, enable_caching: true,
43 max_cache_entries: 100_000,
44 enable_background_updates: true,
45 }
46 }
47}
48
49pub struct AsyncIndexManager {
51 config: AsyncIndexConfig,
53 bucket_indices: Arc<DashMap<u8, Arc<AsyncIndex>>>,
55 lookup_cache: Arc<DashMap<EKey, ArchiveLocation>>,
57 semaphore: Arc<Semaphore>,
59 update_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
61}
62
63impl AsyncIndexManager {
64 pub fn new(config: AsyncIndexConfig) -> Self {
66 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_files));
67
68 Self {
69 config,
70 bucket_indices: Arc::new(DashMap::new()),
71 lookup_cache: Arc::new(DashMap::new()),
72 semaphore,
73 update_handle: Arc::new(RwLock::new(None)),
74 }
75 }
76
77 pub async fn load_directory(&self, path: &Path) -> Result<usize> {
79 info!("Loading indices from {:?} with async operations", path);
80
81 let index_files = self.discover_index_files(path).await?;
83
84 if index_files.is_empty() {
85 info!("No index files found in {:?}", path);
86 return Ok(0);
87 }
88
89 info!(
90 "Found {} index files, loading in parallel",
91 index_files.len()
92 );
93
94 let results = stream::iter(index_files)
96 .map(|path| self.load_single_index(path))
97 .buffer_unordered(self.config.max_concurrent_files)
98 .collect::<Vec<_>>()
99 .await;
100
101 let mut loaded = 0;
103 for result in results {
104 match result {
105 Ok(bucket) => {
106 debug!("Successfully loaded index for bucket {:02x}", bucket);
107 loaded += 1;
108 }
109 Err(e) => {
110 warn!("Failed to load index: {}", e);
111 }
112 }
113 }
114
115 info!("Successfully loaded {} indices", loaded);
116 Ok(loaded)
117 }
118
119 async fn discover_index_files(&self, path: &Path) -> Result<Vec<PathBuf>> {
121 let mut index_files = Vec::new();
122
123 let mut entries = tokio::fs::read_dir(path).await?;
125
126 while let Some(entry) = entries.next_entry().await? {
127 let path = entry.path();
128 if let Some(ext) = path.extension() {
129 if ext == "idx" || ext == "index" {
130 index_files.push(path);
131 }
132 }
133 }
134
135 for subdir in &["data", "indices"] {
137 let subpath = path.join(subdir);
138 if subpath.exists() {
139 if let Ok(mut entries) = tokio::fs::read_dir(&subpath).await {
140 while let Some(entry) = entries.next_entry().await? {
141 let path = entry.path();
142 if let Some(ext) = path.extension() {
143 if ext == "idx" || ext == "index" {
144 index_files.push(path);
145 }
146 }
147 }
148 }
149 }
150 }
151
152 Ok(index_files)
153 }
154
155 async fn load_single_index(&self, path: PathBuf) -> Result<u8> {
157 let _permit = self.semaphore.acquire().await.unwrap();
158
159 debug!("Loading index from {:?}", path);
160
161 let index = if path.extension().and_then(|s| s.to_str()) == Some("idx") {
162 AsyncIndex::load_idx(&path).await?
163 } else {
164 AsyncIndex::load_index(&path).await?
165 };
166
167 let bucket = index.bucket();
168 self.bucket_indices.insert(bucket, Arc::new(index));
169
170 Ok(bucket)
171 }
172
173 pub async fn lookup(&self, ekey: &EKey) -> Option<ArchiveLocation> {
175 if self.config.enable_caching {
177 if let Some(location) = self.lookup_cache.get(ekey) {
178 trace!("Cache hit for {}", ekey);
179 return Some(*location);
180 }
181 }
182
183 let bucket = ekey.bucket_index();
185
186 if let Some(index) = self.bucket_indices.get(&bucket) {
187 if let Some(location) = index.lookup(ekey).await {
188 if self.config.enable_caching {
190 self.update_cache(*ekey, location);
191 }
192 return Some(location);
193 }
194 }
195
196 for entry in self.bucket_indices.iter() {
198 if let Some(location) = entry.value().lookup(ekey).await {
199 if self.config.enable_caching {
201 self.update_cache(*ekey, location);
202 }
203 return Some(location);
204 }
205 }
206
207 None
208 }
209
210 pub async fn lookup_batch(&self, ekeys: &[EKey]) -> Vec<Option<ArchiveLocation>> {
212 let futures = ekeys.iter().map(|ekey| self.lookup(ekey));
214
215 futures::future::join_all(futures).await
216 }
217
218 fn update_cache(&self, ekey: EKey, location: ArchiveLocation) {
220 if self.lookup_cache.len() >= self.config.max_cache_entries {
222 if let Some(entry) = self.lookup_cache.iter().next() {
224 self.lookup_cache.remove(entry.key());
225 }
226 }
227
228 self.lookup_cache.insert(ekey, location);
229 }
230
231 pub async fn start_background_updates(&self, path: PathBuf, interval: std::time::Duration) {
233 if !self.config.enable_background_updates {
234 return;
235 }
236
237 let manager = Arc::new(self.clone_config());
238
239 let handle = tokio::spawn(async move {
240 let mut interval = tokio::time::interval(interval);
241
242 loop {
243 interval.tick().await;
244
245 debug!("Running background index update");
246
247 if let Err(e) = manager.refresh_indices(&path).await {
248 warn!("Background index update failed: {}", e);
249 }
250 }
251 });
252
253 *self.update_handle.write().await = Some(handle);
254 }
255
256 async fn refresh_indices(&self, path: &Path) -> Result<()> {
258 let index_files = self.discover_index_files(path).await?;
260
261 for file_path in index_files {
262 if let Ok(index) = self.load_single_index(file_path).await {
264 debug!("Refreshed index for bucket {:02x}", index);
265 }
266 }
267
268 Ok(())
269 }
270
271 pub async fn stop_background_updates(&self) {
273 if let Some(handle) = self.update_handle.write().await.take() {
274 handle.abort();
275 }
276 }
277
278 pub async fn get_stats(&self) -> IndexStats {
280 let mut total_entries = 0;
281 let mut total_buckets = 0;
282
283 for entry in self.bucket_indices.iter() {
284 total_buckets += 1;
285 total_entries += entry.value().entry_count().await;
286 }
287
288 IndexStats {
289 total_entries,
290 total_buckets,
291 cache_size: self.lookup_cache.len(),
292 cache_hit_rate: 0.0, }
294 }
295
296 pub async fn clear_cache(&self) {
298 self.lookup_cache.clear();
299 }
300
301 fn clone_config(&self) -> Self {
303 Self {
304 config: self.config.clone(),
305 bucket_indices: self.bucket_indices.clone(),
306 lookup_cache: self.lookup_cache.clone(),
307 semaphore: self.semaphore.clone(),
308 update_handle: Arc::new(RwLock::new(None)),
309 }
310 }
311}
312
313pub struct AsyncIndex {
315 bucket: u8,
316 entries: Arc<RwLock<BTreeMap<EKey, ArchiveLocation>>>,
317}
318
319impl AsyncIndex {
320 pub fn new(bucket: u8) -> Self {
322 Self {
323 bucket,
324 entries: Arc::new(RwLock::new(BTreeMap::new())),
325 }
326 }
327
328 pub async fn load_idx(path: &Path) -> Result<Self> {
330 let file = File::open(path).await?;
331 let mut reader = BufReader::new(file);
332
333 let mut header_buf = vec![0u8; 8];
335 reader.read_exact(&mut header_buf).await?;
336
337 let bucket = Self::extract_bucket_from_path(path)?;
339
340 let index = Self::new(bucket);
341
342 index.parse_idx_entries(&mut reader).await?;
344
345 Ok(index)
346 }
347
348 pub async fn load_index(path: &Path) -> Result<Self> {
350 let file = File::open(path).await?;
351 let mut reader = BufReader::new(file);
352
353 let bucket = Self::extract_bucket_from_path(path)?;
354 let index = Self::new(bucket);
355
356 index.parse_index_entries(&mut reader).await?;
358
359 Ok(index)
360 }
361
362 async fn parse_idx_entries(&self, reader: &mut BufReader<File>) -> Result<()> {
364 let mut entries = BTreeMap::new();
365 let mut buffer = vec![0u8; 4096]; reader.seek(tokio::io::SeekFrom::Start(0x108)).await?;
369
370 while let Ok(n) = reader.read(&mut buffer).await {
371 if n == 0 {
372 break;
373 }
374
375 let mut offset = 0;
377 while offset + 25 <= n {
378 let key_bytes = &buffer[offset..offset + 9];
380 let mut full_key = [0u8; 16];
382 full_key[..9].copy_from_slice(key_bytes);
383 let ekey = EKey::new(full_key);
384
385 let archive_id = u16::from_le_bytes([buffer[offset + 9], buffer[offset + 10]]);
386 let archive_offset = u32::from_le_bytes([
387 buffer[offset + 11],
388 buffer[offset + 12],
389 buffer[offset + 13],
390 buffer[offset + 14],
391 ]);
392 let size = u32::from_le_bytes([
393 buffer[offset + 15],
394 buffer[offset + 16],
395 buffer[offset + 17],
396 buffer[offset + 18],
397 ]);
398
399 let location = ArchiveLocation {
400 archive_id,
401 offset: archive_offset as u64,
402 size,
403 };
404
405 entries.insert(ekey, location);
406 offset += 25;
407 }
408 }
409
410 *self.entries.write().await = entries;
411 Ok(())
412 }
413
414 async fn parse_index_entries(&self, _reader: &mut BufReader<File>) -> Result<()> {
416 let entries = BTreeMap::new();
418
419 *self.entries.write().await = entries;
423 Ok(())
424 }
425
426 fn extract_bucket_from_path(path: &Path) -> Result<u8> {
428 let filename = path
429 .file_stem()
430 .and_then(|s| s.to_str())
431 .ok_or_else(|| CascError::InvalidIndexFormat("Invalid filename".into()))?;
432
433 if filename.len() >= 2 {
435 if let Ok(bucket) = u8::from_str_radix(&filename[..2], 16) {
436 return Ok(bucket);
437 }
438 }
439
440 Ok(0)
442 }
443
444 pub async fn lookup(&self, ekey: &EKey) -> Option<ArchiveLocation> {
446 self.entries.read().await.get(ekey).copied()
447 }
448
449 pub fn bucket(&self) -> u8 {
451 self.bucket
452 }
453
454 pub async fn entry_count(&self) -> usize {
456 self.entries.read().await.len()
457 }
458
459 pub async fn add_entry(&self, ekey: EKey, location: ArchiveLocation) {
461 self.entries.write().await.insert(ekey, location);
462 }
463
464 pub async fn remove_entry(&self, ekey: &EKey) -> Option<ArchiveLocation> {
466 self.entries.write().await.remove(ekey)
467 }
468
469 pub async fn add_entries_batch(&self, entries: Vec<(EKey, ArchiveLocation)>) {
471 let mut map = self.entries.write().await;
472 for (ekey, location) in entries {
473 map.insert(ekey, location);
474 }
475 }
476}
477
478#[derive(Debug, Clone)]
480pub struct IndexStats {
481 pub total_entries: usize,
482 pub total_buckets: usize,
483 pub cache_size: usize,
484 pub cache_hit_rate: f64,
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490
491 #[tokio::test]
492 async fn test_async_index_creation() {
493 let index = AsyncIndex::new(0x00);
494 assert_eq!(index.bucket(), 0x00);
495 assert_eq!(index.entry_count().await, 0);
496 }
497
498 #[tokio::test]
499 async fn test_async_index_operations() {
500 let index = AsyncIndex::new(0x01);
501
502 let mut key_data = [0u8; 16];
503 key_data[..9].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8, 9]);
504 let ekey = EKey::new(key_data);
505 let location = ArchiveLocation {
506 archive_id: 1,
507 offset: 100,
508 size: 500,
509 };
510
511 index.add_entry(ekey, location).await;
513 assert_eq!(index.entry_count().await, 1);
514
515 let found = index.lookup(&ekey).await;
517 assert_eq!(found, Some(location));
518
519 let removed = index.remove_entry(&ekey).await;
521 assert_eq!(removed, Some(location));
522 assert_eq!(index.entry_count().await, 0);
523 }
524
525 #[tokio::test]
526 async fn test_manager_creation() {
527 let config = AsyncIndexConfig::default();
528 let manager = AsyncIndexManager::new(config);
529
530 let stats = manager.get_stats().await;
531 assert_eq!(stats.total_entries, 0);
532 assert_eq!(stats.total_buckets, 0);
533 }
534
535 #[tokio::test]
536 async fn test_batch_lookup() {
537 let config = AsyncIndexConfig::default();
538 let manager = AsyncIndexManager::new(config);
539
540 let index = AsyncIndex::new(0x00);
542 let mut key1_data = [0u8; 16];
543 key1_data[..9].copy_from_slice(&[0, 1, 2, 3, 4, 5, 6, 7, 8]);
544 let ekey1 = EKey::new(key1_data);
545
546 let mut key2_data = [0u8; 16];
547 key2_data[..9].copy_from_slice(&[0, 9, 8, 7, 6, 5, 4, 3, 2]);
548 let ekey2 = EKey::new(key2_data);
549
550 let location1 = ArchiveLocation {
551 archive_id: 1,
552 offset: 100,
553 size: 200,
554 };
555
556 let location2 = ArchiveLocation {
557 archive_id: 2,
558 offset: 300,
559 size: 400,
560 };
561
562 index.add_entry(ekey1, location1).await;
563 index.add_entry(ekey2, location2).await;
564
565 manager.bucket_indices.insert(0x00, Arc::new(index));
566
567 let results = manager.lookup_batch(&[ekey1, ekey2]).await;
569 assert_eq!(results.len(), 2);
570 assert_eq!(results[0], Some(location1));
571 assert_eq!(results[1], Some(location2));
572 }
573}