Skip to main content

rivven_core/storage/
tiered.rs

1//! Tiered Storage System for Rivven
2//!
3//! Implements a hot/warm/cold tiered storage architecture:
4//! - **Hot Tier**: In-memory buffer + NVMe/SSD for recent data (sub-ms access)
5//! - **Warm Tier**: Local disk storage for medium-aged data (ms access)
6//! - **Cold Tier**: Object storage (S3/MinIO/Azure Blob) for archival (100ms+ access)
7//!
8//! Features:
9//! - Automatic tier promotion/demotion based on access patterns
10//! - LRU cache for hot tier with size limits
11//! - Asynchronous background compaction and migration
12//! - Zero-copy reads from memory-mapped warm tier
13//! - Pluggable cold storage backends
14
15use bytes::Bytes;
16use std::collections::{BTreeMap, HashMap, VecDeque};
17use std::path::PathBuf;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
21use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
22use tokio::time::interval;
23
24use crate::{Error, Result};
25
26/// Storage tier classification
27#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
28pub enum StorageTier {
29    /// Hot tier: In-memory + fast SSD, < 1ms access
30    Hot,
31    /// Warm tier: Local disk, mmap'd, 1-10ms access  
32    Warm,
33    /// Cold tier: Object storage, 100ms+ access
34    Cold,
35}
36
37impl StorageTier {
38    /// Get tier name for metrics/logging
39    pub fn name(&self) -> &'static str {
40        match self {
41            StorageTier::Hot => "hot",
42            StorageTier::Warm => "warm",
43            StorageTier::Cold => "cold",
44        }
45    }
46
47    /// Get next cooler tier (for demotion)
48    pub fn demote(&self) -> Option<StorageTier> {
49        match self {
50            StorageTier::Hot => Some(StorageTier::Warm),
51            StorageTier::Warm => Some(StorageTier::Cold),
52            StorageTier::Cold => None,
53        }
54    }
55
56    /// Get next hotter tier (for promotion)
57    pub fn promote(&self) -> Option<StorageTier> {
58        match self {
59            StorageTier::Hot => None,
60            StorageTier::Warm => Some(StorageTier::Hot),
61            StorageTier::Cold => Some(StorageTier::Warm),
62        }
63    }
64}
65
66/// Configuration for tiered storage
67#[derive(Debug, Clone)]
68pub struct TieredStorageConfig {
69    /// Maximum size of hot tier in bytes
70    pub hot_tier_max_bytes: u64,
71    /// Maximum age of data in hot tier before demotion
72    pub hot_tier_max_age: Duration,
73    /// Maximum size of warm tier in bytes
74    pub warm_tier_max_bytes: u64,
75    /// Maximum age of data in warm tier before demotion  
76    pub warm_tier_max_age: Duration,
77    /// Path for warm tier storage
78    pub warm_tier_path: PathBuf,
79    /// Cold storage backend configuration
80    pub cold_storage: ColdStorageConfig,
81    /// How often to run tier migration
82    pub migration_interval: Duration,
83    /// Number of concurrent migration operations
84    pub migration_concurrency: usize,
85    /// Enable access-based promotion (promote frequently accessed cold data)
86    pub enable_promotion: bool,
87    /// Access count threshold for promotion
88    pub promotion_threshold: u64,
89    /// Compaction threshold (ratio of dead bytes to total)
90    pub compaction_threshold: f64,
91}
92
93impl Default for TieredStorageConfig {
94    fn default() -> Self {
95        Self {
96            hot_tier_max_bytes: 1024 * 1024 * 1024,            // 1 GB
97            hot_tier_max_age: Duration::from_secs(3600),       // 1 hour
98            warm_tier_max_bytes: 100 * 1024 * 1024 * 1024,     // 100 GB
99            warm_tier_max_age: Duration::from_secs(86400 * 7), // 7 days
100            warm_tier_path: PathBuf::from("/var/lib/rivven/warm"),
101            cold_storage: ColdStorageConfig::default(),
102            migration_interval: Duration::from_secs(60),
103            migration_concurrency: 4,
104            enable_promotion: true,
105            promotion_threshold: 100,
106            compaction_threshold: 0.5,
107        }
108    }
109}
110
111impl TieredStorageConfig {
112    /// High-performance config for low-latency workloads
113    pub fn high_performance() -> Self {
114        Self {
115            hot_tier_max_bytes: 8 * 1024 * 1024 * 1024,    // 8 GB
116            hot_tier_max_age: Duration::from_secs(7200),   // 2 hours
117            warm_tier_max_bytes: 500 * 1024 * 1024 * 1024, // 500 GB
118            migration_interval: Duration::from_secs(30),
119            ..Default::default()
120        }
121    }
122
123    /// Cost-optimized config for archival workloads
124    pub fn cost_optimized() -> Self {
125        Self {
126            hot_tier_max_bytes: 256 * 1024 * 1024,         // 256 MB
127            hot_tier_max_age: Duration::from_secs(300),    // 5 minutes
128            warm_tier_max_bytes: 10 * 1024 * 1024 * 1024,  // 10 GB
129            warm_tier_max_age: Duration::from_secs(86400), // 1 day
130            migration_interval: Duration::from_secs(120),
131            enable_promotion: false,
132            ..Default::default()
133        }
134    }
135}
136
137/// Cold storage backend configuration
138#[derive(Debug, Clone)]
139pub enum ColdStorageConfig {
140    /// Local filesystem (for development/testing)
141    LocalFs { path: PathBuf },
142    /// S3-compatible object storage
143    S3 {
144        endpoint: String,
145        bucket: String,
146        region: String,
147        access_key: Option<String>,
148        secret_key: Option<String>,
149        use_path_style: bool,
150    },
151    /// Azure Blob Storage
152    AzureBlob {
153        account: String,
154        container: String,
155        access_key: Option<String>,
156    },
157    /// Disabled (warm tier is final)
158    Disabled,
159}
160
161impl Default for ColdStorageConfig {
162    fn default() -> Self {
163        ColdStorageConfig::LocalFs {
164            path: PathBuf::from("/var/lib/rivven/cold"),
165        }
166    }
167}
168
169/// Metadata about a stored segment
170#[derive(Debug)]
171pub struct SegmentMetadata {
172    /// Topic name
173    pub topic: String,
174    /// Partition ID
175    pub partition: u32,
176    /// Base offset of segment
177    pub base_offset: u64,
178    /// End offset (exclusive)
179    pub end_offset: u64,
180    /// Size in bytes
181    pub size_bytes: u64,
182    /// Current storage tier
183    pub tier: StorageTier,
184    /// Creation timestamp
185    pub created_at: u64,
186    /// Last accessed timestamp
187    pub last_accessed: AtomicU64,
188    /// Access count for promotion decisions
189    pub access_count: AtomicU64,
190    /// Number of deleted/compacted records
191    pub dead_records: AtomicU64,
192    /// Total records
193    pub total_records: u64,
194}
195
196impl SegmentMetadata {
197    pub fn new(
198        topic: String,
199        partition: u32,
200        base_offset: u64,
201        end_offset: u64,
202        size_bytes: u64,
203        tier: StorageTier,
204    ) -> Self {
205        let now = SystemTime::now()
206            .duration_since(UNIX_EPOCH)
207            .unwrap_or_default()
208            .as_secs();
209
210        Self {
211            topic,
212            partition,
213            base_offset,
214            end_offset,
215            size_bytes,
216            tier,
217            created_at: now,
218            last_accessed: AtomicU64::new(now),
219            access_count: AtomicU64::new(0),
220            dead_records: AtomicU64::new(0),
221            total_records: (end_offset - base_offset),
222        }
223    }
224
225    /// Record an access and update statistics
226    pub fn record_access(&self) {
227        let now = SystemTime::now()
228            .duration_since(UNIX_EPOCH)
229            .unwrap_or_default()
230            .as_secs();
231        self.last_accessed.store(now, Ordering::Relaxed);
232        self.access_count.fetch_add(1, Ordering::Relaxed);
233    }
234
235    /// Get age in seconds
236    pub fn age_secs(&self) -> u64 {
237        let now = SystemTime::now()
238            .duration_since(UNIX_EPOCH)
239            .unwrap_or_default()
240            .as_secs();
241        now.saturating_sub(self.created_at)
242    }
243
244    /// Get seconds since last access
245    pub fn idle_secs(&self) -> u64 {
246        let now = SystemTime::now()
247            .duration_since(UNIX_EPOCH)
248            .unwrap_or_default()
249            .as_secs();
250        now.saturating_sub(self.last_accessed.load(Ordering::Relaxed))
251    }
252
253    /// Calculate compaction ratio (dead/total)
254    pub fn compaction_ratio(&self) -> f64 {
255        let dead = self.dead_records.load(Ordering::Relaxed);
256        if self.total_records == 0 {
257            0.0
258        } else {
259            dead as f64 / self.total_records as f64
260        }
261    }
262
263    /// Build segment key for storage
264    pub fn segment_key(&self) -> String {
265        format!("{}/{}/{:020}", self.topic, self.partition, self.base_offset)
266    }
267}
268
269/// Hot tier: In-memory LRU cache
270#[derive(Debug)]
271pub struct HotTier {
272    /// Segment data keyed by (topic, partition, base_offset)
273    segments: RwLock<HashMap<(String, u32, u64), Bytes>>,
274    /// LRU order tracking
275    lru_order: Mutex<VecDeque<(String, u32, u64)>>,
276    /// Current size in bytes
277    current_size: AtomicU64,
278    /// Maximum size in bytes
279    max_size: u64,
280}
281
282impl HotTier {
283    pub fn new(max_size: u64) -> Self {
284        Self {
285            segments: RwLock::new(HashMap::new()),
286            lru_order: Mutex::new(VecDeque::new()),
287            current_size: AtomicU64::new(0),
288            max_size,
289        }
290    }
291
292    /// Insert data into hot tier
293    pub async fn insert(&self, topic: &str, partition: u32, base_offset: u64, data: Bytes) -> bool {
294        let size = data.len() as u64;
295
296        // Check if it fits
297        if size > self.max_size {
298            return false;
299        }
300
301        // Evict until we have space
302        while self.current_size.load(Ordering::Relaxed) + size > self.max_size {
303            if !self.evict_one().await {
304                break;
305            }
306        }
307
308        let key = (topic.to_string(), partition, base_offset);
309
310        // Insert data
311        {
312            let mut segments = self.segments.write().await;
313            if let Some(old) = segments.insert(key.clone(), data) {
314                self.current_size
315                    .fetch_sub(old.len() as u64, Ordering::Relaxed);
316            }
317        }
318
319        // Update LRU
320        {
321            let mut lru = self.lru_order.lock().await;
322            lru.retain(|k| k != &key);
323            lru.push_back(key);
324        }
325
326        self.current_size.fetch_add(size, Ordering::Relaxed);
327        true
328    }
329
330    /// Get data from hot tier
331    pub async fn get(&self, topic: &str, partition: u32, base_offset: u64) -> Option<Bytes> {
332        let key = (topic.to_string(), partition, base_offset);
333
334        let data = {
335            let segments = self.segments.read().await;
336            segments.get(&key).cloned()
337        };
338
339        if data.is_some() {
340            // Update LRU on access
341            let mut lru = self.lru_order.lock().await;
342            lru.retain(|k| k != &key);
343            lru.push_back(key);
344        }
345
346        data
347    }
348
349    /// Remove data from hot tier
350    pub async fn remove(&self, topic: &str, partition: u32, base_offset: u64) -> Option<Bytes> {
351        let key = (topic.to_string(), partition, base_offset);
352
353        let removed = {
354            let mut segments = self.segments.write().await;
355            segments.remove(&key)
356        };
357
358        if let Some(ref data) = removed {
359            self.current_size
360                .fetch_sub(data.len() as u64, Ordering::Relaxed);
361            let mut lru = self.lru_order.lock().await;
362            lru.retain(|k| k != &key);
363        }
364
365        removed
366    }
367
368    /// Evict least recently used segment
369    async fn evict_one(&self) -> bool {
370        let to_evict = {
371            let mut lru = self.lru_order.lock().await;
372            lru.pop_front()
373        };
374
375        if let Some(key) = to_evict {
376            let removed = {
377                let mut segments = self.segments.write().await;
378                segments.remove(&key)
379            };
380
381            if let Some(data) = removed {
382                self.current_size
383                    .fetch_sub(data.len() as u64, Ordering::Relaxed);
384                return true;
385            }
386        }
387
388        false
389    }
390
391    /// Get current usage statistics
392    pub fn stats(&self) -> HotTierStats {
393        HotTierStats {
394            current_size: self.current_size.load(Ordering::Relaxed),
395            max_size: self.max_size,
396        }
397    }
398}
399
400#[derive(Debug, Clone)]
401pub struct HotTierStats {
402    pub current_size: u64,
403    pub max_size: u64,
404}
405
406/// Warm tier: Memory-mapped local disk storage
407#[derive(Debug)]
408pub struct WarmTier {
409    /// Base path for warm tier storage
410    base_path: PathBuf,
411    /// Segment metadata index
412    segments: RwLock<BTreeMap<(String, u32, u64), Arc<SegmentMetadata>>>,
413    /// Current total size
414    current_size: AtomicU64,
415    /// Maximum size
416    max_size: u64,
417}
418
419impl WarmTier {
420    pub fn new(base_path: PathBuf, max_size: u64) -> Result<Self> {
421        std::fs::create_dir_all(&base_path)?;
422
423        Ok(Self {
424            base_path,
425            segments: RwLock::new(BTreeMap::new()),
426            current_size: AtomicU64::new(0),
427            max_size,
428        })
429    }
430
431    /// Get segment file path
432    fn segment_path(&self, topic: &str, partition: u32, base_offset: u64) -> PathBuf {
433        self.base_path
434            .join(topic)
435            .join(format!("{}", partition))
436            .join(format!("{:020}.segment", base_offset))
437    }
438
439    /// Store segment data
440    pub async fn store(
441        &self,
442        topic: &str,
443        partition: u32,
444        base_offset: u64,
445        end_offset: u64,
446        data: &[u8],
447    ) -> Result<()> {
448        let path = self.segment_path(topic, partition, base_offset);
449
450        // Ensure directory exists
451        if let Some(parent) = path.parent() {
452            tokio::fs::create_dir_all(parent).await?;
453        }
454
455        // Write segment file
456        tokio::fs::write(&path, data).await?;
457
458        let size = data.len() as u64;
459
460        // Update metadata
461        let metadata = Arc::new(SegmentMetadata::new(
462            topic.to_string(),
463            partition,
464            base_offset,
465            end_offset,
466            size,
467            StorageTier::Warm,
468        ));
469
470        {
471            let mut segments = self.segments.write().await;
472            segments.insert((topic.to_string(), partition, base_offset), metadata);
473        }
474
475        self.current_size.fetch_add(size, Ordering::Relaxed);
476
477        Ok(())
478    }
479
480    /// Read segment data using mmap for zero-copy
481    pub async fn read(
482        &self,
483        topic: &str,
484        partition: u32,
485        base_offset: u64,
486    ) -> Result<Option<Bytes>> {
487        let path = self.segment_path(topic, partition, base_offset);
488
489        if !path.exists() {
490            return Ok(None);
491        }
492
493        // Memory map the file for efficient reading
494        let file = std::fs::File::open(&path)?;
495        // SAFETY: File is opened read-only and remains valid for mmap lifetime.
496        // The mmap is only used for reading and copied to Bytes before return.
497        let mmap = unsafe { memmap2::Mmap::map(&file)? };
498
499        // Update access stats
500        let key = (topic.to_string(), partition, base_offset);
501        if let Some(meta) = self.segments.read().await.get(&key) {
502            meta.record_access();
503        }
504
505        // Convert to Bytes (this copies, but allows the mmap to be dropped)
506        // For true zero-copy, we'd need to return an Arc<Mmap>
507        Ok(Some(Bytes::copy_from_slice(&mmap)))
508    }
509
510    /// Remove segment
511    pub async fn remove(&self, topic: &str, partition: u32, base_offset: u64) -> Result<()> {
512        let path = self.segment_path(topic, partition, base_offset);
513        let key = (topic.to_string(), partition, base_offset);
514
515        let size = {
516            let mut segments = self.segments.write().await;
517            segments.remove(&key).map(|m| m.size_bytes)
518        };
519
520        if let Some(size) = size {
521            self.current_size.fetch_sub(size, Ordering::Relaxed);
522        }
523
524        if path.exists() {
525            tokio::fs::remove_file(path).await?;
526        }
527
528        Ok(())
529    }
530
531    /// Get segments that should be demoted to cold tier
532    pub async fn get_demotion_candidates(&self, max_age: Duration) -> Vec<(String, u32, u64)> {
533        let max_age_secs = max_age.as_secs();
534        let segments = self.segments.read().await;
535
536        segments
537            .iter()
538            .filter(|(_, meta)| meta.age_secs() > max_age_secs)
539            .map(|(key, _)| key.clone())
540            .collect()
541    }
542
543    /// Get segments metadata
544    pub async fn get_metadata(
545        &self,
546        topic: &str,
547        partition: u32,
548        base_offset: u64,
549    ) -> Option<Arc<SegmentMetadata>> {
550        let key = (topic.to_string(), partition, base_offset);
551        self.segments.read().await.get(&key).cloned()
552    }
553
554    pub fn stats(&self) -> WarmTierStats {
555        WarmTierStats {
556            current_size: self.current_size.load(Ordering::Relaxed),
557            max_size: self.max_size,
558        }
559    }
560}
561
562#[derive(Debug, Clone)]
563pub struct WarmTierStats {
564    pub current_size: u64,
565    pub max_size: u64,
566}
567
568/// Cold storage backend trait
569#[async_trait::async_trait]
570pub trait ColdStorageBackend: Send + Sync {
571    /// Upload segment to cold storage
572    async fn upload(&self, key: &str, data: &[u8]) -> Result<()>;
573
574    /// Download segment from cold storage
575    async fn download(&self, key: &str) -> Result<Option<Bytes>>;
576
577    /// Delete segment from cold storage
578    async fn delete(&self, key: &str) -> Result<()>;
579
580    /// List all segment keys with prefix
581    async fn list(&self, prefix: &str) -> Result<Vec<String>>;
582
583    /// Check if segment exists
584    async fn exists(&self, key: &str) -> Result<bool>;
585}
586
587/// Local filesystem cold storage (for dev/testing)
588pub struct LocalFsColdStorage {
589    base_path: PathBuf,
590}
591
592impl LocalFsColdStorage {
593    pub fn new(base_path: PathBuf) -> Result<Self> {
594        std::fs::create_dir_all(&base_path)?;
595        // Canonicalize base path to prevent path traversal
596        let base_path = base_path.canonicalize()?;
597        Ok(Self { base_path })
598    }
599
600    /// Convert a key to a safe filesystem path, preventing path traversal attacks.
601    ///
602    /// # Security
603    /// - Rejects keys containing `..` components
604    /// - Rejects absolute paths
605    /// - Validates resulting path stays within base_path
606    fn key_to_path(&self, key: &str) -> Result<PathBuf> {
607        // Security: Reject keys with path traversal attempts
608        if key.contains("..") || key.starts_with('/') || key.starts_with('\\') {
609            return Err(Error::Other(format!(
610                "Invalid key: path traversal attempt detected: {}",
611                key
612            )));
613        }
614
615        // Also reject any key with null bytes (could bypass checks in some systems)
616        if key.contains('\0') {
617            return Err(Error::Other("Invalid key: null byte not allowed".into()));
618        }
619
620        let path = self
621            .base_path
622            .join(key.replace('/', std::path::MAIN_SEPARATOR_STR));
623
624        // Double-check: ensure the resolved path is under base_path
625        // This catches edge cases like symlinks
626        if let Ok(canonical) = path.canonicalize() {
627            if !canonical.starts_with(&self.base_path) {
628                return Err(Error::Other(format!(
629                    "Invalid key: path escapes base directory: {}",
630                    key
631                )));
632            }
633        }
634        // If canonicalize fails (file doesn't exist yet), the path should still be
635        // safe because we've already rejected .. and absolute paths
636
637        Ok(path)
638    }
639}
640
641#[async_trait::async_trait]
642impl ColdStorageBackend for LocalFsColdStorage {
643    async fn upload(&self, key: &str, data: &[u8]) -> Result<()> {
644        let path = self.key_to_path(key)?;
645        if let Some(parent) = path.parent() {
646            tokio::fs::create_dir_all(parent).await?;
647        }
648        tokio::fs::write(&path, data).await?;
649        Ok(())
650    }
651
652    async fn download(&self, key: &str) -> Result<Option<Bytes>> {
653        let path = self.key_to_path(key)?;
654        if !path.exists() {
655            return Ok(None);
656        }
657        let data = tokio::fs::read(&path).await?;
658        Ok(Some(Bytes::from(data)))
659    }
660
661    async fn delete(&self, key: &str) -> Result<()> {
662        let path = self.key_to_path(key)?;
663        if path.exists() {
664            tokio::fs::remove_file(path).await?;
665        }
666        Ok(())
667    }
668
669    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
670        let base = self.key_to_path(prefix)?;
671        let mut keys = Vec::new();
672
673        if !base.exists() {
674            return Ok(keys);
675        }
676
677        fn walk_dir(
678            dir: &std::path::Path,
679            base: &std::path::Path,
680            keys: &mut Vec<String>,
681        ) -> std::io::Result<()> {
682            if dir.is_dir() {
683                for entry in std::fs::read_dir(dir)? {
684                    let entry = entry?;
685                    let path = entry.path();
686                    if path.is_dir() {
687                        walk_dir(&path, base, keys)?;
688                    } else if let Ok(rel) = path.strip_prefix(base) {
689                        keys.push(
690                            rel.to_string_lossy()
691                                .replace(std::path::MAIN_SEPARATOR, "/"),
692                        );
693                    }
694                }
695            }
696            Ok(())
697        }
698
699        walk_dir(&self.base_path, &self.base_path, &mut keys)?;
700        Ok(keys)
701    }
702
703    async fn exists(&self, key: &str) -> Result<bool> {
704        Ok(self.key_to_path(key)?.exists())
705    }
706}
707
708/// Disabled cold storage (warm tier is final)
709pub struct DisabledColdStorage;
710
711#[async_trait::async_trait]
712impl ColdStorageBackend for DisabledColdStorage {
713    async fn upload(&self, _key: &str, _data: &[u8]) -> Result<()> {
714        Err(Error::Other("Cold storage is disabled".into()))
715    }
716
717    async fn download(&self, _key: &str) -> Result<Option<Bytes>> {
718        Ok(None)
719    }
720
721    async fn delete(&self, _key: &str) -> Result<()> {
722        Ok(())
723    }
724
725    async fn list(&self, _prefix: &str) -> Result<Vec<String>> {
726        Ok(Vec::new())
727    }
728
729    async fn exists(&self, _key: &str) -> Result<bool> {
730        Ok(false)
731    }
732}
733
734/// Migration task
735#[derive(Debug)]
736enum MigrationTask {
737    Demote {
738        topic: String,
739        partition: u32,
740        base_offset: u64,
741        from_tier: StorageTier,
742    },
743    Promote {
744        topic: String,
745        partition: u32,
746        base_offset: u64,
747        to_tier: StorageTier,
748    },
749    Compact {
750        topic: String,
751        partition: u32,
752        base_offset: u64,
753    },
754}
755
756/// Main tiered storage manager
757pub struct TieredStorage {
758    config: TieredStorageConfig,
759    hot_tier: Arc<HotTier>,
760    warm_tier: Arc<WarmTier>,
761    cold_storage: Arc<dyn ColdStorageBackend>,
762    /// Global segment index across all tiers
763    segment_index: RwLock<BTreeMap<(String, u32, u64), Arc<SegmentMetadata>>>,
764    /// Migration task queue
765    migration_tx: mpsc::Sender<MigrationTask>,
766    /// Statistics
767    stats: Arc<TieredStorageStats>,
768    /// Shutdown signal
769    shutdown: tokio::sync::broadcast::Sender<()>,
770}
771
772impl TieredStorage {
773    /// Create new tiered storage system
774    pub async fn new(config: TieredStorageConfig) -> Result<Arc<Self>> {
775        let hot_tier = Arc::new(HotTier::new(config.hot_tier_max_bytes));
776        let warm_tier = Arc::new(WarmTier::new(
777            config.warm_tier_path.clone(),
778            config.warm_tier_max_bytes,
779        )?);
780
781        let cold_storage: Arc<dyn ColdStorageBackend> = match &config.cold_storage {
782            ColdStorageConfig::LocalFs { path } => Arc::new(LocalFsColdStorage::new(path.clone())?),
783            ColdStorageConfig::Disabled => Arc::new(DisabledColdStorage),
784            // S3 and Azure would be implemented with their respective SDKs
785            ColdStorageConfig::S3 { .. } => {
786                // Would use aws-sdk-s3 crate
787                return Err(Error::Other("S3 cold storage not yet implemented".into()));
788            }
789            ColdStorageConfig::AzureBlob { .. } => {
790                // Would use azure_storage_blobs crate
791                return Err(Error::Other(
792                    "Azure Blob cold storage not yet implemented".into(),
793                ));
794            }
795        };
796
797        let (migration_tx, migration_rx) = mpsc::channel(1024);
798        let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
799
800        let storage = Arc::new(Self {
801            config: config.clone(),
802            hot_tier,
803            warm_tier,
804            cold_storage,
805            segment_index: RwLock::new(BTreeMap::new()),
806            migration_tx,
807            stats: Arc::new(TieredStorageStats::new()),
808            shutdown: shutdown_tx,
809        });
810
811        // Start background migration worker
812        storage.clone().start_migration_worker(migration_rx);
813
814        // Start background tier manager
815        storage.clone().start_tier_manager();
816
817        Ok(storage)
818    }
819
820    /// Write messages to storage (always starts in hot tier)
821    pub async fn write(
822        &self,
823        topic: &str,
824        partition: u32,
825        base_offset: u64,
826        end_offset: u64,
827        data: Bytes,
828    ) -> Result<()> {
829        let size = data.len() as u64;
830
831        // Always write to hot tier first
832        let inserted = self
833            .hot_tier
834            .insert(topic, partition, base_offset, data.clone())
835            .await;
836
837        if !inserted {
838            // Hot tier full and can't evict, write directly to warm
839            self.warm_tier
840                .store(topic, partition, base_offset, end_offset, &data)
841                .await?;
842            self.stats.warm_writes.fetch_add(1, Ordering::Relaxed);
843        } else {
844            self.stats.hot_writes.fetch_add(1, Ordering::Relaxed);
845        }
846
847        // Update segment index
848        let metadata = Arc::new(SegmentMetadata::new(
849            topic.to_string(),
850            partition,
851            base_offset,
852            end_offset,
853            size,
854            if inserted {
855                StorageTier::Hot
856            } else {
857                StorageTier::Warm
858            },
859        ));
860
861        {
862            let mut index = self.segment_index.write().await;
863            index.insert((topic.to_string(), partition, base_offset), metadata);
864        }
865
866        self.stats
867            .total_bytes_written
868            .fetch_add(size, Ordering::Relaxed);
869
870        Ok(())
871    }
872
873    /// Read messages from storage
874    pub async fn read(
875        &self,
876        topic: &str,
877        partition: u32,
878        start_offset: u64,
879        max_bytes: usize,
880    ) -> Result<Vec<(u64, Bytes)>> {
881        let _start = Instant::now();
882        let mut results = Vec::new();
883        let mut bytes_collected = 0;
884
885        // Find relevant segments
886        let segments = {
887            let index = self.segment_index.read().await;
888            index
889                .range((topic.to_string(), partition, 0)..(topic.to_string(), partition, u64::MAX))
890                .filter(|(_, meta)| meta.end_offset > start_offset)
891                .map(|(k, v)| (k.clone(), v.clone()))
892                .collect::<Vec<_>>()
893        };
894
895        for ((_, _, base_offset), metadata) in segments {
896            if bytes_collected >= max_bytes {
897                break;
898            }
899
900            metadata.record_access();
901
902            // Try to read from each tier in order
903            let data = match metadata.tier {
904                StorageTier::Hot => {
905                    if let Some(data) = self.hot_tier.get(topic, partition, base_offset).await {
906                        self.stats.hot_reads.fetch_add(1, Ordering::Relaxed);
907                        Some(data)
908                    } else {
909                        // Might have been evicted, try warm
910                        None
911                    }
912                }
913                _ => None,
914            };
915
916            let data = match data {
917                Some(d) => d,
918                None => {
919                    // Try warm tier
920                    if let Some(data) = self.warm_tier.read(topic, partition, base_offset).await? {
921                        self.stats.warm_reads.fetch_add(1, Ordering::Relaxed);
922
923                        // Consider promotion if frequently accessed
924                        if self.config.enable_promotion {
925                            let access_count = metadata.access_count.load(Ordering::Relaxed);
926                            if access_count >= self.config.promotion_threshold {
927                                let _ = self
928                                    .migration_tx
929                                    .send(MigrationTask::Promote {
930                                        topic: topic.to_string(),
931                                        partition,
932                                        base_offset,
933                                        to_tier: StorageTier::Hot,
934                                    })
935                                    .await;
936                            }
937                        }
938
939                        data
940                    } else {
941                        // Try cold tier
942                        let key = metadata.segment_key();
943                        if let Some(data) = self.cold_storage.download(&key).await? {
944                            self.stats.cold_reads.fetch_add(1, Ordering::Relaxed);
945
946                            // Consider promotion
947                            if self.config.enable_promotion {
948                                let access_count = metadata.access_count.load(Ordering::Relaxed);
949                                if access_count >= self.config.promotion_threshold {
950                                    let _ = self
951                                        .migration_tx
952                                        .send(MigrationTask::Promote {
953                                            topic: topic.to_string(),
954                                            partition,
955                                            base_offset,
956                                            to_tier: StorageTier::Warm,
957                                        })
958                                        .await;
959                                }
960                            }
961
962                            data
963                        } else {
964                            continue; // Segment not found
965                        }
966                    }
967                }
968            };
969
970            results.push((base_offset, data.clone()));
971            bytes_collected += data.len();
972        }
973
974        self.stats
975            .total_bytes_read
976            .fetch_add(bytes_collected as u64, Ordering::Relaxed);
977
978        Ok(results)
979    }
980
981    /// Get metadata for a specific segment
982    pub async fn get_segment_metadata(
983        &self,
984        topic: &str,
985        partition: u32,
986        base_offset: u64,
987    ) -> Option<Arc<SegmentMetadata>> {
988        self.segment_index
989            .read()
990            .await
991            .get(&(topic.to_string(), partition, base_offset))
992            .cloned()
993    }
994
995    /// Force demote segments from hot to warm
996    pub async fn flush_hot_tier(&self, topic: &str, partition: u32) -> Result<()> {
997        let segments: Vec<_> = {
998            let index = self.segment_index.read().await;
999            index
1000                .range((topic.to_string(), partition, 0)..(topic.to_string(), partition, u64::MAX))
1001                .filter(|(_, meta)| meta.tier == StorageTier::Hot)
1002                .map(|(k, _)| k.2)
1003                .collect()
1004        };
1005
1006        for base_offset in segments {
1007            let _ = self
1008                .migration_tx
1009                .send(MigrationTask::Demote {
1010                    topic: topic.to_string(),
1011                    partition,
1012                    base_offset,
1013                    from_tier: StorageTier::Hot,
1014                })
1015                .await;
1016        }
1017
1018        Ok(())
1019    }
1020
1021    /// Get storage statistics
1022    pub fn stats(&self) -> TieredStorageStatsSnapshot {
1023        TieredStorageStatsSnapshot {
1024            hot_tier: self.hot_tier.stats(),
1025            warm_tier: self.warm_tier.stats(),
1026            hot_reads: self.stats.hot_reads.load(Ordering::Relaxed),
1027            warm_reads: self.stats.warm_reads.load(Ordering::Relaxed),
1028            cold_reads: self.stats.cold_reads.load(Ordering::Relaxed),
1029            hot_writes: self.stats.hot_writes.load(Ordering::Relaxed),
1030            warm_writes: self.stats.warm_writes.load(Ordering::Relaxed),
1031            cold_writes: self.stats.cold_writes.load(Ordering::Relaxed),
1032            total_bytes_read: self.stats.total_bytes_read.load(Ordering::Relaxed),
1033            total_bytes_written: self.stats.total_bytes_written.load(Ordering::Relaxed),
1034            migrations_completed: self.stats.migrations_completed.load(Ordering::Relaxed),
1035            migrations_failed: self.stats.migrations_failed.load(Ordering::Relaxed),
1036        }
1037    }
1038
1039    /// Start the background migration worker
1040    fn start_migration_worker(self: Arc<Self>, mut rx: mpsc::Receiver<MigrationTask>) {
1041        let semaphore = Arc::new(Semaphore::new(self.config.migration_concurrency));
1042        let mut shutdown_rx = self.shutdown.subscribe();
1043
1044        tokio::spawn(async move {
1045            loop {
1046                tokio::select! {
1047                    Some(task) = rx.recv() => {
1048                        let permit = semaphore.clone().acquire_owned().await.unwrap();
1049                        let storage = self.clone();
1050
1051                        tokio::spawn(async move {
1052                            let result = storage.execute_migration(task).await;
1053                            if result.is_ok() {
1054                                storage.stats.migrations_completed.fetch_add(1, Ordering::Relaxed);
1055                            } else {
1056                                storage.stats.migrations_failed.fetch_add(1, Ordering::Relaxed);
1057                            }
1058                            drop(permit);
1059                        });
1060                    }
1061                    _ = shutdown_rx.recv() => {
1062                        break;
1063                    }
1064                }
1065            }
1066        });
1067    }
1068
1069    /// Start the background tier manager (checks for demotions)
1070    fn start_tier_manager(self: Arc<Self>) {
1071        let mut shutdown_rx = self.shutdown.subscribe();
1072        let migration_interval = self.config.migration_interval;
1073
1074        tokio::spawn(async move {
1075            let mut ticker = interval(migration_interval);
1076
1077            loop {
1078                tokio::select! {
1079                    _ = ticker.tick() => {
1080                        if let Err(e) = self.check_tier_migrations().await {
1081                            tracing::warn!("Tier migration check failed: {}", e);
1082                        }
1083                    }
1084                    _ = shutdown_rx.recv() => {
1085                        break;
1086                    }
1087                }
1088            }
1089        });
1090    }
1091
1092    /// Check and queue tier migrations
1093    async fn check_tier_migrations(&self) -> Result<()> {
1094        let hot_max_age = self.config.hot_tier_max_age;
1095        let warm_max_age = self.config.warm_tier_max_age;
1096
1097        // Check hot tier for demotions
1098        let hot_candidates: Vec<_> = {
1099            let index = self.segment_index.read().await;
1100            index
1101                .iter()
1102                .filter(|(_, meta)| {
1103                    meta.tier == StorageTier::Hot
1104                        && Duration::from_secs(meta.age_secs()) > hot_max_age
1105                })
1106                .map(|(k, _)| k.clone())
1107                .collect()
1108        };
1109
1110        for (topic, partition, base_offset) in hot_candidates {
1111            let _ = self
1112                .migration_tx
1113                .send(MigrationTask::Demote {
1114                    topic,
1115                    partition,
1116                    base_offset,
1117                    from_tier: StorageTier::Hot,
1118                })
1119                .await;
1120        }
1121
1122        // Check warm tier for demotions to cold
1123        let warm_candidates = self.warm_tier.get_demotion_candidates(warm_max_age).await;
1124
1125        for (topic, partition, base_offset) in warm_candidates {
1126            let _ = self
1127                .migration_tx
1128                .send(MigrationTask::Demote {
1129                    topic,
1130                    partition,
1131                    base_offset,
1132                    from_tier: StorageTier::Warm,
1133                })
1134                .await;
1135        }
1136
1137        // Check for compaction candidates
1138        let compaction_threshold = self.config.compaction_threshold;
1139        let compaction_candidates: Vec<_> = {
1140            let index = self.segment_index.read().await;
1141            index
1142                .iter()
1143                .filter(|(_, meta)| meta.compaction_ratio() > compaction_threshold)
1144                .map(|(k, _)| k.clone())
1145                .collect()
1146        };
1147
1148        for (topic, partition, base_offset) in compaction_candidates {
1149            let _ = self
1150                .migration_tx
1151                .send(MigrationTask::Compact {
1152                    topic,
1153                    partition,
1154                    base_offset,
1155                })
1156                .await;
1157        }
1158
1159        Ok(())
1160    }
1161
1162    /// Execute a migration task
1163    async fn execute_migration(&self, task: MigrationTask) -> Result<()> {
1164        match task {
1165            MigrationTask::Demote {
1166                topic,
1167                partition,
1168                base_offset,
1169                from_tier,
1170            } => {
1171                self.demote_segment(&topic, partition, base_offset, from_tier)
1172                    .await
1173            }
1174            MigrationTask::Promote {
1175                topic,
1176                partition,
1177                base_offset,
1178                to_tier,
1179            } => {
1180                self.promote_segment(&topic, partition, base_offset, to_tier)
1181                    .await
1182            }
1183            MigrationTask::Compact {
1184                topic,
1185                partition,
1186                base_offset,
1187            } => self.compact_segment(&topic, partition, base_offset).await,
1188        }
1189    }
1190
1191    /// Demote a segment to a cooler tier
1192    async fn demote_segment(
1193        &self,
1194        topic: &str,
1195        partition: u32,
1196        base_offset: u64,
1197        from_tier: StorageTier,
1198    ) -> Result<()> {
1199        let to_tier = match from_tier.demote() {
1200            Some(t) => t,
1201            None => return Ok(()), // Already at coldest tier
1202        };
1203
1204        // Get segment data from current tier
1205        let data = match from_tier {
1206            StorageTier::Hot => self.hot_tier.remove(topic, partition, base_offset).await,
1207            StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1208            StorageTier::Cold => None,
1209        };
1210
1211        let data = match data {
1212            Some(d) => d,
1213            None => return Ok(()), // Segment already gone
1214        };
1215
1216        // Get metadata for end_offset
1217        let metadata = self
1218            .get_segment_metadata(topic, partition, base_offset)
1219            .await;
1220        let end_offset = metadata
1221            .as_ref()
1222            .map(|m| m.end_offset)
1223            .unwrap_or(base_offset);
1224
1225        // Write to new tier
1226        match to_tier {
1227            StorageTier::Warm => {
1228                self.warm_tier
1229                    .store(topic, partition, base_offset, end_offset, &data)
1230                    .await?;
1231            }
1232            StorageTier::Cold => {
1233                let key = format!("{}/{}/{:020}", topic, partition, base_offset);
1234                self.cold_storage.upload(&key, &data).await?;
1235                self.stats.cold_writes.fetch_add(1, Ordering::Relaxed);
1236
1237                // Remove from warm tier
1238                self.warm_tier.remove(topic, partition, base_offset).await?;
1239            }
1240            StorageTier::Hot => unreachable!(),
1241        }
1242
1243        // Update metadata
1244        if let Some(meta) = metadata {
1245            // Create new metadata with updated tier (since SegmentMetadata.tier is not atomic)
1246            let new_meta = Arc::new(SegmentMetadata {
1247                topic: meta.topic.clone(),
1248                partition: meta.partition,
1249                base_offset: meta.base_offset,
1250                end_offset: meta.end_offset,
1251                size_bytes: meta.size_bytes,
1252                tier: to_tier,
1253                created_at: meta.created_at,
1254                last_accessed: AtomicU64::new(meta.last_accessed.load(Ordering::Relaxed)),
1255                access_count: AtomicU64::new(meta.access_count.load(Ordering::Relaxed)),
1256                dead_records: AtomicU64::new(meta.dead_records.load(Ordering::Relaxed)),
1257                total_records: meta.total_records,
1258            });
1259
1260            let mut index = self.segment_index.write().await;
1261            index.insert((topic.to_string(), partition, base_offset), new_meta);
1262        }
1263
1264        tracing::debug!(
1265            "Demoted segment {}/{}/{} from {:?} to {:?}",
1266            topic,
1267            partition,
1268            base_offset,
1269            from_tier,
1270            to_tier
1271        );
1272
1273        Ok(())
1274    }
1275
1276    /// Promote a segment to a hotter tier
1277    async fn promote_segment(
1278        &self,
1279        topic: &str,
1280        partition: u32,
1281        base_offset: u64,
1282        to_tier: StorageTier,
1283    ) -> Result<()> {
1284        // Get current tier from metadata
1285        let metadata = match self
1286            .get_segment_metadata(topic, partition, base_offset)
1287            .await
1288        {
1289            Some(m) => m,
1290            None => return Ok(()),
1291        };
1292
1293        let from_tier = metadata.tier;
1294
1295        // Get data from current tier
1296        let data = match from_tier {
1297            StorageTier::Cold => {
1298                let key = metadata.segment_key();
1299                self.cold_storage.download(&key).await?
1300            }
1301            StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1302            StorageTier::Hot => return Ok(()), // Already hot
1303        };
1304
1305        let data = match data {
1306            Some(d) => d,
1307            None => return Ok(()),
1308        };
1309
1310        // Write to new tier
1311        match to_tier {
1312            StorageTier::Hot => {
1313                self.hot_tier
1314                    .insert(topic, partition, base_offset, data)
1315                    .await;
1316            }
1317            StorageTier::Warm => {
1318                self.warm_tier
1319                    .store(topic, partition, base_offset, metadata.end_offset, &data)
1320                    .await?;
1321            }
1322            StorageTier::Cold => unreachable!(),
1323        }
1324
1325        // Update metadata
1326        let new_meta = Arc::new(SegmentMetadata {
1327            topic: metadata.topic.clone(),
1328            partition: metadata.partition,
1329            base_offset: metadata.base_offset,
1330            end_offset: metadata.end_offset,
1331            size_bytes: metadata.size_bytes,
1332            tier: to_tier,
1333            created_at: metadata.created_at,
1334            last_accessed: AtomicU64::new(metadata.last_accessed.load(Ordering::Relaxed)),
1335            access_count: AtomicU64::new(0), // Reset access count after promotion
1336            dead_records: AtomicU64::new(metadata.dead_records.load(Ordering::Relaxed)),
1337            total_records: metadata.total_records,
1338        });
1339
1340        {
1341            let mut index = self.segment_index.write().await;
1342            index.insert((topic.to_string(), partition, base_offset), new_meta);
1343        }
1344
1345        tracing::debug!(
1346            "Promoted segment {}/{}/{} from {:?} to {:?}",
1347            topic,
1348            partition,
1349            base_offset,
1350            from_tier,
1351            to_tier
1352        );
1353
1354        Ok(())
1355    }
1356
1357    /// Compact a segment (remove tombstones, keep latest per key)
1358    ///
1359    /// Kafka-style log compaction:
1360    /// 1. Read all messages from segment
1361    /// 2. Keep only the latest message per key
1362    /// 3. Remove tombstones (null value = deletion marker)
1363    /// 4. Write compacted segment
1364    /// 5. Update index and delete old segment
1365    async fn compact_segment(&self, topic: &str, partition: u32, base_offset: u64) -> Result<()> {
1366        use std::collections::HashMap;
1367
1368        // Get segment metadata
1369        let metadata = match self
1370            .get_segment_metadata(topic, partition, base_offset)
1371            .await
1372        {
1373            Some(m) => m,
1374            None => {
1375                tracing::debug!(
1376                    "Segment not found for compaction: {}/{}/{}",
1377                    topic,
1378                    partition,
1379                    base_offset
1380                );
1381                return Ok(());
1382            }
1383        };
1384
1385        // Read segment data from current tier
1386        let data = match metadata.tier {
1387            StorageTier::Hot => self.hot_tier.get(topic, partition, base_offset).await,
1388            StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1389            StorageTier::Cold => {
1390                let key = metadata.segment_key();
1391                self.cold_storage.download(&key).await?
1392            }
1393        };
1394
1395        let data = match data {
1396            Some(d) => d,
1397            None => {
1398                tracing::debug!(
1399                    "Segment data not found for compaction: {}/{}/{}",
1400                    topic,
1401                    partition,
1402                    base_offset
1403                );
1404                return Ok(());
1405            }
1406        };
1407
1408        // Parse messages from segment
1409        // Segment format: length-prefixed postcard serialized Messages
1410        let mut messages: Vec<crate::Message> = Vec::new();
1411        let mut cursor = 0;
1412
1413        while cursor < data.len() {
1414            // Read message length (4 bytes)
1415            if cursor + 4 > data.len() {
1416                break;
1417            }
1418            let len = u32::from_le_bytes([
1419                data[cursor],
1420                data[cursor + 1],
1421                data[cursor + 2],
1422                data[cursor + 3],
1423            ]) as usize;
1424            cursor += 4;
1425
1426            if cursor + len > data.len() {
1427                tracing::warn!(
1428                    "Truncated message in segment {}/{}/{}",
1429                    topic,
1430                    partition,
1431                    base_offset
1432                );
1433                break;
1434            }
1435
1436            // Deserialize message
1437            match crate::Message::from_bytes(&data[cursor..cursor + len]) {
1438                Ok(msg) => messages.push(msg),
1439                Err(e) => {
1440                    tracing::warn!("Failed to deserialize message in compaction: {}", e);
1441                }
1442            }
1443            cursor += len;
1444        }
1445
1446        if messages.is_empty() {
1447            tracing::debug!(
1448                "No messages to compact in segment {}/{}/{}",
1449                topic,
1450                partition,
1451                base_offset
1452            );
1453            return Ok(());
1454        }
1455
1456        let original_count = messages.len();
1457
1458        // Compact: keep latest value per key, remove tombstones
1459        let mut key_to_message: HashMap<Option<Bytes>, crate::Message> = HashMap::new();
1460
1461        for msg in messages {
1462            // For keyed messages, keep the latest
1463            // For keyless messages, always keep (append-only semantics)
1464            if msg.key.is_some() {
1465                key_to_message.insert(msg.key.clone(), msg);
1466            } else {
1467                // Keyless messages use offset as synthetic key to preserve all
1468                key_to_message.insert(Some(Bytes::from(msg.offset.to_le_bytes().to_vec())), msg);
1469            }
1470        }
1471
1472        // Filter out tombstones (empty value = deletion marker)
1473        let compacted: Vec<_> = key_to_message
1474            .into_values()
1475            .filter(|msg| !msg.value.is_empty()) // Non-empty value = not a tombstone
1476            .collect();
1477
1478        let compacted_count = compacted.len();
1479
1480        // Only write if compaction reduced size significantly
1481        if compacted_count >= original_count {
1482            tracing::debug!(
1483                "Skipping compaction for {}/{}/{}: no reduction ({} -> {})",
1484                topic,
1485                partition,
1486                base_offset,
1487                original_count,
1488                compacted_count
1489            );
1490            return Ok(());
1491        }
1492
1493        // Serialize compacted messages
1494        let mut compacted_data = Vec::new();
1495        let mut new_end_offset = base_offset;
1496
1497        for msg in &compacted {
1498            let msg_bytes = msg.to_bytes()?;
1499            compacted_data.extend_from_slice(&(msg_bytes.len() as u32).to_le_bytes());
1500            compacted_data.extend_from_slice(&msg_bytes);
1501            new_end_offset = new_end_offset.max(msg.offset + 1);
1502        }
1503
1504        let compacted_bytes = Bytes::from(compacted_data);
1505        let compacted_size = compacted_bytes.len() as u64;
1506        let reduction_ratio = 1.0 - (compacted_count as f64 / original_count as f64);
1507
1508        tracing::info!(
1509            "Compacted segment {}/{}/{}: {} -> {} messages ({:.1}% reduction)",
1510            topic,
1511            partition,
1512            base_offset,
1513            original_count,
1514            compacted_count,
1515            reduction_ratio * 100.0
1516        );
1517
1518        // Write compacted segment to current tier
1519        match metadata.tier {
1520            StorageTier::Hot => {
1521                // Remove old and insert new
1522                self.hot_tier.remove(topic, partition, base_offset).await;
1523                self.hot_tier
1524                    .insert(topic, partition, base_offset, compacted_bytes)
1525                    .await;
1526            }
1527            StorageTier::Warm => {
1528                // Remove and re-store
1529                self.warm_tier.remove(topic, partition, base_offset).await?;
1530                self.warm_tier
1531                    .store(
1532                        topic,
1533                        partition,
1534                        base_offset,
1535                        new_end_offset,
1536                        &compacted_bytes,
1537                    )
1538                    .await?;
1539            }
1540            StorageTier::Cold => {
1541                // Upload compacted version
1542                let key = metadata.segment_key();
1543                self.cold_storage.upload(&key, &compacted_bytes).await?;
1544            }
1545        }
1546
1547        // Update metadata with new size and reset dead records
1548        let new_meta = Arc::new(SegmentMetadata {
1549            topic: metadata.topic.clone(),
1550            partition: metadata.partition,
1551            base_offset: metadata.base_offset,
1552            end_offset: new_end_offset,
1553            size_bytes: compacted_size,
1554            tier: metadata.tier,
1555            created_at: metadata.created_at,
1556            last_accessed: AtomicU64::new(metadata.last_accessed.load(Ordering::Relaxed)),
1557            access_count: AtomicU64::new(metadata.access_count.load(Ordering::Relaxed)),
1558            dead_records: AtomicU64::new(0), // Reset after compaction
1559            total_records: compacted_count as u64,
1560        });
1561
1562        {
1563            let mut index = self.segment_index.write().await;
1564            index.insert((topic.to_string(), partition, base_offset), new_meta);
1565        }
1566
1567        Ok(())
1568    }
1569
1570    /// Shutdown the tiered storage system
1571    pub async fn shutdown(&self) {
1572        let _ = self.shutdown.send(());
1573    }
1574}
1575
1576/// Statistics for tiered storage
1577pub struct TieredStorageStats {
1578    pub hot_reads: AtomicU64,
1579    pub warm_reads: AtomicU64,
1580    pub cold_reads: AtomicU64,
1581    pub hot_writes: AtomicU64,
1582    pub warm_writes: AtomicU64,
1583    pub cold_writes: AtomicU64,
1584    pub total_bytes_read: AtomicU64,
1585    pub total_bytes_written: AtomicU64,
1586    pub migrations_completed: AtomicU64,
1587    pub migrations_failed: AtomicU64,
1588}
1589
1590impl TieredStorageStats {
1591    fn new() -> Self {
1592        Self {
1593            hot_reads: AtomicU64::new(0),
1594            warm_reads: AtomicU64::new(0),
1595            cold_reads: AtomicU64::new(0),
1596            hot_writes: AtomicU64::new(0),
1597            warm_writes: AtomicU64::new(0),
1598            cold_writes: AtomicU64::new(0),
1599            total_bytes_read: AtomicU64::new(0),
1600            total_bytes_written: AtomicU64::new(0),
1601            migrations_completed: AtomicU64::new(0),
1602            migrations_failed: AtomicU64::new(0),
1603        }
1604    }
1605}
1606
1607#[derive(Debug, Clone)]
1608pub struct TieredStorageStatsSnapshot {
1609    pub hot_tier: HotTierStats,
1610    pub warm_tier: WarmTierStats,
1611    pub hot_reads: u64,
1612    pub warm_reads: u64,
1613    pub cold_reads: u64,
1614    pub hot_writes: u64,
1615    pub warm_writes: u64,
1616    pub cold_writes: u64,
1617    pub total_bytes_read: u64,
1618    pub total_bytes_written: u64,
1619    pub migrations_completed: u64,
1620    pub migrations_failed: u64,
1621}
1622
1623#[cfg(test)]
1624mod tests {
1625    use super::*;
1626    use tempfile::TempDir;
1627
1628    #[tokio::test]
1629    async fn test_hot_tier_insert_and_get() {
1630        let hot = HotTier::new(1024 * 1024); // 1 MB
1631
1632        let data = Bytes::from("test data");
1633        hot.insert("topic1", 0, 0, data.clone()).await;
1634
1635        let retrieved = hot.get("topic1", 0, 0).await;
1636        assert_eq!(retrieved, Some(data));
1637    }
1638
1639    #[tokio::test]
1640    async fn test_hot_tier_lru_eviction() {
1641        let hot = HotTier::new(100); // Very small
1642
1643        // Insert more data than fits
1644        hot.insert("topic1", 0, 0, Bytes::from(vec![0u8; 40])).await;
1645        hot.insert("topic1", 0, 1, Bytes::from(vec![1u8; 40])).await;
1646        hot.insert("topic1", 0, 2, Bytes::from(vec![2u8; 40])).await;
1647
1648        // First segment should be evicted
1649        assert!(hot.get("topic1", 0, 0).await.is_none());
1650        // Later segments should still be there
1651        assert!(hot.get("topic1", 0, 1).await.is_some());
1652        assert!(hot.get("topic1", 0, 2).await.is_some());
1653    }
1654
1655    #[tokio::test]
1656    async fn test_warm_tier_store_and_read() {
1657        let temp_dir = TempDir::new().unwrap();
1658        let warm = WarmTier::new(temp_dir.path().to_path_buf(), 1024 * 1024 * 1024).unwrap();
1659
1660        let data = b"warm tier test data";
1661        warm.store("topic1", 0, 0, 100, data).await.unwrap();
1662
1663        let retrieved = warm.read("topic1", 0, 0).await.unwrap();
1664        assert_eq!(retrieved, Some(Bytes::from(&data[..])));
1665    }
1666
1667    #[tokio::test]
1668    async fn test_local_fs_cold_storage() {
1669        let temp_dir = TempDir::new().unwrap();
1670        let cold = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
1671
1672        let key = "topic1/0/00000000000000000000";
1673        let data = b"cold storage test data";
1674
1675        cold.upload(key, data).await.unwrap();
1676        assert!(cold.exists(key).await.unwrap());
1677
1678        let retrieved = cold.download(key).await.unwrap();
1679        assert_eq!(retrieved, Some(Bytes::from(&data[..])));
1680
1681        cold.delete(key).await.unwrap();
1682        assert!(!cold.exists(key).await.unwrap());
1683    }
1684
1685    #[tokio::test]
1686    async fn test_tiered_storage_write_and_read() {
1687        let temp_dir = TempDir::new().unwrap();
1688
1689        let config = TieredStorageConfig {
1690            hot_tier_max_bytes: 1024 * 1024,
1691            warm_tier_path: temp_dir.path().join("warm"),
1692            cold_storage: ColdStorageConfig::LocalFs {
1693                path: temp_dir.path().join("cold"),
1694            },
1695            migration_interval: Duration::from_secs(3600), // Disable auto migration
1696            ..Default::default()
1697        };
1698
1699        let storage = TieredStorage::new(config).await.unwrap();
1700
1701        // Write data
1702        let data = Bytes::from("test message data");
1703        storage
1704            .write("topic1", 0, 0, 10, data.clone())
1705            .await
1706            .unwrap();
1707
1708        // Read data back
1709        let results = storage.read("topic1", 0, 0, 1024).await.unwrap();
1710        assert_eq!(results.len(), 1);
1711        assert_eq!(results[0].1, data);
1712
1713        // Check stats
1714        let stats = storage.stats();
1715        assert_eq!(stats.hot_writes, 1);
1716        assert_eq!(stats.hot_reads, 1);
1717
1718        storage.shutdown().await;
1719    }
1720
1721    #[tokio::test]
1722    async fn test_storage_tier_demote_promote() {
1723        assert_eq!(StorageTier::Hot.demote(), Some(StorageTier::Warm));
1724        assert_eq!(StorageTier::Warm.demote(), Some(StorageTier::Cold));
1725        assert_eq!(StorageTier::Cold.demote(), None);
1726
1727        assert_eq!(StorageTier::Hot.promote(), None);
1728        assert_eq!(StorageTier::Warm.promote(), Some(StorageTier::Hot));
1729        assert_eq!(StorageTier::Cold.promote(), Some(StorageTier::Warm));
1730    }
1731
1732    #[tokio::test]
1733    async fn test_segment_metadata() {
1734        let meta = SegmentMetadata::new("topic1".to_string(), 0, 0, 100, 1024, StorageTier::Hot);
1735
1736        assert_eq!(meta.segment_key(), "topic1/0/00000000000000000000");
1737        assert!(meta.age_secs() <= 1);
1738
1739        meta.record_access();
1740        assert_eq!(meta.access_count.load(Ordering::Relaxed), 1);
1741
1742        meta.dead_records.store(50, Ordering::Relaxed);
1743        assert!((meta.compaction_ratio() - 0.5).abs() < 0.01);
1744    }
1745
1746    #[tokio::test]
1747    async fn test_segment_compaction() {
1748        use crate::Message;
1749
1750        let temp_dir = TempDir::new().unwrap();
1751
1752        let config = TieredStorageConfig {
1753            hot_tier_max_bytes: 10 * 1024 * 1024, // 10 MB
1754            warm_tier_path: temp_dir.path().join("warm"),
1755            cold_storage: ColdStorageConfig::LocalFs {
1756                path: temp_dir.path().join("cold"),
1757            },
1758            migration_interval: Duration::from_secs(3600),
1759            compaction_threshold: 0.1, // Low threshold for testing
1760            ..Default::default()
1761        };
1762
1763        let storage = TieredStorage::new(config).await.unwrap();
1764
1765        // Create messages with duplicate keys
1766        let mut segment_data = Vec::new();
1767
1768        // Message 1: key=A, value=v1
1769        let msg1 = Message::with_key(Bytes::from("A"), Bytes::from("value1"));
1770        let msg1_bytes = msg1.to_bytes().unwrap();
1771        segment_data.extend_from_slice(&(msg1_bytes.len() as u32).to_le_bytes());
1772        segment_data.extend_from_slice(&msg1_bytes);
1773
1774        // Message 2: key=B, value=v1
1775        let msg2 = Message::with_key(Bytes::from("B"), Bytes::from("value1"));
1776        let msg2_bytes = msg2.to_bytes().unwrap();
1777        segment_data.extend_from_slice(&(msg2_bytes.len() as u32).to_le_bytes());
1778        segment_data.extend_from_slice(&msg2_bytes);
1779
1780        // Message 3: key=A, value=v2 (update)
1781        let msg3 = Message::with_key(Bytes::from("A"), Bytes::from("value2"));
1782        let msg3_bytes = msg3.to_bytes().unwrap();
1783        segment_data.extend_from_slice(&(msg3_bytes.len() as u32).to_le_bytes());
1784        segment_data.extend_from_slice(&msg3_bytes);
1785
1786        // Message 4: key=B, value="" (tombstone/delete)
1787        let msg4 = Message::with_key(Bytes::from("B"), Bytes::from(""));
1788        let msg4_bytes = msg4.to_bytes().unwrap();
1789        segment_data.extend_from_slice(&(msg4_bytes.len() as u32).to_le_bytes());
1790        segment_data.extend_from_slice(&msg4_bytes);
1791
1792        // Write segment
1793        let segment_bytes = Bytes::from(segment_data);
1794        storage
1795            .write("compaction-test", 0, 0, 4, segment_bytes)
1796            .await
1797            .unwrap();
1798
1799        // Get metadata and trigger compaction
1800        let meta = storage
1801            .get_segment_metadata("compaction-test", 0, 0)
1802            .await
1803            .unwrap();
1804
1805        // Simulate dead records (2 out of 4 will be removed)
1806        meta.dead_records.store(2, Ordering::Relaxed);
1807
1808        // Run compaction
1809        storage
1810            .compact_segment("compaction-test", 0, 0)
1811            .await
1812            .unwrap();
1813
1814        // Verify compacted segment has fewer messages
1815        let meta_after = storage
1816            .get_segment_metadata("compaction-test", 0, 0)
1817            .await
1818            .unwrap();
1819        assert!(
1820            meta_after.total_records < 4,
1821            "Compaction should reduce message count"
1822        );
1823
1824        storage.shutdown().await;
1825    }
1826
1827    #[tokio::test]
1828    async fn test_compaction_preserves_keyless_messages() {
1829        use crate::Message;
1830
1831        let temp_dir = TempDir::new().unwrap();
1832
1833        let config = TieredStorageConfig {
1834            hot_tier_max_bytes: 10 * 1024 * 1024,
1835            warm_tier_path: temp_dir.path().join("warm"),
1836            cold_storage: ColdStorageConfig::LocalFs {
1837                path: temp_dir.path().join("cold"),
1838            },
1839            migration_interval: Duration::from_secs(3600),
1840            ..Default::default()
1841        };
1842
1843        let storage = TieredStorage::new(config).await.unwrap();
1844
1845        // Create keyless messages (should all be preserved)
1846        let mut segment_data = Vec::new();
1847
1848        for i in 0..5 {
1849            let mut msg = Message::new(Bytes::from(format!("value{}", i)));
1850            msg.offset = i;
1851            let msg_bytes = msg.to_bytes().unwrap();
1852            segment_data.extend_from_slice(&(msg_bytes.len() as u32).to_le_bytes());
1853            segment_data.extend_from_slice(&msg_bytes);
1854        }
1855
1856        let segment_bytes = Bytes::from(segment_data);
1857        storage
1858            .write("keyless-test", 0, 0, 5, segment_bytes)
1859            .await
1860            .unwrap();
1861
1862        // Run compaction
1863        storage.compact_segment("keyless-test", 0, 0).await.unwrap();
1864
1865        // All keyless messages should be preserved (no dedup for keyless)
1866        let meta_after = storage
1867            .get_segment_metadata("keyless-test", 0, 0)
1868            .await
1869            .unwrap();
1870        assert_eq!(
1871            meta_after.total_records, 5,
1872            "Keyless messages should all be preserved"
1873        );
1874
1875        storage.shutdown().await;
1876    }
1877
1878    #[tokio::test]
1879    async fn test_local_fs_cold_storage_path_traversal_protection() {
1880        let temp_dir = TempDir::new().unwrap();
1881        let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
1882
1883        // Valid keys should work
1884        assert!(storage.key_to_path("valid/key/path").is_ok());
1885        assert!(storage.key_to_path("simple-key").is_ok());
1886        assert!(storage.key_to_path("key_with_underscores").is_ok());
1887
1888        // Path traversal attempts should fail
1889        assert!(storage.key_to_path("../escape").is_err());
1890        assert!(storage.key_to_path("valid/../escape").is_err());
1891        assert!(storage.key_to_path("..").is_err());
1892        assert!(storage.key_to_path("foo/../../bar").is_err());
1893
1894        // Absolute paths should fail
1895        assert!(storage.key_to_path("/etc/passwd").is_err());
1896        assert!(storage.key_to_path("\\Windows\\System32").is_err());
1897
1898        // Null bytes should fail (could bypass checks on some systems)
1899        assert!(storage.key_to_path("valid\0.txt").is_err());
1900    }
1901
1902    #[tokio::test]
1903    async fn test_local_fs_cold_storage_operations_with_safe_keys() {
1904        let temp_dir = TempDir::new().unwrap();
1905        let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
1906
1907        // Upload should work with safe keys
1908        let data = b"test data";
1909        storage.upload("test/key", data).await.unwrap();
1910
1911        // Download should work
1912        let downloaded = storage.download("test/key").await.unwrap();
1913        assert_eq!(downloaded, Some(Bytes::from_static(data)));
1914
1915        // Exists should work
1916        assert!(storage.exists("test/key").await.unwrap());
1917        assert!(!storage.exists("nonexistent").await.unwrap());
1918
1919        // Delete should work
1920        storage.delete("test/key").await.unwrap();
1921        assert!(!storage.exists("test/key").await.unwrap());
1922    }
1923
1924    #[tokio::test]
1925    async fn test_local_fs_cold_storage_rejects_malicious_upload() {
1926        let temp_dir = TempDir::new().unwrap();
1927        let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
1928
1929        // Attempting to upload with path traversal should fail
1930        let result = storage.upload("../malicious", b"pwned").await;
1931        assert!(result.is_err());
1932
1933        // The file should NOT exist outside the storage directory
1934        let escaped_path = temp_dir.path().parent().unwrap().join("malicious");
1935        assert!(!escaped_path.exists());
1936    }
1937}