aurora_db/storage/
cold.rs

1use crate::error::{AuroraError, Result};
2use crate::types::{AuroraConfig, ColdStoreMode};
3use sled::Db;
4
5pub struct ColdStore {
6    db: Db,
7    #[allow(dead_code)]
8    db_path: String,
9}
10
11impl ColdStore {
12    pub fn new(path: &str) -> Result<Self> {
13        let config = AuroraConfig::default();
14        Self::with_config(
15            path,
16            config.cold_cache_capacity_mb,
17            config.cold_flush_interval_ms,
18            config.cold_mode,
19        )
20    }
21
22    pub fn with_config(
23        path: &str,
24        cache_capacity_mb: usize,
25        flush_interval_ms: Option<u64>,
26        mode: ColdStoreMode,
27    ) -> Result<Self> {
28        let db_path = if !path.ends_with(".db") {
29            format!("{}.db", path)
30        } else {
31            path.to_string()
32        };
33
34        let mut sled_config = sled::Config::new()
35            .path(&db_path)
36            .cache_capacity((cache_capacity_mb * 1024 * 1024) as u64)
37            .flush_every_ms(flush_interval_ms);
38
39        sled_config = match mode {
40            ColdStoreMode::HighThroughput => sled_config.mode(sled::Mode::HighThroughput),
41            ColdStoreMode::LowSpace => sled_config.mode(sled::Mode::LowSpace),
42        };
43
44        let db = sled_config.open()?;
45
46        Ok(Self { db, db_path })
47    }
48
49    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
50        Ok(self.db.get(key.as_bytes())?.map(|ivec| ivec.to_vec()))
51    }
52
53    /// Stores a value in cold storage.
54    /// Note: This operation does not automatically flush writes to disk.
55    /// Call `flush()` or `compact()` to ensure durability.
56    pub fn set(&self, key: String, value: Vec<u8>) -> Result<()> {
57        self.db.insert(key.as_bytes(), value)?;
58        Ok(())
59    }
60
61    pub fn delete(&self, key: &str) -> Result<()> {
62        self.db.remove(key.as_bytes())?;
63        Ok(())
64    }
65
66    pub fn scan(&self) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
67        self.db.iter().map(|result| {
68            result
69                .map_err(|e| AuroraError::Storage(e))
70                .and_then(|(key, value)| {
71                    Ok((
72                        String::from_utf8(key.to_vec()).map_err(|_| {
73                            AuroraError::Protocol("Invalid UTF-8 in key".to_string())
74                        })?,
75                        value.to_vec(),
76                    ))
77                })
78        })
79    }
80
81    pub fn scan_prefix(
82        &self,
83        prefix: &str,
84    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
85        self.db.scan_prefix(prefix.as_bytes()).map(|result| {
86            result
87                .map_err(|e| AuroraError::Storage(e))
88                .and_then(|(key, value)| {
89                    Ok((
90                        String::from_utf8(key.to_vec()).map_err(|_| {
91                            AuroraError::Protocol("Invalid UTF-8 in key".to_string())
92                        })?,
93                        value.to_vec(),
94                    ))
95                })
96        })
97    }
98
99    /// Batch operations for better performance.
100    /// Note: This operation does not automatically flush writes to disk.
101    /// Call `flush()` or `compact()` to ensure durability.
102    pub fn batch_set(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
103        let batch = pairs
104            .into_iter()
105            .fold(sled::Batch::default(), |mut batch, (key, value)| {
106                batch.insert(key.as_bytes(), value);
107                batch
108            });
109
110        self.db.apply_batch(batch)?;
111        Ok(())
112    }
113
114    /// Flushes all buffered writes to disk to ensure durability.
115    pub fn flush(&self) -> Result<()> {
116        self.db.flush()?;
117        Ok(())
118    }
119
120    pub fn compact(&self) -> Result<()> {
121        self.db.flush()?;
122        Ok(())
123    }
124
125    pub fn get_stats(&self) -> Result<ColdStoreStats> {
126        Ok(ColdStoreStats {
127            size_on_disk: self.estimated_size(),
128            tree_count: self.db.tree_names().len() as u64,
129        })
130    }
131
132    pub fn estimated_size(&self) -> u64 {
133        self.db.size_on_disk().unwrap_or(0)
134    }
135}
136
137impl Drop for ColdStore {
138    fn drop(&mut self) {
139        if let Err(e) = self.db.flush() {
140            eprintln!("Error flushing database: {}", e);
141        }
142    }
143}
144
145#[derive(Debug)]
146pub struct ColdStoreStats {
147    pub size_on_disk: u64,
148    pub tree_count: u64,
149}