1use std::path::Path;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::task::JoinHandle;
9use tracing::{info, warn};
10
11use ormdb_core::catalog::Catalog;
12use ormdb_core::metrics::SharedMetricsRegistry;
13use ormdb_core::query::{PlanCache, QueryExecutor, TableStatistics};
14use ormdb_core::replication::ChangeLog;
15use ormdb_core::storage::{
16 ColumnarStore, CompactionEngine, CompactionResult, RetentionPolicy, StorageConfig,
17 StorageEngine,
18};
19
20use crate::error::Error;
21
22pub struct Database {
24 storage: Arc<StorageEngine>,
25 catalog: Catalog,
26 statistics: TableStatistics,
27 plan_cache: PlanCache,
28 columnar: ColumnarStore,
29 changelog: ChangeLog,
30 _catalog_db: sled::Db,
32 retention_policy: RetentionPolicy,
34}
35
36impl Database {
37 pub fn open(data_path: &Path) -> Result<Self, Error> {
39 Self::open_with_retention(data_path, RetentionPolicy::default())
40 }
41
42 pub fn open_with_retention(data_path: &Path, retention_policy: RetentionPolicy) -> Result<Self, Error> {
44 std::fs::create_dir_all(data_path).map_err(|e| {
46 Error::Database(format!("failed to create data directory: {}", e))
47 })?;
48
49 let storage_path = data_path.join("storage");
51 let storage = Arc::new(StorageEngine::open(StorageConfig::new(&storage_path))
52 .map_err(|e| Error::Database(format!("failed to open storage: {}", e)))?);
53
54 let catalog_path = data_path.join("catalog");
56 let catalog_db = sled::open(&catalog_path)
57 .map_err(|e| Error::Database(format!("failed to open catalog db: {}", e)))?;
58
59 let catalog = Catalog::open(&catalog_db)
60 .map_err(|e| Error::Database(format!("failed to open catalog: {}", e)))?;
61
62 let columnar = ColumnarStore::open(storage.db())
64 .map_err(|e| Error::Database(format!("failed to open columnar store: {}", e)))?;
65
66 let changelog = ChangeLog::open(storage.db())
68 .map_err(|e| Error::Database(format!("failed to open changelog: {}", e)))?;
69
70 let db = Self {
71 storage,
72 catalog,
73 statistics: TableStatistics::new(),
74 plan_cache: PlanCache::new(1000), columnar,
76 changelog,
77 _catalog_db: catalog_db,
78 retention_policy,
79 };
80
81 if let Err(e) = db.statistics.refresh(&db.storage, &db.catalog) {
82 warn!(error = %e, "Failed to refresh statistics on startup");
83 }
84
85 Ok(db)
86 }
87
88 pub fn storage(&self) -> &StorageEngine {
90 &self.storage
91 }
92
93 pub fn storage_arc(&self) -> Arc<StorageEngine> {
95 self.storage.clone()
96 }
97
98 pub fn catalog(&self) -> &Catalog {
100 &self.catalog
101 }
102
103 pub fn statistics(&self) -> &TableStatistics {
105 &self.statistics
106 }
107
108 pub fn refresh_statistics_if_stale(&self, threshold: Duration) -> Result<bool, Error> {
110 if self.statistics.is_stale(threshold.as_millis() as u64) {
111 self.statistics.refresh(&self.storage, &self.catalog)?;
112 return Ok(true);
113 }
114 Ok(false)
115 }
116
117 pub fn plan_cache(&self) -> &PlanCache {
119 &self.plan_cache
120 }
121
122 pub fn columnar(&self) -> &ColumnarStore {
124 &self.columnar
125 }
126
127 pub fn changelog(&self) -> &ChangeLog {
129 &self.changelog
130 }
131
132 pub fn executor(&self) -> QueryExecutor<'_> {
134 QueryExecutor::new(&self.storage, &self.catalog)
135 }
136
137 pub fn executor_with_metrics(&self, metrics: SharedMetricsRegistry) -> QueryExecutor<'_> {
139 QueryExecutor::with_metrics(&self.storage, &self.catalog, metrics)
140 }
141
142 pub fn schema_version(&self) -> u64 {
144 self.catalog.current_version()
145 }
146
147 pub fn flush(&self) -> Result<(), Error> {
149 self.storage
150 .flush()
151 .map_err(|e| Error::Database(format!("failed to flush storage: {}", e)))
152 }
153
154 pub fn compact(&self) -> CompactionResult {
156 let engine = CompactionEngine::new(self.storage.clone(), self.retention_policy.clone());
157 let result = engine.compact();
158 if result.did_cleanup() {
159 if let Err(e) = engine.compact_sled() {
160 warn!(error = %e, "Failed to run sled compaction");
161 }
162 }
163 result
164 }
165
166 pub fn compaction_engine(&self) -> CompactionEngine {
168 CompactionEngine::new(self.storage.clone(), self.retention_policy.clone())
169 }
170
171 pub fn retention_policy(&self) -> &RetentionPolicy {
173 &self.retention_policy
174 }
175}
176
177pub struct CompactionTask {
179 handle: JoinHandle<()>,
180 stop_flag: Arc<AtomicBool>,
181}
182
183impl CompactionTask {
184 pub fn start(database: Arc<Database>, interval: Duration) -> Self {
186 let stop_flag = Arc::new(AtomicBool::new(false));
187 let stop_flag_clone = stop_flag.clone();
188
189 let handle = tokio::spawn(async move {
190 info!(interval_secs = interval.as_secs(), "Background compaction task started");
191
192 let mut ticker = tokio::time::interval(interval);
193 ticker.tick().await; loop {
196 ticker.tick().await;
197
198 if stop_flag_clone.load(Ordering::SeqCst) {
199 info!("Background compaction task stopping");
200 break;
201 }
202
203 let result = database.compact();
205
206 if result.did_cleanup() {
207 info!(
208 versions_removed = result.versions_removed,
209 tombstones_removed = result.tombstones_removed,
210 bytes_reclaimed = result.bytes_reclaimed,
211 duration_ms = result.duration.as_millis() as u64,
212 "Background compaction completed"
213 );
214 }
215 }
216 });
217
218 Self { handle, stop_flag }
219 }
220
221 pub fn stop(&self) {
223 self.stop_flag.store(true, Ordering::SeqCst);
224 }
225
226 pub async fn join(self) {
228 self.stop();
229 if let Err(e) = self.handle.await {
230 warn!(error = %e, "Compaction task panicked");
231 }
232 }
233}
234
235pub type SharedDatabase = Arc<Database>;
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn test_database_open() {
244 let dir = tempfile::tempdir().unwrap();
245 let db = Database::open(dir.path()).unwrap();
246
247 assert_eq!(db.schema_version(), 0);
249 }
250
251 #[test]
252 fn test_database_with_schema() {
253 use ormdb_core::catalog::{EntityDef, FieldDef, FieldType, ScalarType, SchemaBundle};
254
255 let dir = tempfile::tempdir().unwrap();
256 let db = Database::open(dir.path()).unwrap();
257
258 let schema = SchemaBundle::new(1).with_entity(
260 EntityDef::new("User", "id")
261 .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
262 .with_field(FieldDef::new("name", FieldType::Scalar(ScalarType::String))),
263 );
264
265 db.catalog().apply_schema(schema).unwrap();
266
267 assert_eq!(db.schema_version(), 1);
269
270 let user_def = db.catalog().get_entity("User").unwrap();
272 assert!(user_def.is_some());
273 }
274
275 #[test]
276 fn test_database_persistence() {
277 use ormdb_core::catalog::{EntityDef, FieldDef, FieldType, ScalarType, SchemaBundle};
278
279 let dir = tempfile::tempdir().unwrap();
280
281 {
283 let db = Database::open(dir.path()).unwrap();
284 let schema = SchemaBundle::new(1).with_entity(
285 EntityDef::new("Item", "id")
286 .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid))),
287 );
288 db.catalog().apply_schema(schema).unwrap();
289 db.flush().unwrap();
290 }
291
292 {
294 let db = Database::open(dir.path()).unwrap();
295 assert_eq!(db.schema_version(), 1);
296 let item_def = db.catalog().get_entity("Item").unwrap();
297 assert!(item_def.is_some());
298 }
299 }
300}