1use bytes::Bytes;
16use std::collections::{BTreeMap, HashMap, VecDeque};
17use std::path::PathBuf;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
21use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
22use tokio::time::interval;
23
24use crate::{Error, Result};
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
28pub enum StorageTier {
29 Hot,
31 Warm,
33 Cold,
35}
36
37impl StorageTier {
38 pub fn name(&self) -> &'static str {
40 match self {
41 StorageTier::Hot => "hot",
42 StorageTier::Warm => "warm",
43 StorageTier::Cold => "cold",
44 }
45 }
46
47 pub fn demote(&self) -> Option<StorageTier> {
49 match self {
50 StorageTier::Hot => Some(StorageTier::Warm),
51 StorageTier::Warm => Some(StorageTier::Cold),
52 StorageTier::Cold => None,
53 }
54 }
55
56 pub fn promote(&self) -> Option<StorageTier> {
58 match self {
59 StorageTier::Hot => None,
60 StorageTier::Warm => Some(StorageTier::Hot),
61 StorageTier::Cold => Some(StorageTier::Warm),
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct TieredStorageConfig {
69 pub hot_tier_max_bytes: u64,
71 pub hot_tier_max_age: Duration,
73 pub warm_tier_max_bytes: u64,
75 pub warm_tier_max_age: Duration,
77 pub warm_tier_path: PathBuf,
79 pub cold_storage: ColdStorageConfig,
81 pub migration_interval: Duration,
83 pub migration_concurrency: usize,
85 pub enable_promotion: bool,
87 pub promotion_threshold: u64,
89 pub compaction_threshold: f64,
91}
92
93impl Default for TieredStorageConfig {
94 fn default() -> Self {
95 Self {
96 hot_tier_max_bytes: 1024 * 1024 * 1024, hot_tier_max_age: Duration::from_secs(3600), warm_tier_max_bytes: 100 * 1024 * 1024 * 1024, warm_tier_max_age: Duration::from_secs(86400 * 7), warm_tier_path: PathBuf::from("/var/lib/rivven/warm"),
101 cold_storage: ColdStorageConfig::default(),
102 migration_interval: Duration::from_secs(60),
103 migration_concurrency: 4,
104 enable_promotion: true,
105 promotion_threshold: 100,
106 compaction_threshold: 0.5,
107 }
108 }
109}
110
111impl TieredStorageConfig {
112 pub fn high_performance() -> Self {
114 Self {
115 hot_tier_max_bytes: 8 * 1024 * 1024 * 1024, hot_tier_max_age: Duration::from_secs(7200), warm_tier_max_bytes: 500 * 1024 * 1024 * 1024, migration_interval: Duration::from_secs(30),
119 ..Default::default()
120 }
121 }
122
123 pub fn cost_optimized() -> Self {
125 Self {
126 hot_tier_max_bytes: 256 * 1024 * 1024, hot_tier_max_age: Duration::from_secs(300), warm_tier_max_bytes: 10 * 1024 * 1024 * 1024, warm_tier_max_age: Duration::from_secs(86400), migration_interval: Duration::from_secs(120),
131 enable_promotion: false,
132 ..Default::default()
133 }
134 }
135}
136
137#[derive(Debug, Clone)]
139pub enum ColdStorageConfig {
140 LocalFs { path: PathBuf },
142 S3 {
144 endpoint: String,
145 bucket: String,
146 region: String,
147 access_key: Option<String>,
148 secret_key: Option<String>,
149 use_path_style: bool,
150 },
151 AzureBlob {
153 account: String,
154 container: String,
155 access_key: Option<String>,
156 },
157 Disabled,
159}
160
161impl Default for ColdStorageConfig {
162 fn default() -> Self {
163 ColdStorageConfig::LocalFs {
164 path: PathBuf::from("/var/lib/rivven/cold"),
165 }
166 }
167}
168
169#[derive(Debug)]
171pub struct SegmentMetadata {
172 pub topic: String,
174 pub partition: u32,
176 pub base_offset: u64,
178 pub end_offset: u64,
180 pub size_bytes: u64,
182 pub tier: StorageTier,
184 pub created_at: u64,
186 pub last_accessed: AtomicU64,
188 pub access_count: AtomicU64,
190 pub dead_records: AtomicU64,
192 pub total_records: u64,
194}
195
196impl SegmentMetadata {
197 pub fn new(
198 topic: String,
199 partition: u32,
200 base_offset: u64,
201 end_offset: u64,
202 size_bytes: u64,
203 tier: StorageTier,
204 ) -> Self {
205 let now = SystemTime::now()
206 .duration_since(UNIX_EPOCH)
207 .unwrap_or_default()
208 .as_secs();
209
210 Self {
211 topic,
212 partition,
213 base_offset,
214 end_offset,
215 size_bytes,
216 tier,
217 created_at: now,
218 last_accessed: AtomicU64::new(now),
219 access_count: AtomicU64::new(0),
220 dead_records: AtomicU64::new(0),
221 total_records: (end_offset - base_offset),
222 }
223 }
224
225 pub fn record_access(&self) {
227 let now = SystemTime::now()
228 .duration_since(UNIX_EPOCH)
229 .unwrap_or_default()
230 .as_secs();
231 self.last_accessed.store(now, Ordering::Relaxed);
232 self.access_count.fetch_add(1, Ordering::Relaxed);
233 }
234
235 pub fn age_secs(&self) -> u64 {
237 let now = SystemTime::now()
238 .duration_since(UNIX_EPOCH)
239 .unwrap_or_default()
240 .as_secs();
241 now.saturating_sub(self.created_at)
242 }
243
244 pub fn idle_secs(&self) -> u64 {
246 let now = SystemTime::now()
247 .duration_since(UNIX_EPOCH)
248 .unwrap_or_default()
249 .as_secs();
250 now.saturating_sub(self.last_accessed.load(Ordering::Relaxed))
251 }
252
253 pub fn compaction_ratio(&self) -> f64 {
255 let dead = self.dead_records.load(Ordering::Relaxed);
256 if self.total_records == 0 {
257 0.0
258 } else {
259 dead as f64 / self.total_records as f64
260 }
261 }
262
263 pub fn segment_key(&self) -> String {
265 format!("{}/{}/{:020}", self.topic, self.partition, self.base_offset)
266 }
267}
268
269#[derive(Debug)]
271pub struct HotTier {
272 segments: RwLock<HashMap<(String, u32, u64), Bytes>>,
274 lru_order: Mutex<VecDeque<(String, u32, u64)>>,
276 current_size: AtomicU64,
278 max_size: u64,
280}
281
282impl HotTier {
283 pub fn new(max_size: u64) -> Self {
284 Self {
285 segments: RwLock::new(HashMap::new()),
286 lru_order: Mutex::new(VecDeque::new()),
287 current_size: AtomicU64::new(0),
288 max_size,
289 }
290 }
291
292 pub async fn insert(&self, topic: &str, partition: u32, base_offset: u64, data: Bytes) -> bool {
294 let size = data.len() as u64;
295
296 if size > self.max_size {
298 return false;
299 }
300
301 while self.current_size.load(Ordering::Relaxed) + size > self.max_size {
303 if !self.evict_one().await {
304 break;
305 }
306 }
307
308 let key = (topic.to_string(), partition, base_offset);
309
310 {
312 let mut segments = self.segments.write().await;
313 if let Some(old) = segments.insert(key.clone(), data) {
314 self.current_size
315 .fetch_sub(old.len() as u64, Ordering::Relaxed);
316 }
317 }
318
319 {
321 let mut lru = self.lru_order.lock().await;
322 lru.retain(|k| k != &key);
323 lru.push_back(key);
324 }
325
326 self.current_size.fetch_add(size, Ordering::Relaxed);
327 true
328 }
329
330 pub async fn get(&self, topic: &str, partition: u32, base_offset: u64) -> Option<Bytes> {
332 let key = (topic.to_string(), partition, base_offset);
333
334 let data = {
335 let segments = self.segments.read().await;
336 segments.get(&key).cloned()
337 };
338
339 if data.is_some() {
340 let mut lru = self.lru_order.lock().await;
342 lru.retain(|k| k != &key);
343 lru.push_back(key);
344 }
345
346 data
347 }
348
349 pub async fn remove(&self, topic: &str, partition: u32, base_offset: u64) -> Option<Bytes> {
351 let key = (topic.to_string(), partition, base_offset);
352
353 let removed = {
354 let mut segments = self.segments.write().await;
355 segments.remove(&key)
356 };
357
358 if let Some(ref data) = removed {
359 self.current_size
360 .fetch_sub(data.len() as u64, Ordering::Relaxed);
361 let mut lru = self.lru_order.lock().await;
362 lru.retain(|k| k != &key);
363 }
364
365 removed
366 }
367
368 async fn evict_one(&self) -> bool {
370 let to_evict = {
371 let mut lru = self.lru_order.lock().await;
372 lru.pop_front()
373 };
374
375 if let Some(key) = to_evict {
376 let removed = {
377 let mut segments = self.segments.write().await;
378 segments.remove(&key)
379 };
380
381 if let Some(data) = removed {
382 self.current_size
383 .fetch_sub(data.len() as u64, Ordering::Relaxed);
384 return true;
385 }
386 }
387
388 false
389 }
390
391 pub fn stats(&self) -> HotTierStats {
393 HotTierStats {
394 current_size: self.current_size.load(Ordering::Relaxed),
395 max_size: self.max_size,
396 }
397 }
398}
399
400#[derive(Debug, Clone)]
401pub struct HotTierStats {
402 pub current_size: u64,
403 pub max_size: u64,
404}
405
406#[derive(Debug)]
408pub struct WarmTier {
409 base_path: PathBuf,
411 segments: RwLock<BTreeMap<(String, u32, u64), Arc<SegmentMetadata>>>,
413 current_size: AtomicU64,
415 max_size: u64,
417}
418
419impl WarmTier {
420 pub fn new(base_path: PathBuf, max_size: u64) -> Result<Self> {
421 std::fs::create_dir_all(&base_path)?;
422
423 Ok(Self {
424 base_path,
425 segments: RwLock::new(BTreeMap::new()),
426 current_size: AtomicU64::new(0),
427 max_size,
428 })
429 }
430
431 fn segment_path(&self, topic: &str, partition: u32, base_offset: u64) -> PathBuf {
433 self.base_path
434 .join(topic)
435 .join(format!("{}", partition))
436 .join(format!("{:020}.segment", base_offset))
437 }
438
439 pub async fn store(
441 &self,
442 topic: &str,
443 partition: u32,
444 base_offset: u64,
445 end_offset: u64,
446 data: &[u8],
447 ) -> Result<()> {
448 let path = self.segment_path(topic, partition, base_offset);
449
450 if let Some(parent) = path.parent() {
452 tokio::fs::create_dir_all(parent).await?;
453 }
454
455 tokio::fs::write(&path, data).await?;
457
458 let size = data.len() as u64;
459
460 let metadata = Arc::new(SegmentMetadata::new(
462 topic.to_string(),
463 partition,
464 base_offset,
465 end_offset,
466 size,
467 StorageTier::Warm,
468 ));
469
470 {
471 let mut segments = self.segments.write().await;
472 segments.insert((topic.to_string(), partition, base_offset), metadata);
473 }
474
475 self.current_size.fetch_add(size, Ordering::Relaxed);
476
477 Ok(())
478 }
479
480 pub async fn read(
482 &self,
483 topic: &str,
484 partition: u32,
485 base_offset: u64,
486 ) -> Result<Option<Bytes>> {
487 let path = self.segment_path(topic, partition, base_offset);
488
489 if !path.exists() {
490 return Ok(None);
491 }
492
493 let file = std::fs::File::open(&path)?;
495 let mmap = unsafe { memmap2::Mmap::map(&file)? };
498
499 let key = (topic.to_string(), partition, base_offset);
501 if let Some(meta) = self.segments.read().await.get(&key) {
502 meta.record_access();
503 }
504
505 Ok(Some(Bytes::copy_from_slice(&mmap)))
508 }
509
510 pub async fn remove(&self, topic: &str, partition: u32, base_offset: u64) -> Result<()> {
512 let path = self.segment_path(topic, partition, base_offset);
513 let key = (topic.to_string(), partition, base_offset);
514
515 let size = {
516 let mut segments = self.segments.write().await;
517 segments.remove(&key).map(|m| m.size_bytes)
518 };
519
520 if let Some(size) = size {
521 self.current_size.fetch_sub(size, Ordering::Relaxed);
522 }
523
524 if path.exists() {
525 tokio::fs::remove_file(path).await?;
526 }
527
528 Ok(())
529 }
530
531 pub async fn get_demotion_candidates(&self, max_age: Duration) -> Vec<(String, u32, u64)> {
533 let max_age_secs = max_age.as_secs();
534 let segments = self.segments.read().await;
535
536 segments
537 .iter()
538 .filter(|(_, meta)| meta.age_secs() > max_age_secs)
539 .map(|(key, _)| key.clone())
540 .collect()
541 }
542
543 pub async fn get_metadata(
545 &self,
546 topic: &str,
547 partition: u32,
548 base_offset: u64,
549 ) -> Option<Arc<SegmentMetadata>> {
550 let key = (topic.to_string(), partition, base_offset);
551 self.segments.read().await.get(&key).cloned()
552 }
553
554 pub fn stats(&self) -> WarmTierStats {
555 WarmTierStats {
556 current_size: self.current_size.load(Ordering::Relaxed),
557 max_size: self.max_size,
558 }
559 }
560}
561
562#[derive(Debug, Clone)]
563pub struct WarmTierStats {
564 pub current_size: u64,
565 pub max_size: u64,
566}
567
568#[async_trait::async_trait]
570pub trait ColdStorageBackend: Send + Sync {
571 async fn upload(&self, key: &str, data: &[u8]) -> Result<()>;
573
574 async fn download(&self, key: &str) -> Result<Option<Bytes>>;
576
577 async fn delete(&self, key: &str) -> Result<()>;
579
580 async fn list(&self, prefix: &str) -> Result<Vec<String>>;
582
583 async fn exists(&self, key: &str) -> Result<bool>;
585}
586
587pub struct LocalFsColdStorage {
589 base_path: PathBuf,
590}
591
592impl LocalFsColdStorage {
593 pub fn new(base_path: PathBuf) -> Result<Self> {
594 std::fs::create_dir_all(&base_path)?;
595 let base_path = base_path.canonicalize()?;
597 Ok(Self { base_path })
598 }
599
600 fn key_to_path(&self, key: &str) -> Result<PathBuf> {
607 if key.contains("..") || key.starts_with('/') || key.starts_with('\\') {
609 return Err(Error::Other(format!(
610 "Invalid key: path traversal attempt detected: {}",
611 key
612 )));
613 }
614
615 if key.contains('\0') {
617 return Err(Error::Other("Invalid key: null byte not allowed".into()));
618 }
619
620 let path = self
621 .base_path
622 .join(key.replace('/', std::path::MAIN_SEPARATOR_STR));
623
624 if let Ok(canonical) = path.canonicalize() {
627 if !canonical.starts_with(&self.base_path) {
628 return Err(Error::Other(format!(
629 "Invalid key: path escapes base directory: {}",
630 key
631 )));
632 }
633 }
634 Ok(path)
638 }
639}
640
641#[async_trait::async_trait]
642impl ColdStorageBackend for LocalFsColdStorage {
643 async fn upload(&self, key: &str, data: &[u8]) -> Result<()> {
644 let path = self.key_to_path(key)?;
645 if let Some(parent) = path.parent() {
646 tokio::fs::create_dir_all(parent).await?;
647 }
648 tokio::fs::write(&path, data).await?;
649 Ok(())
650 }
651
652 async fn download(&self, key: &str) -> Result<Option<Bytes>> {
653 let path = self.key_to_path(key)?;
654 if !path.exists() {
655 return Ok(None);
656 }
657 let data = tokio::fs::read(&path).await?;
658 Ok(Some(Bytes::from(data)))
659 }
660
661 async fn delete(&self, key: &str) -> Result<()> {
662 let path = self.key_to_path(key)?;
663 if path.exists() {
664 tokio::fs::remove_file(path).await?;
665 }
666 Ok(())
667 }
668
669 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
670 let base = self.key_to_path(prefix)?;
671 let mut keys = Vec::new();
672
673 if !base.exists() {
674 return Ok(keys);
675 }
676
677 fn walk_dir(
678 dir: &std::path::Path,
679 base: &std::path::Path,
680 keys: &mut Vec<String>,
681 ) -> std::io::Result<()> {
682 if dir.is_dir() {
683 for entry in std::fs::read_dir(dir)? {
684 let entry = entry?;
685 let path = entry.path();
686 if path.is_dir() {
687 walk_dir(&path, base, keys)?;
688 } else if let Ok(rel) = path.strip_prefix(base) {
689 keys.push(
690 rel.to_string_lossy()
691 .replace(std::path::MAIN_SEPARATOR, "/"),
692 );
693 }
694 }
695 }
696 Ok(())
697 }
698
699 walk_dir(&self.base_path, &self.base_path, &mut keys)?;
700 Ok(keys)
701 }
702
703 async fn exists(&self, key: &str) -> Result<bool> {
704 Ok(self.key_to_path(key)?.exists())
705 }
706}
707
708pub struct DisabledColdStorage;
710
711#[async_trait::async_trait]
712impl ColdStorageBackend for DisabledColdStorage {
713 async fn upload(&self, _key: &str, _data: &[u8]) -> Result<()> {
714 Err(Error::Other("Cold storage is disabled".into()))
715 }
716
717 async fn download(&self, _key: &str) -> Result<Option<Bytes>> {
718 Ok(None)
719 }
720
721 async fn delete(&self, _key: &str) -> Result<()> {
722 Ok(())
723 }
724
725 async fn list(&self, _prefix: &str) -> Result<Vec<String>> {
726 Ok(Vec::new())
727 }
728
729 async fn exists(&self, _key: &str) -> Result<bool> {
730 Ok(false)
731 }
732}
733
734#[derive(Debug)]
736enum MigrationTask {
737 Demote {
738 topic: String,
739 partition: u32,
740 base_offset: u64,
741 from_tier: StorageTier,
742 },
743 Promote {
744 topic: String,
745 partition: u32,
746 base_offset: u64,
747 to_tier: StorageTier,
748 },
749 Compact {
750 topic: String,
751 partition: u32,
752 base_offset: u64,
753 },
754}
755
756pub struct TieredStorage {
758 config: TieredStorageConfig,
759 hot_tier: Arc<HotTier>,
760 warm_tier: Arc<WarmTier>,
761 cold_storage: Arc<dyn ColdStorageBackend>,
762 segment_index: RwLock<BTreeMap<(String, u32, u64), Arc<SegmentMetadata>>>,
764 migration_tx: mpsc::Sender<MigrationTask>,
766 stats: Arc<TieredStorageStats>,
768 shutdown: tokio::sync::broadcast::Sender<()>,
770}
771
772impl TieredStorage {
773 pub async fn new(config: TieredStorageConfig) -> Result<Arc<Self>> {
775 let hot_tier = Arc::new(HotTier::new(config.hot_tier_max_bytes));
776 let warm_tier = Arc::new(WarmTier::new(
777 config.warm_tier_path.clone(),
778 config.warm_tier_max_bytes,
779 )?);
780
781 let cold_storage: Arc<dyn ColdStorageBackend> = match &config.cold_storage {
782 ColdStorageConfig::LocalFs { path } => Arc::new(LocalFsColdStorage::new(path.clone())?),
783 ColdStorageConfig::Disabled => Arc::new(DisabledColdStorage),
784 ColdStorageConfig::S3 { .. } => {
786 return Err(Error::Other("S3 cold storage not yet implemented".into()));
788 }
789 ColdStorageConfig::AzureBlob { .. } => {
790 return Err(Error::Other(
792 "Azure Blob cold storage not yet implemented".into(),
793 ));
794 }
795 };
796
797 let (migration_tx, migration_rx) = mpsc::channel(1024);
798 let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
799
800 let storage = Arc::new(Self {
801 config: config.clone(),
802 hot_tier,
803 warm_tier,
804 cold_storage,
805 segment_index: RwLock::new(BTreeMap::new()),
806 migration_tx,
807 stats: Arc::new(TieredStorageStats::new()),
808 shutdown: shutdown_tx,
809 });
810
811 storage.clone().start_migration_worker(migration_rx);
813
814 storage.clone().start_tier_manager();
816
817 Ok(storage)
818 }
819
820 pub async fn write(
822 &self,
823 topic: &str,
824 partition: u32,
825 base_offset: u64,
826 end_offset: u64,
827 data: Bytes,
828 ) -> Result<()> {
829 let size = data.len() as u64;
830
831 let inserted = self
833 .hot_tier
834 .insert(topic, partition, base_offset, data.clone())
835 .await;
836
837 if !inserted {
838 self.warm_tier
840 .store(topic, partition, base_offset, end_offset, &data)
841 .await?;
842 self.stats.warm_writes.fetch_add(1, Ordering::Relaxed);
843 } else {
844 self.stats.hot_writes.fetch_add(1, Ordering::Relaxed);
845 }
846
847 let metadata = Arc::new(SegmentMetadata::new(
849 topic.to_string(),
850 partition,
851 base_offset,
852 end_offset,
853 size,
854 if inserted {
855 StorageTier::Hot
856 } else {
857 StorageTier::Warm
858 },
859 ));
860
861 {
862 let mut index = self.segment_index.write().await;
863 index.insert((topic.to_string(), partition, base_offset), metadata);
864 }
865
866 self.stats
867 .total_bytes_written
868 .fetch_add(size, Ordering::Relaxed);
869
870 Ok(())
871 }
872
873 pub async fn read(
875 &self,
876 topic: &str,
877 partition: u32,
878 start_offset: u64,
879 max_bytes: usize,
880 ) -> Result<Vec<(u64, Bytes)>> {
881 let _start = Instant::now();
882 let mut results = Vec::new();
883 let mut bytes_collected = 0;
884
885 let segments = {
887 let index = self.segment_index.read().await;
888 index
889 .range((topic.to_string(), partition, 0)..(topic.to_string(), partition, u64::MAX))
890 .filter(|(_, meta)| meta.end_offset > start_offset)
891 .map(|(k, v)| (k.clone(), v.clone()))
892 .collect::<Vec<_>>()
893 };
894
895 for ((_, _, base_offset), metadata) in segments {
896 if bytes_collected >= max_bytes {
897 break;
898 }
899
900 metadata.record_access();
901
902 let data = match metadata.tier {
904 StorageTier::Hot => {
905 if let Some(data) = self.hot_tier.get(topic, partition, base_offset).await {
906 self.stats.hot_reads.fetch_add(1, Ordering::Relaxed);
907 Some(data)
908 } else {
909 None
911 }
912 }
913 _ => None,
914 };
915
916 let data = match data {
917 Some(d) => d,
918 None => {
919 if let Some(data) = self.warm_tier.read(topic, partition, base_offset).await? {
921 self.stats.warm_reads.fetch_add(1, Ordering::Relaxed);
922
923 if self.config.enable_promotion {
925 let access_count = metadata.access_count.load(Ordering::Relaxed);
926 if access_count >= self.config.promotion_threshold {
927 let _ = self
928 .migration_tx
929 .send(MigrationTask::Promote {
930 topic: topic.to_string(),
931 partition,
932 base_offset,
933 to_tier: StorageTier::Hot,
934 })
935 .await;
936 }
937 }
938
939 data
940 } else {
941 let key = metadata.segment_key();
943 if let Some(data) = self.cold_storage.download(&key).await? {
944 self.stats.cold_reads.fetch_add(1, Ordering::Relaxed);
945
946 if self.config.enable_promotion {
948 let access_count = metadata.access_count.load(Ordering::Relaxed);
949 if access_count >= self.config.promotion_threshold {
950 let _ = self
951 .migration_tx
952 .send(MigrationTask::Promote {
953 topic: topic.to_string(),
954 partition,
955 base_offset,
956 to_tier: StorageTier::Warm,
957 })
958 .await;
959 }
960 }
961
962 data
963 } else {
964 continue; }
966 }
967 }
968 };
969
970 results.push((base_offset, data.clone()));
971 bytes_collected += data.len();
972 }
973
974 self.stats
975 .total_bytes_read
976 .fetch_add(bytes_collected as u64, Ordering::Relaxed);
977
978 Ok(results)
979 }
980
981 pub async fn get_segment_metadata(
983 &self,
984 topic: &str,
985 partition: u32,
986 base_offset: u64,
987 ) -> Option<Arc<SegmentMetadata>> {
988 self.segment_index
989 .read()
990 .await
991 .get(&(topic.to_string(), partition, base_offset))
992 .cloned()
993 }
994
995 pub async fn flush_hot_tier(&self, topic: &str, partition: u32) -> Result<()> {
997 let segments: Vec<_> = {
998 let index = self.segment_index.read().await;
999 index
1000 .range((topic.to_string(), partition, 0)..(topic.to_string(), partition, u64::MAX))
1001 .filter(|(_, meta)| meta.tier == StorageTier::Hot)
1002 .map(|(k, _)| k.2)
1003 .collect()
1004 };
1005
1006 for base_offset in segments {
1007 let _ = self
1008 .migration_tx
1009 .send(MigrationTask::Demote {
1010 topic: topic.to_string(),
1011 partition,
1012 base_offset,
1013 from_tier: StorageTier::Hot,
1014 })
1015 .await;
1016 }
1017
1018 Ok(())
1019 }
1020
1021 pub fn stats(&self) -> TieredStorageStatsSnapshot {
1023 TieredStorageStatsSnapshot {
1024 hot_tier: self.hot_tier.stats(),
1025 warm_tier: self.warm_tier.stats(),
1026 hot_reads: self.stats.hot_reads.load(Ordering::Relaxed),
1027 warm_reads: self.stats.warm_reads.load(Ordering::Relaxed),
1028 cold_reads: self.stats.cold_reads.load(Ordering::Relaxed),
1029 hot_writes: self.stats.hot_writes.load(Ordering::Relaxed),
1030 warm_writes: self.stats.warm_writes.load(Ordering::Relaxed),
1031 cold_writes: self.stats.cold_writes.load(Ordering::Relaxed),
1032 total_bytes_read: self.stats.total_bytes_read.load(Ordering::Relaxed),
1033 total_bytes_written: self.stats.total_bytes_written.load(Ordering::Relaxed),
1034 migrations_completed: self.stats.migrations_completed.load(Ordering::Relaxed),
1035 migrations_failed: self.stats.migrations_failed.load(Ordering::Relaxed),
1036 }
1037 }
1038
1039 fn start_migration_worker(self: Arc<Self>, mut rx: mpsc::Receiver<MigrationTask>) {
1041 let semaphore = Arc::new(Semaphore::new(self.config.migration_concurrency));
1042 let mut shutdown_rx = self.shutdown.subscribe();
1043
1044 tokio::spawn(async move {
1045 loop {
1046 tokio::select! {
1047 Some(task) = rx.recv() => {
1048 let permit = semaphore.clone().acquire_owned().await.unwrap();
1049 let storage = self.clone();
1050
1051 tokio::spawn(async move {
1052 let result = storage.execute_migration(task).await;
1053 if result.is_ok() {
1054 storage.stats.migrations_completed.fetch_add(1, Ordering::Relaxed);
1055 } else {
1056 storage.stats.migrations_failed.fetch_add(1, Ordering::Relaxed);
1057 }
1058 drop(permit);
1059 });
1060 }
1061 _ = shutdown_rx.recv() => {
1062 break;
1063 }
1064 }
1065 }
1066 });
1067 }
1068
1069 fn start_tier_manager(self: Arc<Self>) {
1071 let mut shutdown_rx = self.shutdown.subscribe();
1072 let migration_interval = self.config.migration_interval;
1073
1074 tokio::spawn(async move {
1075 let mut ticker = interval(migration_interval);
1076
1077 loop {
1078 tokio::select! {
1079 _ = ticker.tick() => {
1080 if let Err(e) = self.check_tier_migrations().await {
1081 tracing::warn!("Tier migration check failed: {}", e);
1082 }
1083 }
1084 _ = shutdown_rx.recv() => {
1085 break;
1086 }
1087 }
1088 }
1089 });
1090 }
1091
1092 async fn check_tier_migrations(&self) -> Result<()> {
1094 let hot_max_age = self.config.hot_tier_max_age;
1095 let warm_max_age = self.config.warm_tier_max_age;
1096
1097 let hot_candidates: Vec<_> = {
1099 let index = self.segment_index.read().await;
1100 index
1101 .iter()
1102 .filter(|(_, meta)| {
1103 meta.tier == StorageTier::Hot
1104 && Duration::from_secs(meta.age_secs()) > hot_max_age
1105 })
1106 .map(|(k, _)| k.clone())
1107 .collect()
1108 };
1109
1110 for (topic, partition, base_offset) in hot_candidates {
1111 let _ = self
1112 .migration_tx
1113 .send(MigrationTask::Demote {
1114 topic,
1115 partition,
1116 base_offset,
1117 from_tier: StorageTier::Hot,
1118 })
1119 .await;
1120 }
1121
1122 let warm_candidates = self.warm_tier.get_demotion_candidates(warm_max_age).await;
1124
1125 for (topic, partition, base_offset) in warm_candidates {
1126 let _ = self
1127 .migration_tx
1128 .send(MigrationTask::Demote {
1129 topic,
1130 partition,
1131 base_offset,
1132 from_tier: StorageTier::Warm,
1133 })
1134 .await;
1135 }
1136
1137 let compaction_threshold = self.config.compaction_threshold;
1139 let compaction_candidates: Vec<_> = {
1140 let index = self.segment_index.read().await;
1141 index
1142 .iter()
1143 .filter(|(_, meta)| meta.compaction_ratio() > compaction_threshold)
1144 .map(|(k, _)| k.clone())
1145 .collect()
1146 };
1147
1148 for (topic, partition, base_offset) in compaction_candidates {
1149 let _ = self
1150 .migration_tx
1151 .send(MigrationTask::Compact {
1152 topic,
1153 partition,
1154 base_offset,
1155 })
1156 .await;
1157 }
1158
1159 Ok(())
1160 }
1161
1162 async fn execute_migration(&self, task: MigrationTask) -> Result<()> {
1164 match task {
1165 MigrationTask::Demote {
1166 topic,
1167 partition,
1168 base_offset,
1169 from_tier,
1170 } => {
1171 self.demote_segment(&topic, partition, base_offset, from_tier)
1172 .await
1173 }
1174 MigrationTask::Promote {
1175 topic,
1176 partition,
1177 base_offset,
1178 to_tier,
1179 } => {
1180 self.promote_segment(&topic, partition, base_offset, to_tier)
1181 .await
1182 }
1183 MigrationTask::Compact {
1184 topic,
1185 partition,
1186 base_offset,
1187 } => self.compact_segment(&topic, partition, base_offset).await,
1188 }
1189 }
1190
1191 async fn demote_segment(
1193 &self,
1194 topic: &str,
1195 partition: u32,
1196 base_offset: u64,
1197 from_tier: StorageTier,
1198 ) -> Result<()> {
1199 let to_tier = match from_tier.demote() {
1200 Some(t) => t,
1201 None => return Ok(()), };
1203
1204 let data = match from_tier {
1206 StorageTier::Hot => self.hot_tier.remove(topic, partition, base_offset).await,
1207 StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1208 StorageTier::Cold => None,
1209 };
1210
1211 let data = match data {
1212 Some(d) => d,
1213 None => return Ok(()), };
1215
1216 let metadata = self
1218 .get_segment_metadata(topic, partition, base_offset)
1219 .await;
1220 let end_offset = metadata
1221 .as_ref()
1222 .map(|m| m.end_offset)
1223 .unwrap_or(base_offset);
1224
1225 match to_tier {
1227 StorageTier::Warm => {
1228 self.warm_tier
1229 .store(topic, partition, base_offset, end_offset, &data)
1230 .await?;
1231 }
1232 StorageTier::Cold => {
1233 let key = format!("{}/{}/{:020}", topic, partition, base_offset);
1234 self.cold_storage.upload(&key, &data).await?;
1235 self.stats.cold_writes.fetch_add(1, Ordering::Relaxed);
1236
1237 self.warm_tier.remove(topic, partition, base_offset).await?;
1239 }
1240 StorageTier::Hot => unreachable!(),
1241 }
1242
1243 if let Some(meta) = metadata {
1245 let new_meta = Arc::new(SegmentMetadata {
1247 topic: meta.topic.clone(),
1248 partition: meta.partition,
1249 base_offset: meta.base_offset,
1250 end_offset: meta.end_offset,
1251 size_bytes: meta.size_bytes,
1252 tier: to_tier,
1253 created_at: meta.created_at,
1254 last_accessed: AtomicU64::new(meta.last_accessed.load(Ordering::Relaxed)),
1255 access_count: AtomicU64::new(meta.access_count.load(Ordering::Relaxed)),
1256 dead_records: AtomicU64::new(meta.dead_records.load(Ordering::Relaxed)),
1257 total_records: meta.total_records,
1258 });
1259
1260 let mut index = self.segment_index.write().await;
1261 index.insert((topic.to_string(), partition, base_offset), new_meta);
1262 }
1263
1264 tracing::debug!(
1265 "Demoted segment {}/{}/{} from {:?} to {:?}",
1266 topic,
1267 partition,
1268 base_offset,
1269 from_tier,
1270 to_tier
1271 );
1272
1273 Ok(())
1274 }
1275
1276 async fn promote_segment(
1278 &self,
1279 topic: &str,
1280 partition: u32,
1281 base_offset: u64,
1282 to_tier: StorageTier,
1283 ) -> Result<()> {
1284 let metadata = match self
1286 .get_segment_metadata(topic, partition, base_offset)
1287 .await
1288 {
1289 Some(m) => m,
1290 None => return Ok(()),
1291 };
1292
1293 let from_tier = metadata.tier;
1294
1295 let data = match from_tier {
1297 StorageTier::Cold => {
1298 let key = metadata.segment_key();
1299 self.cold_storage.download(&key).await?
1300 }
1301 StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1302 StorageTier::Hot => return Ok(()), };
1304
1305 let data = match data {
1306 Some(d) => d,
1307 None => return Ok(()),
1308 };
1309
1310 match to_tier {
1312 StorageTier::Hot => {
1313 self.hot_tier
1314 .insert(topic, partition, base_offset, data)
1315 .await;
1316 }
1317 StorageTier::Warm => {
1318 self.warm_tier
1319 .store(topic, partition, base_offset, metadata.end_offset, &data)
1320 .await?;
1321 }
1322 StorageTier::Cold => unreachable!(),
1323 }
1324
1325 let new_meta = Arc::new(SegmentMetadata {
1327 topic: metadata.topic.clone(),
1328 partition: metadata.partition,
1329 base_offset: metadata.base_offset,
1330 end_offset: metadata.end_offset,
1331 size_bytes: metadata.size_bytes,
1332 tier: to_tier,
1333 created_at: metadata.created_at,
1334 last_accessed: AtomicU64::new(metadata.last_accessed.load(Ordering::Relaxed)),
1335 access_count: AtomicU64::new(0), dead_records: AtomicU64::new(metadata.dead_records.load(Ordering::Relaxed)),
1337 total_records: metadata.total_records,
1338 });
1339
1340 {
1341 let mut index = self.segment_index.write().await;
1342 index.insert((topic.to_string(), partition, base_offset), new_meta);
1343 }
1344
1345 tracing::debug!(
1346 "Promoted segment {}/{}/{} from {:?} to {:?}",
1347 topic,
1348 partition,
1349 base_offset,
1350 from_tier,
1351 to_tier
1352 );
1353
1354 Ok(())
1355 }
1356
1357 async fn compact_segment(&self, topic: &str, partition: u32, base_offset: u64) -> Result<()> {
1366 use std::collections::HashMap;
1367
1368 let metadata = match self
1370 .get_segment_metadata(topic, partition, base_offset)
1371 .await
1372 {
1373 Some(m) => m,
1374 None => {
1375 tracing::debug!(
1376 "Segment not found for compaction: {}/{}/{}",
1377 topic,
1378 partition,
1379 base_offset
1380 );
1381 return Ok(());
1382 }
1383 };
1384
1385 let data = match metadata.tier {
1387 StorageTier::Hot => self.hot_tier.get(topic, partition, base_offset).await,
1388 StorageTier::Warm => self.warm_tier.read(topic, partition, base_offset).await?,
1389 StorageTier::Cold => {
1390 let key = metadata.segment_key();
1391 self.cold_storage.download(&key).await?
1392 }
1393 };
1394
1395 let data = match data {
1396 Some(d) => d,
1397 None => {
1398 tracing::debug!(
1399 "Segment data not found for compaction: {}/{}/{}",
1400 topic,
1401 partition,
1402 base_offset
1403 );
1404 return Ok(());
1405 }
1406 };
1407
1408 let mut messages: Vec<crate::Message> = Vec::new();
1411 let mut cursor = 0;
1412
1413 while cursor < data.len() {
1414 if cursor + 4 > data.len() {
1416 break;
1417 }
1418 let len = u32::from_le_bytes([
1419 data[cursor],
1420 data[cursor + 1],
1421 data[cursor + 2],
1422 data[cursor + 3],
1423 ]) as usize;
1424 cursor += 4;
1425
1426 if cursor + len > data.len() {
1427 tracing::warn!(
1428 "Truncated message in segment {}/{}/{}",
1429 topic,
1430 partition,
1431 base_offset
1432 );
1433 break;
1434 }
1435
1436 match crate::Message::from_bytes(&data[cursor..cursor + len]) {
1438 Ok(msg) => messages.push(msg),
1439 Err(e) => {
1440 tracing::warn!("Failed to deserialize message in compaction: {}", e);
1441 }
1442 }
1443 cursor += len;
1444 }
1445
1446 if messages.is_empty() {
1447 tracing::debug!(
1448 "No messages to compact in segment {}/{}/{}",
1449 topic,
1450 partition,
1451 base_offset
1452 );
1453 return Ok(());
1454 }
1455
1456 let original_count = messages.len();
1457
1458 let mut key_to_message: HashMap<Option<Bytes>, crate::Message> = HashMap::new();
1460
1461 for msg in messages {
1462 if msg.key.is_some() {
1465 key_to_message.insert(msg.key.clone(), msg);
1466 } else {
1467 key_to_message.insert(Some(Bytes::from(msg.offset.to_le_bytes().to_vec())), msg);
1469 }
1470 }
1471
1472 let compacted: Vec<_> = key_to_message
1474 .into_values()
1475 .filter(|msg| !msg.value.is_empty()) .collect();
1477
1478 let compacted_count = compacted.len();
1479
1480 if compacted_count >= original_count {
1482 tracing::debug!(
1483 "Skipping compaction for {}/{}/{}: no reduction ({} -> {})",
1484 topic,
1485 partition,
1486 base_offset,
1487 original_count,
1488 compacted_count
1489 );
1490 return Ok(());
1491 }
1492
1493 let mut compacted_data = Vec::new();
1495 let mut new_end_offset = base_offset;
1496
1497 for msg in &compacted {
1498 let msg_bytes = msg.to_bytes()?;
1499 compacted_data.extend_from_slice(&(msg_bytes.len() as u32).to_le_bytes());
1500 compacted_data.extend_from_slice(&msg_bytes);
1501 new_end_offset = new_end_offset.max(msg.offset + 1);
1502 }
1503
1504 let compacted_bytes = Bytes::from(compacted_data);
1505 let compacted_size = compacted_bytes.len() as u64;
1506 let reduction_ratio = 1.0 - (compacted_count as f64 / original_count as f64);
1507
1508 tracing::info!(
1509 "Compacted segment {}/{}/{}: {} -> {} messages ({:.1}% reduction)",
1510 topic,
1511 partition,
1512 base_offset,
1513 original_count,
1514 compacted_count,
1515 reduction_ratio * 100.0
1516 );
1517
1518 match metadata.tier {
1520 StorageTier::Hot => {
1521 self.hot_tier.remove(topic, partition, base_offset).await;
1523 self.hot_tier
1524 .insert(topic, partition, base_offset, compacted_bytes)
1525 .await;
1526 }
1527 StorageTier::Warm => {
1528 self.warm_tier.remove(topic, partition, base_offset).await?;
1530 self.warm_tier
1531 .store(
1532 topic,
1533 partition,
1534 base_offset,
1535 new_end_offset,
1536 &compacted_bytes,
1537 )
1538 .await?;
1539 }
1540 StorageTier::Cold => {
1541 let key = metadata.segment_key();
1543 self.cold_storage.upload(&key, &compacted_bytes).await?;
1544 }
1545 }
1546
1547 let new_meta = Arc::new(SegmentMetadata {
1549 topic: metadata.topic.clone(),
1550 partition: metadata.partition,
1551 base_offset: metadata.base_offset,
1552 end_offset: new_end_offset,
1553 size_bytes: compacted_size,
1554 tier: metadata.tier,
1555 created_at: metadata.created_at,
1556 last_accessed: AtomicU64::new(metadata.last_accessed.load(Ordering::Relaxed)),
1557 access_count: AtomicU64::new(metadata.access_count.load(Ordering::Relaxed)),
1558 dead_records: AtomicU64::new(0), total_records: compacted_count as u64,
1560 });
1561
1562 {
1563 let mut index = self.segment_index.write().await;
1564 index.insert((topic.to_string(), partition, base_offset), new_meta);
1565 }
1566
1567 Ok(())
1568 }
1569
1570 pub async fn shutdown(&self) {
1572 let _ = self.shutdown.send(());
1573 }
1574}
1575
1576pub struct TieredStorageStats {
1578 pub hot_reads: AtomicU64,
1579 pub warm_reads: AtomicU64,
1580 pub cold_reads: AtomicU64,
1581 pub hot_writes: AtomicU64,
1582 pub warm_writes: AtomicU64,
1583 pub cold_writes: AtomicU64,
1584 pub total_bytes_read: AtomicU64,
1585 pub total_bytes_written: AtomicU64,
1586 pub migrations_completed: AtomicU64,
1587 pub migrations_failed: AtomicU64,
1588}
1589
1590impl TieredStorageStats {
1591 fn new() -> Self {
1592 Self {
1593 hot_reads: AtomicU64::new(0),
1594 warm_reads: AtomicU64::new(0),
1595 cold_reads: AtomicU64::new(0),
1596 hot_writes: AtomicU64::new(0),
1597 warm_writes: AtomicU64::new(0),
1598 cold_writes: AtomicU64::new(0),
1599 total_bytes_read: AtomicU64::new(0),
1600 total_bytes_written: AtomicU64::new(0),
1601 migrations_completed: AtomicU64::new(0),
1602 migrations_failed: AtomicU64::new(0),
1603 }
1604 }
1605}
1606
1607#[derive(Debug, Clone)]
1608pub struct TieredStorageStatsSnapshot {
1609 pub hot_tier: HotTierStats,
1610 pub warm_tier: WarmTierStats,
1611 pub hot_reads: u64,
1612 pub warm_reads: u64,
1613 pub cold_reads: u64,
1614 pub hot_writes: u64,
1615 pub warm_writes: u64,
1616 pub cold_writes: u64,
1617 pub total_bytes_read: u64,
1618 pub total_bytes_written: u64,
1619 pub migrations_completed: u64,
1620 pub migrations_failed: u64,
1621}
1622
1623#[cfg(test)]
1624mod tests {
1625 use super::*;
1626 use tempfile::TempDir;
1627
1628 #[tokio::test]
1629 async fn test_hot_tier_insert_and_get() {
1630 let hot = HotTier::new(1024 * 1024); let data = Bytes::from("test data");
1633 hot.insert("topic1", 0, 0, data.clone()).await;
1634
1635 let retrieved = hot.get("topic1", 0, 0).await;
1636 assert_eq!(retrieved, Some(data));
1637 }
1638
1639 #[tokio::test]
1640 async fn test_hot_tier_lru_eviction() {
1641 let hot = HotTier::new(100); hot.insert("topic1", 0, 0, Bytes::from(vec![0u8; 40])).await;
1645 hot.insert("topic1", 0, 1, Bytes::from(vec![1u8; 40])).await;
1646 hot.insert("topic1", 0, 2, Bytes::from(vec![2u8; 40])).await;
1647
1648 assert!(hot.get("topic1", 0, 0).await.is_none());
1650 assert!(hot.get("topic1", 0, 1).await.is_some());
1652 assert!(hot.get("topic1", 0, 2).await.is_some());
1653 }
1654
1655 #[tokio::test]
1656 async fn test_warm_tier_store_and_read() {
1657 let temp_dir = TempDir::new().unwrap();
1658 let warm = WarmTier::new(temp_dir.path().to_path_buf(), 1024 * 1024 * 1024).unwrap();
1659
1660 let data = b"warm tier test data";
1661 warm.store("topic1", 0, 0, 100, data).await.unwrap();
1662
1663 let retrieved = warm.read("topic1", 0, 0).await.unwrap();
1664 assert_eq!(retrieved, Some(Bytes::from(&data[..])));
1665 }
1666
1667 #[tokio::test]
1668 async fn test_local_fs_cold_storage() {
1669 let temp_dir = TempDir::new().unwrap();
1670 let cold = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
1671
1672 let key = "topic1/0/00000000000000000000";
1673 let data = b"cold storage test data";
1674
1675 cold.upload(key, data).await.unwrap();
1676 assert!(cold.exists(key).await.unwrap());
1677
1678 let retrieved = cold.download(key).await.unwrap();
1679 assert_eq!(retrieved, Some(Bytes::from(&data[..])));
1680
1681 cold.delete(key).await.unwrap();
1682 assert!(!cold.exists(key).await.unwrap());
1683 }
1684
1685 #[tokio::test]
1686 async fn test_tiered_storage_write_and_read() {
1687 let temp_dir = TempDir::new().unwrap();
1688
1689 let config = TieredStorageConfig {
1690 hot_tier_max_bytes: 1024 * 1024,
1691 warm_tier_path: temp_dir.path().join("warm"),
1692 cold_storage: ColdStorageConfig::LocalFs {
1693 path: temp_dir.path().join("cold"),
1694 },
1695 migration_interval: Duration::from_secs(3600), ..Default::default()
1697 };
1698
1699 let storage = TieredStorage::new(config).await.unwrap();
1700
1701 let data = Bytes::from("test message data");
1703 storage
1704 .write("topic1", 0, 0, 10, data.clone())
1705 .await
1706 .unwrap();
1707
1708 let results = storage.read("topic1", 0, 0, 1024).await.unwrap();
1710 assert_eq!(results.len(), 1);
1711 assert_eq!(results[0].1, data);
1712
1713 let stats = storage.stats();
1715 assert_eq!(stats.hot_writes, 1);
1716 assert_eq!(stats.hot_reads, 1);
1717
1718 storage.shutdown().await;
1719 }
1720
1721 #[tokio::test]
1722 async fn test_storage_tier_demote_promote() {
1723 assert_eq!(StorageTier::Hot.demote(), Some(StorageTier::Warm));
1724 assert_eq!(StorageTier::Warm.demote(), Some(StorageTier::Cold));
1725 assert_eq!(StorageTier::Cold.demote(), None);
1726
1727 assert_eq!(StorageTier::Hot.promote(), None);
1728 assert_eq!(StorageTier::Warm.promote(), Some(StorageTier::Hot));
1729 assert_eq!(StorageTier::Cold.promote(), Some(StorageTier::Warm));
1730 }
1731
1732 #[tokio::test]
1733 async fn test_segment_metadata() {
1734 let meta = SegmentMetadata::new("topic1".to_string(), 0, 0, 100, 1024, StorageTier::Hot);
1735
1736 assert_eq!(meta.segment_key(), "topic1/0/00000000000000000000");
1737 assert!(meta.age_secs() <= 1);
1738
1739 meta.record_access();
1740 assert_eq!(meta.access_count.load(Ordering::Relaxed), 1);
1741
1742 meta.dead_records.store(50, Ordering::Relaxed);
1743 assert!((meta.compaction_ratio() - 0.5).abs() < 0.01);
1744 }
1745
1746 #[tokio::test]
1747 async fn test_segment_compaction() {
1748 use crate::Message;
1749
1750 let temp_dir = TempDir::new().unwrap();
1751
1752 let config = TieredStorageConfig {
1753 hot_tier_max_bytes: 10 * 1024 * 1024, warm_tier_path: temp_dir.path().join("warm"),
1755 cold_storage: ColdStorageConfig::LocalFs {
1756 path: temp_dir.path().join("cold"),
1757 },
1758 migration_interval: Duration::from_secs(3600),
1759 compaction_threshold: 0.1, ..Default::default()
1761 };
1762
1763 let storage = TieredStorage::new(config).await.unwrap();
1764
1765 let mut segment_data = Vec::new();
1767
1768 let msg1 = Message::with_key(Bytes::from("A"), Bytes::from("value1"));
1770 let msg1_bytes = msg1.to_bytes().unwrap();
1771 segment_data.extend_from_slice(&(msg1_bytes.len() as u32).to_le_bytes());
1772 segment_data.extend_from_slice(&msg1_bytes);
1773
1774 let msg2 = Message::with_key(Bytes::from("B"), Bytes::from("value1"));
1776 let msg2_bytes = msg2.to_bytes().unwrap();
1777 segment_data.extend_from_slice(&(msg2_bytes.len() as u32).to_le_bytes());
1778 segment_data.extend_from_slice(&msg2_bytes);
1779
1780 let msg3 = Message::with_key(Bytes::from("A"), Bytes::from("value2"));
1782 let msg3_bytes = msg3.to_bytes().unwrap();
1783 segment_data.extend_from_slice(&(msg3_bytes.len() as u32).to_le_bytes());
1784 segment_data.extend_from_slice(&msg3_bytes);
1785
1786 let msg4 = Message::with_key(Bytes::from("B"), Bytes::from(""));
1788 let msg4_bytes = msg4.to_bytes().unwrap();
1789 segment_data.extend_from_slice(&(msg4_bytes.len() as u32).to_le_bytes());
1790 segment_data.extend_from_slice(&msg4_bytes);
1791
1792 let segment_bytes = Bytes::from(segment_data);
1794 storage
1795 .write("compaction-test", 0, 0, 4, segment_bytes)
1796 .await
1797 .unwrap();
1798
1799 let meta = storage
1801 .get_segment_metadata("compaction-test", 0, 0)
1802 .await
1803 .unwrap();
1804
1805 meta.dead_records.store(2, Ordering::Relaxed);
1807
1808 storage
1810 .compact_segment("compaction-test", 0, 0)
1811 .await
1812 .unwrap();
1813
1814 let meta_after = storage
1816 .get_segment_metadata("compaction-test", 0, 0)
1817 .await
1818 .unwrap();
1819 assert!(
1820 meta_after.total_records < 4,
1821 "Compaction should reduce message count"
1822 );
1823
1824 storage.shutdown().await;
1825 }
1826
1827 #[tokio::test]
1828 async fn test_compaction_preserves_keyless_messages() {
1829 use crate::Message;
1830
1831 let temp_dir = TempDir::new().unwrap();
1832
1833 let config = TieredStorageConfig {
1834 hot_tier_max_bytes: 10 * 1024 * 1024,
1835 warm_tier_path: temp_dir.path().join("warm"),
1836 cold_storage: ColdStorageConfig::LocalFs {
1837 path: temp_dir.path().join("cold"),
1838 },
1839 migration_interval: Duration::from_secs(3600),
1840 ..Default::default()
1841 };
1842
1843 let storage = TieredStorage::new(config).await.unwrap();
1844
1845 let mut segment_data = Vec::new();
1847
1848 for i in 0..5 {
1849 let mut msg = Message::new(Bytes::from(format!("value{}", i)));
1850 msg.offset = i;
1851 let msg_bytes = msg.to_bytes().unwrap();
1852 segment_data.extend_from_slice(&(msg_bytes.len() as u32).to_le_bytes());
1853 segment_data.extend_from_slice(&msg_bytes);
1854 }
1855
1856 let segment_bytes = Bytes::from(segment_data);
1857 storage
1858 .write("keyless-test", 0, 0, 5, segment_bytes)
1859 .await
1860 .unwrap();
1861
1862 storage.compact_segment("keyless-test", 0, 0).await.unwrap();
1864
1865 let meta_after = storage
1867 .get_segment_metadata("keyless-test", 0, 0)
1868 .await
1869 .unwrap();
1870 assert_eq!(
1871 meta_after.total_records, 5,
1872 "Keyless messages should all be preserved"
1873 );
1874
1875 storage.shutdown().await;
1876 }
1877
1878 #[tokio::test]
1879 async fn test_local_fs_cold_storage_path_traversal_protection() {
1880 let temp_dir = TempDir::new().unwrap();
1881 let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
1882
1883 assert!(storage.key_to_path("valid/key/path").is_ok());
1885 assert!(storage.key_to_path("simple-key").is_ok());
1886 assert!(storage.key_to_path("key_with_underscores").is_ok());
1887
1888 assert!(storage.key_to_path("../escape").is_err());
1890 assert!(storage.key_to_path("valid/../escape").is_err());
1891 assert!(storage.key_to_path("..").is_err());
1892 assert!(storage.key_to_path("foo/../../bar").is_err());
1893
1894 assert!(storage.key_to_path("/etc/passwd").is_err());
1896 assert!(storage.key_to_path("\\Windows\\System32").is_err());
1897
1898 assert!(storage.key_to_path("valid\0.txt").is_err());
1900 }
1901
1902 #[tokio::test]
1903 async fn test_local_fs_cold_storage_operations_with_safe_keys() {
1904 let temp_dir = TempDir::new().unwrap();
1905 let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
1906
1907 let data = b"test data";
1909 storage.upload("test/key", data).await.unwrap();
1910
1911 let downloaded = storage.download("test/key").await.unwrap();
1913 assert_eq!(downloaded, Some(Bytes::from_static(data)));
1914
1915 assert!(storage.exists("test/key").await.unwrap());
1917 assert!(!storage.exists("nonexistent").await.unwrap());
1918
1919 storage.delete("test/key").await.unwrap();
1921 assert!(!storage.exists("test/key").await.unwrap());
1922 }
1923
1924 #[tokio::test]
1925 async fn test_local_fs_cold_storage_rejects_malicious_upload() {
1926 let temp_dir = TempDir::new().unwrap();
1927 let storage = LocalFsColdStorage::new(temp_dir.path().to_path_buf()).unwrap();
1928
1929 let result = storage.upload("../malicious", b"pwned").await;
1931 assert!(result.is_err());
1932
1933 let escaped_path = temp_dir.path().parent().unwrap().join("malicious");
1935 assert!(!escaped_path.exists());
1936 }
1937}