dbx_core/engine/database.rs
1//! Database struct definition — the core data structure
2
3use crate::engine::types::{BackgroundJob, TablePersistence};
4use crate::engine::{DeltaVariant, DurabilityLevel, WosVariant};
5use crate::monitoring::DbxMetrics;
6use crate::sql::optimizer::QueryOptimizer;
7use crate::sql::parser::SqlParser;
8use crate::sql::view::{SharedMaterializedViewRegistry, ViewRegistry};
9use crate::storage::encryption::EncryptionConfig;
10use crate::transaction::mvcc::manager::TransactionManager;
11use arrow::array::RecordBatch;
12use arrow::datatypes::Schema;
13use dashmap::DashMap;
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17// ════════════════════════════════════════════
18// CAS (Compare-And-Swap) Row Latch Lock Manager
19// ════════════════════════════════════════════
20
21/// A Lock Striping mechanism to provide fine-grained, row-level locks
22/// for concurrent CAS operations without memory bloat.
23pub struct RowLockManager {
24 stripes: Vec<std::sync::Mutex<()>>,
25 mask: usize,
26}
27
28impl RowLockManager {
29 /// Creates a new RowLockManager with `1 << power_of_two` stripes.
30 /// E.g., `power_of_two = 10` gives 1024 locks.
31 pub fn new(power_of_two: usize) -> Self {
32 let size = 1 << power_of_two;
33 let mut stripes = Vec::with_capacity(size);
34 for _ in 0..size {
35 stripes.push(std::sync::Mutex::new(()));
36 }
37 Self {
38 stripes,
39 mask: size - 1,
40 }
41 }
42
43 /// Acquires a lock for a specific (table, key) combination.
44 pub fn lock<'a>(&'a self, table: &str, key: &[u8]) -> std::sync::MutexGuard<'a, ()> {
45 let mut hasher = ahash::AHasher::default();
46 std::hash::Hash::hash(&table, &mut hasher);
47 std::hash::Hash::hash(&key, &mut hasher);
48 let hash = std::hash::Hasher::finish(&hasher) as usize;
49 self.stripes[hash & self.mask].lock().unwrap()
50 }
51}
52
53/// DBX 데이터베이스 엔진
54///
55/// 5-Tier Hybrid Storage 아키텍처를 관리하는 메인 API입니다.
56///
57/// # 데이터 흐름
58///
59/// - **INSERT**: Delta Store (Tier 1)에 먼저 쓰기
60/// - **GET**: Delta → WOS 순서로 조회 (첫 번째 hit에서 short-circuit)
61/// - **DELETE**: 모든 계층에서 삭제
62/// - **flush()**: Delta Store 데이터를 WOS로 이동
63///
64/// # 예제
65///
66/// ```rust
67/// use dbx_core::Database;
68///
69/// # fn main() -> dbx_core::DbxResult<()> {
70/// let db = Database::open_in_memory()?;
71/// db.insert("users", b"user:1", b"Alice")?;
72/// let value = db.get("users", b"user:1")?;
73/// assert_eq!(value, Some(b"Alice".to_vec()));
74/// # Ok(())
75/// # }
76/// ```
77pub struct Database {
78 /// Tier 1: Delta Store (in-memory write buffer) — row-based or columnar
79 pub(crate) delta: DeltaVariant,
80
81 /// Tier 3: 메모리 WOS (테이블별 Memory 지정 시 사용)
82 pub(crate) memory_wos: WosVariant,
83
84 /// Tier 3: 파일 WOS (path 기반 DB에서만 Some, 테이블별 File/미지정 시 사용)
85 pub(crate) file_wos: Option<WosVariant>,
86
87 /// 통합 경로 관리자
88 pub(crate) storage_manager: Arc<crate::storage::manager::StoragePathManager>,
89
90 /// 테이블별 저장소 지정 (Memory | File). file_wos가 Some일 때만 사용.
91 pub(crate) table_persistence: DashMap<String, TablePersistence>,
92
93 /// Schema registry: table_name → Arrow Schema
94 #[allow(dead_code)]
95 pub(crate) schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
96
97 /// SQL table registry: table_name → `Vec<RecordBatch>`
98 pub(crate) tables: RwLock<HashMap<String, Vec<RecordBatch>>>,
99
100 /// SQL table schemas: table_name → Schema
101 pub(crate) table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
102
103 /// Hash Index: fast O(1) key lookups
104 pub(crate) index: Arc<crate::index::HashIndex>,
105
106 /// Row ID counters: table_name → next_row_id
107 pub(crate) row_counters: Arc<DashMap<String, std::sync::atomic::AtomicUsize>>,
108
109 /// SQL parser (cached)
110 pub(crate) sql_parser: SqlParser,
111 /// SQL optimizer (cached)
112 pub(crate) sql_optimizer: QueryOptimizer,
113
114 /// Write-Ahead Log for crash recovery (optional, plain or encrypted)
115 pub(crate) wal: Option<Arc<crate::wal::WriteAheadLog>>,
116
117 /// Encrypted WAL (optional, used when encryption is enabled)
118 pub(crate) encrypted_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
119
120 /// Encryption config (None = no encryption)
121 pub(crate) encryption: RwLock<Option<EncryptionConfig>>,
122
123 /// MVCC Transaction Manager
124 pub(crate) tx_manager: Arc<TransactionManager>,
125
126 /// Columnar Cache for OLAP queries (Tier 2)
127 pub(crate) columnar_cache: Arc<crate::storage::columnar_cache::ColumnarCache>,
128
129 /// GPU Manager for optional acceleration (Phase 6.4)
130 pub(crate) gpu_manager: Option<Arc<crate::storage::gpu::GpuManager>>,
131
132 /// Background job sender
133 pub(crate) job_sender: Option<std::sync::mpsc::Sender<BackgroundJob>>,
134
135 /// WAL 내구성 정책
136 pub durability: DurabilityLevel,
137
138 /// Index registry: index_name → (table, column) mapping for DROP INDEX
139 pub(crate) index_registry: RwLock<HashMap<String, (String, String)>>,
140
141 /// Automation & Extensibility Engine (UDF, Triggers, Scheduler)
142 pub(crate) automation_engine: Arc<crate::automation::ExecutionEngine>,
143
144 /// Trigger Registry (이벤트 매칭용)
145 pub(crate) trigger_registry: crate::engine::automation_api::TriggerRegistry,
146
147 /// SQL Trigger Executor
148 pub(crate) trigger_executor: Arc<RwLock<crate::automation::TriggerExecutor>>,
149
150 /// Stored Procedure Executor
151 pub(crate) procedure_executor: Arc<RwLock<crate::automation::ProcedureExecutor>>,
152
153 /// SQL Schedule Executor
154 pub(crate) schedule_executor: Arc<RwLock<crate::automation::ScheduleExecutor>>,
155
156 /// Parallel Execution Engine for multi-threaded query execution
157 #[allow(dead_code)]
158 pub(crate) parallel_engine: Arc<crate::engine::parallel_engine::ParallelExecutionEngine>,
159
160 /// SQL View Registry — CREATE/DROP VIEW 지원
161 pub(crate) view_registry: ViewRegistry,
162
163 /// Materialized View Registry — CREATE/REFRESH/DROP MATERIALIZED VIEW 지원
164 pub mat_view_registry: SharedMaterializedViewRegistry,
165
166 /// 파티션 매핑 정보 (테이블명 -> PartitionMap)
167 pub(crate) partition_maps:
168 Arc<RwLock<std::collections::HashMap<String, crate::storage::partition::PartitionMap>>>,
169
170 /// 파티션별 통계 정보 ("table__partname" → PartitionStats)
171 pub(crate) partition_stats: Arc<DashMap<String, crate::storage::partition::PartitionStats>>,
172
173 /// 파티션별 압축 설정 ("table__partname" → CompressionConfig)
174 pub(crate) partition_compression:
175 Arc<DashMap<String, crate::storage::compression::CompressionConfig>>,
176
177 /// 파티션 수명 주기 정책 (테이블명 → PartitionLifecycle)
178 pub(crate) partition_lifecycle:
179 Arc<DashMap<String, crate::storage::partition::PartitionLifecycle>>,
180
181 /// 파티션 티어 힌트 ("table__partname" → PartitionTierHint)
182 pub(crate) partition_tier_hints:
183 Arc<DashMap<String, crate::storage::partition::PartitionTierHint>>,
184
185 /// 파티션 서브테이블 첫 쓰기 시각 ("table__partname" → UNIX timestamp secs)
186 /// INSERT 경로에서 자동 기록 → partition_needs_archive/delete의 기준으로 활용
187 pub(crate) partition_creation_times: Arc<DashMap<String, u64>>,
188
189 /// Lifecycle 자동 스케줄러 중단 플래그
190 /// `true`로 설정되면 백그라운드 스레드가 자동 종료 (Database Drop 시 자동 처리)
191 pub(crate) lifecycle_stop_flag: Arc<std::sync::atomic::AtomicBool>,
192
193 /// Lifecycle 스케줄러 기동 여부 (중복 spawn 방지)
194 pub(crate) lifecycle_running: Arc<std::sync::atomic::AtomicBool>,
195
196 /// Database Configuration (Parallelism, HTAP Sync)
197 pub(crate) config: crate::engine::parallel_engine::DbConfig,
198
199 /// 적응형 워크로드 분석기 (HTAP/OLAP 비율 분석)
200 pub(crate) workload_analyzer: Arc<RwLock<crate::engine::workload_analyzer::WorkloadAnalyzer>>,
201
202 /// Replication Master (MVP)
203 pub(crate) replication_master: Option<Arc<crate::replication::master::ReplicationMaster>>,
204
205 /// Sharding 라우터
206 pub(crate) sharding_router: Arc<crate::sharding::router::ShardRouter>,
207
208 /// Sharding 데이터 분산/수집 처리기
209 pub(crate) scatter_gather: Arc<crate::sharding::scatter_gather::ScatterGather>,
210
211 /// Metrics & Observability (Phase 0.4)
212 pub(crate) metrics: Arc<DbxMetrics>,
213
214 /// Atomic CAS operations를 위한 Row-level Latch 매니저 (Phase 1.1)
215 pub(crate) cas_locks: Arc<RowLockManager>,
216
217 /// 5-Tier Storage Metadata Registry (Phase 6)
218 pub(crate) metadata_registry: Arc<crate::storage::metadata::MetadataRegistry>,
219}
220
221impl Database {
222 /// 테이블에 사용할 WOS. file_wos가 None이면(인메모리 전용 DB) 항상 memory_wos.
223 pub(crate) fn wos_for_table(&self, table: &str) -> &WosVariant {
224 match &self.file_wos {
225 None => &self.memory_wos,
226 Some(file) => {
227 let is_memory =
228 self.table_persistence.get(table).map(|r| *r) == Some(TablePersistence::Memory);
229 if is_memory { &self.memory_wos } else { file }
230 }
231 }
232 }
233
234 /// 스키마/인덱스 메타데이터 저장용 WOS. 파일이 있으면 파일, 없으면 메모리.
235 pub(crate) fn wos_for_metadata(&self) -> &WosVariant {
236 self.file_wos.as_ref().unwrap_or(&self.memory_wos)
237 }
238
239 /// 테이블별 저장소를 지정합니다 (파일 DB에서만 유효).
240 ///
241 /// `TablePersistence::File`은 파일에 저장(재시작 후 유지),
242 /// `TablePersistence::Memory`는 메모리만 사용(프로세스 종료 시 사라짐).
243 /// 인메모리 전용 DB(`open_in_memory`)에서는 `File` 지정 시 에러를 반환합니다.
244 pub fn set_table_persistence(
245 &self,
246 table: &str,
247 persistence: TablePersistence,
248 ) -> crate::error::DbxResult<()> {
249 if persistence == TablePersistence::File && self.file_wos.is_none() {
250 return Err(crate::error::DbxError::InvalidOperation {
251 message: "TablePersistence::File requires a file-backed database (open with path)"
252 .to_string(),
253 context: "Use Database::open(path) to enable per-table file persistence"
254 .to_string(),
255 });
256 }
257 self.table_persistence
258 .insert(table.to_string(), persistence);
259 Ok(())
260 }
261
262 /// 테이블에 지정된 저장소 종류를 반환합니다 (미지정 시 None).
263 pub fn table_persistence(&self, table: &str) -> Option<TablePersistence> {
264 self.table_persistence.get(table).map(|r| *r)
265 }
266
267 /// Replication Master를 활성화하고, WAL 변경사항을 구독할 수 있는 Receiver를 반환합니다.
268 ///
269 /// `capacity`는 브로드캐스트 채널의 버퍼 크기입니다.
270 /// 이미 활성화되어 있다면 새로운 Receiver만 생성하여 반환합니다.
271 pub fn enable_replication(
272 &mut self,
273 capacity: usize,
274 ) -> tokio::sync::broadcast::Receiver<crate::replication::protocol::ReplicationMessage> {
275 use crate::replication::master::ReplicationMaster;
276
277 if let Some(master) = &self.replication_master {
278 master.subscribe()
279 } else {
280 let (master, rx) = ReplicationMaster::new(capacity);
281 self.replication_master = Some(Arc::new(master));
282 rx
283 }
284 }
285
286 // ── Monitoring / Observability API (Phase 0.4) ────────────────────
287
288 /// Export all DBX metrics in Prometheus exposition text format.
289 ///
290 /// The returned string can be served at a `/metrics` HTTP endpoint
291 /// and consumed by Prometheus or any compatible scraper.
292 ///
293 /// # Example
294 ///
295 /// ```rust
296 /// use dbx_core::Database;
297 ///
298 /// # fn main() -> dbx_core::DbxResult<()> {
299 /// let db = Database::open_in_memory()?;
300 /// db.insert("users", b"k1", b"v1")?;
301 /// let text = db.export_metrics();
302 /// assert!(text.contains("dbx_inserts_total 1"));
303 /// # Ok(())
304 /// # }
305 /// ```
306 pub fn export_metrics(&self) -> String {
307 crate::monitoring::export_prometheus(&self.metrics)
308 }
309
310 /// Return a non-atomic snapshot of current metrics.
311 ///
312 /// Useful for programmatic inspection of counters and hit rates.
313 ///
314 /// # Example
315 ///
316 /// ```rust
317 /// use dbx_core::Database;
318 ///
319 /// # fn main() -> dbx_core::DbxResult<()> {
320 /// let db = Database::open_in_memory()?;
321 /// db.insert("users", b"k1", b"v1")?;
322 /// let snap = db.metrics_snapshot();
323 /// assert_eq!(snap.inserts_total, 1);
324 /// # Ok(())
325 /// # }
326 /// ```
327 pub fn metrics_snapshot(&self) -> crate::monitoring::MetricsSnapshot {
328 self.metrics.snapshot()
329 }
330
331 /// Reset all metrics counters and histograms to zero.
332 pub fn reset_metrics(&self) {
333 self.metrics.reset();
334 }
335
336 // ── Streaming Ingestion API ──────────────────────────────────────────────
337
338 /// 채널 기반 스트리밍 수집 파이프라인 생성
339 ///
340 /// INSERT / UPDATE / DELETE 이벤트를 [`StreamEvent`](crate::engine::StreamEvent)로
341 /// 전송하면 백그라운드에서 자동 배치 처리합니다.
342 ///
343 /// # 인수
344 ///
345 /// * `table` - 이벤트를 적용할 테이블 이름
346 /// * `batch_size` - 이 이벤트 수에 도달하면 즉시 flush
347 /// * `max_latency_ms` - 버퍼가 차지 않아도 이 밀리초마다 강제 flush
348 ///
349 /// # 예시
350 ///
351 /// ```rust,no_run
352 /// use dbx_core::{Database, engine::{StreamIngester, StreamEvent}};
353 /// use std::{sync::Arc, time::Duration};
354 ///
355 /// let db = Arc::new(Database::open_in_memory().unwrap());
356 /// let ingester = db.create_stream_ingester("orders", 1000, 100);
357 /// let tx = ingester.sender();
358 /// tx.send(vec![StreamEvent::Insert { key: "1".into(), value: b"data".to_vec() }]).unwrap();
359 /// ingester.flush().unwrap();
360 /// ```
361 pub fn create_stream_ingester(
362 self: &Arc<Self>,
363 table: &str,
364 batch_size: usize,
365 max_latency_ms: u64,
366 ) -> crate::engine::StreamIngester {
367 crate::engine::StreamIngester::new(
368 Arc::clone(self),
369 table,
370 batch_size,
371 std::time::Duration::from_millis(max_latency_ms),
372 )
373 }
374}