Skip to main content

ormdb_server/
database.rs

1//! Database wrapper combining StorageEngine and Catalog.
2
3use std::path::Path;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::task::JoinHandle;
9use tracing::{info, warn};
10
11use ormdb_core::catalog::Catalog;
12use ormdb_core::metrics::SharedMetricsRegistry;
13use ormdb_core::query::{PlanCache, QueryExecutor, TableStatistics};
14use ormdb_core::replication::ChangeLog;
15use ormdb_core::storage::{
16    ColumnarStore, CompactionEngine, CompactionResult, RetentionPolicy, StorageConfig,
17    StorageEngine,
18};
19
20use crate::error::Error;
21
22/// Database wrapper that provides access to storage and catalog.
23pub struct Database {
24    storage: Arc<StorageEngine>,
25    catalog: Catalog,
26    statistics: TableStatistics,
27    plan_cache: PlanCache,
28    columnar: ColumnarStore,
29    changelog: ChangeLog,
30    /// Keep the sled::Db handle alive for the catalog.
31    _catalog_db: sled::Db,
32    /// Retention policy for compaction.
33    retention_policy: RetentionPolicy,
34}
35
36impl Database {
37    /// Open a database at the given path.
38    pub fn open(data_path: &Path) -> Result<Self, Error> {
39        Self::open_with_retention(data_path, RetentionPolicy::default())
40    }
41
42    /// Open a database with a specific retention policy.
43    pub fn open_with_retention(data_path: &Path, retention_policy: RetentionPolicy) -> Result<Self, Error> {
44        // Create data directory if it doesn't exist
45        std::fs::create_dir_all(data_path).map_err(|e| {
46            Error::Database(format!("failed to create data directory: {}", e))
47        })?;
48
49        // Open storage engine
50        let storage_path = data_path.join("storage");
51        let storage = Arc::new(StorageEngine::open(StorageConfig::new(&storage_path))
52            .map_err(|e| Error::Database(format!("failed to open storage: {}", e)))?);
53
54        // Open catalog database (separate sled instance)
55        let catalog_path = data_path.join("catalog");
56        let catalog_db = sled::open(&catalog_path)
57            .map_err(|e| Error::Database(format!("failed to open catalog db: {}", e)))?;
58
59        let catalog = Catalog::open(&catalog_db)
60            .map_err(|e| Error::Database(format!("failed to open catalog: {}", e)))?;
61
62        // Open columnar store (uses storage sled Db)
63        let columnar = ColumnarStore::open(storage.db())
64            .map_err(|e| Error::Database(format!("failed to open columnar store: {}", e)))?;
65
66        // Open changelog (uses storage sled Db)
67        let changelog = ChangeLog::open(storage.db())
68            .map_err(|e| Error::Database(format!("failed to open changelog: {}", e)))?;
69
70        let db = Self {
71            storage,
72            catalog,
73            statistics: TableStatistics::new(),
74            plan_cache: PlanCache::new(1000), // 1000 entry cache
75            columnar,
76            changelog,
77            _catalog_db: catalog_db,
78            retention_policy,
79        };
80
81        if let Err(e) = db.statistics.refresh(&db.storage, &db.catalog) {
82            warn!(error = %e, "Failed to refresh statistics on startup");
83        }
84
85        Ok(db)
86    }
87
88    /// Get a reference to the storage engine.
89    pub fn storage(&self) -> &StorageEngine {
90        &self.storage
91    }
92
93    /// Get an Arc reference to the storage engine.
94    pub fn storage_arc(&self) -> Arc<StorageEngine> {
95        self.storage.clone()
96    }
97
98    /// Get a reference to the catalog.
99    pub fn catalog(&self) -> &Catalog {
100        &self.catalog
101    }
102
103    /// Get a reference to the table statistics.
104    pub fn statistics(&self) -> &TableStatistics {
105        &self.statistics
106    }
107
108    /// Refresh statistics if stale, returning true if a refresh ran.
109    pub fn refresh_statistics_if_stale(&self, threshold: Duration) -> Result<bool, Error> {
110        if self.statistics.is_stale(threshold.as_millis() as u64) {
111            self.statistics.refresh(&self.storage, &self.catalog)?;
112            return Ok(true);
113        }
114        Ok(false)
115    }
116
117    /// Get a reference to the plan cache.
118    pub fn plan_cache(&self) -> &PlanCache {
119        &self.plan_cache
120    }
121
122    /// Get a reference to the columnar store.
123    pub fn columnar(&self) -> &ColumnarStore {
124        &self.columnar
125    }
126
127    /// Get a reference to the changelog.
128    pub fn changelog(&self) -> &ChangeLog {
129        &self.changelog
130    }
131
132    /// Create a query executor for this database.
133    pub fn executor(&self) -> QueryExecutor<'_> {
134        QueryExecutor::new(&self.storage, &self.catalog)
135    }
136
137    /// Create a query executor with metrics tracking.
138    pub fn executor_with_metrics(&self, metrics: SharedMetricsRegistry) -> QueryExecutor<'_> {
139        QueryExecutor::with_metrics(&self.storage, &self.catalog, metrics)
140    }
141
142    /// Get the current schema version.
143    pub fn schema_version(&self) -> u64 {
144        self.catalog.current_version()
145    }
146
147    /// Flush all pending writes to disk.
148    pub fn flush(&self) -> Result<(), Error> {
149        self.storage
150            .flush()
151            .map_err(|e| Error::Database(format!("failed to flush storage: {}", e)))
152    }
153
154    /// Run a single compaction cycle manually.
155    pub fn compact(&self) -> CompactionResult {
156        let engine = CompactionEngine::new(self.storage.clone(), self.retention_policy.clone());
157        let result = engine.compact();
158        if result.did_cleanup() {
159            if let Err(e) = engine.compact_sled() {
160                warn!(error = %e, "Failed to run sled compaction");
161            }
162        }
163        result
164    }
165
166    /// Create a compaction engine for this database.
167    pub fn compaction_engine(&self) -> CompactionEngine {
168        CompactionEngine::new(self.storage.clone(), self.retention_policy.clone())
169    }
170
171    /// Get the retention policy.
172    pub fn retention_policy(&self) -> &RetentionPolicy {
173        &self.retention_policy
174    }
175}
176
177/// Handle for a background compaction task.
178pub struct CompactionTask {
179    handle: JoinHandle<()>,
180    stop_flag: Arc<AtomicBool>,
181}
182
183impl CompactionTask {
184    /// Start a background compaction task.
185    pub fn start(database: Arc<Database>, interval: Duration) -> Self {
186        let stop_flag = Arc::new(AtomicBool::new(false));
187        let stop_flag_clone = stop_flag.clone();
188
189        let handle = tokio::spawn(async move {
190            info!(interval_secs = interval.as_secs(), "Background compaction task started");
191
192            let mut ticker = tokio::time::interval(interval);
193            ticker.tick().await; // Skip first immediate tick
194
195            loop {
196                ticker.tick().await;
197
198                if stop_flag_clone.load(Ordering::SeqCst) {
199                    info!("Background compaction task stopping");
200                    break;
201                }
202
203                // Run compaction
204                let result = database.compact();
205
206                if result.did_cleanup() {
207                    info!(
208                        versions_removed = result.versions_removed,
209                        tombstones_removed = result.tombstones_removed,
210                        bytes_reclaimed = result.bytes_reclaimed,
211                        duration_ms = result.duration.as_millis() as u64,
212                        "Background compaction completed"
213                    );
214                }
215            }
216        });
217
218        Self { handle, stop_flag }
219    }
220
221    /// Signal the compaction task to stop.
222    pub fn stop(&self) {
223        self.stop_flag.store(true, Ordering::SeqCst);
224    }
225
226    /// Wait for the compaction task to finish.
227    pub async fn join(self) {
228        self.stop();
229        if let Err(e) = self.handle.await {
230            warn!(error = %e, "Compaction task panicked");
231        }
232    }
233}
234
235/// Thread-safe database handle.
236pub type SharedDatabase = Arc<Database>;
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn test_database_open() {
244        let dir = tempfile::tempdir().unwrap();
245        let db = Database::open(dir.path()).unwrap();
246
247        // Should have version 0 (no schema applied yet)
248        assert_eq!(db.schema_version(), 0);
249    }
250
251    #[test]
252    fn test_database_with_schema() {
253        use ormdb_core::catalog::{EntityDef, FieldDef, FieldType, ScalarType, SchemaBundle};
254
255        let dir = tempfile::tempdir().unwrap();
256        let db = Database::open(dir.path()).unwrap();
257
258        // Apply a schema
259        let schema = SchemaBundle::new(1).with_entity(
260            EntityDef::new("User", "id")
261                .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
262                .with_field(FieldDef::new("name", FieldType::Scalar(ScalarType::String))),
263        );
264
265        db.catalog().apply_schema(schema).unwrap();
266
267        // Version should now be 1
268        assert_eq!(db.schema_version(), 1);
269
270        // Should be able to get entity definition
271        let user_def = db.catalog().get_entity("User").unwrap();
272        assert!(user_def.is_some());
273    }
274
275    #[test]
276    fn test_database_persistence() {
277        use ormdb_core::catalog::{EntityDef, FieldDef, FieldType, ScalarType, SchemaBundle};
278
279        let dir = tempfile::tempdir().unwrap();
280
281        // Open and create schema
282        {
283            let db = Database::open(dir.path()).unwrap();
284            let schema = SchemaBundle::new(1).with_entity(
285                EntityDef::new("Item", "id")
286                    .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid))),
287            );
288            db.catalog().apply_schema(schema).unwrap();
289            db.flush().unwrap();
290        }
291
292        // Reopen and verify
293        {
294            let db = Database::open(dir.path()).unwrap();
295            assert_eq!(db.schema_version(), 1);
296            let item_def = db.catalog().get_entity("Item").unwrap();
297            assert!(item_def.is_some());
298        }
299    }
300}