zipora 3.1.2

High-performance Rust implementation providing advanced data structures and compression algorithms with memory safety guarantees. Features LRU page cache, sophisticated caching layer, fiber-based concurrency, real-time compression, secure memory pools, SIMD optimizations, and complete C FFI for migration from C++.
//! Multi-shard architecture for reduced contention

use super::*;
use crate::error::Result;
use std::sync::Arc;
use std::time::{Duration, Instant};

/// Multi-shard LRU page cache for high concurrency
pub struct LruPageCache {
    /// Configuration
    config: PageCacheConfig,
    
    /// Cache shards
    shards: Vec<Arc<SingleLruPageCache>>,
    
    /// Global statistics aggregator
    global_stats: CacheStatistics,
    
    /// Buffer pool for reuse
    buffer_pool: Arc<BufferPool>,
}

impl LruPageCache {
    /// Create new multi-shard cache
    pub fn new(config: PageCacheConfig) -> Result<Self> {
        config.validate()?;
        
        let num_shards = config.num_shards as usize;
        let mut shards = Vec::with_capacity(num_shards);
        
        // Create per-shard configuration
        let shard_capacity = config.capacity / num_shards;
        let mut shard_config = config.clone();
        shard_config.capacity = shard_capacity;
        shard_config.num_shards = 1; // Each shard is single-threaded
        
        // Create all shards
        for shard_id in 0..num_shards {
            let shard = Arc::new(SingleLruPageCache::new(shard_config.clone())?);
            shards.push(shard);
        }
        
        // Create buffer pool
        let buffer_pool_size = std::cmp::max(64, num_shards * 16);
        let buffer_pool = Arc::new(BufferPool::new(buffer_pool_size));
        
        Ok(Self {
            config,
            shards,
            global_stats: CacheStatistics::new(),
            buffer_pool,
        })
    }
    
    /// Register a file for caching across all shards
    pub fn register_file(&self, fd: i32) -> Result<FileId> {
        // Use first shard to generate file ID
        self.shards[0].register_file(fd)
    }
    
    /// Read data with automatic shard selection
    pub fn read(&self, file_id: FileId, offset: u64, length: usize) -> Result<CacheBuffer> {
        let start_page = (offset / PAGE_SIZE as u64) as PageId;
        let shard_id = get_shard_id(file_id, start_page, self.config.num_shards) as usize;
        
        let mut buffer = self.buffer_pool.get();
        let shard = &self.shards[shard_id];
        
        let data = shard.read(file_id, offset, length, &mut buffer)?;
        
        // Update global statistics
        self.global_stats.record_bytes_read(length as u64);
        self.global_stats.record_hit(buffer.hit_type());
        
        Ok(buffer)
    }
    
    /// Batch read operations across shards
    pub fn read_batch(&self, requests: Vec<(FileId, u64, usize)>) -> Result<Vec<CacheBuffer>> {
        let mut results = Vec::with_capacity(requests.len());
        
        // Group requests by shard
        let mut shard_requests: Vec<Vec<(usize, FileId, u64, usize)>> = 
            vec![Vec::new(); self.shards.len()];
        
        for (idx, &(file_id, offset, length)) in requests.iter().enumerate() {
            let start_page = (offset / PAGE_SIZE as u64) as PageId;
            let shard_id = get_shard_id(file_id, start_page, self.config.num_shards) as usize;
            shard_requests[shard_id].push((idx, file_id, offset, length));
        }
        
        // Execute requests in parallel per shard
        results.resize_with(requests.len(), || CacheBuffer::new());
        
        for (shard_id, shard_reqs) in shard_requests.iter().enumerate() {
            let shard = &self.shards[shard_id];
            
            for &(idx, file_id, offset, length) in shard_reqs {
                let mut buffer = self.buffer_pool.get();
                match shard.read(file_id, offset, length, &mut buffer) {
                    Ok(_) => {
                        self.global_stats.record_bytes_read(length as u64);
                        self.global_stats.record_hit(buffer.hit_type());
                        results[idx] = buffer;
                    }
                    Err(e) => {
                        self.global_stats.record_load_failure();
                        return Err(e);
                    }
                }
            }
        }
        
        Ok(results)
    }
    
    /// Read with cache-aware blob store integration
    pub fn read_blob_record(&self, file_id: FileId, record_id: u64, base_offset: u64) -> Result<CacheBuffer> {
        // This would integrate with existing blob store offset calculations
        // For now, use simple offset calculation
        let offset = base_offset + record_id * 1024; // Placeholder
        let length = 1024; // Placeholder
        
        self.read(file_id, offset, length)
    }
    
    /// Prefetch pages for better cache warming
    pub fn prefetch(&self, file_id: FileId, offset: u64, length: usize) -> Result<()> {
        let start_page = (offset / PAGE_SIZE as u64) as PageId;
        let end_page = ((offset + length as u64 - 1) / PAGE_SIZE as u64) as PageId;
        
        // Issue prefetch requests for each page
        for page_id in start_page..=end_page {
            let shard_id = get_shard_id(file_id, page_id, self.config.num_shards) as usize;
            let page_offset = (page_id as u64) * (PAGE_SIZE as u64);
            
            // Async prefetch would go here
            // For now, just trigger a read to warm the cache
            let _ = self.read(file_id, page_offset, PAGE_SIZE);
        }
        
        Ok(())
    }
    
    /// Invalidate cached pages for a file
    pub fn invalidate_file(&self, file_id: FileId) -> Result<()> {
        // Would need to invalidate across all shards
        // This is a complex operation requiring coordination
        for shard in &self.shards {
            // TODO: Implement per-shard invalidation
        }
        
        Ok(())
    }
    
    /// Get aggregated statistics across all shards
    pub fn stats(&self) -> CacheStatsSnapshot {
        let mut aggregated = self.global_stats.snapshot();
        
        // Aggregate shard-specific statistics
        for shard in &self.shards {
            let shard_stats = shard.stats().snapshot();
            
            // Add shard stats to global stats
            for i in 0..7 {
                aggregated.hit_counts[i] += shard_stats.hit_counts[i];
            }
            
            aggregated.total_hits += shard_stats.total_hits;
            aggregated.total_misses += shard_stats.total_misses;
            aggregated.evictions += shard_stats.evictions;
            aggregated.hash_collisions += shard_stats.hash_collisions;
            aggregated.memory_used += shard_stats.memory_used;
            aggregated.lock_contentions += shard_stats.lock_contentions;
            aggregated.lock_acquisitions += shard_stats.lock_acquisitions;
        }
        
        // Recalculate derived metrics
        aggregated.hit_ratio = if aggregated.total_hits + aggregated.total_misses > 0 {
            aggregated.total_hits as f64 / (aggregated.total_hits + aggregated.total_misses) as f64
        } else {
            0.0
        };
        aggregated.miss_ratio = 1.0 - aggregated.hit_ratio;
        
        aggregated.memory_utilization = if aggregated.memory_allocated > 0 {
            aggregated.memory_used as f64 / aggregated.memory_allocated as f64
        } else {
            0.0
        };
        
        aggregated.lock_contention_ratio = if aggregated.lock_acquisitions > 0 {
            aggregated.lock_contentions as f64 / aggregated.lock_acquisitions as f64
        } else {
            0.0
        };
        
        aggregated
    }
    
    /// Get per-shard statistics
    pub fn shard_stats(&self) -> Vec<CacheStatsSnapshot> {
        self.shards.iter().map(|shard| shard.stats().snapshot()).collect()
    }
    
    /// Get buffer pool statistics
    pub fn buffer_pool_stats(&self) -> BufferPoolStats {
        self.buffer_pool.stats()
    }
    
    /// Get configuration
    pub fn config(&self) -> &PageCacheConfig {
        &self.config
    }
    
    /// Get number of shards
    pub fn shard_count(&self) -> usize {
        self.shards.len()
    }
    
    /// Get shard for specific file and page
    pub fn get_shard(&self, file_id: FileId, page_id: PageId) -> &Arc<SingleLruPageCache> {
        let shard_id = get_shard_id(file_id, page_id, self.config.num_shards) as usize;
        &self.shards[shard_id]
    }
    
    /// Perform maintenance across all shards
    pub fn maintenance(&self) -> Result<()> {
        for shard in &self.shards {
            // TODO: Implement shard maintenance
            // - Defragmentation
            // - Statistics collection
            // - Background cleanup
        }
        
        self.global_stats.record_maintenance_cycle();
        Ok(())
    }
    
    /// Resize cache capacity (if supported)
    pub fn resize(&mut self, new_capacity: usize) -> Result<()> {
        // This would be a complex operation requiring careful coordination
        // For now, return error as resize is not supported
        Err(CacheError::HardwareUnsupported.into())
    }
    
    /// Flush all dirty pages (if write caching is implemented)
    pub fn flush(&self) -> Result<()> {
        // TODO: Implement flush for write-through/write-back caching
        Ok(())
    }
}

/// Shard selection strategy
pub enum ShardStrategy {
    /// Hash-based selection (default)
    Hash,
    /// Round-robin selection
    RoundRobin,
    /// Load-based selection
    LoadBalanced,
    /// Custom selection function
    Custom(fn(FileId, PageId, u32) -> u32),
}

impl ShardStrategy {
    /// Select shard for given file and page
    pub fn select_shard(&self, file_id: FileId, page_id: PageId, num_shards: u32) -> u32 {
        match self {
            ShardStrategy::Hash => get_shard_id(file_id, page_id, num_shards),
            ShardStrategy::RoundRobin => {
                // Simple round-robin based on page ID
                page_id % num_shards
            }
            ShardStrategy::LoadBalanced => {
                // TODO: Implement load-based selection
                get_shard_id(file_id, page_id, num_shards)
            }
            ShardStrategy::Custom(func) => func(file_id, page_id, num_shards),
        }
    }
}

/// Cache operation context for tracking across shards
pub struct CacheOpContext {
    /// Operation ID for tracking
    pub op_id: u64,
    
    /// Start time
    pub start_time: Instant,
    
    /// Shards involved
    pub shards_used: Vec<usize>,
    
    /// Total bytes requested
    pub bytes_requested: usize,
    
    /// Cache hit information
    pub hit_info: Vec<CacheHitType>,
}

impl CacheOpContext {
    pub fn new(op_id: u64, bytes_requested: usize) -> Self {
        Self {
            op_id,
            start_time: Instant::now(),
            shards_used: Vec::new(),
            bytes_requested,
            hit_info: Vec::new(),
        }
    }
    
    pub fn add_shard(&mut self, shard_id: usize, hit_type: CacheHitType) {
        self.shards_used.push(shard_id);
        self.hit_info.push(hit_type);
    }
    
    pub fn duration(&self) -> Duration {
        self.start_time.elapsed()
    }
    
    pub fn overall_hit_type(&self) -> CacheHitType {
        if self.hit_info.is_empty() {
            CacheHitType::Hit
        } else if self.hit_info.len() == 1 {
            self.hit_info[0]
        } else {
            CacheHitType::Mix
        }
    }
}