Skip to main content

rivven_core/storage/
tiered.rs

1//! Tiered Storage System for Rivven
2//!
3//! Implements a hot/warm/cold tiered storage architecture:
4//! - **Hot Tier**: In-memory buffer + NVMe/SSD for recent data (sub-ms access)
5//! - **Warm Tier**: Local disk storage for medium-aged data (ms access)
6//! - **Cold Tier**: Object storage (S3/MinIO/Azure Blob) for archival (100ms+ access)
7//!
8//! Features:
9//! - Automatic tier promotion/demotion based on access patterns
10//! - LRU cache for hot tier with size limits
11//! - Asynchronous background compaction and migration
12//! - Zero-copy reads from memory-mapped warm tier
13//! - Pluggable cold storage backends
14
15use bytes::Bytes;
16use std::collections::{BTreeMap, HashMap, VecDeque};
17use std::path::PathBuf;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
21use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
22use tokio::time::interval;
23
24use crate::{Error, Result};
25
26/// Storage tier classification
27#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
28pub enum StorageTier {
29    /// Hot tier: In-memory + fast SSD, < 1ms access
30    Hot,
31    /// Warm tier: Local disk, mmap'd, 1-10ms access  
32    Warm,
33    /// Cold tier: Object storage, 100ms+ access
34    Cold,
35}
36
37impl StorageTier {
38    /// Get tier name for metrics/logging
39    pub fn name(&self) -> &'static str {
40        match self {
41            StorageTier::Hot => "hot",
42            StorageTier::Warm => "warm",
43            StorageTier::Cold => "cold",
44        }
45    }
46
47    /// Get next cooler tier (for demotion)
48    pub fn demote(&self) -> Option<StorageTier> {
49        match self {
50            StorageTier::Hot => Some(StorageTier::Warm),
51            StorageTier::Warm => Some(StorageTier::Cold),
52            StorageTier::Cold => None,
53        }
54    }
55
56    /// Get next hotter tier (for promotion)
57    pub fn promote(&self) -> Option<StorageTier> {
58        match self {
59            StorageTier::Hot => None,
60            StorageTier::Warm => Some(StorageTier::Hot),
61            StorageTier::Cold => Some(StorageTier::Warm),
62        }
63    }
64}
65
66/// Configuration for tiered storage
67#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
68pub struct TieredStorageConfig {
69    /// Enable tiered storage (default: false)
70    #[serde(default)]
71    pub enabled: bool,
72    /// Maximum size of hot tier in bytes
73    #[serde(default = "default_hot_tier_max_bytes")]
74    pub hot_tier_max_bytes: u64,
75    /// Maximum age of data in hot tier before demotion (seconds)
76    #[serde(default = "default_hot_tier_max_age_secs")]
77    pub hot_tier_max_age_secs: u64,
78    /// Maximum size of warm tier in bytes
79    #[serde(default = "default_warm_tier_max_bytes")]
80    pub warm_tier_max_bytes: u64,
81    /// Maximum age of data in warm tier before demotion (seconds)
82    #[serde(default = "default_warm_tier_max_age_secs")]
83    pub warm_tier_max_age_secs: u64,
84    /// Path for warm tier storage
85    #[serde(default = "default_warm_tier_path")]
86    pub warm_tier_path: String,
87    /// Cold storage backend configuration
88    #[serde(default)]
89    pub cold_storage: ColdStorageConfig,
90    /// How often to run tier migration (seconds)
91    #[serde(default = "default_migration_interval_secs")]
92    pub migration_interval_secs: u64,
93    /// Number of concurrent migration operations
94    #[serde(default = "default_migration_concurrency")]
95    pub migration_concurrency: usize,
96    /// Enable access-based promotion (promote frequently accessed cold data)
97    #[serde(default = "default_enable_promotion")]
98    pub enable_promotion: bool,
99    /// Access count threshold for promotion
100    #[serde(default = "default_promotion_threshold")]
101    pub promotion_threshold: u64,
102    /// Compaction threshold (ratio of dead bytes to total)
103    #[serde(default = "default_compaction_threshold")]
104    pub compaction_threshold: f64,
105}
106
107fn default_hot_tier_max_bytes() -> u64 {
108    1024 * 1024 * 1024
109} // 1 GB
110fn default_hot_tier_max_age_secs() -> u64 {
111    3600
112} // 1 hour
113fn default_warm_tier_max_bytes() -> u64 {
114    100 * 1024 * 1024 * 1024
115} // 100 GB
116fn default_warm_tier_max_age_secs() -> u64 {
117    86400 * 7
118} // 7 days
119fn default_warm_tier_path() -> String {
120    "/var/lib/rivven/warm".to_string()
121}
122fn default_migration_interval_secs() -> u64 {
123    60
124}
125fn default_migration_concurrency() -> usize {
126    4
127}
128fn default_enable_promotion() -> bool {
129    true
130}
131fn default_promotion_threshold() -> u64 {
132    100
133}
134fn default_compaction_threshold() -> f64 {
135    0.5
136}
137
138impl Default for TieredStorageConfig {
139    fn default() -> Self {
140        Self {
141            enabled: false,
142            hot_tier_max_bytes: default_hot_tier_max_bytes(),
143            hot_tier_max_age_secs: default_hot_tier_max_age_secs(),
144            warm_tier_max_bytes: default_warm_tier_max_bytes(),
145            warm_tier_max_age_secs: default_warm_tier_max_age_secs(),
146            warm_tier_path: default_warm_tier_path(),
147            cold_storage: ColdStorageConfig::default(),
148            migration_interval_secs: default_migration_interval_secs(),
149            migration_concurrency: default_migration_concurrency(),
150            enable_promotion: default_enable_promotion(),
151            promotion_threshold: default_promotion_threshold(),
152            compaction_threshold: default_compaction_threshold(),
153        }
154    }
155}
156
157impl TieredStorageConfig {
158    /// Get hot tier max age as Duration
159    pub fn hot_tier_max_age(&self) -> Duration {
160        Duration::from_secs(self.hot_tier_max_age_secs)
161    }
162
163    /// Get warm tier max age as Duration
164    pub fn warm_tier_max_age(&self) -> Duration {
165        Duration::from_secs(self.warm_tier_max_age_secs)
166    }
167
168    /// Get warm tier path as PathBuf
169    pub fn warm_tier_path_buf(&self) -> PathBuf {
170        PathBuf::from(&self.warm_tier_path)
171    }
172
173    /// Get migration interval as Duration
174    pub fn migration_interval(&self) -> Duration {
175        Duration::from_secs(self.migration_interval_secs)
176    }
177
178    /// High-performance config for low-latency workloads
179    pub fn high_performance() -> Self {
180        Self {
181            enabled: true,
182            hot_tier_max_bytes: 8 * 1024 * 1024 * 1024, // 8 GB
183            hot_tier_max_age_secs: 7200,                // 2 hours
184            warm_tier_max_bytes: 500 * 1024 * 1024 * 1024, // 500 GB
185            migration_interval_secs: 30,
186            ..Default::default()
187        }
188    }
189
190    /// Cost-optimized config for archival workloads
191    pub fn cost_optimized() -> Self {
192        Self {
193            enabled: true,
194            hot_tier_max_bytes: 256 * 1024 * 1024, // 256 MB
195            hot_tier_max_age_secs: 300,            // 5 minutes
196            warm_tier_max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
197            warm_tier_max_age_secs: 86400,         // 1 day
198            migration_interval_secs: 120,
199            enable_promotion: false,
200            ..Default::default()
201        }
202    }
203
204    /// Testing config for integration tests (fast migration, small tiers)
205    pub fn testing() -> Self {
206        Self {
207            enabled: true,
208            hot_tier_max_bytes: 1024 * 1024,       // 1 MB
209            hot_tier_max_age_secs: 5,              // 5 seconds
210            warm_tier_max_bytes: 10 * 1024 * 1024, // 10 MB
211            warm_tier_max_age_secs: 10,            // 10 seconds
212            migration_interval_secs: 1,
213            migration_concurrency: 2,
214            enable_promotion: true,
215            promotion_threshold: 3,
216            compaction_threshold: 0.3,
217            ..Default::default()
218        }
219    }
220}
221
222/// Cold storage backend configuration
223#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
224#[serde(tag = "type", rename_all = "snake_case")]
225pub enum ColdStorageConfig {
226    /// Local filesystem (for development/testing)
227    LocalFs {
228        #[serde(default = "default_cold_storage_path")]
229        path: String,
230    },
231    /// S3-compatible object storage (AWS S3, MinIO, Cloudflare R2, etc.)
232    S3 {
233        /// S3 endpoint URL (e.g., `https://s3.us-east-1.amazonaws.com` or `http://localhost:9000` for MinIO)
234        endpoint: Option<String>,
235        /// S3 bucket name
236        bucket: String,
237        /// AWS region (e.g., "us-east-1")
238        region: String,
239        /// AWS access key ID (optional, uses default credential chain if not provided)
240        access_key: Option<String>,
241        /// AWS secret access key
242        secret_key: Option<String>,
243        /// Use path-style URLs (required for MinIO and some S3-compatible services)
244        #[serde(default)]
245        use_path_style: bool,
246    },
247    /// Google Cloud Storage
248    Gcs {
249        /// GCS bucket name
250        bucket: String,
251        /// Path to service account key JSON file (optional, uses default credentials if not provided)
252        service_account_path: Option<String>,
253    },
254    /// Azure Blob Storage
255    AzureBlob {
256        /// Azure storage account name
257        account: String,
258        /// Azure container name
259        container: String,
260        /// Azure storage access key (optional, uses DefaultAzureCredential if not provided)
261        access_key: Option<String>,
262    },
263    /// Disabled (warm tier is final)
264    Disabled,
265}
266
267fn default_cold_storage_path() -> String {
268    "/var/lib/rivven/cold".to_string()
269}
270
271impl Default for ColdStorageConfig {
272    fn default() -> Self {
273        ColdStorageConfig::LocalFs {
274            path: default_cold_storage_path(),
275        }
276    }
277}
278
279impl ColdStorageConfig {
280    /// Get path as PathBuf for LocalFs variant
281    pub fn local_fs_path(&self) -> Option<PathBuf> {
282        match self {
283            ColdStorageConfig::LocalFs { path } => Some(PathBuf::from(path)),
284            _ => None,
285        }
286    }
287}
288
289/// Metadata about a stored segment
290#[derive(Debug)]
291pub struct SegmentMetadata {
292    /// Topic name
293    pub topic: String,
294    /// Partition ID
295    pub partition: u32,
296    /// Base offset of segment
297    pub base_offset: u64,
298    /// End offset (exclusive)
299    pub end_offset: u64,
300    /// Size in bytes
301    pub size_bytes: u64,
302    /// Current storage tier
303    pub tier: StorageTier,
304    /// Creation timestamp
305    pub created_at: u64,
306    /// Last accessed timestamp
307    pub last_accessed: AtomicU64,
308    /// Access count for promotion decisions
309    pub access_count: AtomicU64,
310    /// Number of deleted/compacted records
311    pub dead_records: AtomicU64,
312    /// Total records
313    pub total_records: u64,
314}
315
316impl SegmentMetadata {
317    pub fn new(
318        topic: String,
319        partition: u32,
320        base_offset: u64,
321        end_offset: u64,
322        size_bytes: u64,
323        tier: StorageTier,
324    ) -> Self {
325        let now = SystemTime::now()
326            .duration_since(UNIX_EPOCH)
327            .unwrap_or_default()
328            .as_secs();
329
330        Self {
331            topic,
332            partition,
333            base_offset,
334            end_offset,
335            size_bytes,
336            tier,
337            created_at: now,
338            last_accessed: AtomicU64::new(now),
339            access_count: AtomicU64::new(0),
340            dead_records: AtomicU64::new(0),
341            total_records: (end_offset - base_offset),
342        }
343    }
344
345    /// Record an access and update statistics
346    pub fn record_access(&self) {
347        let now = SystemTime::now()
348            .duration_since(UNIX_EPOCH)
349            .unwrap_or_default()
350            .as_secs();
351        self.last_accessed.store(now, Ordering::Relaxed);
352        self.access_count.fetch_add(1, Ordering::Relaxed);
353    }
354
355    /// Get age in seconds
356    pub fn age_secs(&self) -> u64 {
357        let now = SystemTime::now()
358            .duration_since(UNIX_EPOCH)
359            .unwrap_or_default()
360            .as_secs();
361        now.saturating_sub(self.created_at)
362    }
363
364    /// Get seconds since last access
365    pub fn idle_secs(&self) -> u64 {
366        let now = SystemTime::now()
367            .duration_since(UNIX_EPOCH)
368            .unwrap_or_default()
369            .as_secs();
370        now.saturating_sub(self.last_accessed.load(Ordering::Relaxed))
371    }
372
373    /// Calculate compaction ratio (dead/total)
374    pub fn compaction_ratio(&self) -> f64 {
375        let dead = self.dead_records.load(Ordering::Relaxed);
376        if self.total_records == 0 {
377            0.0
378        } else {
379            dead as f64 / self.total_records as f64
380        }
381    }
382
383    /// Build segment key for storage
384    pub fn segment_key(&self) -> String {
385        format!("{}/{}/{:020}", self.topic, self.partition, self.base_offset)
386    }
387}
388
389/// Hot tier: In-memory LRU cache
390#[derive(Debug)]
391pub struct HotTier {
392    /// Segment data keyed by (topic, partition, base_offset)
393    segments: RwLock<HashMap<(String, u32, u64), Bytes>>,
394    /// LRU order tracking
395    lru_order: Mutex<VecDeque<(String, u32, u64)>>,
396    /// Current size in bytes
397    current_size: AtomicU64,
398    /// Maximum size in bytes
399    max_size: u64,
400}
401
402impl HotTier {
403    pub fn new(max_size: u64) -> Self {
404        Self {
405            segments: RwLock::new(HashMap::new()),
406            lru_order: Mutex::new(VecDeque::new()),
407            current_size: AtomicU64::new(0),
408            max_size,
409        }
410    }
411
412    /// Insert data into hot tier
413    pub async fn insert(&self, topic: &str, partition: u32, base_offset: u64, data: Bytes) -> bool {
414        let size = data.len() as u64;
415
416        // Check if it fits at all
417        if size > self.max_size {
418            return false;
419        }
420
421        // Atomically reserve capacity using CAS loop to eliminate TOCTOU race
422        loop {
423            let current = self.current_size.load(Ordering::Acquire);
424            if current + size > self.max_size {
425                // Try to evict to make space
426                if !self.evict_one().await {
427                    return false; // Cannot make space
428                }
429                continue; // Re-check after eviction
430            }
431            // Attempt to reserve space atomically
432            if self
433                .current_size
434                .compare_exchange_weak(current, current + size, Ordering::AcqRel, Ordering::Acquire)
435                .is_ok()
436            {
437                break; // Space successfully reserved
438            }
439            // CAS failed (another concurrent insert modified current_size) — retry
440        }
441
442        let key = (topic.to_string(), partition, base_offset);
443
444        // Insert data — space already reserved via CAS
445        {
446            let mut segments = self.segments.write().await;
447            if let Some(old) = segments.insert(key.clone(), data) {
448                // Replaced an existing entry: release its space (reservation covers new entry)
449                self.current_size
450                    .fetch_sub(old.len() as u64, Ordering::Relaxed);
451            }
452        }
453
454        // Update LRU
455        {
456            let mut lru = self.lru_order.lock().await;
457            lru.retain(|k| k != &key);
458            lru.push_back(key);
459        }
460
461        true
462    }
463
464    /// Get data from hot tier
465    pub async fn get(&self, topic: &str, partition: u32, base_offset: u64) -> Option<Bytes> {
466        let key = (topic.to_string(), partition, base_offset);
467
468        let data = {
469            let segments = self.segments.read().await;
470            segments.get(&key).cloned()
471        };
472
473        if data.is_some() {
474            // Update LRU on access
475            let mut lru = self.lru_order.lock().await;
476            lru.retain(|k| k != &key);
477            lru.push_back(key);
478        }
479
480        data
481    }
482
483    /// Remove data from hot tier
484    pub async fn remove(&self, topic: &str, partition: u32, base_offset: u64) -> Option<Bytes> {
485        let key = (topic.to_string(), partition, base_offset);
486
487        let removed = {
488            let mut segments = self.segments.write().await;
489            segments.remove(&key)
490        };
491
492        if let Some(ref data) = removed {
493            self.current_size
494                .fetch_sub(data.len() as u64, Ordering::Relaxed);
495            let mut lru = self.lru_order.lock().await;
496            lru.retain(|k| k != &key);
497        }
498
499        removed
500    }
501
502    /// Evict least recently used segment
503    async fn evict_one(&self) -> bool {
504        let to_evict = {
505            let mut lru = self.lru_order.lock().await;
506            lru.pop_front()
507        };
508
509        if let Some(key) = to_evict {
510            let removed = {
511                let mut segments = self.segments.write().await;
512                segments.remove(&key)
513            };
514
515            if let Some(data) = removed {
516                self.current_size
517                    .fetch_sub(data.len() as u64, Ordering::Relaxed);
518                return true;
519            }
520        }
521
522        false
523    }
524
525    /// Get current usage statistics
526    pub fn stats(&self) -> HotTierStats {
527        HotTierStats {
528            current_size: self.current_size.load(Ordering::Relaxed),
529            max_size: self.max_size,
530        }
531    }
532}
533
534#[derive(Debug, Clone)]
535pub struct HotTierStats {
536    pub current_size: u64,
537    pub max_size: u64,
538}
539
540/// Warm tier: Memory-mapped local disk storage
541#[derive(Debug)]
542pub struct WarmTier {
543    /// Base path for warm tier storage
544    base_path: PathBuf,
545    /// Segment metadata index
546    segments: RwLock<BTreeMap<(String, u32, u64), Arc<SegmentMetadata>>>,
547    /// Current total size
548    current_size: AtomicU64,
549    /// Maximum size
550    max_size: u64,
551}
552
553impl WarmTier {
554    pub fn new(base_path: PathBuf, max_size: u64) -> Result<Self> {
555        std::fs::create_dir_all(&base_path)?;
556
557        Ok(Self {
558            base_path,
559            segments: RwLock::new(BTreeMap::new()),
560            current_size: AtomicU64::new(0),
561            max_size,
562        })
563    }
564
565    /// Get segment file path
566    fn segment_path(&self, topic: &str, partition: u32, base_offset: u64) -> PathBuf {
567        self.base_path
568            .join(topic)
569            .join(format!("{}", partition))
570            .join(format!("{:020}.segment", base_offset))
571    }
572
573    /// Store segment data
574    pub async fn store(
575        &self,
576        topic: &str,
577        partition: u32,
578        base_offset: u64,
579        end_offset: u64,
580        data: &[u8],
581    ) -> Result<()> {
582        let size = data.len() as u64;
583
584        // Enforce max_size: evict oldest segments until we have space
585        while self.current_size.load(Ordering::Relaxed) + size > self.max_size {
586            if !self.evict_oldest().await {
587                // Cannot evict anything more — reject the write
588                return Err(crate::Error::Other(format!(
589                    "warm tier full ({} / {} bytes), cannot store segment",
590                    self.current_size.load(Ordering::Relaxed),
591                    self.max_size
592                )));
593            }
594        }
595
596        let path = self.segment_path(topic, partition, base_offset);
597
598        // Ensure directory exists
599        if let Some(parent) = path.parent() {
600            tokio::fs::create_dir_all(parent).await?;
601        }
602
603        // Write segment file
604        tokio::fs::write(&path, data).await?;
605
606        // Update metadata
607        let metadata = Arc::new(SegmentMetadata::new(
608            topic.to_string(),
609            partition,
610            base_offset,
611            end_offset,
612            size,
613            StorageTier::Warm,
614        ));
615
616        {
617            let mut segments = self.segments.write().await;
618            segments.insert((topic.to_string(), partition, base_offset), metadata);
619        }
620
621        self.current_size.fetch_add(size, Ordering::Relaxed);
622
623        Ok(())
624    }
625
626    /// Read segment data using mmap for zero-copy
627    pub async fn read(
628        &self,
629        topic: &str,
630        partition: u32,
631        base_offset: u64,
632    ) -> Result<Option<Bytes>> {
633        let path = self.segment_path(topic, partition, base_offset);
634
635        if !path.exists() {
636            return Ok(None);
637        }
638
639        // Memory map the file for efficient reading
640        let file = std::fs::File::open(&path)?;
641        // SAFETY: File is opened read-only and remains valid for mmap lifetime.
642        // The mmap is only used for reading and copied to Bytes before return.
643        let mmap = unsafe { memmap2::Mmap::map(&file)? };
644
645        // Update access stats
646        let key = (topic.to_string(), partition, base_offset);
647        if let Some(meta) = self.segments.read().await.get(&key) {
648            meta.record_access();
649        }
650
651        // Convert to Bytes (this copies, but allows the mmap to be dropped)
652        // For true zero-copy, we'd need to return an Arc<Mmap>
653        Ok(Some(Bytes::copy_from_slice(&mmap)))
654    }
655
656    /// Remove segment
657    pub async fn remove(&self, topic: &str, partition: u32, base_offset: u64) -> Result<()> {
658        let path = self.segment_path(topic, partition, base_offset);
659        let key = (topic.to_string(), partition, base_offset);
660
661        let size = {
662            let mut segments = self.segments.write().await;
663            segments.remove(&key).map(|m| m.size_bytes)
664        };
665
666        if let Some(size) = size {
667            self.current_size.fetch_sub(size, Ordering::Relaxed);
668        }
669
670        if path.exists() {
671            tokio::fs::remove_file(path).await?;
672        }
673
674        Ok(())
675    }
676
677    /// Get segments that should be demoted to cold tier
678    pub async fn get_demotion_candidates(&self, max_age: Duration) -> Vec<(String, u32, u64)> {
679        let max_age_secs = max_age.as_secs();
680        let segments = self.segments.read().await;
681
682        segments
683            .iter()
684            .filter(|(_, meta)| meta.age_secs() > max_age_secs)
685            .map(|(key, _)| key.clone())
686            .collect()
687    }
688
689    /// Get segments metadata
690    pub async fn get_metadata(
691        &self,
692        topic: &str,
693        partition: u32,
694        base_offset: u64,
695    ) -> Option<Arc<SegmentMetadata>> {
696        let key = (topic.to_string(), partition, base_offset);
697        self.segments.read().await.get(&key).cloned()
698    }
699
700    pub fn stats(&self) -> WarmTierStats {
701        WarmTierStats {
702            current_size: self.current_size.load(Ordering::Relaxed),
703            max_size: self.max_size,
704        }
705    }
706
707    /// Evict the oldest (by creation time) segment from warm tier to free space.
708    /// Returns true if a segment was evicted.
709    async fn evict_oldest(&self) -> bool {
710        let to_evict = {
711            let segments = self.segments.read().await;
712            segments
713                .iter()
714                .min_by_key(|(_, meta)| meta.created_at)
715                .map(|(key, _)| key.clone())
716        };
717
718        if let Some((topic, partition, base_offset)) = to_evict {
719            tracing::debug!(
720                topic = %topic,
721                partition,
722                base_offset,
723                "Evicting warm tier segment to free space"
724            );
725            // Ignore removal errors — best effort
726            let _ = self.remove(&topic, partition, base_offset).await;
727            true
728        } else {
729            false
730        }
731    }
732}
733
734#[derive(Debug, Clone)]
735pub struct WarmTierStats {
736    pub current_size: u64,
737    pub max_size: u64,
738}
739
740/// Cold storage backend trait
741#[async_trait::async_trait]
742pub trait ColdStorageBackend: Send + Sync {
743    /// Upload segment to cold storage
744    async fn upload(&self, key: &str, data: &[u8]) -> Result<()>;
745
746    /// Download segment from cold storage
747    async fn download(&self, key: &str) -> Result<Option<Bytes>>;
748
749    /// Delete segment from cold storage
750    async fn delete(&self, key: &str) -> Result<()>;
751
752    /// List all segment keys with prefix
753    async fn list(&self, prefix: &str) -> Result<Vec<String>>;
754
755    /// Check if segment exists
756    async fn exists(&self, key: &str) -> Result<bool>;
757}
758
759/// Local filesystem cold storage (for dev/testing)
760pub struct LocalFsColdStorage {
761    base_path: PathBuf,
762}
763
764impl LocalFsColdStorage {
765    pub fn new(base_path: PathBuf) -> Result<Self> {
766        std::fs::create_dir_all(&base_path)?;
767        // Canonicalize base path to prevent path traversal
768        let base_path = base_path.canonicalize()?;
769        Ok(Self { base_path })
770    }
771
772    /// Convert a key to a safe filesystem path, preventing path traversal attacks.
773    ///
774    /// # Security
775    /// - Rejects keys containing `..` components
776    /// - Rejects absolute paths
777    /// - Validates resulting path stays within base_path
778    fn key_to_path(&self, key: &str) -> Result<PathBuf> {
779        // Security: Reject keys with path traversal attempts
780        if key.contains("..") || key.starts_with('/') || key.starts_with('\\') {
781            return Err(Error::Other(format!(
782                "Invalid key: path traversal attempt detected: {}",
783                key
784            )));
785        }
786
787        // Also reject any key with null bytes (could bypass checks in some systems)
788        if key.contains('\0') {
789            return Err(Error::Other("Invalid key: null byte not allowed".into()));
790        }
791
792        let path = self
793            .base_path
794            .join(key.replace('/', std::path::MAIN_SEPARATOR_STR));
795
796        // Double-check: ensure the resolved path is under base_path
797        // This catches edge cases like symlinks
798        if let Ok(canonical) = path.canonicalize() {
799            if !canonical.starts_with(&self.base_path) {
800                return Err(Error::Other(format!(
801                    "Invalid key: path escapes base directory: {}",
802                    key
803                )));
804            }
805        }
806        // If canonicalize fails (file doesn't exist yet), the path should still be
807        // safe because we've already rejected .. and absolute paths
808
809        Ok(path)
810    }
811}
812
813#[async_trait::async_trait]
814impl ColdStorageBackend for LocalFsColdStorage {
815    async fn upload(&self, key: &str, data: &[u8]) -> Result<()> {
816        let path = self.key_to_path(key)?;
817        if let Some(parent) = path.parent() {
818            tokio::fs::create_dir_all(parent).await?;
819        }
820        tokio::fs::write(&path, data).await?;
821        Ok(())
822    }
823
824    async fn download(&self, key: &str) -> Result<Option<Bytes>> {
825        let path = self.key_to_path(key)?;
826        if !path.exists() {
827            return Ok(None);
828        }
829        let data = tokio::fs::read(&path).await?;
830        Ok(Some(Bytes::from(data)))
831    }
832
833    async fn delete(&self, key: &str) -> Result<()> {
834        let path = self.key_to_path(key)?;
835        if path.exists() {
836            tokio::fs::remove_file(path).await?;
837        }
838        Ok(())
839    }
840
841    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
842        let base = self.key_to_path(prefix)?;
843        let mut keys = Vec::new();
844
845        if !base.exists() {
846            return Ok(keys);
847        }
848
849        fn walk_dir(
850            dir: &std::path::Path,
851            base: &std::path::Path,
852            keys: &mut Vec<String>,
853        ) -> std::io::Result<()> {
854            if dir.is_dir() {
855                for entry in std::fs::read_dir(dir)? {
856                    let entry = entry?;
857                    let path = entry.path();
858                    if path.is_dir() {
859                        walk_dir(&path, base, keys)?;
860                    } else if let Ok(rel) = path.strip_prefix(base) {
861                        keys.push(
862                            rel.to_string_lossy()
863                                .replace(std::path::MAIN_SEPARATOR, "/"),
864                        );
865                    }
866                }
867            }
868            Ok(())
869        }
870
871        walk_dir(&self.base_path, &self.base_path, &mut keys)?;
872        Ok(keys)
873    }
874
875    async fn exists(&self, key: &str) -> Result<bool> {
876        Ok(self.key_to_path(key)?.exists())
877    }
878}
879
880/// Disabled cold storage (warm tier is final)
881pub struct DisabledColdStorage;
882
883#[async_trait::async_trait]
884impl ColdStorageBackend for DisabledColdStorage {
885    async fn upload(&self, _key: &str, _data: &[u8]) -> Result<()> {
886        Err(Error::Other("Cold storage is disabled".into()))
887    }
888
889    async fn download(&self, _key: &str) -> Result<Option<Bytes>> {
890        Ok(None)
891    }
892
893    async fn delete(&self, _key: &str) -> Result<()> {
894        Ok(())
895    }
896
897    async fn list(&self, _prefix: &str) -> Result<Vec<String>> {
898        Ok(Vec::new())
899    }
900
901    async fn exists(&self, _key: &str) -> Result<bool> {
902        Ok(false)
903    }
904}
905
906// ============================================================================
907// Cloud Storage Backends (S3, GCS, Azure)
908// ============================================================================
909
910/// Object Store based cold storage backend
911///
912/// Provides a unified interface for S3, GCS, Azure Blob Storage, and MinIO
913/// using the `object_store` crate.
914#[cfg(feature = "cloud-storage")]
915pub struct ObjectStoreColdStorage {
916    store: Arc<dyn object_store::ObjectStore>,
917    /// Optional prefix for all keys (e.g., "rivven/segments/")
918    prefix: String,
919}
920
921#[cfg(feature = "cloud-storage")]
922impl ObjectStoreColdStorage {
923    /// Create a new S3-compatible cold storage backend
924    #[cfg(feature = "s3")]
925    pub fn s3(
926        bucket: &str,
927        region: &str,
928        endpoint: Option<&str>,
929        access_key: Option<&str>,
930        secret_key: Option<&str>,
931        use_path_style: bool,
932    ) -> Result<Self> {
933        use object_store::aws::AmazonS3Builder;
934
935        let mut builder = AmazonS3Builder::new()
936            .with_bucket_name(bucket)
937            .with_region(region);
938
939        if let Some(endpoint) = endpoint {
940            builder = builder.with_endpoint(endpoint);
941        }
942
943        if let (Some(key), Some(secret)) = (access_key, secret_key) {
944            builder = builder
945                .with_access_key_id(key)
946                .with_secret_access_key(secret);
947        }
948
949        if use_path_style {
950            builder = builder.with_virtual_hosted_style_request(false);
951        }
952
953        let store = builder
954            .build()
955            .map_err(|e| Error::Other(format!("Failed to create S3 client: {}", e)))?;
956
957        Ok(Self {
958            store: Arc::new(store),
959            prefix: String::new(),
960        })
961    }
962
963    /// Create a new MinIO cold storage backend (S3-compatible)
964    #[cfg(feature = "s3")]
965    pub fn minio(endpoint: &str, bucket: &str, access_key: &str, secret_key: &str) -> Result<Self> {
966        Self::s3(
967            bucket,
968            "us-east-1", // MinIO doesn't care about region
969            Some(endpoint),
970            Some(access_key),
971            Some(secret_key),
972            true, // MinIO requires path-style
973        )
974    }
975
976    /// Create a new Google Cloud Storage backend
977    #[cfg(feature = "gcs")]
978    pub fn gcs(bucket: &str, service_account_path: Option<&std::path::Path>) -> Result<Self> {
979        use object_store::gcp::GoogleCloudStorageBuilder;
980
981        let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket);
982
983        if let Some(path) = service_account_path {
984            builder = builder.with_service_account_path(path.to_string_lossy());
985        }
986
987        let store = builder
988            .build()
989            .map_err(|e| Error::Other(format!("Failed to create GCS client: {}", e)))?;
990
991        Ok(Self {
992            store: Arc::new(store),
993            prefix: String::new(),
994        })
995    }
996
997    /// Create a new Azure Blob Storage backend
998    #[cfg(feature = "azure")]
999    pub fn azure(account: &str, container: &str, access_key: Option<&str>) -> Result<Self> {
1000        use object_store::azure::MicrosoftAzureBuilder;
1001
1002        let mut builder = MicrosoftAzureBuilder::new()
1003            .with_account(account)
1004            .with_container_name(container);
1005
1006        if let Some(key) = access_key {
1007            builder = builder.with_access_key(key);
1008        }
1009
1010        let store = builder
1011            .build()
1012            .map_err(|e| Error::Other(format!("Failed to create Azure Blob client: {}", e)))?;
1013
1014        Ok(Self {
1015            store: Arc::new(store),
1016            prefix: String::new(),
1017        })
1018    }
1019
1020    /// Set a prefix for all keys
1021    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
1022        self.prefix = prefix.into();
1023        if !self.prefix.is_empty() && !self.prefix.ends_with('/') {
1024            self.prefix.push('/');
1025        }
1026        self
1027    }
1028
1029    fn full_path(&self, key: &str) -> object_store::path::Path {
1030        object_store::path::Path::from(format!("{}{}", self.prefix, key))
1031    }
1032}
1033
1034#[cfg(feature = "cloud-storage")]
1035#[async_trait::async_trait]
1036impl ColdStorageBackend for ObjectStoreColdStorage {
1037    async fn upload(&self, key: &str, data: &[u8]) -> Result<()> {
1038        use object_store::ObjectStore;
1039
1040        let path = self.full_path(key);
1041        let payload = object_store::PutPayload::from(data.to_vec());
1042
1043        self.store
1044            .put(&path, payload)
1045            .await
1046            .map_err(|e| Error::Other(format!("Failed to upload to object store: {}", e)))?;
1047
1048        Ok(())
1049    }
1050
1051    async fn download(&self, key: &str) -> Result<Option<Bytes>> {
1052        use object_store::ObjectStore;
1053
1054        let path = self.full_path(key);
1055
1056        match self.store.get(&path).await {
1057            Ok(result) => {
1058                let bytes = result
1059                    .bytes()
1060                    .await
1061                    .map_err(|e| Error::Other(format!("Failed to read object: {}", e)))?;
1062                Ok(Some(bytes))
1063            }
1064            Err(object_store::Error::NotFound { .. }) => Ok(None),
1065            Err(e) => Err(Error::Other(format!(
1066                "Failed to download from object store: {}",
1067                e
1068            ))),
1069        }
1070    }
1071
1072    async fn delete(&self, key: &str) -> Result<()> {
1073        use object_store::ObjectStore;
1074
1075        let path = self.full_path(key);
1076
1077        // Ignore NotFound errors on delete
1078        match self.store.delete(&path).await {
1079            Ok(()) => Ok(()),
1080            Err(object_store::Error::NotFound { .. }) => Ok(()),
1081            Err(e) => Err(Error::Other(format!(
1082                "Failed to delete from object store: {}",
1083                e
1084            ))),
1085        }
1086    }
1087
1088    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
1089        use futures::StreamExt;
1090        use object_store::ObjectStore;
1091
1092        let full_prefix = self.full_path(prefix);
1093        let mut stream = self.store.list(Some(&full_prefix));
1094        let mut keys = Vec::new();
1095
1096        while let Some(result) = stream.next().await {
1097            match result {
1098                Ok(meta) => {
1099                    let key = meta.location.to_string();
1100                    // Strip the prefix to return relative keys
1101                    if let Some(relative) = key.strip_prefix(&self.prefix) {
1102                        keys.push(relative.to_string());
1103                    } else {
1104                        keys.push(key);
1105                    }
1106                }
1107                Err(e) => {
1108                    return Err(Error::Other(format!("Failed to list objects: {}", e)));
1109                }
1110            }
1111        }
1112
1113        Ok(keys)
1114    }
1115
1116    async fn exists(&self, key: &str) -> Result<bool> {
1117        use object_store::ObjectStore;
1118
1119        let path = self.full_path(key);
1120
1121        match self.store.head(&path).await {
1122            Ok(_) => Ok(true),
1123            Err(object_store::Error::NotFound { .. }) => Ok(false),
1124            Err(e) => Err(Error::Other(format!(
1125                "Failed to check object existence: {}",
1126                e
1127            ))),
1128        }
1129    }
1130}
1131
1132/// Migration task
1133#[derive(Debug)]
1134enum MigrationTask {
1135    Demote {
1136        topic: String,
1137        partition: u32,
1138        base_offset: u64,
1139        from_tier: StorageTier,
1140    },
1141    Promote {
1142        topic: String,
1143        partition: u32,
1144        base_offset: u64,
1145        to_tier: StorageTier,
1146    },
1147    Compact {
1148        topic: String,
1149        partition: u32,
1150        base_offset: u64,
1151    },
1152}
1153
1154/// Main tiered storage manager
1155pub struct TieredStorage {
1156    config: TieredStorageConfig,
1157    hot_tier: Arc<HotTier>,
1158    warm_tier: Arc<WarmTier>,
1159    cold_storage: Arc<dyn ColdStorageBackend>,
1160    /// Global segment index across all tiers
1161    segment_index: RwLock<BTreeMap<(String, u32, u64), Arc<SegmentMetadata>>>,
1162    /// Migration task queue
1163    migration_tx: mpsc::Sender<MigrationTask>,
1164    /// Statistics
1165    stats: Arc<TieredStorageStats>,
1166    /// Shutdown signal
1167    shutdown: tokio::sync::broadcast::Sender<()>,
1168}
1169
1170impl std::fmt::Debug for TieredStorage {
1171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1172        f.debug_struct("TieredStorage")
1173            .field("config", &self.config)
1174            .field("hot_tier", &self.hot_tier)
1175            .field("warm_tier", &self.warm_tier)
1176            .field("cold_storage", &"<dyn ColdStorageBackend>")
1177            .finish()
1178    }
1179}
1180
1181impl TieredStorage {
1182    /// Create new tiered storage system
1183    pub async fn new(config: TieredStorageConfig) -> Result<Arc<Self>> {
1184        let hot_tier = Arc::new(HotTier::new(config.hot_tier_max_bytes));
1185        let warm_tier = Arc::new(WarmTier::new(
1186            config.warm_tier_path_buf(),
1187            config.warm_tier_max_bytes,
1188        )?);
1189
1190        let cold_storage: Arc<dyn ColdStorageBackend> = match &config.cold_storage {
1191            ColdStorageConfig::LocalFs { path } => {
1192                Arc::new(LocalFsColdStorage::new(PathBuf::from(path))?)
1193            }
1194            ColdStorageConfig::Disabled => Arc::new(DisabledColdStorage),
1195
1196            #[cfg(feature = "s3")]
1197            ColdStorageConfig::S3 {
1198                endpoint,
1199                bucket,
1200                region,
1201                access_key,
1202                secret_key,
1203                use_path_style,
1204            } => Arc::new(ObjectStoreColdStorage::s3(
1205                bucket,
1206                region,
1207                endpoint.as_deref(),
1208                access_key.as_deref(),
1209                secret_key.as_deref(),
1210                *use_path_style,
1211            )?),
1212
1213            #[cfg(not(feature = "s3"))]
1214            ColdStorageConfig::S3 { .. } => {
1215                return Err(Error::Other(
1216                    "S3 cold storage requires the 's3' feature flag".into(),
1217                ));
1218            }
1219
1220            #[cfg(feature = "gcs")]
1221            ColdStorageConfig::Gcs {
1222                bucket,
1223                service_account_path,
1224            } => Arc::new(ObjectStoreColdStorage::gcs(
1225                bucket,
1226                service_account_path.as_ref().map(std::path::Path::new),
1227            )?),
1228
1229            #[cfg(not(feature = "gcs"))]
1230            ColdStorageConfig::Gcs { .. } => {
1231                return Err(Error::Other(
1232                    "GCS cold storage requires the 'gcs' feature flag".into(),
1233                ));
1234            }
1235
1236            #[cfg(feature = "azure")]
1237            ColdStorageConfig::AzureBlob {
1238                account,
1239                container,
1240                access_key,
1241            } => Arc::new(ObjectStoreColdStorage::azure(
1242                account,
1243                container,
1244                access_key.as_deref(),
1245            )?),
1246
1247            #[cfg(not(feature = "azure"))]
1248            ColdStorageConfig::AzureBlob { .. } => {
1249                return Err(Error::Other(
1250                    "Azure Blob cold storage requires the 'azure' feature flag".into(),
1251                ));
1252            }
1253        };
1254
1255        let (migration_tx, migration_rx) = mpsc::channel(1024);
1256        let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
1257
1258        let storage = Arc::new(Self {
1259            config: config.clone(),
1260            hot_tier,
1261            warm_tier,
1262            cold_storage,
1263            segment_index: RwLock::new(BTreeMap::new()),
1264            migration_tx,
1265            stats: Arc::new(TieredStorageStats::new()),
1266            shutdown: shutdown_tx,
1267        });
1268
1269        // Start background migration worker
1270        storage.clone().start_migration_worker(migration_rx);
1271
1272        // Start background tier manager
1273        storage.clone().start_tier_manager();
1274
1275        Ok(storage)
1276    }
1277
1278    /// Write messages to storage (always starts in hot tier)
1279    pub async fn write(
1280        &self,
1281        topic: &str,
1282        partition: u32,
1283        base_offset: u64,
1284        end_offset: u64,
1285        data: Bytes,
1286    ) -> Result<()> {
1287        let size = data.len() as u64;
1288
1289        // Always write to hot tier first
1290        let inserted = self
1291            .hot_tier
1292            .insert(topic, partition, base_offset, data.clone())
1293            .await;
1294
1295        if !inserted {
1296            // Hot tier full and can't evict, write directly to warm
1297            self.warm_tier
1298                .store(topic, partition, base_offset, end_offset, &data)
1299                .await?;
1300            self.stats.warm_writes.fetch_add(1, Ordering::Relaxed);
1301        } else {
1302            self.stats.hot_writes.fetch_add(1, Ordering::Relaxed);
1303        }
1304
1305        // Update segment index
1306        let metadata = Arc::new(SegmentMetadata::new(
1307            topic.to_string(),
1308            partition,
1309            base_offset,
1310            end_offset,
1311            size,
1312            if inserted {
1313                StorageTier::Hot
1314            } else {
1315                StorageTier::Warm
1316            },
1317        ));
1318
1319        {
1320            let mut index = self.segment_index.write().await;
1321            index.insert((topic.to_string(), partition, base_offset), metadata);
1322        }
1323
1324        self.stats
1325            .total_bytes_written
1326            .fetch_add(size, Ordering::Relaxed);
1327
1328        Ok(())
1329    }
1330
1331    /// Read messages from storage
1332    pub async fn read(
1333        &self,
1334        topic: &str,
1335        partition: u32,
1336        start_offset: u64,
1337        max_bytes: usize,
1338    ) -> Result<Vec<(u64, Bytes)>> {
1339        let _start = Instant::now();
1340        let mut results = Vec::new();
1341        let mut bytes_collected = 0;
1342
1343        // Find relevant segments
1344        let segments = {
1345            let index = self.segment_index.read().await;
1346            index
1347                .range((topic.to_string(), partition, 0)..(topic.to_string(), partition, u64::MAX))
1348                .filter(|(_, meta)| meta.end_offset > start_offset)
1349                .map(|(k, v)| (k.clone(), v.clone()))
1350                .collect::<Vec<_>>()
1351        };
1352
1353        for ((_, _, base_offset), metadata) in segments {
1354            if bytes_collected >= max_bytes {
1355                break;
1356            }
1357
1358            metadata.record_access();
1359
1360            // Try to read from each tier in order
1361            let data = match metadata.tier {
1362                StorageTier::Hot => {
1363                    if let Some(data) = self.hot_tier.get(topic, partition, base_offset).await {
1364                        self.stats.hot_reads.fetch_add(1, Ordering::Relaxed);
1365                        Some(data)
1366                    } else {
1367                        // Might have been evicted, try warm
1368                        None
1369                    }
1370                }
1371                _ => None,
1372            };
1373
1374            let data = match data {
1375                Some(d) => d,
1376                None => {
1377                    // Try warm tier
1378                    if let Some(data) = self.warm_tier.read(topic, partition, base_offset).await? {
1379                        self.stats.warm_reads.fetch_add(1, Ordering::Relaxed);
1380
1381                        // Consider promotion if frequently accessed
1382                        if self.config.enable_promotion {
1383                            let access_count = metadata.access_count.load(Ordering::Relaxed);
1384                            if access_count >= self.config.promotion_threshold {
1385                                let _ = self
1386                                    .migration_tx
1387                                    .send(MigrationTask::Promote {
1388                                        topic: topic.to_string(),
1389                                        partition,
1390                                        base_offset,
1391                                        to_tier: StorageTier::Hot,
1392                                    })
1393                                    .await;
1394                            }
1395                        }
1396
1397                        data
1398                    } else {
1399                        // Try cold tier
1400                        let key = metadata.segment_key();
1401                        if let Some(data) = self.cold_storage.download(&key).await? {
1402                            self.stats.cold_reads.fetch_add(1, Ordering::Relaxed);
1403
1404                            // Consider promotion
1405                            if self.config.enable_promotion {
1406                                let access_count = metadata.access_count.load(Ordering::Relaxed);
1407                                if access_count >= self.config.promotion_threshold {
1408                                    let _ = self
1409                                        .migration_tx
1410                                        .send(MigrationTask::Promote {
1411                                            topic: topic.to_string(),
1412                                            partition,
1413                                            base_offset,
1414                                            to_tier: StorageTier::Warm,
1415                                        })
1416                                        .await;
1417                                }
1418                            }
1419
1420                            data
1421                        } else {
1422                            continue; // Segment not found
1423                        }
1424                    }
1425                }
1426            };
1427
1428            results.push((base_offset, data.clone()));
1429            bytes_collected += data.len();
1430        }
1431
1432        self.stats
1433            .total_bytes_read
1434            .fetch_add(bytes_collected as u64, Ordering::Relaxed);
1435
1436        Ok(results)
1437    }
1438
1439    /// Get metadata for a specific segment
1440    pub async fn get_segment_metadata(
1441        &self,
1442        topic: &str,
1443        partition: u32,
1444        base_offset: u64,
1445    ) -> Option<Arc<SegmentMetadata>> {
1446        self.segment_index
1447            .read()
1448            .await
1449            .get(&(topic.to_string(), partition, base_offset))
1450            .cloned()
1451    }
1452
1453    /// Force demote segments from hot to warm
1454    pub async fn flush_hot_tier(&self, topic: &str, partition: u32) -> Result<()> {
1455        let segments: Vec<_> = {
1456            let index = self.segment_index.read().await;
1457            index
1458                .range((topic.to_string(), partition, 0)..(topic.to_string(), partition, u64::MAX))
1459                .filter(|(_, meta)| meta.tier == StorageTier::Hot)
1460                .map(|(k, _)| k.2)
1461                .collect()
1462        };
1463
1464        for base_offset in segments {
1465            let _ = self
1466                .migration_tx
1467                .send(MigrationTask::Demote {
1468                    topic: topic.to_string(),
1469                    partition,
1470                    base_offset,
1471                    from_tier: StorageTier::Hot,
1472                })
1473                .await;
1474        }
1475
1476        Ok(())
1477    }
1478
1479    /// Get storage statistics
1480    pub fn stats(&self) -> TieredStorageStatsSnapshot {
1481        TieredStorageStatsSnapshot {
1482            hot_tier: self.hot_tier.stats(),
1483            warm_tier: self.warm_tier.stats(),
1484            hot_reads: self.stats.hot_reads.load(Ordering::Relaxed),
1485            warm_reads: self.stats.warm_reads.load(Ordering::Relaxed),
1486            cold_reads: self.stats.cold_reads.load(Ordering::Relaxed),
1487            hot_writes: self.stats.hot_writes.load(Ordering::Relaxed),
1488            warm_writes: self.stats.warm_writes.load(Ordering::Relaxed),
1489            cold_writes: self.stats.cold_writes.load(Ordering::Relaxed),
1490            total_bytes_read: self.stats.total_bytes_read.load(Ordering::Relaxed),
1491            total_bytes_written: self.stats.total_bytes_written.load(Ordering::Relaxed),
1492            migrations_completed: self.stats.migrations_completed.load(Ordering::Relaxed),
1493            migrations_failed: self.stats.migrations_failed.load(Ordering::Relaxed),
1494        }
1495    }
1496
1497    /// Start the background migration worker
1498    fn start_migration_worker(self: Arc<Self>, mut rx: mpsc::Receiver<MigrationTask>) {
1499        let semaphore = Arc::new(Semaphore::new(self.config.migration_concurrency));
1500        let mut shutdown_rx = self.shutdown.subscribe();
1501
1502        tokio::spawn(async move {
1503            loop {
1504                tokio::select! {
1505                    Some(task) = rx.recv() => {
1506                        // SAFETY: acquire_owned only fails if semaphore is closed, which never
1507                        // happens since we own the Arc<Semaphore> in this function scope.
1508                        let permit = semaphore.clone().acquire_owned().await.expect("semaphore closed unexpectedly");
1509                        let storage = self.clone();
1510
1511                        tokio::spawn(async move {
1512                            let result = storage.execute_migration(task).await;
1513                            if result.is_ok() {
1514                                storage.stats.migrations_completed.fetch_add(1, Ordering::Relaxed);
1515                            } else {
1516                                storage.stats.migrations_failed.fetch_add(1, Ordering::Relaxed);
1517                            }
1518                            drop(permit);
1519                        });
1520                    }
1521                    _ = shutdown_rx.recv() => {
1522                        break;
1523                    }
1524                }
1525            }
1526        });
1527    }
1528
1529    /// Start the background tier manager (checks for demotions)
1530    fn start_tier_manager(self: Arc<Self>) {
1531        let mut shutdown_rx = self.shutdown.subscribe();
1532        let migration_interval = self.config.migration_interval();
1533
1534        tokio::spawn(async move {
1535            let mut ticker = interval(migration_interval);
1536
1537            loop {
1538                tokio::select! {
1539                    _ = ticker.tick() => {
1540                        if let Err(e) = self.check_tier_migrations().await {
1541                            tracing::warn!("Tier migration check failed: {}", e);
1542                        }
1543                    }
1544                    _ = shutdown_rx.recv() => {
1545                        break;
1546                    }
1547                }
1548            }
1549        });
1550    }
1551
1552    /// Check and queue tier migrations
1553    async fn check_tier_migrations(&self) -> Result<()> {
1554        let hot_max_age = self.config.hot_tier_max_age();
1555        let warm_max_age = self.config.warm_tier_max_age();
1556
1557        // Check hot tier for demotions
1558        let hot_candidates: Vec<_> = {
1559            let index = self.segment_index.read().await;
1560            index
1561                .iter()
1562                .filter(|(_, meta)| {
1563                    meta.tier == StorageTier::Hot
1564                        && Duration::from_secs(meta.age_secs()) > hot_max_age
1565                })
1566                .map(|(k, _)| k.clone())
1567                .collect()
1568        };
1569
1570        for (topic, partition, base_offset) in hot_candidates {
1571            let _ = self
1572                .migration_tx
1573                .send(MigrationTask::Demote {
1574                    topic,
1575                    partition,
1576                    base_offset,
1577                    from_tier: StorageTier::Hot,
1578                })
1579                .await;
1580        }
1581
1582        // Check warm tier for demotions to cold
1583        let warm_candidates = self.warm_tier.get_demotion_candidates(warm_max_age).await;
1584
1585        for (topic, partition, base_offset) in warm_candidates {
1586            let _ = self
1587                .migration_tx
1588                .send(MigrationTask::Demote {
1589                    topic,
1590                    partition,
1591                    base_offset,
1592                    from_tier: StorageTier::Warm,
1593                })
1594                .await;
1595        }
1596
1597        // Check for compaction candidates
1598        let compaction_threshold = self.config.compaction_threshold;
1599        let compaction_candidates: Vec<_> = {
1600            let index = self.segment_index.read().await;
1601            index
1602                .iter()
1603                .filter(|(_, meta)| meta.compaction_ratio() > compaction_threshold)
1604                .map(|(k, _)| k.clone())
1605                .collect()
1606        };
1607
1608        for (topic, partition, base_offset) in compaction_candidates {
1609            let _ = self
1610                .migration_tx
1611                .send(MigrationTask::Compact {
1612                    topic,
1613                    partition,
1614                    base_offset,
1615                })
1616                .await;
1617        }
1618
1619        Ok(())
1620    }
1621
1622    /// Execute a migration task
1623    async fn execute_migration(&self, task: MigrationTask) -> Result<()> {
1624        match task {
1625            MigrationTask::Demote {
1626                topic,
1627                partition,
1628                base_offset,
1629                from_tier,
1630            } => {
1631                self.demote_segment(&topic, partition, base_offset, from_tier)
1632                    .await
1633            }
1634            MigrationTask::Promote {
1635                topic,
1636                partition,
1637                base_offset,
1638                to_tier,
1639            } => {
1640                self.promote_segment(&topic, partition, base_offset, to_tier)
1641                    .await
1642            }
1643            MigrationTask::Compact {
1644                topic,
1645                partition,
1646                base_offset,
1647            } => self.compact_segment(&topic, partition, base_offset).await,
1648        }
1649    }
1650
1651    /// Demote a segment to a cooler tier
1652    async fn demote_segment(
1653        &self,
1654        topic: &str,
1655        partition: u32,
1656        base_offset: u64,
1657        from_tier: StorageTier,
1658    ) -> Result<()> {
1659        let to_tier = match from_tier.demote() {
1660            Some(t) => t,
1661            None => return Ok(()), // Already at coldest tier
1662        };
1663
1664        // Get segment data from current tier
1665        let data = match from_tier {
1666            StorageTier::Hot => self.hot_tier.remove(topic, partition, base_offset).await,
1667            StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1668            StorageTier::Cold => None,
1669        };
1670
1671        let data = match data {
1672            Some(d) => d,
1673            None => return Ok(()), // Segment already gone
1674        };
1675
1676        // Get metadata for end_offset
1677        let metadata = self
1678            .get_segment_metadata(topic, partition, base_offset)
1679            .await;
1680        let end_offset = metadata
1681            .as_ref()
1682            .map(|m| m.end_offset)
1683            .unwrap_or(base_offset);
1684
1685        // Write to new tier
1686        match to_tier {
1687            StorageTier::Warm => {
1688                self.warm_tier
1689                    .store(topic, partition, base_offset, end_offset, &data)
1690                    .await?;
1691            }
1692            StorageTier::Cold => {
1693                let key = format!("{}/{}/{:020}", topic, partition, base_offset);
1694                self.cold_storage.upload(&key, &data).await?;
1695                self.stats.cold_writes.fetch_add(1, Ordering::Relaxed);
1696
1697                // Remove from warm tier
1698                self.warm_tier.remove(topic, partition, base_offset).await?;
1699            }
1700            StorageTier::Hot => unreachable!(),
1701        }
1702
1703        // Update metadata
1704        if let Some(meta) = metadata {
1705            // Create new metadata with updated tier (since SegmentMetadata.tier is not atomic)
1706            let new_meta = Arc::new(SegmentMetadata {
1707                topic: meta.topic.clone(),
1708                partition: meta.partition,
1709                base_offset: meta.base_offset,
1710                end_offset: meta.end_offset,
1711                size_bytes: meta.size_bytes,
1712                tier: to_tier,
1713                created_at: meta.created_at,
1714                last_accessed: AtomicU64::new(meta.last_accessed.load(Ordering::Relaxed)),
1715                access_count: AtomicU64::new(meta.access_count.load(Ordering::Relaxed)),
1716                dead_records: AtomicU64::new(meta.dead_records.load(Ordering::Relaxed)),
1717                total_records: meta.total_records,
1718            });
1719
1720            let mut index = self.segment_index.write().await;
1721            index.insert((topic.to_string(), partition, base_offset), new_meta);
1722        }
1723
1724        tracing::debug!(
1725            "Demoted segment {}/{}/{} from {:?} to {:?}",
1726            topic,
1727            partition,
1728            base_offset,
1729            from_tier,
1730            to_tier
1731        );
1732
1733        Ok(())
1734    }
1735
1736    /// Promote a segment to a hotter tier
1737    async fn promote_segment(
1738        &self,
1739        topic: &str,
1740        partition: u32,
1741        base_offset: u64,
1742        to_tier: StorageTier,
1743    ) -> Result<()> {
1744        // Get current tier from metadata
1745        let metadata = match self
1746            .get_segment_metadata(topic, partition, base_offset)
1747            .await
1748        {
1749            Some(m) => m,
1750            None => return Ok(()),
1751        };
1752
1753        let from_tier = metadata.tier;
1754
1755        // Get data from current tier
1756        let data = match from_tier {
1757            StorageTier::Cold => {
1758                let key = metadata.segment_key();
1759                self.cold_storage.download(&key).await?
1760            }
1761            StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1762            StorageTier::Hot => return Ok(()), // Already hot
1763        };
1764
1765        let data = match data {
1766            Some(d) => d,
1767            None => return Ok(()),
1768        };
1769
1770        // Write to new tier
1771        match to_tier {
1772            StorageTier::Hot => {
1773                self.hot_tier
1774                    .insert(topic, partition, base_offset, data)
1775                    .await;
1776            }
1777            StorageTier::Warm => {
1778                self.warm_tier
1779                    .store(topic, partition, base_offset, metadata.end_offset, &data)
1780                    .await?;
1781            }
1782            StorageTier::Cold => unreachable!(),
1783        }
1784
1785        // Update metadata
1786        let new_meta = Arc::new(SegmentMetadata {
1787            topic: metadata.topic.clone(),
1788            partition: metadata.partition,
1789            base_offset: metadata.base_offset,
1790            end_offset: metadata.end_offset,
1791            size_bytes: metadata.size_bytes,
1792            tier: to_tier,
1793            created_at: metadata.created_at,
1794            last_accessed: AtomicU64::new(metadata.last_accessed.load(Ordering::Relaxed)),
1795            access_count: AtomicU64::new(0), // Reset access count after promotion
1796            dead_records: AtomicU64::new(metadata.dead_records.load(Ordering::Relaxed)),
1797            total_records: metadata.total_records,
1798        });
1799
1800        {
1801            let mut index = self.segment_index.write().await;
1802            index.insert((topic.to_string(), partition, base_offset), new_meta);
1803        }
1804
1805        tracing::debug!(
1806            "Promoted segment {}/{}/{} from {:?} to {:?}",
1807            topic,
1808            partition,
1809            base_offset,
1810            from_tier,
1811            to_tier
1812        );
1813
1814        Ok(())
1815    }
1816
1817    /// Compact a segment (remove tombstones, keep latest per key)
1818    ///
1819    /// Kafka-style log compaction:
1820    /// 1. Read all messages from segment
1821    /// 2. Keep only the latest message per key
1822    /// 3. Remove tombstones (null value = deletion marker)
1823    /// 4. Write compacted segment
1824    /// 5. Update index and delete old segment
1825    async fn compact_segment(&self, topic: &str, partition: u32, base_offset: u64) -> Result<()> {
1826        use std::collections::HashMap;
1827
1828        // Get segment metadata
1829        let metadata = match self
1830            .get_segment_metadata(topic, partition, base_offset)
1831            .await
1832        {
1833            Some(m) => m,
1834            None => {
1835                tracing::debug!(
1836                    "Segment not found for compaction: {}/{}/{}",
1837                    topic,
1838                    partition,
1839                    base_offset
1840                );
1841                return Ok(());
1842            }
1843        };
1844
1845        // Read segment data from current tier
1846        let data = match metadata.tier {
1847            StorageTier::Hot => self.hot_tier.get(topic, partition, base_offset).await,
1848            StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1849            StorageTier::Cold => {
1850                let key = metadata.segment_key();
1851                self.cold_storage.download(&key).await?
1852            }
1853        };
1854
1855        let data = match data {
1856            Some(d) => d,
1857            None => {
1858                tracing::debug!(
1859                    "Segment data not found for compaction: {}/{}/{}",
1860                    topic,
1861                    partition,
1862                    base_offset
1863                );
1864                return Ok(());
1865            }
1866        };
1867
1868        // Parse messages from segment
1869        // Segment format: length-prefixed postcard serialized Messages
1870        let mut messages: Vec<crate::Message> = Vec::new();
1871        let mut cursor = 0;
1872
1873        while cursor < data.len() {
1874            // Read message length (4 bytes)
1875            if cursor + 4 > data.len() {
1876                break;
1877            }
1878            let len = u32::from_be_bytes([
1879                data[cursor],
1880                data[cursor + 1],
1881                data[cursor + 2],
1882                data[cursor + 3],
1883            ]) as usize;
1884            cursor += 4;
1885
1886            if cursor + len > data.len() {
1887                tracing::warn!(
1888                    "Truncated message in segment {}/{}/{}",
1889                    topic,
1890                    partition,
1891                    base_offset
1892                );
1893                break;
1894            }
1895
1896            // Deserialize message
1897            match crate::Message::from_bytes(&data[cursor..cursor + len]) {
1898                Ok(msg) => messages.push(msg),
1899                Err(e) => {
1900                    tracing::warn!("Failed to deserialize message in compaction: {}", e);
1901                }
1902            }
1903            cursor += len;
1904        }
1905
1906        if messages.is_empty() {
1907            tracing::debug!(
1908                "No messages to compact in segment {}/{}/{}",
1909                topic,
1910                partition,
1911                base_offset
1912            );
1913            return Ok(());
1914        }
1915
1916        let original_count = messages.len();
1917
1918        // Compact: keep latest value per key, remove tombstones
1919        let mut key_to_message: HashMap<Option<Bytes>, crate::Message> = HashMap::new();
1920
1921        for msg in messages {
1922            // For keyed messages, keep the latest
1923            // For keyless messages, always keep (append-only semantics)
1924            if msg.key.is_some() {
1925                key_to_message.insert(msg.key.clone(), msg);
1926            } else {
1927                // Keyless messages use offset as synthetic key to preserve all
1928                key_to_message.insert(Some(Bytes::from(msg.offset.to_be_bytes().to_vec())), msg);
1929            }
1930        }
1931
1932        // Filter out tombstones (empty value = deletion marker)
1933        let compacted: Vec<_> = key_to_message
1934            .into_values()
1935            .filter(|msg| !msg.value.is_empty()) // Non-empty value = not a tombstone
1936            .collect();
1937
1938        let compacted_count = compacted.len();
1939
1940        // Only write if compaction reduced size significantly
1941        if compacted_count >= original_count {
1942            tracing::debug!(
1943                "Skipping compaction for {}/{}/{}: no reduction ({} -> {})",
1944                topic,
1945                partition,
1946                base_offset,
1947                original_count,
1948                compacted_count
1949            );
1950            return Ok(());
1951        }
1952
1953        // Serialize compacted messages
1954        let mut compacted_data = Vec::new();
1955        let mut new_end_offset = base_offset;
1956
1957        for msg in &compacted {
1958            let msg_bytes = msg.to_bytes()?;
1959            compacted_data.extend_from_slice(&(msg_bytes.len() as u32).to_be_bytes());
1960            compacted_data.extend_from_slice(&msg_bytes);
1961            new_end_offset = new_end_offset.max(msg.offset + 1);
1962        }
1963
1964        let compacted_bytes = Bytes::from(compacted_data);
1965        let compacted_size = compacted_bytes.len() as u64;
1966        let reduction_ratio = 1.0 - (compacted_count as f64 / original_count as f64);
1967
1968        tracing::info!(
1969            "Compacted segment {}/{}/{}: {} -> {} messages ({:.1}% reduction)",
1970            topic,
1971            partition,
1972            base_offset,
1973            original_count,
1974            compacted_count,
1975            reduction_ratio * 100.0
1976        );
1977
1978        // Write compacted segment to current tier
1979        match metadata.tier {
1980            StorageTier::Hot => {
1981                // Remove old and insert new
1982                self.hot_tier.remove(topic, partition, base_offset).await;
1983                self.hot_tier
1984                    .insert(topic, partition, base_offset, compacted_bytes)
1985                    .await;
1986            }
1987            StorageTier::Warm => {
1988                // Remove and re-store
1989                self.warm_tier.remove(topic, partition, base_offset).await?;
1990                self.warm_tier
1991                    .store(
1992                        topic,
1993                        partition,
1994                        base_offset,
1995                        new_end_offset,
1996                        &compacted_bytes,
1997                    )
1998                    .await?;
1999            }
2000            StorageTier::Cold => {
2001                // Upload compacted version
2002                let key = metadata.segment_key();
2003                self.cold_storage.upload(&key, &compacted_bytes).await?;
2004            }
2005        }
2006
2007        // Update metadata with new size and reset dead records
2008        let new_meta = Arc::new(SegmentMetadata {
2009            topic: metadata.topic.clone(),
2010            partition: metadata.partition,
2011            base_offset: metadata.base_offset,
2012            end_offset: new_end_offset,
2013            size_bytes: compacted_size,
2014            tier: metadata.tier,
2015            created_at: metadata.created_at,
2016            last_accessed: AtomicU64::new(metadata.last_accessed.load(Ordering::Relaxed)),
2017            access_count: AtomicU64::new(metadata.access_count.load(Ordering::Relaxed)),
2018            dead_records: AtomicU64::new(0), // Reset after compaction
2019            total_records: compacted_count as u64,
2020        });
2021
2022        {
2023            let mut index = self.segment_index.write().await;
2024            index.insert((topic.to_string(), partition, base_offset), new_meta);
2025        }
2026
2027        Ok(())
2028    }
2029
2030    /// Shutdown the tiered storage system
2031    pub async fn shutdown(&self) {
2032        let _ = self.shutdown.send(());
2033    }
2034}
2035
2036/// Statistics for tiered storage
2037pub struct TieredStorageStats {
2038    pub hot_reads: AtomicU64,
2039    pub warm_reads: AtomicU64,
2040    pub cold_reads: AtomicU64,
2041    pub hot_writes: AtomicU64,
2042    pub warm_writes: AtomicU64,
2043    pub cold_writes: AtomicU64,
2044    pub total_bytes_read: AtomicU64,
2045    pub total_bytes_written: AtomicU64,
2046    pub migrations_completed: AtomicU64,
2047    pub migrations_failed: AtomicU64,
2048}
2049
2050impl TieredStorageStats {
2051    fn new() -> Self {
2052        Self {
2053            hot_reads: AtomicU64::new(0),
2054            warm_reads: AtomicU64::new(0),
2055            cold_reads: AtomicU64::new(0),
2056            hot_writes: AtomicU64::new(0),
2057            warm_writes: AtomicU64::new(0),
2058            cold_writes: AtomicU64::new(0),
2059            total_bytes_read: AtomicU64::new(0),
2060            total_bytes_written: AtomicU64::new(0),
2061            migrations_completed: AtomicU64::new(0),
2062            migrations_failed: AtomicU64::new(0),
2063        }
2064    }
2065}
2066
2067#[derive(Debug, Clone)]
2068pub struct TieredStorageStatsSnapshot {
2069    pub hot_tier: HotTierStats,
2070    pub warm_tier: WarmTierStats,
2071    pub hot_reads: u64,
2072    pub warm_reads: u64,
2073    pub cold_reads: u64,
2074    pub hot_writes: u64,
2075    pub warm_writes: u64,
2076    pub cold_writes: u64,
2077    pub total_bytes_read: u64,
2078    pub total_bytes_written: u64,
2079    pub migrations_completed: u64,
2080    pub migrations_failed: u64,
2081}
2082
2083#[cfg(test)]
2084mod tests {
2085    use super::*;
2086    use tempfile::TempDir;
2087
2088    #[tokio::test]
2089    async fn test_hot_tier_insert_and_get() {
2090        let hot = HotTier::new(1024 * 1024); // 1 MB
2091
2092        let data = Bytes::from("test data");
2093        hot.insert("topic1", 0, 0, data.clone()).await;
2094
2095        let retrieved = hot.get("topic1", 0, 0).await;
2096        assert_eq!(retrieved, Some(data));
2097    }
2098
2099    #[tokio::test]
2100    async fn test_hot_tier_lru_eviction() {
2101        let hot = HotTier::new(100); // Very small
2102
2103        // Insert more data than fits
2104        hot.insert("topic1", 0, 0, Bytes::from(vec![0u8; 40])).await;
2105        hot.insert("topic1", 0, 1, Bytes::from(vec![1u8; 40])).await;
2106        hot.insert("topic1", 0, 2, Bytes::from(vec![2u8; 40])).await;
2107
2108        // First segment should be evicted
2109        assert!(hot.get("topic1", 0, 0).await.is_none());
2110        // Later segments should still be there
2111        assert!(hot.get("topic1", 0, 1).await.is_some());
2112        assert!(hot.get("topic1", 0, 2).await.is_some());
2113    }
2114
2115    #[tokio::test]
2116    async fn test_warm_tier_store_and_read() {
2117        let temp_dir = TempDir::new().unwrap();
2118        let warm = WarmTier::new(temp_dir.path().to_path_buf(), 1024 * 1024 * 1024).unwrap();
2119
2120        let data = b"warm tier test data";
2121        warm.store("topic1", 0, 0, 100, data).await.unwrap();
2122
2123        let retrieved = warm.read("topic1", 0, 0).await.unwrap();
2124        assert_eq!(retrieved, Some(Bytes::from(&data[..])));
2125    }
2126
2127    #[tokio::test]
2128    async fn test_local_fs_cold_storage() {
2129        let temp_dir = TempDir::new().unwrap();
2130        let cold = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
2131
2132        let key = "topic1/0/00000000000000000000";
2133        let data = b"cold storage test data";
2134
2135        cold.upload(key, data).await.unwrap();
2136        assert!(cold.exists(key).await.unwrap());
2137
2138        let retrieved = cold.download(key).await.unwrap();
2139        assert_eq!(retrieved, Some(Bytes::from(&data[..])));
2140
2141        cold.delete(key).await.unwrap();
2142        assert!(!cold.exists(key).await.unwrap());
2143    }
2144
2145    #[tokio::test]
2146    async fn test_tiered_storage_write_and_read() {
2147        let temp_dir = TempDir::new().unwrap();
2148
2149        let config = TieredStorageConfig {
2150            enabled: true,
2151            hot_tier_max_bytes: 1024 * 1024,
2152            warm_tier_path: temp_dir.path().join("warm").to_string_lossy().to_string(),
2153            cold_storage: ColdStorageConfig::LocalFs {
2154                path: temp_dir.path().join("cold").to_string_lossy().to_string(),
2155            },
2156            migration_interval_secs: 3600, // Disable auto migration
2157            ..Default::default()
2158        };
2159
2160        let storage = TieredStorage::new(config).await.unwrap();
2161
2162        // Write data
2163        let data = Bytes::from("test message data");
2164        storage
2165            .write("topic1", 0, 0, 10, data.clone())
2166            .await
2167            .unwrap();
2168
2169        // Read data back
2170        let results = storage.read("topic1", 0, 0, 1024).await.unwrap();
2171        assert_eq!(results.len(), 1);
2172        assert_eq!(results[0].1, data);
2173
2174        // Check stats
2175        let stats = storage.stats();
2176        assert_eq!(stats.hot_writes, 1);
2177        assert_eq!(stats.hot_reads, 1);
2178
2179        storage.shutdown().await;
2180    }
2181
2182    #[tokio::test]
2183    async fn test_storage_tier_demote_promote() {
2184        assert_eq!(StorageTier::Hot.demote(), Some(StorageTier::Warm));
2185        assert_eq!(StorageTier::Warm.demote(), Some(StorageTier::Cold));
2186        assert_eq!(StorageTier::Cold.demote(), None);
2187
2188        assert_eq!(StorageTier::Hot.promote(), None);
2189        assert_eq!(StorageTier::Warm.promote(), Some(StorageTier::Hot));
2190        assert_eq!(StorageTier::Cold.promote(), Some(StorageTier::Warm));
2191    }
2192
2193    #[tokio::test]
2194    async fn test_segment_metadata() {
2195        let meta = SegmentMetadata::new("topic1".to_string(), 0, 0, 100, 1024, StorageTier::Hot);
2196
2197        assert_eq!(meta.segment_key(), "topic1/0/00000000000000000000");
2198        assert!(meta.age_secs() <= 1);
2199
2200        meta.record_access();
2201        assert_eq!(meta.access_count.load(Ordering::Relaxed), 1);
2202
2203        meta.dead_records.store(50, Ordering::Relaxed);
2204        assert!((meta.compaction_ratio() - 0.5).abs() < 0.01);
2205    }
2206
2207    #[tokio::test]
2208    async fn test_segment_compaction() {
2209        use crate::Message;
2210
2211        let temp_dir = TempDir::new().unwrap();
2212
2213        let config = TieredStorageConfig {
2214            enabled: true,
2215            hot_tier_max_bytes: 10 * 1024 * 1024, // 10 MB
2216            warm_tier_path: temp_dir.path().join("warm").to_string_lossy().to_string(),
2217            cold_storage: ColdStorageConfig::LocalFs {
2218                path: temp_dir.path().join("cold").to_string_lossy().to_string(),
2219            },
2220            migration_interval_secs: 3600,
2221            compaction_threshold: 0.1, // Low threshold for testing
2222            ..Default::default()
2223        };
2224
2225        let storage = TieredStorage::new(config).await.unwrap();
2226
2227        // Create messages with duplicate keys
2228        let mut segment_data = Vec::new();
2229
2230        // Message 1: key=A, value=v1
2231        let msg1 = Message::with_key(Bytes::from("A"), Bytes::from("value1"));
2232        let msg1_bytes = msg1.to_bytes().unwrap();
2233        segment_data.extend_from_slice(&(msg1_bytes.len() as u32).to_be_bytes());
2234        segment_data.extend_from_slice(&msg1_bytes);
2235
2236        // Message 2: key=B, value=v1
2237        let msg2 = Message::with_key(Bytes::from("B"), Bytes::from("value1"));
2238        let msg2_bytes = msg2.to_bytes().unwrap();
2239        segment_data.extend_from_slice(&(msg2_bytes.len() as u32).to_be_bytes());
2240        segment_data.extend_from_slice(&msg2_bytes);
2241
2242        // Message 3: key=A, value=v2 (update)
2243        let msg3 = Message::with_key(Bytes::from("A"), Bytes::from("value2"));
2244        let msg3_bytes = msg3.to_bytes().unwrap();
2245        segment_data.extend_from_slice(&(msg3_bytes.len() as u32).to_be_bytes());
2246        segment_data.extend_from_slice(&msg3_bytes);
2247
2248        // Message 4: key=B, value="" (tombstone/delete)
2249        let msg4 = Message::with_key(Bytes::from("B"), Bytes::from(""));
2250        let msg4_bytes = msg4.to_bytes().unwrap();
2251        segment_data.extend_from_slice(&(msg4_bytes.len() as u32).to_be_bytes());
2252        segment_data.extend_from_slice(&msg4_bytes);
2253
2254        // Write segment
2255        let segment_bytes = Bytes::from(segment_data);
2256        storage
2257            .write("compaction-test", 0, 0, 4, segment_bytes)
2258            .await
2259            .unwrap();
2260
2261        // Get metadata and trigger compaction
2262        let meta = storage
2263            .get_segment_metadata("compaction-test", 0, 0)
2264            .await
2265            .unwrap();
2266
2267        // Simulate dead records (2 out of 4 will be removed)
2268        meta.dead_records.store(2, Ordering::Relaxed);
2269
2270        // Run compaction
2271        storage
2272            .compact_segment("compaction-test", 0, 0)
2273            .await
2274            .unwrap();
2275
2276        // Verify compacted segment has fewer messages
2277        let meta_after = storage
2278            .get_segment_metadata("compaction-test", 0, 0)
2279            .await
2280            .unwrap();
2281        assert!(
2282            meta_after.total_records < 4,
2283            "Compaction should reduce message count"
2284        );
2285
2286        storage.shutdown().await;
2287    }
2288
2289    #[tokio::test]
2290    async fn test_compaction_preserves_keyless_messages() {
2291        use crate::Message;
2292
2293        let temp_dir = TempDir::new().unwrap();
2294
2295        let config = TieredStorageConfig {
2296            enabled: true,
2297            hot_tier_max_bytes: 10 * 1024 * 1024,
2298            warm_tier_path: temp_dir.path().join("warm").to_string_lossy().to_string(),
2299            cold_storage: ColdStorageConfig::LocalFs {
2300                path: temp_dir.path().join("cold").to_string_lossy().to_string(),
2301            },
2302            migration_interval_secs: 3600,
2303            ..Default::default()
2304        };
2305
2306        let storage = TieredStorage::new(config).await.unwrap();
2307
2308        // Create keyless messages (should all be preserved)
2309        let mut segment_data = Vec::new();
2310
2311        for i in 0..5 {
2312            let mut msg = Message::new(Bytes::from(format!("value{}", i)));
2313            msg.offset = i;
2314            let msg_bytes = msg.to_bytes().unwrap();
2315            segment_data.extend_from_slice(&(msg_bytes.len() as u32).to_be_bytes());
2316            segment_data.extend_from_slice(&msg_bytes);
2317        }
2318
2319        let segment_bytes = Bytes::from(segment_data);
2320        storage
2321            .write("keyless-test", 0, 0, 5, segment_bytes)
2322            .await
2323            .unwrap();
2324
2325        // Run compaction
2326        storage.compact_segment("keyless-test", 0, 0).await.unwrap();
2327
2328        // All keyless messages should be preserved (no dedup for keyless)
2329        let meta_after = storage
2330            .get_segment_metadata("keyless-test", 0, 0)
2331            .await
2332            .unwrap();
2333        assert_eq!(
2334            meta_after.total_records, 5,
2335            "Keyless messages should all be preserved"
2336        );
2337
2338        storage.shutdown().await;
2339    }
2340
2341    #[tokio::test]
2342    async fn test_local_fs_cold_storage_path_traversal_protection() {
2343        let temp_dir = TempDir::new().unwrap();
2344        let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
2345
2346        // Valid keys should work
2347        assert!(storage.key_to_path("valid/key/path").is_ok());
2348        assert!(storage.key_to_path("simple-key").is_ok());
2349        assert!(storage.key_to_path("key_with_underscores").is_ok());
2350
2351        // Path traversal attempts should fail
2352        assert!(storage.key_to_path("../escape").is_err());
2353        assert!(storage.key_to_path("valid/../escape").is_err());
2354        assert!(storage.key_to_path("..").is_err());
2355        assert!(storage.key_to_path("foo/../../bar").is_err());
2356
2357        // Absolute paths should fail
2358        assert!(storage.key_to_path("/etc/passwd").is_err());
2359        assert!(storage.key_to_path("\\Windows\\System32").is_err());
2360
2361        // Null bytes should fail (could bypass checks on some systems)
2362        assert!(storage.key_to_path("valid\0.txt").is_err());
2363    }
2364
2365    #[tokio::test]
2366    async fn test_local_fs_cold_storage_operations_with_safe_keys() {
2367        let temp_dir = TempDir::new().unwrap();
2368        let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
2369
2370        // Upload should work with safe keys
2371        let data = b"test data";
2372        storage.upload("test/key", data).await.unwrap();
2373
2374        // Download should work
2375        let downloaded = storage.download("test/key").await.unwrap();
2376        assert_eq!(downloaded, Some(Bytes::from_static(data)));
2377
2378        // Exists should work
2379        assert!(storage.exists("test/key").await.unwrap());
2380        assert!(!storage.exists("nonexistent").await.unwrap());
2381
2382        // Delete should work
2383        storage.delete("test/key").await.unwrap();
2384        assert!(!storage.exists("test/key").await.unwrap());
2385    }
2386
2387    #[tokio::test]
2388    async fn test_local_fs_cold_storage_rejects_malicious_upload() {
2389        let temp_dir = TempDir::new().unwrap();
2390        let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
2391
2392        // Attempting to upload with path traversal should fail
2393        let result = storage.upload("../malicious", b"pwned").await;
2394        assert!(result.is_err());
2395
2396        // The file should NOT exist outside the storage directory
2397        let escaped_path = temp_dir.path().parent().unwrap().join("malicious");
2398        assert!(!escaped_path.exists());
2399    }
2400}