1use bytes::Bytes;
16use std::collections::{BTreeMap, HashMap, VecDeque};
17use std::path::PathBuf;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
21use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
22use tokio::time::interval;
23
24use crate::{Error, Result};
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
28pub enum StorageTier {
29 Hot,
31 Warm,
33 Cold,
35}
36
37impl StorageTier {
38 pub fn name(&self) -> &'static str {
40 match self {
41 StorageTier::Hot => "hot",
42 StorageTier::Warm => "warm",
43 StorageTier::Cold => "cold",
44 }
45 }
46
47 pub fn demote(&self) -> Option<StorageTier> {
49 match self {
50 StorageTier::Hot => Some(StorageTier::Warm),
51 StorageTier::Warm => Some(StorageTier::Cold),
52 StorageTier::Cold => None,
53 }
54 }
55
56 pub fn promote(&self) -> Option<StorageTier> {
58 match self {
59 StorageTier::Hot => None,
60 StorageTier::Warm => Some(StorageTier::Hot),
61 StorageTier::Cold => Some(StorageTier::Warm),
62 }
63 }
64}
65
66#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
68pub struct TieredStorageConfig {
69 #[serde(default)]
71 pub enabled: bool,
72 #[serde(default = "default_hot_tier_max_bytes")]
74 pub hot_tier_max_bytes: u64,
75 #[serde(default = "default_hot_tier_max_age_secs")]
77 pub hot_tier_max_age_secs: u64,
78 #[serde(default = "default_warm_tier_max_bytes")]
80 pub warm_tier_max_bytes: u64,
81 #[serde(default = "default_warm_tier_max_age_secs")]
83 pub warm_tier_max_age_secs: u64,
84 #[serde(default = "default_warm_tier_path")]
86 pub warm_tier_path: String,
87 #[serde(default)]
89 pub cold_storage: ColdStorageConfig,
90 #[serde(default = "default_migration_interval_secs")]
92 pub migration_interval_secs: u64,
93 #[serde(default = "default_migration_concurrency")]
95 pub migration_concurrency: usize,
96 #[serde(default = "default_enable_promotion")]
98 pub enable_promotion: bool,
99 #[serde(default = "default_promotion_threshold")]
101 pub promotion_threshold: u64,
102 #[serde(default = "default_compaction_threshold")]
104 pub compaction_threshold: f64,
105}
106
107fn default_hot_tier_max_bytes() -> u64 {
108 1024 * 1024 * 1024
109} fn default_hot_tier_max_age_secs() -> u64 {
111 3600
112} fn default_warm_tier_max_bytes() -> u64 {
114 100 * 1024 * 1024 * 1024
115} fn default_warm_tier_max_age_secs() -> u64 {
117 86400 * 7
118} fn default_warm_tier_path() -> String {
120 "/var/lib/rivven/warm".to_string()
121}
122fn default_migration_interval_secs() -> u64 {
123 60
124}
125fn default_migration_concurrency() -> usize {
126 4
127}
128fn default_enable_promotion() -> bool {
129 true
130}
131fn default_promotion_threshold() -> u64 {
132 100
133}
134fn default_compaction_threshold() -> f64 {
135 0.5
136}
137
138impl Default for TieredStorageConfig {
139 fn default() -> Self {
140 Self {
141 enabled: false,
142 hot_tier_max_bytes: default_hot_tier_max_bytes(),
143 hot_tier_max_age_secs: default_hot_tier_max_age_secs(),
144 warm_tier_max_bytes: default_warm_tier_max_bytes(),
145 warm_tier_max_age_secs: default_warm_tier_max_age_secs(),
146 warm_tier_path: default_warm_tier_path(),
147 cold_storage: ColdStorageConfig::default(),
148 migration_interval_secs: default_migration_interval_secs(),
149 migration_concurrency: default_migration_concurrency(),
150 enable_promotion: default_enable_promotion(),
151 promotion_threshold: default_promotion_threshold(),
152 compaction_threshold: default_compaction_threshold(),
153 }
154 }
155}
156
157impl TieredStorageConfig {
158 pub fn hot_tier_max_age(&self) -> Duration {
160 Duration::from_secs(self.hot_tier_max_age_secs)
161 }
162
163 pub fn warm_tier_max_age(&self) -> Duration {
165 Duration::from_secs(self.warm_tier_max_age_secs)
166 }
167
168 pub fn warm_tier_path_buf(&self) -> PathBuf {
170 PathBuf::from(&self.warm_tier_path)
171 }
172
173 pub fn migration_interval(&self) -> Duration {
175 Duration::from_secs(self.migration_interval_secs)
176 }
177
178 pub fn high_performance() -> Self {
180 Self {
181 enabled: true,
182 hot_tier_max_bytes: 8 * 1024 * 1024 * 1024, hot_tier_max_age_secs: 7200, warm_tier_max_bytes: 500 * 1024 * 1024 * 1024, migration_interval_secs: 30,
186 ..Default::default()
187 }
188 }
189
190 pub fn cost_optimized() -> Self {
192 Self {
193 enabled: true,
194 hot_tier_max_bytes: 256 * 1024 * 1024, hot_tier_max_age_secs: 300, warm_tier_max_bytes: 10 * 1024 * 1024 * 1024, warm_tier_max_age_secs: 86400, migration_interval_secs: 120,
199 enable_promotion: false,
200 ..Default::default()
201 }
202 }
203
204 pub fn testing() -> Self {
206 Self {
207 enabled: true,
208 hot_tier_max_bytes: 1024 * 1024, hot_tier_max_age_secs: 5, warm_tier_max_bytes: 10 * 1024 * 1024, warm_tier_max_age_secs: 10, migration_interval_secs: 1,
213 migration_concurrency: 2,
214 enable_promotion: true,
215 promotion_threshold: 3,
216 compaction_threshold: 0.3,
217 ..Default::default()
218 }
219 }
220}
221
222#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
224#[serde(tag = "type", rename_all = "snake_case")]
225pub enum ColdStorageConfig {
226 LocalFs {
228 #[serde(default = "default_cold_storage_path")]
229 path: String,
230 },
231 S3 {
233 endpoint: Option<String>,
235 bucket: String,
237 region: String,
239 access_key: Option<String>,
241 secret_key: Option<String>,
243 #[serde(default)]
245 use_path_style: bool,
246 },
247 Gcs {
249 bucket: String,
251 service_account_path: Option<String>,
253 },
254 AzureBlob {
256 account: String,
258 container: String,
260 access_key: Option<String>,
262 },
263 Disabled,
265}
266
267fn default_cold_storage_path() -> String {
268 "/var/lib/rivven/cold".to_string()
269}
270
271impl Default for ColdStorageConfig {
272 fn default() -> Self {
273 ColdStorageConfig::LocalFs {
274 path: default_cold_storage_path(),
275 }
276 }
277}
278
279impl ColdStorageConfig {
280 pub fn local_fs_path(&self) -> Option<PathBuf> {
282 match self {
283 ColdStorageConfig::LocalFs { path } => Some(PathBuf::from(path)),
284 _ => None,
285 }
286 }
287}
288
289#[derive(Debug)]
291pub struct SegmentMetadata {
292 pub topic: String,
294 pub partition: u32,
296 pub base_offset: u64,
298 pub end_offset: u64,
300 pub size_bytes: u64,
302 pub tier: StorageTier,
304 pub created_at: u64,
306 pub last_accessed: AtomicU64,
308 pub access_count: AtomicU64,
310 pub dead_records: AtomicU64,
312 pub total_records: u64,
314}
315
316impl SegmentMetadata {
317 pub fn new(
318 topic: String,
319 partition: u32,
320 base_offset: u64,
321 end_offset: u64,
322 size_bytes: u64,
323 tier: StorageTier,
324 ) -> Self {
325 let now = SystemTime::now()
326 .duration_since(UNIX_EPOCH)
327 .unwrap_or_default()
328 .as_secs();
329
330 Self {
331 topic,
332 partition,
333 base_offset,
334 end_offset,
335 size_bytes,
336 tier,
337 created_at: now,
338 last_accessed: AtomicU64::new(now),
339 access_count: AtomicU64::new(0),
340 dead_records: AtomicU64::new(0),
341 total_records: (end_offset - base_offset),
342 }
343 }
344
345 pub fn record_access(&self) {
347 let now = SystemTime::now()
348 .duration_since(UNIX_EPOCH)
349 .unwrap_or_default()
350 .as_secs();
351 self.last_accessed.store(now, Ordering::Relaxed);
352 self.access_count.fetch_add(1, Ordering::Relaxed);
353 }
354
355 pub fn age_secs(&self) -> u64 {
357 let now = SystemTime::now()
358 .duration_since(UNIX_EPOCH)
359 .unwrap_or_default()
360 .as_secs();
361 now.saturating_sub(self.created_at)
362 }
363
364 pub fn idle_secs(&self) -> u64 {
366 let now = SystemTime::now()
367 .duration_since(UNIX_EPOCH)
368 .unwrap_or_default()
369 .as_secs();
370 now.saturating_sub(self.last_accessed.load(Ordering::Relaxed))
371 }
372
373 pub fn compaction_ratio(&self) -> f64 {
375 let dead = self.dead_records.load(Ordering::Relaxed);
376 if self.total_records == 0 {
377 0.0
378 } else {
379 dead as f64 / self.total_records as f64
380 }
381 }
382
383 pub fn segment_key(&self) -> String {
385 format!("{}/{}/{:020}", self.topic, self.partition, self.base_offset)
386 }
387}
388
389#[derive(Debug)]
391pub struct HotTier {
392 segments: RwLock<HashMap<(String, u32, u64), Bytes>>,
394 lru_order: Mutex<VecDeque<(String, u32, u64)>>,
396 current_size: AtomicU64,
398 max_size: u64,
400}
401
402impl HotTier {
403 pub fn new(max_size: u64) -> Self {
404 Self {
405 segments: RwLock::new(HashMap::new()),
406 lru_order: Mutex::new(VecDeque::new()),
407 current_size: AtomicU64::new(0),
408 max_size,
409 }
410 }
411
412 pub async fn insert(&self, topic: &str, partition: u32, base_offset: u64, data: Bytes) -> bool {
414 let size = data.len() as u64;
415
416 if size > self.max_size {
418 return false;
419 }
420
421 loop {
423 let current = self.current_size.load(Ordering::Acquire);
424 if current + size > self.max_size {
425 if !self.evict_one().await {
427 return false; }
429 continue; }
431 if self
433 .current_size
434 .compare_exchange_weak(current, current + size, Ordering::AcqRel, Ordering::Acquire)
435 .is_ok()
436 {
437 break; }
439 }
441
442 let key = (topic.to_string(), partition, base_offset);
443
444 {
446 let mut segments = self.segments.write().await;
447 if let Some(old) = segments.insert(key.clone(), data) {
448 self.current_size
450 .fetch_sub(old.len() as u64, Ordering::Relaxed);
451 }
452 }
453
454 {
456 let mut lru = self.lru_order.lock().await;
457 lru.retain(|k| k != &key);
458 lru.push_back(key);
459 }
460
461 true
462 }
463
464 pub async fn get(&self, topic: &str, partition: u32, base_offset: u64) -> Option<Bytes> {
466 let key = (topic.to_string(), partition, base_offset);
467
468 let data = {
469 let segments = self.segments.read().await;
470 segments.get(&key).cloned()
471 };
472
473 if data.is_some() {
474 let mut lru = self.lru_order.lock().await;
476 lru.retain(|k| k != &key);
477 lru.push_back(key);
478 }
479
480 data
481 }
482
483 pub async fn remove(&self, topic: &str, partition: u32, base_offset: u64) -> Option<Bytes> {
485 let key = (topic.to_string(), partition, base_offset);
486
487 let removed = {
488 let mut segments = self.segments.write().await;
489 segments.remove(&key)
490 };
491
492 if let Some(ref data) = removed {
493 self.current_size
494 .fetch_sub(data.len() as u64, Ordering::Relaxed);
495 let mut lru = self.lru_order.lock().await;
496 lru.retain(|k| k != &key);
497 }
498
499 removed
500 }
501
502 async fn evict_one(&self) -> bool {
504 let to_evict = {
505 let mut lru = self.lru_order.lock().await;
506 lru.pop_front()
507 };
508
509 if let Some(key) = to_evict {
510 let removed = {
511 let mut segments = self.segments.write().await;
512 segments.remove(&key)
513 };
514
515 if let Some(data) = removed {
516 self.current_size
517 .fetch_sub(data.len() as u64, Ordering::Relaxed);
518 return true;
519 }
520 }
521
522 false
523 }
524
525 pub fn stats(&self) -> HotTierStats {
527 HotTierStats {
528 current_size: self.current_size.load(Ordering::Relaxed),
529 max_size: self.max_size,
530 }
531 }
532}
533
534#[derive(Debug, Clone)]
535pub struct HotTierStats {
536 pub current_size: u64,
537 pub max_size: u64,
538}
539
540#[derive(Debug)]
542pub struct WarmTier {
543 base_path: PathBuf,
545 segments: RwLock<BTreeMap<(String, u32, u64), Arc<SegmentMetadata>>>,
547 current_size: AtomicU64,
549 max_size: u64,
551}
552
553impl WarmTier {
554 pub fn new(base_path: PathBuf, max_size: u64) -> Result<Self> {
555 std::fs::create_dir_all(&base_path)?;
556
557 Ok(Self {
558 base_path,
559 segments: RwLock::new(BTreeMap::new()),
560 current_size: AtomicU64::new(0),
561 max_size,
562 })
563 }
564
565 fn segment_path(&self, topic: &str, partition: u32, base_offset: u64) -> PathBuf {
567 self.base_path
568 .join(topic)
569 .join(format!("{}", partition))
570 .join(format!("{:020}.segment", base_offset))
571 }
572
573 pub async fn store(
575 &self,
576 topic: &str,
577 partition: u32,
578 base_offset: u64,
579 end_offset: u64,
580 data: &[u8],
581 ) -> Result<()> {
582 let size = data.len() as u64;
583
584 while self.current_size.load(Ordering::Relaxed) + size > self.max_size {
586 if !self.evict_oldest().await {
587 return Err(crate::Error::Other(format!(
589 "warm tier full ({} / {} bytes), cannot store segment",
590 self.current_size.load(Ordering::Relaxed),
591 self.max_size
592 )));
593 }
594 }
595
596 let path = self.segment_path(topic, partition, base_offset);
597
598 if let Some(parent) = path.parent() {
600 tokio::fs::create_dir_all(parent).await?;
601 }
602
603 tokio::fs::write(&path, data).await?;
605
606 let metadata = Arc::new(SegmentMetadata::new(
608 topic.to_string(),
609 partition,
610 base_offset,
611 end_offset,
612 size,
613 StorageTier::Warm,
614 ));
615
616 {
617 let mut segments = self.segments.write().await;
618 segments.insert((topic.to_string(), partition, base_offset), metadata);
619 }
620
621 self.current_size.fetch_add(size, Ordering::Relaxed);
622
623 Ok(())
624 }
625
626 pub async fn read(
628 &self,
629 topic: &str,
630 partition: u32,
631 base_offset: u64,
632 ) -> Result<Option<Bytes>> {
633 let path = self.segment_path(topic, partition, base_offset);
634
635 if !path.exists() {
636 return Ok(None);
637 }
638
639 let file = std::fs::File::open(&path)?;
641 let mmap = unsafe { memmap2::Mmap::map(&file)? };
644
645 let key = (topic.to_string(), partition, base_offset);
647 if let Some(meta) = self.segments.read().await.get(&key) {
648 meta.record_access();
649 }
650
651 Ok(Some(Bytes::copy_from_slice(&mmap)))
654 }
655
656 pub async fn remove(&self, topic: &str, partition: u32, base_offset: u64) -> Result<()> {
658 let path = self.segment_path(topic, partition, base_offset);
659 let key = (topic.to_string(), partition, base_offset);
660
661 let size = {
662 let mut segments = self.segments.write().await;
663 segments.remove(&key).map(|m| m.size_bytes)
664 };
665
666 if let Some(size) = size {
667 self.current_size.fetch_sub(size, Ordering::Relaxed);
668 }
669
670 if path.exists() {
671 tokio::fs::remove_file(path).await?;
672 }
673
674 Ok(())
675 }
676
677 pub async fn get_demotion_candidates(&self, max_age: Duration) -> Vec<(String, u32, u64)> {
679 let max_age_secs = max_age.as_secs();
680 let segments = self.segments.read().await;
681
682 segments
683 .iter()
684 .filter(|(_, meta)| meta.age_secs() > max_age_secs)
685 .map(|(key, _)| key.clone())
686 .collect()
687 }
688
689 pub async fn get_metadata(
691 &self,
692 topic: &str,
693 partition: u32,
694 base_offset: u64,
695 ) -> Option<Arc<SegmentMetadata>> {
696 let key = (topic.to_string(), partition, base_offset);
697 self.segments.read().await.get(&key).cloned()
698 }
699
700 pub fn stats(&self) -> WarmTierStats {
701 WarmTierStats {
702 current_size: self.current_size.load(Ordering::Relaxed),
703 max_size: self.max_size,
704 }
705 }
706
707 async fn evict_oldest(&self) -> bool {
710 let to_evict = {
711 let segments = self.segments.read().await;
712 segments
713 .iter()
714 .min_by_key(|(_, meta)| meta.created_at)
715 .map(|(key, _)| key.clone())
716 };
717
718 if let Some((topic, partition, base_offset)) = to_evict {
719 tracing::debug!(
720 topic = %topic,
721 partition,
722 base_offset,
723 "Evicting warm tier segment to free space"
724 );
725 let _ = self.remove(&topic, partition, base_offset).await;
727 true
728 } else {
729 false
730 }
731 }
732}
733
734#[derive(Debug, Clone)]
735pub struct WarmTierStats {
736 pub current_size: u64,
737 pub max_size: u64,
738}
739
740#[async_trait::async_trait]
742pub trait ColdStorageBackend: Send + Sync {
743 async fn upload(&self, key: &str, data: &[u8]) -> Result<()>;
745
746 async fn download(&self, key: &str) -> Result<Option<Bytes>>;
748
749 async fn delete(&self, key: &str) -> Result<()>;
751
752 async fn list(&self, prefix: &str) -> Result<Vec<String>>;
754
755 async fn exists(&self, key: &str) -> Result<bool>;
757}
758
759pub struct LocalFsColdStorage {
761 base_path: PathBuf,
762}
763
764impl LocalFsColdStorage {
765 pub fn new(base_path: PathBuf) -> Result<Self> {
766 std::fs::create_dir_all(&base_path)?;
767 let base_path = base_path.canonicalize()?;
769 Ok(Self { base_path })
770 }
771
772 fn key_to_path(&self, key: &str) -> Result<PathBuf> {
779 if key.contains("..") || key.starts_with('/') || key.starts_with('\\') {
781 return Err(Error::Other(format!(
782 "Invalid key: path traversal attempt detected: {}",
783 key
784 )));
785 }
786
787 if key.contains('\0') {
789 return Err(Error::Other("Invalid key: null byte not allowed".into()));
790 }
791
792 let path = self
793 .base_path
794 .join(key.replace('/', std::path::MAIN_SEPARATOR_STR));
795
796 if let Ok(canonical) = path.canonicalize() {
799 if !canonical.starts_with(&self.base_path) {
800 return Err(Error::Other(format!(
801 "Invalid key: path escapes base directory: {}",
802 key
803 )));
804 }
805 }
806 Ok(path)
810 }
811}
812
813#[async_trait::async_trait]
814impl ColdStorageBackend for LocalFsColdStorage {
815 async fn upload(&self, key: &str, data: &[u8]) -> Result<()> {
816 let path = self.key_to_path(key)?;
817 if let Some(parent) = path.parent() {
818 tokio::fs::create_dir_all(parent).await?;
819 }
820 tokio::fs::write(&path, data).await?;
821 Ok(())
822 }
823
824 async fn download(&self, key: &str) -> Result<Option<Bytes>> {
825 let path = self.key_to_path(key)?;
826 if !path.exists() {
827 return Ok(None);
828 }
829 let data = tokio::fs::read(&path).await?;
830 Ok(Some(Bytes::from(data)))
831 }
832
833 async fn delete(&self, key: &str) -> Result<()> {
834 let path = self.key_to_path(key)?;
835 if path.exists() {
836 tokio::fs::remove_file(path).await?;
837 }
838 Ok(())
839 }
840
841 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
842 let base = self.key_to_path(prefix)?;
843 let mut keys = Vec::new();
844
845 if !base.exists() {
846 return Ok(keys);
847 }
848
849 fn walk_dir(
850 dir: &std::path::Path,
851 base: &std::path::Path,
852 keys: &mut Vec<String>,
853 ) -> std::io::Result<()> {
854 if dir.is_dir() {
855 for entry in std::fs::read_dir(dir)? {
856 let entry = entry?;
857 let path = entry.path();
858 if path.is_dir() {
859 walk_dir(&path, base, keys)?;
860 } else if let Ok(rel) = path.strip_prefix(base) {
861 keys.push(
862 rel.to_string_lossy()
863 .replace(std::path::MAIN_SEPARATOR, "/"),
864 );
865 }
866 }
867 }
868 Ok(())
869 }
870
871 walk_dir(&self.base_path, &self.base_path, &mut keys)?;
872 Ok(keys)
873 }
874
875 async fn exists(&self, key: &str) -> Result<bool> {
876 Ok(self.key_to_path(key)?.exists())
877 }
878}
879
880pub struct DisabledColdStorage;
882
883#[async_trait::async_trait]
884impl ColdStorageBackend for DisabledColdStorage {
885 async fn upload(&self, _key: &str, _data: &[u8]) -> Result<()> {
886 Err(Error::Other("Cold storage is disabled".into()))
887 }
888
889 async fn download(&self, _key: &str) -> Result<Option<Bytes>> {
890 Ok(None)
891 }
892
893 async fn delete(&self, _key: &str) -> Result<()> {
894 Ok(())
895 }
896
897 async fn list(&self, _prefix: &str) -> Result<Vec<String>> {
898 Ok(Vec::new())
899 }
900
901 async fn exists(&self, _key: &str) -> Result<bool> {
902 Ok(false)
903 }
904}
905
906#[cfg(feature = "cloud-storage")]
915pub struct ObjectStoreColdStorage {
916 store: Arc<dyn object_store::ObjectStore>,
917 prefix: String,
919}
920
921#[cfg(feature = "cloud-storage")]
922impl ObjectStoreColdStorage {
923 #[cfg(feature = "s3")]
925 pub fn s3(
926 bucket: &str,
927 region: &str,
928 endpoint: Option<&str>,
929 access_key: Option<&str>,
930 secret_key: Option<&str>,
931 use_path_style: bool,
932 ) -> Result<Self> {
933 use object_store::aws::AmazonS3Builder;
934
935 let mut builder = AmazonS3Builder::new()
936 .with_bucket_name(bucket)
937 .with_region(region);
938
939 if let Some(endpoint) = endpoint {
940 builder = builder.with_endpoint(endpoint);
941 }
942
943 if let (Some(key), Some(secret)) = (access_key, secret_key) {
944 builder = builder
945 .with_access_key_id(key)
946 .with_secret_access_key(secret);
947 }
948
949 if use_path_style {
950 builder = builder.with_virtual_hosted_style_request(false);
951 }
952
953 let store = builder
954 .build()
955 .map_err(|e| Error::Other(format!("Failed to create S3 client: {}", e)))?;
956
957 Ok(Self {
958 store: Arc::new(store),
959 prefix: String::new(),
960 })
961 }
962
963 #[cfg(feature = "s3")]
965 pub fn minio(endpoint: &str, bucket: &str, access_key: &str, secret_key: &str) -> Result<Self> {
966 Self::s3(
967 bucket,
968 "us-east-1", Some(endpoint),
970 Some(access_key),
971 Some(secret_key),
972 true, )
974 }
975
976 #[cfg(feature = "gcs")]
978 pub fn gcs(bucket: &str, service_account_path: Option<&std::path::Path>) -> Result<Self> {
979 use object_store::gcp::GoogleCloudStorageBuilder;
980
981 let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket);
982
983 if let Some(path) = service_account_path {
984 builder = builder.with_service_account_path(path.to_string_lossy());
985 }
986
987 let store = builder
988 .build()
989 .map_err(|e| Error::Other(format!("Failed to create GCS client: {}", e)))?;
990
991 Ok(Self {
992 store: Arc::new(store),
993 prefix: String::new(),
994 })
995 }
996
997 #[cfg(feature = "azure")]
999 pub fn azure(account: &str, container: &str, access_key: Option<&str>) -> Result<Self> {
1000 use object_store::azure::MicrosoftAzureBuilder;
1001
1002 let mut builder = MicrosoftAzureBuilder::new()
1003 .with_account(account)
1004 .with_container_name(container);
1005
1006 if let Some(key) = access_key {
1007 builder = builder.with_access_key(key);
1008 }
1009
1010 let store = builder
1011 .build()
1012 .map_err(|e| Error::Other(format!("Failed to create Azure Blob client: {}", e)))?;
1013
1014 Ok(Self {
1015 store: Arc::new(store),
1016 prefix: String::new(),
1017 })
1018 }
1019
1020 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
1022 self.prefix = prefix.into();
1023 if !self.prefix.is_empty() && !self.prefix.ends_with('/') {
1024 self.prefix.push('/');
1025 }
1026 self
1027 }
1028
1029 fn full_path(&self, key: &str) -> object_store::path::Path {
1030 object_store::path::Path::from(format!("{}{}", self.prefix, key))
1031 }
1032}
1033
1034#[cfg(feature = "cloud-storage")]
1035#[async_trait::async_trait]
1036impl ColdStorageBackend for ObjectStoreColdStorage {
1037 async fn upload(&self, key: &str, data: &[u8]) -> Result<()> {
1038 use object_store::ObjectStore;
1039
1040 let path = self.full_path(key);
1041 let payload = object_store::PutPayload::from(data.to_vec());
1042
1043 self.store
1044 .put(&path, payload)
1045 .await
1046 .map_err(|e| Error::Other(format!("Failed to upload to object store: {}", e)))?;
1047
1048 Ok(())
1049 }
1050
1051 async fn download(&self, key: &str) -> Result<Option<Bytes>> {
1052 use object_store::ObjectStore;
1053
1054 let path = self.full_path(key);
1055
1056 match self.store.get(&path).await {
1057 Ok(result) => {
1058 let bytes = result
1059 .bytes()
1060 .await
1061 .map_err(|e| Error::Other(format!("Failed to read object: {}", e)))?;
1062 Ok(Some(bytes))
1063 }
1064 Err(object_store::Error::NotFound { .. }) => Ok(None),
1065 Err(e) => Err(Error::Other(format!(
1066 "Failed to download from object store: {}",
1067 e
1068 ))),
1069 }
1070 }
1071
1072 async fn delete(&self, key: &str) -> Result<()> {
1073 use object_store::ObjectStore;
1074
1075 let path = self.full_path(key);
1076
1077 match self.store.delete(&path).await {
1079 Ok(()) => Ok(()),
1080 Err(object_store::Error::NotFound { .. }) => Ok(()),
1081 Err(e) => Err(Error::Other(format!(
1082 "Failed to delete from object store: {}",
1083 e
1084 ))),
1085 }
1086 }
1087
1088 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
1089 use futures::StreamExt;
1090 use object_store::ObjectStore;
1091
1092 let full_prefix = self.full_path(prefix);
1093 let mut stream = self.store.list(Some(&full_prefix));
1094 let mut keys = Vec::new();
1095
1096 while let Some(result) = stream.next().await {
1097 match result {
1098 Ok(meta) => {
1099 let key = meta.location.to_string();
1100 if let Some(relative) = key.strip_prefix(&self.prefix) {
1102 keys.push(relative.to_string());
1103 } else {
1104 keys.push(key);
1105 }
1106 }
1107 Err(e) => {
1108 return Err(Error::Other(format!("Failed to list objects: {}", e)));
1109 }
1110 }
1111 }
1112
1113 Ok(keys)
1114 }
1115
1116 async fn exists(&self, key: &str) -> Result<bool> {
1117 use object_store::ObjectStore;
1118
1119 let path = self.full_path(key);
1120
1121 match self.store.head(&path).await {
1122 Ok(_) => Ok(true),
1123 Err(object_store::Error::NotFound { .. }) => Ok(false),
1124 Err(e) => Err(Error::Other(format!(
1125 "Failed to check object existence: {}",
1126 e
1127 ))),
1128 }
1129 }
1130}
1131
1132#[derive(Debug)]
1134enum MigrationTask {
1135 Demote {
1136 topic: String,
1137 partition: u32,
1138 base_offset: u64,
1139 from_tier: StorageTier,
1140 },
1141 Promote {
1142 topic: String,
1143 partition: u32,
1144 base_offset: u64,
1145 to_tier: StorageTier,
1146 },
1147 Compact {
1148 topic: String,
1149 partition: u32,
1150 base_offset: u64,
1151 },
1152}
1153
1154pub struct TieredStorage {
1156 config: TieredStorageConfig,
1157 hot_tier: Arc<HotTier>,
1158 warm_tier: Arc<WarmTier>,
1159 cold_storage: Arc<dyn ColdStorageBackend>,
1160 segment_index: RwLock<BTreeMap<(String, u32, u64), Arc<SegmentMetadata>>>,
1162 migration_tx: mpsc::Sender<MigrationTask>,
1164 stats: Arc<TieredStorageStats>,
1166 shutdown: tokio::sync::broadcast::Sender<()>,
1168}
1169
1170impl std::fmt::Debug for TieredStorage {
1171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1172 f.debug_struct("TieredStorage")
1173 .field("config", &self.config)
1174 .field("hot_tier", &self.hot_tier)
1175 .field("warm_tier", &self.warm_tier)
1176 .field("cold_storage", &"<dyn ColdStorageBackend>")
1177 .finish()
1178 }
1179}
1180
1181impl TieredStorage {
1182 pub async fn new(config: TieredStorageConfig) -> Result<Arc<Self>> {
1184 let hot_tier = Arc::new(HotTier::new(config.hot_tier_max_bytes));
1185 let warm_tier = Arc::new(WarmTier::new(
1186 config.warm_tier_path_buf(),
1187 config.warm_tier_max_bytes,
1188 )?);
1189
1190 let cold_storage: Arc<dyn ColdStorageBackend> = match &config.cold_storage {
1191 ColdStorageConfig::LocalFs { path } => {
1192 Arc::new(LocalFsColdStorage::new(PathBuf::from(path))?)
1193 }
1194 ColdStorageConfig::Disabled => Arc::new(DisabledColdStorage),
1195
1196 #[cfg(feature = "s3")]
1197 ColdStorageConfig::S3 {
1198 endpoint,
1199 bucket,
1200 region,
1201 access_key,
1202 secret_key,
1203 use_path_style,
1204 } => Arc::new(ObjectStoreColdStorage::s3(
1205 bucket,
1206 region,
1207 endpoint.as_deref(),
1208 access_key.as_deref(),
1209 secret_key.as_deref(),
1210 *use_path_style,
1211 )?),
1212
1213 #[cfg(not(feature = "s3"))]
1214 ColdStorageConfig::S3 { .. } => {
1215 return Err(Error::Other(
1216 "S3 cold storage requires the 's3' feature flag".into(),
1217 ));
1218 }
1219
1220 #[cfg(feature = "gcs")]
1221 ColdStorageConfig::Gcs {
1222 bucket,
1223 service_account_path,
1224 } => Arc::new(ObjectStoreColdStorage::gcs(
1225 bucket,
1226 service_account_path.as_ref().map(std::path::Path::new),
1227 )?),
1228
1229 #[cfg(not(feature = "gcs"))]
1230 ColdStorageConfig::Gcs { .. } => {
1231 return Err(Error::Other(
1232 "GCS cold storage requires the 'gcs' feature flag".into(),
1233 ));
1234 }
1235
1236 #[cfg(feature = "azure")]
1237 ColdStorageConfig::AzureBlob {
1238 account,
1239 container,
1240 access_key,
1241 } => Arc::new(ObjectStoreColdStorage::azure(
1242 account,
1243 container,
1244 access_key.as_deref(),
1245 )?),
1246
1247 #[cfg(not(feature = "azure"))]
1248 ColdStorageConfig::AzureBlob { .. } => {
1249 return Err(Error::Other(
1250 "Azure Blob cold storage requires the 'azure' feature flag".into(),
1251 ));
1252 }
1253 };
1254
1255 let (migration_tx, migration_rx) = mpsc::channel(1024);
1256 let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
1257
1258 let storage = Arc::new(Self {
1259 config: config.clone(),
1260 hot_tier,
1261 warm_tier,
1262 cold_storage,
1263 segment_index: RwLock::new(BTreeMap::new()),
1264 migration_tx,
1265 stats: Arc::new(TieredStorageStats::new()),
1266 shutdown: shutdown_tx,
1267 });
1268
1269 storage.clone().start_migration_worker(migration_rx);
1271
1272 storage.clone().start_tier_manager();
1274
1275 Ok(storage)
1276 }
1277
1278 pub async fn write(
1280 &self,
1281 topic: &str,
1282 partition: u32,
1283 base_offset: u64,
1284 end_offset: u64,
1285 data: Bytes,
1286 ) -> Result<()> {
1287 let size = data.len() as u64;
1288
1289 let inserted = self
1291 .hot_tier
1292 .insert(topic, partition, base_offset, data.clone())
1293 .await;
1294
1295 if !inserted {
1296 self.warm_tier
1298 .store(topic, partition, base_offset, end_offset, &data)
1299 .await?;
1300 self.stats.warm_writes.fetch_add(1, Ordering::Relaxed);
1301 } else {
1302 self.stats.hot_writes.fetch_add(1, Ordering::Relaxed);
1303 }
1304
1305 let metadata = Arc::new(SegmentMetadata::new(
1307 topic.to_string(),
1308 partition,
1309 base_offset,
1310 end_offset,
1311 size,
1312 if inserted {
1313 StorageTier::Hot
1314 } else {
1315 StorageTier::Warm
1316 },
1317 ));
1318
1319 {
1320 let mut index = self.segment_index.write().await;
1321 index.insert((topic.to_string(), partition, base_offset), metadata);
1322 }
1323
1324 self.stats
1325 .total_bytes_written
1326 .fetch_add(size, Ordering::Relaxed);
1327
1328 Ok(())
1329 }
1330
1331 pub async fn read(
1333 &self,
1334 topic: &str,
1335 partition: u32,
1336 start_offset: u64,
1337 max_bytes: usize,
1338 ) -> Result<Vec<(u64, Bytes)>> {
1339 let _start = Instant::now();
1340 let mut results = Vec::new();
1341 let mut bytes_collected = 0;
1342
1343 let segments = {
1345 let index = self.segment_index.read().await;
1346 index
1347 .range((topic.to_string(), partition, 0)..(topic.to_string(), partition, u64::MAX))
1348 .filter(|(_, meta)| meta.end_offset > start_offset)
1349 .map(|(k, v)| (k.clone(), v.clone()))
1350 .collect::<Vec<_>>()
1351 };
1352
1353 for ((_, _, base_offset), metadata) in segments {
1354 if bytes_collected >= max_bytes {
1355 break;
1356 }
1357
1358 metadata.record_access();
1359
1360 let data = match metadata.tier {
1362 StorageTier::Hot => {
1363 if let Some(data) = self.hot_tier.get(topic, partition, base_offset).await {
1364 self.stats.hot_reads.fetch_add(1, Ordering::Relaxed);
1365 Some(data)
1366 } else {
1367 None
1369 }
1370 }
1371 _ => None,
1372 };
1373
1374 let data = match data {
1375 Some(d) => d,
1376 None => {
1377 if let Some(data) = self.warm_tier.read(topic, partition, base_offset).await? {
1379 self.stats.warm_reads.fetch_add(1, Ordering::Relaxed);
1380
1381 if self.config.enable_promotion {
1383 let access_count = metadata.access_count.load(Ordering::Relaxed);
1384 if access_count >= self.config.promotion_threshold {
1385 let _ = self
1386 .migration_tx
1387 .send(MigrationTask::Promote {
1388 topic: topic.to_string(),
1389 partition,
1390 base_offset,
1391 to_tier: StorageTier::Hot,
1392 })
1393 .await;
1394 }
1395 }
1396
1397 data
1398 } else {
1399 let key = metadata.segment_key();
1401 if let Some(data) = self.cold_storage.download(&key).await? {
1402 self.stats.cold_reads.fetch_add(1, Ordering::Relaxed);
1403
1404 if self.config.enable_promotion {
1406 let access_count = metadata.access_count.load(Ordering::Relaxed);
1407 if access_count >= self.config.promotion_threshold {
1408 let _ = self
1409 .migration_tx
1410 .send(MigrationTask::Promote {
1411 topic: topic.to_string(),
1412 partition,
1413 base_offset,
1414 to_tier: StorageTier::Warm,
1415 })
1416 .await;
1417 }
1418 }
1419
1420 data
1421 } else {
1422 continue; }
1424 }
1425 }
1426 };
1427
1428 results.push((base_offset, data.clone()));
1429 bytes_collected += data.len();
1430 }
1431
1432 self.stats
1433 .total_bytes_read
1434 .fetch_add(bytes_collected as u64, Ordering::Relaxed);
1435
1436 Ok(results)
1437 }
1438
1439 pub async fn get_segment_metadata(
1441 &self,
1442 topic: &str,
1443 partition: u32,
1444 base_offset: u64,
1445 ) -> Option<Arc<SegmentMetadata>> {
1446 self.segment_index
1447 .read()
1448 .await
1449 .get(&(topic.to_string(), partition, base_offset))
1450 .cloned()
1451 }
1452
1453 pub async fn flush_hot_tier(&self, topic: &str, partition: u32) -> Result<()> {
1455 let segments: Vec<_> = {
1456 let index = self.segment_index.read().await;
1457 index
1458 .range((topic.to_string(), partition, 0)..(topic.to_string(), partition, u64::MAX))
1459 .filter(|(_, meta)| meta.tier == StorageTier::Hot)
1460 .map(|(k, _)| k.2)
1461 .collect()
1462 };
1463
1464 for base_offset in segments {
1465 let _ = self
1466 .migration_tx
1467 .send(MigrationTask::Demote {
1468 topic: topic.to_string(),
1469 partition,
1470 base_offset,
1471 from_tier: StorageTier::Hot,
1472 })
1473 .await;
1474 }
1475
1476 Ok(())
1477 }
1478
1479 pub fn stats(&self) -> TieredStorageStatsSnapshot {
1481 TieredStorageStatsSnapshot {
1482 hot_tier: self.hot_tier.stats(),
1483 warm_tier: self.warm_tier.stats(),
1484 hot_reads: self.stats.hot_reads.load(Ordering::Relaxed),
1485 warm_reads: self.stats.warm_reads.load(Ordering::Relaxed),
1486 cold_reads: self.stats.cold_reads.load(Ordering::Relaxed),
1487 hot_writes: self.stats.hot_writes.load(Ordering::Relaxed),
1488 warm_writes: self.stats.warm_writes.load(Ordering::Relaxed),
1489 cold_writes: self.stats.cold_writes.load(Ordering::Relaxed),
1490 total_bytes_read: self.stats.total_bytes_read.load(Ordering::Relaxed),
1491 total_bytes_written: self.stats.total_bytes_written.load(Ordering::Relaxed),
1492 migrations_completed: self.stats.migrations_completed.load(Ordering::Relaxed),
1493 migrations_failed: self.stats.migrations_failed.load(Ordering::Relaxed),
1494 }
1495 }
1496
1497 fn start_migration_worker(self: Arc<Self>, mut rx: mpsc::Receiver<MigrationTask>) {
1499 let semaphore = Arc::new(Semaphore::new(self.config.migration_concurrency));
1500 let mut shutdown_rx = self.shutdown.subscribe();
1501
1502 tokio::spawn(async move {
1503 loop {
1504 tokio::select! {
1505 Some(task) = rx.recv() => {
1506 let permit = semaphore.clone().acquire_owned().await.expect("semaphore closed unexpectedly");
1509 let storage = self.clone();
1510
1511 tokio::spawn(async move {
1512 let result = storage.execute_migration(task).await;
1513 if result.is_ok() {
1514 storage.stats.migrations_completed.fetch_add(1, Ordering::Relaxed);
1515 } else {
1516 storage.stats.migrations_failed.fetch_add(1, Ordering::Relaxed);
1517 }
1518 drop(permit);
1519 });
1520 }
1521 _ = shutdown_rx.recv() => {
1522 break;
1523 }
1524 }
1525 }
1526 });
1527 }
1528
1529 fn start_tier_manager(self: Arc<Self>) {
1531 let mut shutdown_rx = self.shutdown.subscribe();
1532 let migration_interval = self.config.migration_interval();
1533
1534 tokio::spawn(async move {
1535 let mut ticker = interval(migration_interval);
1536
1537 loop {
1538 tokio::select! {
1539 _ = ticker.tick() => {
1540 if let Err(e) = self.check_tier_migrations().await {
1541 tracing::warn!("Tier migration check failed: {}", e);
1542 }
1543 }
1544 _ = shutdown_rx.recv() => {
1545 break;
1546 }
1547 }
1548 }
1549 });
1550 }
1551
1552 async fn check_tier_migrations(&self) -> Result<()> {
1554 let hot_max_age = self.config.hot_tier_max_age();
1555 let warm_max_age = self.config.warm_tier_max_age();
1556
1557 let hot_candidates: Vec<_> = {
1559 let index = self.segment_index.read().await;
1560 index
1561 .iter()
1562 .filter(|(_, meta)| {
1563 meta.tier == StorageTier::Hot
1564 && Duration::from_secs(meta.age_secs()) > hot_max_age
1565 })
1566 .map(|(k, _)| k.clone())
1567 .collect()
1568 };
1569
1570 for (topic, partition, base_offset) in hot_candidates {
1571 let _ = self
1572 .migration_tx
1573 .send(MigrationTask::Demote {
1574 topic,
1575 partition,
1576 base_offset,
1577 from_tier: StorageTier::Hot,
1578 })
1579 .await;
1580 }
1581
1582 let warm_candidates = self.warm_tier.get_demotion_candidates(warm_max_age).await;
1584
1585 for (topic, partition, base_offset) in warm_candidates {
1586 let _ = self
1587 .migration_tx
1588 .send(MigrationTask::Demote {
1589 topic,
1590 partition,
1591 base_offset,
1592 from_tier: StorageTier::Warm,
1593 })
1594 .await;
1595 }
1596
1597 let compaction_threshold = self.config.compaction_threshold;
1599 let compaction_candidates: Vec<_> = {
1600 let index = self.segment_index.read().await;
1601 index
1602 .iter()
1603 .filter(|(_, meta)| meta.compaction_ratio() > compaction_threshold)
1604 .map(|(k, _)| k.clone())
1605 .collect()
1606 };
1607
1608 for (topic, partition, base_offset) in compaction_candidates {
1609 let _ = self
1610 .migration_tx
1611 .send(MigrationTask::Compact {
1612 topic,
1613 partition,
1614 base_offset,
1615 })
1616 .await;
1617 }
1618
1619 Ok(())
1620 }
1621
1622 async fn execute_migration(&self, task: MigrationTask) -> Result<()> {
1624 match task {
1625 MigrationTask::Demote {
1626 topic,
1627 partition,
1628 base_offset,
1629 from_tier,
1630 } => {
1631 self.demote_segment(&topic, partition, base_offset, from_tier)
1632 .await
1633 }
1634 MigrationTask::Promote {
1635 topic,
1636 partition,
1637 base_offset,
1638 to_tier,
1639 } => {
1640 self.promote_segment(&topic, partition, base_offset, to_tier)
1641 .await
1642 }
1643 MigrationTask::Compact {
1644 topic,
1645 partition,
1646 base_offset,
1647 } => self.compact_segment(&topic, partition, base_offset).await,
1648 }
1649 }
1650
1651 async fn demote_segment(
1653 &self,
1654 topic: &str,
1655 partition: u32,
1656 base_offset: u64,
1657 from_tier: StorageTier,
1658 ) -> Result<()> {
1659 let to_tier = match from_tier.demote() {
1660 Some(t) => t,
1661 None => return Ok(()), };
1663
1664 let data = match from_tier {
1666 StorageTier::Hot => self.hot_tier.remove(topic, partition, base_offset).await,
1667 StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1668 StorageTier::Cold => None,
1669 };
1670
1671 let data = match data {
1672 Some(d) => d,
1673 None => return Ok(()), };
1675
1676 let metadata = self
1678 .get_segment_metadata(topic, partition, base_offset)
1679 .await;
1680 let end_offset = metadata
1681 .as_ref()
1682 .map(|m| m.end_offset)
1683 .unwrap_or(base_offset);
1684
1685 match to_tier {
1687 StorageTier::Warm => {
1688 self.warm_tier
1689 .store(topic, partition, base_offset, end_offset, &data)
1690 .await?;
1691 }
1692 StorageTier::Cold => {
1693 let key = format!("{}/{}/{:020}", topic, partition, base_offset);
1694 self.cold_storage.upload(&key, &data).await?;
1695 self.stats.cold_writes.fetch_add(1, Ordering::Relaxed);
1696
1697 self.warm_tier.remove(topic, partition, base_offset).await?;
1699 }
1700 StorageTier::Hot => unreachable!(),
1701 }
1702
1703 if let Some(meta) = metadata {
1705 let new_meta = Arc::new(SegmentMetadata {
1707 topic: meta.topic.clone(),
1708 partition: meta.partition,
1709 base_offset: meta.base_offset,
1710 end_offset: meta.end_offset,
1711 size_bytes: meta.size_bytes,
1712 tier: to_tier,
1713 created_at: meta.created_at,
1714 last_accessed: AtomicU64::new(meta.last_accessed.load(Ordering::Relaxed)),
1715 access_count: AtomicU64::new(meta.access_count.load(Ordering::Relaxed)),
1716 dead_records: AtomicU64::new(meta.dead_records.load(Ordering::Relaxed)),
1717 total_records: meta.total_records,
1718 });
1719
1720 let mut index = self.segment_index.write().await;
1721 index.insert((topic.to_string(), partition, base_offset), new_meta);
1722 }
1723
1724 tracing::debug!(
1725 "Demoted segment {}/{}/{} from {:?} to {:?}",
1726 topic,
1727 partition,
1728 base_offset,
1729 from_tier,
1730 to_tier
1731 );
1732
1733 Ok(())
1734 }
1735
1736 async fn promote_segment(
1738 &self,
1739 topic: &str,
1740 partition: u32,
1741 base_offset: u64,
1742 to_tier: StorageTier,
1743 ) -> Result<()> {
1744 let metadata = match self
1746 .get_segment_metadata(topic, partition, base_offset)
1747 .await
1748 {
1749 Some(m) => m,
1750 None => return Ok(()),
1751 };
1752
1753 let from_tier = metadata.tier;
1754
1755 let data = match from_tier {
1757 StorageTier::Cold => {
1758 let key = metadata.segment_key();
1759 self.cold_storage.download(&key).await?
1760 }
1761 StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1762 StorageTier::Hot => return Ok(()), };
1764
1765 let data = match data {
1766 Some(d) => d,
1767 None => return Ok(()),
1768 };
1769
1770 match to_tier {
1772 StorageTier::Hot => {
1773 self.hot_tier
1774 .insert(topic, partition, base_offset, data)
1775 .await;
1776 }
1777 StorageTier::Warm => {
1778 self.warm_tier
1779 .store(topic, partition, base_offset, metadata.end_offset, &data)
1780 .await?;
1781 }
1782 StorageTier::Cold => unreachable!(),
1783 }
1784
1785 let new_meta = Arc::new(SegmentMetadata {
1787 topic: metadata.topic.clone(),
1788 partition: metadata.partition,
1789 base_offset: metadata.base_offset,
1790 end_offset: metadata.end_offset,
1791 size_bytes: metadata.size_bytes,
1792 tier: to_tier,
1793 created_at: metadata.created_at,
1794 last_accessed: AtomicU64::new(metadata.last_accessed.load(Ordering::Relaxed)),
1795 access_count: AtomicU64::new(0), dead_records: AtomicU64::new(metadata.dead_records.load(Ordering::Relaxed)),
1797 total_records: metadata.total_records,
1798 });
1799
1800 {
1801 let mut index = self.segment_index.write().await;
1802 index.insert((topic.to_string(), partition, base_offset), new_meta);
1803 }
1804
1805 tracing::debug!(
1806 "Promoted segment {}/{}/{} from {:?} to {:?}",
1807 topic,
1808 partition,
1809 base_offset,
1810 from_tier,
1811 to_tier
1812 );
1813
1814 Ok(())
1815 }
1816
1817 async fn compact_segment(&self, topic: &str, partition: u32, base_offset: u64) -> Result<()> {
1826 use std::collections::HashMap;
1827
1828 let metadata = match self
1830 .get_segment_metadata(topic, partition, base_offset)
1831 .await
1832 {
1833 Some(m) => m,
1834 None => {
1835 tracing::debug!(
1836 "Segment not found for compaction: {}/{}/{}",
1837 topic,
1838 partition,
1839 base_offset
1840 );
1841 return Ok(());
1842 }
1843 };
1844
1845 let data = match metadata.tier {
1847 StorageTier::Hot => self.hot_tier.get(topic, partition, base_offset).await,
1848 StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1849 StorageTier::Cold => {
1850 let key = metadata.segment_key();
1851 self.cold_storage.download(&key).await?
1852 }
1853 };
1854
1855 let data = match data {
1856 Some(d) => d,
1857 None => {
1858 tracing::debug!(
1859 "Segment data not found for compaction: {}/{}/{}",
1860 topic,
1861 partition,
1862 base_offset
1863 );
1864 return Ok(());
1865 }
1866 };
1867
1868 let mut messages: Vec<crate::Message> = Vec::new();
1871 let mut cursor = 0;
1872
1873 while cursor < data.len() {
1874 if cursor + 4 > data.len() {
1876 break;
1877 }
1878 let len = u32::from_be_bytes([
1879 data[cursor],
1880 data[cursor + 1],
1881 data[cursor + 2],
1882 data[cursor + 3],
1883 ]) as usize;
1884 cursor += 4;
1885
1886 if cursor + len > data.len() {
1887 tracing::warn!(
1888 "Truncated message in segment {}/{}/{}",
1889 topic,
1890 partition,
1891 base_offset
1892 );
1893 break;
1894 }
1895
1896 match crate::Message::from_bytes(&data[cursor..cursor + len]) {
1898 Ok(msg) => messages.push(msg),
1899 Err(e) => {
1900 tracing::warn!("Failed to deserialize message in compaction: {}", e);
1901 }
1902 }
1903 cursor += len;
1904 }
1905
1906 if messages.is_empty() {
1907 tracing::debug!(
1908 "No messages to compact in segment {}/{}/{}",
1909 topic,
1910 partition,
1911 base_offset
1912 );
1913 return Ok(());
1914 }
1915
1916 let original_count = messages.len();
1917
1918 let mut key_to_message: HashMap<Option<Bytes>, crate::Message> = HashMap::new();
1920
1921 for msg in messages {
1922 if msg.key.is_some() {
1925 key_to_message.insert(msg.key.clone(), msg);
1926 } else {
1927 key_to_message.insert(Some(Bytes::from(msg.offset.to_be_bytes().to_vec())), msg);
1929 }
1930 }
1931
1932 let compacted: Vec<_> = key_to_message
1934 .into_values()
1935 .filter(|msg| !msg.value.is_empty()) .collect();
1937
1938 let compacted_count = compacted.len();
1939
1940 if compacted_count >= original_count {
1942 tracing::debug!(
1943 "Skipping compaction for {}/{}/{}: no reduction ({} -> {})",
1944 topic,
1945 partition,
1946 base_offset,
1947 original_count,
1948 compacted_count
1949 );
1950 return Ok(());
1951 }
1952
1953 let mut compacted_data = Vec::new();
1955 let mut new_end_offset = base_offset;
1956
1957 for msg in &compacted {
1958 let msg_bytes = msg.to_bytes()?;
1959 compacted_data.extend_from_slice(&(msg_bytes.len() as u32).to_be_bytes());
1960 compacted_data.extend_from_slice(&msg_bytes);
1961 new_end_offset = new_end_offset.max(msg.offset + 1);
1962 }
1963
1964 let compacted_bytes = Bytes::from(compacted_data);
1965 let compacted_size = compacted_bytes.len() as u64;
1966 let reduction_ratio = 1.0 - (compacted_count as f64 / original_count as f64);
1967
1968 tracing::info!(
1969 "Compacted segment {}/{}/{}: {} -> {} messages ({:.1}% reduction)",
1970 topic,
1971 partition,
1972 base_offset,
1973 original_count,
1974 compacted_count,
1975 reduction_ratio * 100.0
1976 );
1977
1978 match metadata.tier {
1980 StorageTier::Hot => {
1981 self.hot_tier.remove(topic, partition, base_offset).await;
1983 self.hot_tier
1984 .insert(topic, partition, base_offset, compacted_bytes)
1985 .await;
1986 }
1987 StorageTier::Warm => {
1988 self.warm_tier.remove(topic, partition, base_offset).await?;
1990 self.warm_tier
1991 .store(
1992 topic,
1993 partition,
1994 base_offset,
1995 new_end_offset,
1996 &compacted_bytes,
1997 )
1998 .await?;
1999 }
2000 StorageTier::Cold => {
2001 let key = metadata.segment_key();
2003 self.cold_storage.upload(&key, &compacted_bytes).await?;
2004 }
2005 }
2006
2007 let new_meta = Arc::new(SegmentMetadata {
2009 topic: metadata.topic.clone(),
2010 partition: metadata.partition,
2011 base_offset: metadata.base_offset,
2012 end_offset: new_end_offset,
2013 size_bytes: compacted_size,
2014 tier: metadata.tier,
2015 created_at: metadata.created_at,
2016 last_accessed: AtomicU64::new(metadata.last_accessed.load(Ordering::Relaxed)),
2017 access_count: AtomicU64::new(metadata.access_count.load(Ordering::Relaxed)),
2018 dead_records: AtomicU64::new(0), total_records: compacted_count as u64,
2020 });
2021
2022 {
2023 let mut index = self.segment_index.write().await;
2024 index.insert((topic.to_string(), partition, base_offset), new_meta);
2025 }
2026
2027 Ok(())
2028 }
2029
2030 pub async fn shutdown(&self) {
2032 let _ = self.shutdown.send(());
2033 }
2034}
2035
2036pub struct TieredStorageStats {
2038 pub hot_reads: AtomicU64,
2039 pub warm_reads: AtomicU64,
2040 pub cold_reads: AtomicU64,
2041 pub hot_writes: AtomicU64,
2042 pub warm_writes: AtomicU64,
2043 pub cold_writes: AtomicU64,
2044 pub total_bytes_read: AtomicU64,
2045 pub total_bytes_written: AtomicU64,
2046 pub migrations_completed: AtomicU64,
2047 pub migrations_failed: AtomicU64,
2048}
2049
2050impl TieredStorageStats {
2051 fn new() -> Self {
2052 Self {
2053 hot_reads: AtomicU64::new(0),
2054 warm_reads: AtomicU64::new(0),
2055 cold_reads: AtomicU64::new(0),
2056 hot_writes: AtomicU64::new(0),
2057 warm_writes: AtomicU64::new(0),
2058 cold_writes: AtomicU64::new(0),
2059 total_bytes_read: AtomicU64::new(0),
2060 total_bytes_written: AtomicU64::new(0),
2061 migrations_completed: AtomicU64::new(0),
2062 migrations_failed: AtomicU64::new(0),
2063 }
2064 }
2065}
2066
2067#[derive(Debug, Clone)]
2068pub struct TieredStorageStatsSnapshot {
2069 pub hot_tier: HotTierStats,
2070 pub warm_tier: WarmTierStats,
2071 pub hot_reads: u64,
2072 pub warm_reads: u64,
2073 pub cold_reads: u64,
2074 pub hot_writes: u64,
2075 pub warm_writes: u64,
2076 pub cold_writes: u64,
2077 pub total_bytes_read: u64,
2078 pub total_bytes_written: u64,
2079 pub migrations_completed: u64,
2080 pub migrations_failed: u64,
2081}
2082
2083#[cfg(test)]
2084mod tests {
2085 use super::*;
2086 use tempfile::TempDir;
2087
2088 #[tokio::test]
2089 async fn test_hot_tier_insert_and_get() {
2090 let hot = HotTier::new(1024 * 1024); let data = Bytes::from("test data");
2093 hot.insert("topic1", 0, 0, data.clone()).await;
2094
2095 let retrieved = hot.get("topic1", 0, 0).await;
2096 assert_eq!(retrieved, Some(data));
2097 }
2098
2099 #[tokio::test]
2100 async fn test_hot_tier_lru_eviction() {
2101 let hot = HotTier::new(100); hot.insert("topic1", 0, 0, Bytes::from(vec![0u8; 40])).await;
2105 hot.insert("topic1", 0, 1, Bytes::from(vec![1u8; 40])).await;
2106 hot.insert("topic1", 0, 2, Bytes::from(vec![2u8; 40])).await;
2107
2108 assert!(hot.get("topic1", 0, 0).await.is_none());
2110 assert!(hot.get("topic1", 0, 1).await.is_some());
2112 assert!(hot.get("topic1", 0, 2).await.is_some());
2113 }
2114
2115 #[tokio::test]
2116 async fn test_warm_tier_store_and_read() {
2117 let temp_dir = TempDir::new().unwrap();
2118 let warm = WarmTier::new(temp_dir.path().to_path_buf(), 1024 * 1024 * 1024).unwrap();
2119
2120 let data = b"warm tier test data";
2121 warm.store("topic1", 0, 0, 100, data).await.unwrap();
2122
2123 let retrieved = warm.read("topic1", 0, 0).await.unwrap();
2124 assert_eq!(retrieved, Some(Bytes::from(&data[..])));
2125 }
2126
2127 #[tokio::test]
2128 async fn test_local_fs_cold_storage() {
2129 let temp_dir = TempDir::new().unwrap();
2130 let cold = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
2131
2132 let key = "topic1/0/00000000000000000000";
2133 let data = b"cold storage test data";
2134
2135 cold.upload(key, data).await.unwrap();
2136 assert!(cold.exists(key).await.unwrap());
2137
2138 let retrieved = cold.download(key).await.unwrap();
2139 assert_eq!(retrieved, Some(Bytes::from(&data[..])));
2140
2141 cold.delete(key).await.unwrap();
2142 assert!(!cold.exists(key).await.unwrap());
2143 }
2144
2145 #[tokio::test]
2146 async fn test_tiered_storage_write_and_read() {
2147 let temp_dir = TempDir::new().unwrap();
2148
2149 let config = TieredStorageConfig {
2150 enabled: true,
2151 hot_tier_max_bytes: 1024 * 1024,
2152 warm_tier_path: temp_dir.path().join("warm").to_string_lossy().to_string(),
2153 cold_storage: ColdStorageConfig::LocalFs {
2154 path: temp_dir.path().join("cold").to_string_lossy().to_string(),
2155 },
2156 migration_interval_secs: 3600, ..Default::default()
2158 };
2159
2160 let storage = TieredStorage::new(config).await.unwrap();
2161
2162 let data = Bytes::from("test message data");
2164 storage
2165 .write("topic1", 0, 0, 10, data.clone())
2166 .await
2167 .unwrap();
2168
2169 let results = storage.read("topic1", 0, 0, 1024).await.unwrap();
2171 assert_eq!(results.len(), 1);
2172 assert_eq!(results[0].1, data);
2173
2174 let stats = storage.stats();
2176 assert_eq!(stats.hot_writes, 1);
2177 assert_eq!(stats.hot_reads, 1);
2178
2179 storage.shutdown().await;
2180 }
2181
2182 #[tokio::test]
2183 async fn test_storage_tier_demote_promote() {
2184 assert_eq!(StorageTier::Hot.demote(), Some(StorageTier::Warm));
2185 assert_eq!(StorageTier::Warm.demote(), Some(StorageTier::Cold));
2186 assert_eq!(StorageTier::Cold.demote(), None);
2187
2188 assert_eq!(StorageTier::Hot.promote(), None);
2189 assert_eq!(StorageTier::Warm.promote(), Some(StorageTier::Hot));
2190 assert_eq!(StorageTier::Cold.promote(), Some(StorageTier::Warm));
2191 }
2192
2193 #[tokio::test]
2194 async fn test_segment_metadata() {
2195 let meta = SegmentMetadata::new("topic1".to_string(), 0, 0, 100, 1024, StorageTier::Hot);
2196
2197 assert_eq!(meta.segment_key(), "topic1/0/00000000000000000000");
2198 assert!(meta.age_secs() <= 1);
2199
2200 meta.record_access();
2201 assert_eq!(meta.access_count.load(Ordering::Relaxed), 1);
2202
2203 meta.dead_records.store(50, Ordering::Relaxed);
2204 assert!((meta.compaction_ratio() - 0.5).abs() < 0.01);
2205 }
2206
2207 #[tokio::test]
2208 async fn test_segment_compaction() {
2209 use crate::Message;
2210
2211 let temp_dir = TempDir::new().unwrap();
2212
2213 let config = TieredStorageConfig {
2214 enabled: true,
2215 hot_tier_max_bytes: 10 * 1024 * 1024, warm_tier_path: temp_dir.path().join("warm").to_string_lossy().to_string(),
2217 cold_storage: ColdStorageConfig::LocalFs {
2218 path: temp_dir.path().join("cold").to_string_lossy().to_string(),
2219 },
2220 migration_interval_secs: 3600,
2221 compaction_threshold: 0.1, ..Default::default()
2223 };
2224
2225 let storage = TieredStorage::new(config).await.unwrap();
2226
2227 let mut segment_data = Vec::new();
2229
2230 let msg1 = Message::with_key(Bytes::from("A"), Bytes::from("value1"));
2232 let msg1_bytes = msg1.to_bytes().unwrap();
2233 segment_data.extend_from_slice(&(msg1_bytes.len() as u32).to_be_bytes());
2234 segment_data.extend_from_slice(&msg1_bytes);
2235
2236 let msg2 = Message::with_key(Bytes::from("B"), Bytes::from("value1"));
2238 let msg2_bytes = msg2.to_bytes().unwrap();
2239 segment_data.extend_from_slice(&(msg2_bytes.len() as u32).to_be_bytes());
2240 segment_data.extend_from_slice(&msg2_bytes);
2241
2242 let msg3 = Message::with_key(Bytes::from("A"), Bytes::from("value2"));
2244 let msg3_bytes = msg3.to_bytes().unwrap();
2245 segment_data.extend_from_slice(&(msg3_bytes.len() as u32).to_be_bytes());
2246 segment_data.extend_from_slice(&msg3_bytes);
2247
2248 let msg4 = Message::with_key(Bytes::from("B"), Bytes::from(""));
2250 let msg4_bytes = msg4.to_bytes().unwrap();
2251 segment_data.extend_from_slice(&(msg4_bytes.len() as u32).to_be_bytes());
2252 segment_data.extend_from_slice(&msg4_bytes);
2253
2254 let segment_bytes = Bytes::from(segment_data);
2256 storage
2257 .write("compaction-test", 0, 0, 4, segment_bytes)
2258 .await
2259 .unwrap();
2260
2261 let meta = storage
2263 .get_segment_metadata("compaction-test", 0, 0)
2264 .await
2265 .unwrap();
2266
2267 meta.dead_records.store(2, Ordering::Relaxed);
2269
2270 storage
2272 .compact_segment("compaction-test", 0, 0)
2273 .await
2274 .unwrap();
2275
2276 let meta_after = storage
2278 .get_segment_metadata("compaction-test", 0, 0)
2279 .await
2280 .unwrap();
2281 assert!(
2282 meta_after.total_records < 4,
2283 "Compaction should reduce message count"
2284 );
2285
2286 storage.shutdown().await;
2287 }
2288
2289 #[tokio::test]
2290 async fn test_compaction_preserves_keyless_messages() {
2291 use crate::Message;
2292
2293 let temp_dir = TempDir::new().unwrap();
2294
2295 let config = TieredStorageConfig {
2296 enabled: true,
2297 hot_tier_max_bytes: 10 * 1024 * 1024,
2298 warm_tier_path: temp_dir.path().join("warm").to_string_lossy().to_string(),
2299 cold_storage: ColdStorageConfig::LocalFs {
2300 path: temp_dir.path().join("cold").to_string_lossy().to_string(),
2301 },
2302 migration_interval_secs: 3600,
2303 ..Default::default()
2304 };
2305
2306 let storage = TieredStorage::new(config).await.unwrap();
2307
2308 let mut segment_data = Vec::new();
2310
2311 for i in 0..5 {
2312 let mut msg = Message::new(Bytes::from(format!("value{}", i)));
2313 msg.offset = i;
2314 let msg_bytes = msg.to_bytes().unwrap();
2315 segment_data.extend_from_slice(&(msg_bytes.len() as u32).to_be_bytes());
2316 segment_data.extend_from_slice(&msg_bytes);
2317 }
2318
2319 let segment_bytes = Bytes::from(segment_data);
2320 storage
2321 .write("keyless-test", 0, 0, 5, segment_bytes)
2322 .await
2323 .unwrap();
2324
2325 storage.compact_segment("keyless-test", 0, 0).await.unwrap();
2327
2328 let meta_after = storage
2330 .get_segment_metadata("keyless-test", 0, 0)
2331 .await
2332 .unwrap();
2333 assert_eq!(
2334 meta_after.total_records, 5,
2335 "Keyless messages should all be preserved"
2336 );
2337
2338 storage.shutdown().await;
2339 }
2340
2341 #[tokio::test]
2342 async fn test_local_fs_cold_storage_path_traversal_protection() {
2343 let temp_dir = TempDir::new().unwrap();
2344 let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
2345
2346 assert!(storage.key_to_path("valid/key/path").is_ok());
2348 assert!(storage.key_to_path("simple-key").is_ok());
2349 assert!(storage.key_to_path("key_with_underscores").is_ok());
2350
2351 assert!(storage.key_to_path("../escape").is_err());
2353 assert!(storage.key_to_path("valid/../escape").is_err());
2354 assert!(storage.key_to_path("..").is_err());
2355 assert!(storage.key_to_path("foo/../../bar").is_err());
2356
2357 assert!(storage.key_to_path("/etc/passwd").is_err());
2359 assert!(storage.key_to_path("\\Windows\\System32").is_err());
2360
2361 assert!(storage.key_to_path("valid\0.txt").is_err());
2363 }
2364
2365 #[tokio::test]
2366 async fn test_local_fs_cold_storage_operations_with_safe_keys() {
2367 let temp_dir = TempDir::new().unwrap();
2368 let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
2369
2370 let data = b"test data";
2372 storage.upload("test/key", data).await.unwrap();
2373
2374 let downloaded = storage.download("test/key").await.unwrap();
2376 assert_eq!(downloaded, Some(Bytes::from_static(data)));
2377
2378 assert!(storage.exists("test/key").await.unwrap());
2380 assert!(!storage.exists("nonexistent").await.unwrap());
2381
2382 storage.delete("test/key").await.unwrap();
2384 assert!(!storage.exists("test/key").await.unwrap());
2385 }
2386
2387 #[tokio::test]
2388 async fn test_local_fs_cold_storage_rejects_malicious_upload() {
2389 let temp_dir = TempDir::new().unwrap();
2390 let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
2391
2392 let result = storage.upload("../malicious", b"pwned").await;
2394 assert!(result.is_err());
2395
2396 let escaped_path = temp_dir.path().parent().unwrap().join("malicious");
2398 assert!(!escaped_path.exists());
2399 }
2400}