use crate::error::{CascError, Result};
use crate::types::{ArchiveLocation, EKey};
use dashmap::DashMap;
use futures::stream::{self, StreamExt};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
use tokio::sync::{RwLock, Semaphore};
use tracing::{debug, info, trace, warn};
#[derive(Debug, Clone)]
pub struct AsyncIndexConfig {
pub max_concurrent_files: usize,
pub buffer_size: usize,
pub enable_caching: bool,
pub max_cache_entries: usize,
pub enable_background_updates: bool,
}
impl Default for AsyncIndexConfig {
fn default() -> Self {
Self {
max_concurrent_files: 16,
buffer_size: 64 * 1024, enable_caching: true,
max_cache_entries: 100_000,
enable_background_updates: true,
}
}
}
pub struct AsyncIndexManager {
config: AsyncIndexConfig,
bucket_indices: Arc<DashMap<u8, Arc<AsyncIndex>>>,
lookup_cache: Arc<DashMap<EKey, ArchiveLocation>>,
semaphore: Arc<Semaphore>,
update_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
}
impl AsyncIndexManager {
pub fn new(config: AsyncIndexConfig) -> Self {
let semaphore = Arc::new(Semaphore::new(config.max_concurrent_files));
Self {
config,
bucket_indices: Arc::new(DashMap::new()),
lookup_cache: Arc::new(DashMap::new()),
semaphore,
update_handle: Arc::new(RwLock::new(None)),
}
}
pub async fn load_directory(&self, path: &Path) -> Result<usize> {
info!("Loading indices from {:?} with async operations", path);
let index_files = self.discover_index_files(path).await?;
if index_files.is_empty() {
info!("No index files found in {:?}", path);
return Ok(0);
}
info!(
"Found {} index files, loading in parallel",
index_files.len()
);
let results = stream::iter(index_files)
.map(|path| self.load_single_index(path))
.buffer_unordered(self.config.max_concurrent_files)
.collect::<Vec<_>>()
.await;
let mut loaded = 0;
for result in results {
match result {
Ok(bucket) => {
debug!("Successfully loaded index for bucket {:02x}", bucket);
loaded += 1;
}
Err(e) => {
warn!("Failed to load index: {}", e);
}
}
}
info!("Successfully loaded {} indices", loaded);
Ok(loaded)
}
async fn discover_index_files(&self, path: &Path) -> Result<Vec<PathBuf>> {
let mut index_files = Vec::new();
let mut entries = tokio::fs::read_dir(path).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if let Some(ext) = path.extension() {
if ext == "idx" || ext == "index" {
index_files.push(path);
}
}
}
for subdir in &["data", "indices"] {
let subpath = path.join(subdir);
if subpath.exists() {
if let Ok(mut entries) = tokio::fs::read_dir(&subpath).await {
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if let Some(ext) = path.extension() {
if ext == "idx" || ext == "index" {
index_files.push(path);
}
}
}
}
}
}
Ok(index_files)
}
async fn load_single_index(&self, path: PathBuf) -> Result<u8> {
let _permit = self.semaphore.acquire().await.unwrap();
debug!("Loading index from {:?}", path);
let index = if path.extension().and_then(|s| s.to_str()) == Some("idx") {
AsyncIndex::load_idx(&path).await?
} else {
AsyncIndex::load_index(&path).await?
};
let bucket = index.bucket();
self.bucket_indices.insert(bucket, Arc::new(index));
Ok(bucket)
}
pub async fn lookup(&self, ekey: &EKey) -> Option<ArchiveLocation> {
if self.config.enable_caching {
if let Some(location) = self.lookup_cache.get(ekey) {
trace!("Cache hit for {}", ekey);
return Some(*location);
}
}
let bucket = ekey.bucket_index();
if let Some(index) = self.bucket_indices.get(&bucket) {
if let Some(location) = index.lookup(ekey).await {
if self.config.enable_caching {
self.update_cache(*ekey, location);
}
return Some(location);
}
}
for entry in self.bucket_indices.iter() {
if let Some(location) = entry.value().lookup(ekey).await {
if self.config.enable_caching {
self.update_cache(*ekey, location);
}
return Some(location);
}
}
None
}
pub async fn lookup_batch(&self, ekeys: &[EKey]) -> Vec<Option<ArchiveLocation>> {
let futures = ekeys.iter().map(|ekey| self.lookup(ekey));
futures::future::join_all(futures).await
}
fn update_cache(&self, ekey: EKey, location: ArchiveLocation) {
if self.lookup_cache.len() >= self.config.max_cache_entries {
if let Some(entry) = self.lookup_cache.iter().next() {
self.lookup_cache.remove(entry.key());
}
}
self.lookup_cache.insert(ekey, location);
}
pub async fn start_background_updates(&self, path: PathBuf, interval: std::time::Duration) {
if !self.config.enable_background_updates {
return;
}
let manager = Arc::new(self.clone_config());
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
debug!("Running background index update");
if let Err(e) = manager.refresh_indices(&path).await {
warn!("Background index update failed: {}", e);
}
}
});
*self.update_handle.write().await = Some(handle);
}
async fn refresh_indices(&self, path: &Path) -> Result<()> {
let index_files = self.discover_index_files(path).await?;
for file_path in index_files {
if let Ok(index) = self.load_single_index(file_path).await {
debug!("Refreshed index for bucket {:02x}", index);
}
}
Ok(())
}
pub async fn stop_background_updates(&self) {
if let Some(handle) = self.update_handle.write().await.take() {
handle.abort();
}
}
pub async fn get_stats(&self) -> IndexStats {
let mut total_entries = 0;
let mut total_buckets = 0;
for entry in self.bucket_indices.iter() {
total_buckets += 1;
total_entries += entry.value().entry_count().await;
}
IndexStats {
total_entries,
total_buckets,
cache_size: self.lookup_cache.len(),
cache_hit_rate: 0.0, }
}
pub async fn clear_cache(&self) {
self.lookup_cache.clear();
}
fn clone_config(&self) -> Self {
Self {
config: self.config.clone(),
bucket_indices: self.bucket_indices.clone(),
lookup_cache: self.lookup_cache.clone(),
semaphore: self.semaphore.clone(),
update_handle: Arc::new(RwLock::new(None)),
}
}
}
pub struct AsyncIndex {
bucket: u8,
entries: Arc<RwLock<BTreeMap<EKey, ArchiveLocation>>>,
}
impl AsyncIndex {
pub fn new(bucket: u8) -> Self {
Self {
bucket,
entries: Arc::new(RwLock::new(BTreeMap::new())),
}
}
pub async fn load_idx(path: &Path) -> Result<Self> {
let file = File::open(path).await?;
let mut reader = BufReader::new(file);
let mut header_buf = vec![0u8; 8];
reader.read_exact(&mut header_buf).await?;
let bucket = Self::extract_bucket_from_path(path)?;
let index = Self::new(bucket);
index.parse_idx_entries(&mut reader).await?;
Ok(index)
}
pub async fn load_index(path: &Path) -> Result<Self> {
let file = File::open(path).await?;
let mut reader = BufReader::new(file);
let bucket = Self::extract_bucket_from_path(path)?;
let index = Self::new(bucket);
index.parse_index_entries(&mut reader).await?;
Ok(index)
}
async fn parse_idx_entries(&self, reader: &mut BufReader<File>) -> Result<()> {
let mut entries = BTreeMap::new();
let mut buffer = vec![0u8; 4096];
reader.seek(tokio::io::SeekFrom::Start(0x108)).await?;
while let Ok(n) = reader.read(&mut buffer).await {
if n == 0 {
break;
}
let mut offset = 0;
while offset + 25 <= n {
let key_bytes = &buffer[offset..offset + 9];
let mut full_key = [0u8; 16];
full_key[..9].copy_from_slice(key_bytes);
let ekey = EKey::new(full_key);
let archive_id = u16::from_le_bytes([buffer[offset + 9], buffer[offset + 10]]);
let archive_offset = u32::from_le_bytes([
buffer[offset + 11],
buffer[offset + 12],
buffer[offset + 13],
buffer[offset + 14],
]);
let size = u32::from_le_bytes([
buffer[offset + 15],
buffer[offset + 16],
buffer[offset + 17],
buffer[offset + 18],
]);
let location = ArchiveLocation {
archive_id,
offset: archive_offset as u64,
size,
};
entries.insert(ekey, location);
offset += 25;
}
}
*self.entries.write().await = entries;
Ok(())
}
async fn parse_index_entries(&self, _reader: &mut BufReader<File>) -> Result<()> {
let entries = BTreeMap::new();
*self.entries.write().await = entries;
Ok(())
}
fn extract_bucket_from_path(path: &Path) -> Result<u8> {
let filename = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| CascError::InvalidIndexFormat("Invalid filename".into()))?;
if filename.len() >= 2 {
if let Ok(bucket) = u8::from_str_radix(&filename[..2], 16) {
return Ok(bucket);
}
}
Ok(0)
}
pub async fn lookup(&self, ekey: &EKey) -> Option<ArchiveLocation> {
self.entries.read().await.get(ekey).copied()
}
pub fn bucket(&self) -> u8 {
self.bucket
}
pub async fn entry_count(&self) -> usize {
self.entries.read().await.len()
}
pub async fn add_entry(&self, ekey: EKey, location: ArchiveLocation) {
self.entries.write().await.insert(ekey, location);
}
pub async fn remove_entry(&self, ekey: &EKey) -> Option<ArchiveLocation> {
self.entries.write().await.remove(ekey)
}
pub async fn add_entries_batch(&self, entries: Vec<(EKey, ArchiveLocation)>) {
let mut map = self.entries.write().await;
for (ekey, location) in entries {
map.insert(ekey, location);
}
}
}
#[derive(Debug, Clone)]
pub struct IndexStats {
pub total_entries: usize,
pub total_buckets: usize,
pub cache_size: usize,
pub cache_hit_rate: f64,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_async_index_creation() {
let index = AsyncIndex::new(0x00);
assert_eq!(index.bucket(), 0x00);
assert_eq!(index.entry_count().await, 0);
}
#[tokio::test]
async fn test_async_index_operations() {
let index = AsyncIndex::new(0x01);
let mut key_data = [0u8; 16];
key_data[..9].copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8, 9]);
let ekey = EKey::new(key_data);
let location = ArchiveLocation {
archive_id: 1,
offset: 100,
size: 500,
};
index.add_entry(ekey, location).await;
assert_eq!(index.entry_count().await, 1);
let found = index.lookup(&ekey).await;
assert_eq!(found, Some(location));
let removed = index.remove_entry(&ekey).await;
assert_eq!(removed, Some(location));
assert_eq!(index.entry_count().await, 0);
}
#[tokio::test]
async fn test_manager_creation() {
let config = AsyncIndexConfig::default();
let manager = AsyncIndexManager::new(config);
let stats = manager.get_stats().await;
assert_eq!(stats.total_entries, 0);
assert_eq!(stats.total_buckets, 0);
}
#[tokio::test]
async fn test_batch_lookup() {
let config = AsyncIndexConfig::default();
let manager = AsyncIndexManager::new(config);
let index = AsyncIndex::new(0x00);
let mut key1_data = [0u8; 16];
key1_data[..9].copy_from_slice(&[0, 1, 2, 3, 4, 5, 6, 7, 8]);
let ekey1 = EKey::new(key1_data);
let mut key2_data = [0u8; 16];
key2_data[..9].copy_from_slice(&[0, 9, 8, 7, 6, 5, 4, 3, 2]);
let ekey2 = EKey::new(key2_data);
let location1 = ArchiveLocation {
archive_id: 1,
offset: 100,
size: 200,
};
let location2 = ArchiveLocation {
archive_id: 2,
offset: 300,
size: 400,
};
index.add_entry(ekey1, location1).await;
index.add_entry(ekey2, location2).await;
manager.bucket_indices.insert(0x00, Arc::new(index));
let results = manager.lookup_batch(&[ekey1, ekey2]).await;
assert_eq!(results.len(), 2);
assert_eq!(results[0], Some(location1));
assert_eq!(results[1], Some(location2));
}
}