aurora_db/storage/
cold.rs1use crate::error::{AqlError, ErrorCode, Result};
2use crate::types::{AuroraConfig, ColdStoreMode};
3use sled::Db;
4use std::sync::Arc;
5
6pub struct ColdStore {
7 db: Db,
8 #[allow(dead_code)]
9 db_path: String,
10}
11
12impl ColdStore {
13 pub fn new(path: &str) -> Result<Self> {
14 let config = AuroraConfig::default();
15 Self::with_config(
16 path,
17 config.cold_cache_capacity_mb,
18 config.cold_flush_interval_ms,
19 config.cold_mode,
20 )
21 }
22
23 pub fn with_config(
24 path: &str,
25 cache_capacity_mb: usize,
26 flush_interval_ms: Option<u64>,
27 mode: ColdStoreMode,
28 ) -> Result<Self> {
29 let db_path = if !path.ends_with(".db") {
30 format!("{}.db", path)
31 } else {
32 path.to_string()
33 };
34
35 let mut sled_config = sled::Config::new()
36 .path(&db_path)
37 .cache_capacity((cache_capacity_mb * 1024 * 1024) as u64)
38 .flush_every_ms(flush_interval_ms);
39
40 sled_config = match mode {
41 ColdStoreMode::HighThroughput => sled_config.mode(sled::Mode::HighThroughput),
42 ColdStoreMode::LowSpace => sled_config.mode(sled::Mode::LowSpace),
43 };
44
45 let db = sled_config.open().map_err(|e| {
46 let error_msg = e.to_string();
47
48 if error_msg.contains("Access is denied") || error_msg.contains("os error 5") {
50 let lock_hint = if std::path::Path::new(&db_path).exists() {
51 "\n\nPossible solutions:\n\
52 1. Close any other Aurora instances using this database\n\
53 2. If no other instance is running, the previous instance may have crashed.\n\
54 Delete the lock file in the database directory and try again\n\
55 3. Run as administrator if permission is the issue"
56 } else {
57 "\n\nThe database directory may require administrator privileges to create."
58 };
59
60 AqlError::new(
61 ErrorCode::IoError,
62 format!(
63 "Cannot open database at '{}': file is locked or access denied.{}",
64 db_path,
65 lock_hint
66 ),
67 )
68 } else {
69 AqlError::from(e)
70 }
71 })?;
72
73 Ok(Self { db, db_path })
74 }
75
76 pub fn try_remove_stale_lock(db_path: &str) -> Result<bool> {
85 use std::path::Path;
86
87 let path = if !db_path.ends_with(".db") {
88 format!("{}.db", db_path)
89 } else {
90 db_path.to_string()
91 };
92
93 let db_dir = Path::new(&path);
94 if !db_dir.exists() {
95 return Ok(false);
96 }
97
98 let lock_file = db_dir.join(".lock");
100 if lock_file.exists() {
101 std::fs::remove_file(&lock_file)?;
102 Ok(true)
103 } else {
104 Ok(false)
105 }
106 }
107
108 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
109 Ok(self.db.get(key.as_bytes())?.map(|ivec| ivec.to_vec()))
110 }
111
112 pub fn set(&self, key: String, value: Vec<u8>) -> Result<()> {
113 self.db.insert(key.as_bytes(), value)?;
114 Ok(())
115 }
116
117 pub fn delete(&self, key: &str) -> Result<()> {
118 self.db.remove(key.as_bytes())?;
119 Ok(())
120 }
121
122 pub fn scan(&self) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
123 self.db.iter().map(|result| {
124 result.map_err(AqlError::from).and_then(|(key, value)| {
125 Ok((
126 String::from_utf8(key.to_vec()).map_err(|_| {
127 AqlError::new(ErrorCode::ProtocolError, "Invalid UTF-8 in key".to_string())
128 })?,
129 value.to_vec(),
130 ))
131 })
132 })
133 }
134
135 pub fn scan_prefix(
136 &self,
137 prefix: &str,
138 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
139 self.db.scan_prefix(prefix.as_bytes()).map(|result| {
140 result.map_err(AqlError::from).and_then(|(key, value)| {
141 Ok((
142 String::from_utf8(key.to_vec()).map_err(|_| {
143 AqlError::new(ErrorCode::ProtocolError, "Invalid UTF-8 in key".to_string())
144 })?,
145 value.to_vec(),
146 ))
147 })
148 })
149 }
150
151 pub fn batch_set(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
152 let batch = pairs
153 .into_iter()
154 .fold(sled::Batch::default(), |mut batch, (key, value)| {
155 batch.insert(key.as_bytes(), value);
156 batch
157 });
158
159 self.db.apply_batch(batch)?;
160 Ok(())
161 }
162
163 pub fn batch_set_arc(&self, pairs: Vec<(Arc<String>, Arc<Vec<u8>>)>) -> Result<()> {
165 let batch = pairs
166 .into_iter()
167 .fold(sled::Batch::default(), |mut batch, (key, value)| {
168 batch.insert(key.as_bytes(), value.as_slice());
171 batch
172 });
173
174 self.db.apply_batch(batch)?;
175 Ok(())
177 }
178
179 pub fn flush(&self) -> Result<()> {
181 self.db.flush()?;
182 Ok(())
183 }
184
185 pub fn compact(&self) -> Result<()> {
186 self.db.flush()?;
187 Ok(())
188 }
189
190 pub fn get_stats(&self) -> Result<ColdStoreStats> {
191 Ok(ColdStoreStats {
192 size_on_disk: self.estimated_size(),
193 tree_count: self.db.tree_names().len() as u64,
194 })
195 }
196
197 pub fn estimated_size(&self) -> u64 {
198 self.db.size_on_disk().unwrap_or(0)
199 }
200}
201
202impl Drop for ColdStore {
203 fn drop(&mut self) {
204 if let Err(e) = self.db.flush() {
205 eprintln!("Error flushing database: {}", e);
206 }
207 }
208}
209
210#[derive(Debug)]
211pub struct ColdStoreStats {
212 pub size_on_disk: u64,
213 pub tree_count: u64,
214}