Skip to main content

aurora_db/storage/
cold.rs

1use crate::error::{AqlError, ErrorCode, Result};
2use crate::types::{AuroraConfig, ColdStoreMode};
3use sled::Db;
4use std::sync::Arc;
5
6pub struct ColdStore {
7    db: Db,
8    #[allow(dead_code)]
9    db_path: String,
10}
11
12impl ColdStore {
13    pub fn new(path: &str) -> Result<Self> {
14        let config = AuroraConfig::default();
15        Self::with_config(
16            path,
17            config.cold_cache_capacity_mb,
18            config.cold_flush_interval_ms,
19            config.cold_mode,
20        )
21    }
22
23    pub fn with_config(
24        path: &str,
25        cache_capacity_mb: usize,
26        flush_interval_ms: Option<u64>,
27        mode: ColdStoreMode,
28    ) -> Result<Self> {
29        let db_path = if !path.ends_with(".db") {
30            format!("{}.db", path)
31        } else {
32            path.to_string()
33        };
34
35        let mut sled_config = sled::Config::new()
36            .path(&db_path)
37            .cache_capacity((cache_capacity_mb * 1024 * 1024) as u64)
38            .flush_every_ms(flush_interval_ms);
39
40        sled_config = match mode {
41            ColdStoreMode::HighThroughput => sled_config.mode(sled::Mode::HighThroughput),
42            ColdStoreMode::LowSpace => sled_config.mode(sled::Mode::LowSpace),
43        };
44
45        let db = sled_config.open().map_err(|e| {
46            let error_msg = e.to_string();
47
48            // Check for Windows "Access is denied" error (file locked by another process)
49            if error_msg.contains("Access is denied") || error_msg.contains("os error 5") {
50                let lock_hint = if std::path::Path::new(&db_path).exists() {
51                    "\n\nPossible solutions:\n\
52                    1. Close any other Aurora instances using this database\n\
53                    2. If no other instance is running, the previous instance may have crashed.\n\
54                       Delete the lock file in the database directory and try again\n\
55                    3. Run as administrator if permission is the issue"
56                } else {
57                    "\n\nThe database directory may require administrator privileges to create."
58                };
59
60                AqlError::new(
61                    ErrorCode::IoError,
62                    format!(
63                        "Cannot open database at '{}': file is locked or access denied.{}",
64                        db_path,
65                        lock_hint
66                    ),
67                )
68            } else {
69                AqlError::from(e)
70            }
71        })?;
72
73        Ok(Self { db, db_path })
74    }
75
76    /// Attempt to detect and remove stale lock files
77    ///
78    /// This is useful for recovering from crashes where the lock file wasn't properly cleaned up.
79    /// Only call this if you're sure no other process is using the database.
80    ///
81    /// # Safety
82    /// This function should only be used when you're certain the database is not in use by another process.
83    /// Calling this while another Aurora instance is running could lead to data corruption.
84    pub fn try_remove_stale_lock(db_path: &str) -> Result<bool> {
85        use std::path::Path;
86
87        let path = if !db_path.ends_with(".db") {
88            format!("{}.db", db_path)
89        } else {
90            db_path.to_string()
91        };
92
93        let db_dir = Path::new(&path);
94        if !db_dir.exists() {
95            return Ok(false);
96        }
97
98        // Sled uses a .lock file in the db directory
99        let lock_file = db_dir.join(".lock");
100        if lock_file.exists() {
101            std::fs::remove_file(&lock_file)?;
102            Ok(true)
103        } else {
104            Ok(false)
105        }
106    }
107
108    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
109        Ok(self.db.get(key.as_bytes())?.map(|ivec| ivec.to_vec()))
110    }
111
112    pub fn set(&self, key: String, value: Vec<u8>) -> Result<()> {
113        self.db.insert(key.as_bytes(), value)?;
114        Ok(())
115    }
116
117    pub fn delete(&self, key: &str) -> Result<()> {
118        self.db.remove(key.as_bytes())?;
119        Ok(())
120    }
121
122    pub fn scan(&self) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
123        self.db.iter().map(|result| {
124            result.map_err(AqlError::from).and_then(|(key, value)| {
125                Ok((
126                    String::from_utf8(key.to_vec()).map_err(|_| {
127                        AqlError::new(ErrorCode::ProtocolError, "Invalid UTF-8 in key".to_string())
128                    })?,
129                    value.to_vec(),
130                ))
131            })
132        })
133    }
134
135    pub fn scan_prefix(
136        &self,
137        prefix: &str,
138    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
139        self.db.scan_prefix(prefix.as_bytes()).map(|result| {
140            result.map_err(AqlError::from).and_then(|(key, value)| {
141                Ok((
142                    String::from_utf8(key.to_vec()).map_err(|_| {
143                        AqlError::new(ErrorCode::ProtocolError, "Invalid UTF-8 in key".to_string())
144                    })?,
145                    value.to_vec(),
146                ))
147            })
148        })
149    }
150
151    pub fn batch_set(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
152        let batch = pairs
153            .into_iter()
154            .fold(sled::Batch::default(), |mut batch, (key, value)| {
155                batch.insert(key.as_bytes(), value);
156                batch
157            });
158
159        self.db.apply_batch(batch)?;
160        Ok(())
161    }
162
163    // CHANGED: New optimized method for WriteBuffer
164    pub fn batch_set_arc(&self, pairs: Vec<(Arc<String>, Arc<Vec<u8>>)>) -> Result<()> {
165        let batch = pairs
166            .into_iter()
167            .fold(sled::Batch::default(), |mut batch, (key, value)| {
168                // Sled accepts &[u8], so we deref the Arc. 
169                // This avoids cloning the Vec<u8> before passing it to Sled.
170                batch.insert(key.as_bytes(), value.as_slice());
171                batch
172            });
173
174        self.db.apply_batch(batch)?;
175        // Note: No explicit flush here (Sled handles it based on flush_every_ms)
176        Ok(())
177    }
178
179    /// Flushes all buffered writes to disk to ensure durability.
180    pub fn flush(&self) -> Result<()> {
181        self.db.flush()?;
182        Ok(())
183    }
184
185    pub fn compact(&self) -> Result<()> {
186        self.db.flush()?;
187        Ok(())
188    }
189
190    pub fn get_stats(&self) -> Result<ColdStoreStats> {
191        Ok(ColdStoreStats {
192            size_on_disk: self.estimated_size(),
193            tree_count: self.db.tree_names().len() as u64,
194        })
195    }
196
197    pub fn estimated_size(&self) -> u64 {
198        self.db.size_on_disk().unwrap_or(0)
199    }
200}
201
202impl Drop for ColdStore {
203    fn drop(&mut self) {
204        if let Err(e) = self.db.flush() {
205            eprintln!("Error flushing database: {}", e);
206        }
207    }
208}
209
210#[derive(Debug)]
211pub struct ColdStoreStats {
212    pub size_on_disk: u64,
213    pub tree_count: u64,
214}