aurora_db/storage/
cold.rs

1use crate::error::{AuroraError, Result};
2use crate::types::{AuroraConfig, ColdStoreMode};
3use sled::Db;
4
5pub struct ColdStore {
6    db: Db,
7    #[allow(dead_code)]
8    db_path: String,
9}
10
11impl ColdStore {
12    pub fn new(path: &str) -> Result<Self> {
13        let config = AuroraConfig::default();
14        Self::with_config(
15            path,
16            config.cold_cache_capacity_mb,
17            config.cold_flush_interval_ms,
18            config.cold_mode,
19        )
20    }
21
22    pub fn with_config(
23        path: &str,
24        cache_capacity_mb: usize,
25        flush_interval_ms: Option<u64>,
26        mode: ColdStoreMode,
27    ) -> Result<Self> {
28        let db_path = if !path.ends_with(".db") {
29            format!("{}.db", path)
30        } else {
31            path.to_string()
32        };
33
34        let mut sled_config = sled::Config::new()
35            .path(&db_path)
36            .cache_capacity((cache_capacity_mb * 1024 * 1024) as u64)
37            .flush_every_ms(flush_interval_ms);
38
39        sled_config = match mode {
40            ColdStoreMode::HighThroughput => sled_config.mode(sled::Mode::HighThroughput),
41            ColdStoreMode::LowSpace => sled_config.mode(sled::Mode::LowSpace),
42        };
43
44        let db = sled_config.open()?;
45
46        Ok(Self { db, db_path })
47    }
48
49    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
50        Ok(self.db.get(key.as_bytes())?.map(|ivec| ivec.to_vec()))
51    }
52
53    pub fn set(&self, key: String, value: Vec<u8>) -> Result<()> {
54        self.db.insert(key.as_bytes(), value)?;
55        self.db.flush()?;
56        Ok(())
57    }
58
59    pub fn delete(&self, key: &str) -> Result<()> {
60        self.db.remove(key.as_bytes())?;
61        Ok(())
62    }
63
64    pub fn scan(&self) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
65        self.db.iter().map(|result| {
66            result
67                .map_err(AuroraError::Storage)
68                .and_then(|(key, value)| {
69                    Ok((
70                        String::from_utf8(key.to_vec()).map_err(|_| {
71                            AuroraError::Protocol("Invalid UTF-8 in key".to_string())
72                        })?,
73                        value.to_vec(),
74                    ))
75                })
76        })
77    }
78
79    pub fn scan_prefix(
80        &self,
81        prefix: &str,
82    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
83        self.db.scan_prefix(prefix.as_bytes()).map(|result| {
84            result
85                .map_err(AuroraError::Storage)
86                .and_then(|(key, value)| {
87                    Ok((
88                        String::from_utf8(key.to_vec()).map_err(|_| {
89                            AuroraError::Protocol("Invalid UTF-8 in key".to_string())
90                        })?,
91                        value.to_vec(),
92                    ))
93                })
94        })
95    }
96
97    pub fn batch_set(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
98        let batch = pairs
99            .into_iter()
100            .fold(sled::Batch::default(), |mut batch, (key, value)| {
101                batch.insert(key.as_bytes(), value);
102                batch
103            });
104
105        self.db.apply_batch(batch)?;
106        self.db.flush()?;
107        Ok(())
108    }
109
110    /// Flushes all buffered writes to disk to ensure durability.
111    pub fn flush(&self) -> Result<()> {
112        self.db.flush()?;
113        Ok(())
114    }
115
116    pub fn compact(&self) -> Result<()> {
117        self.db.flush()?;
118        Ok(())
119    }
120
121    pub fn get_stats(&self) -> Result<ColdStoreStats> {
122        Ok(ColdStoreStats {
123            size_on_disk: self.estimated_size(),
124            tree_count: self.db.tree_names().len() as u64,
125        })
126    }
127
128    pub fn estimated_size(&self) -> u64 {
129        self.db.size_on_disk().unwrap_or(0)
130    }
131}
132
133impl Drop for ColdStore {
134    fn drop(&mut self) {
135        if let Err(e) = self.db.flush() {
136            eprintln!("Error flushing database: {}", e);
137        }
138    }
139}
140
141#[derive(Debug)]
142pub struct ColdStoreStats {
143    pub size_on_disk: u64,
144    pub tree_count: u64,
145}