Skip to main content

dbx_core/engine/
database.rs

1//! Database struct definition — the core data structure
2
3use crate::engine::types::{BackgroundJob, TablePersistence};
4use crate::engine::{DeltaVariant, DurabilityLevel, WosVariant};
5use crate::monitoring::DbxMetrics;
6use crate::sql::optimizer::QueryOptimizer;
7use crate::sql::parser::SqlParser;
8use crate::sql::view::{SharedMaterializedViewRegistry, ViewRegistry};
9use crate::storage::encryption::EncryptionConfig;
10use crate::transaction::mvcc::manager::TransactionManager;
11use arrow::array::RecordBatch;
12use arrow::datatypes::Schema;
13use dashmap::DashMap;
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17// ════════════════════════════════════════════
18// CAS (Compare-And-Swap) Row Latch Lock Manager
19// ════════════════════════════════════════════
20
21/// A Lock Striping mechanism to provide fine-grained, row-level locks
22/// for concurrent CAS operations without memory bloat.
23pub struct RowLockManager {
24    stripes: Vec<std::sync::Mutex<()>>,
25    mask: usize,
26}
27
28impl RowLockManager {
29    /// Creates a new RowLockManager with `1 << power_of_two` stripes.
30    /// E.g., `power_of_two = 10` gives 1024 locks.
31    pub fn new(power_of_two: usize) -> Self {
32        let size = 1 << power_of_two;
33        let mut stripes = Vec::with_capacity(size);
34        for _ in 0..size {
35            stripes.push(std::sync::Mutex::new(()));
36        }
37        Self {
38            stripes,
39            mask: size - 1,
40        }
41    }
42
43    /// Acquires a lock for a specific (table, key) combination.
44    pub fn lock<'a>(&'a self, table: &str, key: &[u8]) -> std::sync::MutexGuard<'a, ()> {
45        let mut hasher = ahash::AHasher::default();
46        std::hash::Hash::hash(&table, &mut hasher);
47        std::hash::Hash::hash(&key, &mut hasher);
48        let hash = std::hash::Hasher::finish(&hasher) as usize;
49        self.stripes[hash & self.mask].lock().unwrap()
50    }
51}
52
53/// DBX 데이터베이스 엔진
54///
55/// 5-Tier Hybrid Storage 아키텍처를 관리하는 메인 API입니다.
56///
57/// # 데이터 흐름
58///
59/// - **INSERT**: Delta Store (Tier 1)에 먼저 쓰기
60/// - **GET**: Delta → WOS 순서로 조회 (첫 번째 hit에서 short-circuit)
61/// - **DELETE**: 모든 계층에서 삭제
62/// - **flush()**: Delta Store 데이터를 WOS로 이동
63///
64/// # 예제
65///
66/// ```rust
67/// use dbx_core::Database;
68///
69/// # fn main() -> dbx_core::DbxResult<()> {
70/// let db = Database::open_in_memory()?;
71/// db.insert("users", b"user:1", b"Alice")?;
72/// let value = db.get("users", b"user:1")?;
73/// assert_eq!(value, Some(b"Alice".to_vec()));
74/// # Ok(())
75/// # }
76/// ```
77pub struct Database {
78    /// Tier 1: Delta Store (in-memory write buffer) — row-based or columnar
79    pub(crate) delta: DeltaVariant,
80
81    /// Tier 3: 메모리 WOS (테이블별 Memory 지정 시 사용)
82    pub(crate) memory_wos: WosVariant,
83
84    /// Tier 3: 파일 WOS (path 기반 DB에서만 Some, 테이블별 File/미지정 시 사용)
85    pub(crate) file_wos: Option<WosVariant>,
86
87    /// 테이블별 저장소 지정 (Memory | File). file_wos가 Some일 때만 사용.
88    pub(crate) table_persistence: DashMap<String, TablePersistence>,
89
90    /// Schema registry: table_name → Arrow Schema
91    #[allow(dead_code)]
92    pub(crate) schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
93
94    /// SQL table registry: table_name → `Vec<RecordBatch>`
95    pub(crate) tables: RwLock<HashMap<String, Vec<RecordBatch>>>,
96
97    /// SQL table schemas: table_name → Schema
98    pub(crate) table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
99
100    /// Hash Index: fast O(1) key lookups
101    pub(crate) index: Arc<crate::index::HashIndex>,
102
103    /// Row ID counters: table_name → next_row_id
104    pub(crate) row_counters: Arc<DashMap<String, std::sync::atomic::AtomicUsize>>,
105
106    /// SQL parser (cached)
107    pub(crate) sql_parser: SqlParser,
108    /// SQL optimizer (cached)
109    pub(crate) sql_optimizer: QueryOptimizer,
110
111    /// Write-Ahead Log for crash recovery (optional, plain or encrypted)
112    pub(crate) wal: Option<Arc<crate::wal::WriteAheadLog>>,
113
114    /// Encrypted WAL (optional, used when encryption is enabled)
115    pub(crate) encrypted_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
116
117    /// Encryption config (None = no encryption)
118    pub(crate) encryption: RwLock<Option<EncryptionConfig>>,
119
120    /// MVCC Transaction Manager
121    pub(crate) tx_manager: Arc<TransactionManager>,
122
123    /// Columnar Cache for OLAP queries (Tier 2)
124    pub(crate) columnar_cache: Arc<crate::storage::columnar_cache::ColumnarCache>,
125
126    /// GPU Manager for optional acceleration (Phase 6.4)
127    pub(crate) gpu_manager: Option<Arc<crate::storage::gpu::GpuManager>>,
128
129    /// Background job sender
130    pub(crate) job_sender: Option<std::sync::mpsc::Sender<BackgroundJob>>,
131
132    /// WAL 내구성 정책
133    pub durability: DurabilityLevel,
134
135    /// Index registry: index_name → (table, column) mapping for DROP INDEX
136    pub(crate) index_registry: RwLock<HashMap<String, (String, String)>>,
137
138    /// Automation & Extensibility Engine (UDF, Triggers, Scheduler)
139    pub(crate) automation_engine: Arc<crate::automation::ExecutionEngine>,
140
141    /// Trigger Registry (이벤트 매칭용)
142    pub(crate) trigger_registry: crate::engine::automation_api::TriggerRegistry,
143
144    /// SQL Trigger Executor
145    pub(crate) trigger_executor: Arc<RwLock<crate::automation::TriggerExecutor>>,
146
147    /// Stored Procedure Executor
148    pub(crate) procedure_executor: Arc<RwLock<crate::automation::ProcedureExecutor>>,
149
150    /// SQL Schedule Executor
151    pub(crate) schedule_executor: Arc<RwLock<crate::automation::ScheduleExecutor>>,
152
153    /// Parallel Execution Engine for multi-threaded query execution
154    #[allow(dead_code)]
155    pub(crate) parallel_engine: Arc<crate::engine::parallel_engine::ParallelExecutionEngine>,
156
157    /// SQL View Registry — CREATE/DROP VIEW 지원
158    pub(crate) view_registry: ViewRegistry,
159
160    /// Materialized View Registry — CREATE/REFRESH/DROP MATERIALIZED VIEW 지원
161    pub mat_view_registry: SharedMaterializedViewRegistry,
162
163    /// 파티션 매핑 정보 (테이블명 -> PartitionMap)
164    pub(crate) partition_maps:
165        Arc<RwLock<std::collections::HashMap<String, crate::storage::partition::PartitionMap>>>,
166
167    /// 파티션별 통계 정보 ("table__partname" → PartitionStats)
168    pub(crate) partition_stats: Arc<DashMap<String, crate::storage::partition::PartitionStats>>,
169
170    /// 파티션별 압축 설정 ("table__partname" → CompressionConfig)
171    pub(crate) partition_compression:
172        Arc<DashMap<String, crate::storage::compression::CompressionConfig>>,
173
174    /// 파티션 수명 주기 정책 (테이블명 → PartitionLifecycle)
175    pub(crate) partition_lifecycle:
176        Arc<DashMap<String, crate::storage::partition::PartitionLifecycle>>,
177
178    /// 파티션 티어 힌트 ("table__partname" → PartitionTierHint)
179    pub(crate) partition_tier_hints:
180        Arc<DashMap<String, crate::storage::partition::PartitionTierHint>>,
181
182    /// 파티션 서브테이블 첫 쓰기 시각 ("table__partname" → UNIX timestamp secs)
183    /// INSERT 경로에서 자동 기록 → partition_needs_archive/delete의 기준으로 활용
184    pub(crate) partition_creation_times: Arc<DashMap<String, u64>>,
185
186    /// Lifecycle 자동 스케줄러 중단 플래그
187    /// `true`로 설정되면 백그라운드 스레드가 자동 종료 (Database Drop 시 자동 처리)
188    pub(crate) lifecycle_stop_flag: Arc<std::sync::atomic::AtomicBool>,
189
190    /// Lifecycle 스케줄러 기동 여부 (중복 spawn 방지)
191    pub(crate) lifecycle_running: Arc<std::sync::atomic::AtomicBool>,
192
193    /// Database Configuration (Parallelism, HTAP Sync)
194    pub(crate) config: crate::engine::parallel_engine::DbConfig,
195
196    /// 적응형 워크로드 분석기 (HTAP/OLAP 비율 분석)
197    pub(crate) workload_analyzer: Arc<RwLock<crate::engine::workload_analyzer::WorkloadAnalyzer>>,
198
199    /// Replication Master (MVP)
200    pub(crate) replication_master: Option<Arc<crate::replication::master::ReplicationMaster>>,
201
202    /// Sharding 라우터
203    pub(crate) sharding_router: Arc<crate::sharding::router::ShardRouter>,
204
205    /// Sharding 데이터 분산/수집 처리기
206    pub(crate) scatter_gather: Arc<crate::sharding::scatter_gather::ScatterGather>,
207
208    /// Metrics & Observability (Phase 0.4)
209    pub(crate) metrics: Arc<DbxMetrics>,
210
211    /// Atomic CAS operations를 위한 Row-level Latch 매니저 (Phase 1.1)
212    pub(crate) cas_locks: Arc<RowLockManager>,
213}
214
215impl Database {
216    /// 테이블에 사용할 WOS. file_wos가 None이면(인메모리 전용 DB) 항상 memory_wos.
217    pub(crate) fn wos_for_table(&self, table: &str) -> &WosVariant {
218        match &self.file_wos {
219            None => &self.memory_wos,
220            Some(file) => {
221                let is_memory =
222                    self.table_persistence.get(table).map(|r| *r) == Some(TablePersistence::Memory);
223                if is_memory { &self.memory_wos } else { file }
224            }
225        }
226    }
227
228    /// 스키마/인덱스 메타데이터 저장용 WOS. 파일이 있으면 파일, 없으면 메모리.
229    pub(crate) fn wos_for_metadata(&self) -> &WosVariant {
230        self.file_wos.as_ref().unwrap_or(&self.memory_wos)
231    }
232
233    /// 테이블별 저장소를 지정합니다 (파일 DB에서만 유효).
234    ///
235    /// `TablePersistence::File`은 파일에 저장(재시작 후 유지),
236    /// `TablePersistence::Memory`는 메모리만 사용(프로세스 종료 시 사라짐).
237    /// 인메모리 전용 DB(`open_in_memory`)에서는 `File` 지정 시 에러를 반환합니다.
238    pub fn set_table_persistence(
239        &self,
240        table: &str,
241        persistence: TablePersistence,
242    ) -> crate::error::DbxResult<()> {
243        if persistence == TablePersistence::File && self.file_wos.is_none() {
244            return Err(crate::error::DbxError::InvalidOperation {
245                message: "TablePersistence::File requires a file-backed database (open with path)"
246                    .to_string(),
247                context: "Use Database::open(path) to enable per-table file persistence"
248                    .to_string(),
249            });
250        }
251        self.table_persistence
252            .insert(table.to_string(), persistence);
253        Ok(())
254    }
255
256    /// 테이블에 지정된 저장소 종류를 반환합니다 (미지정 시 None).
257    pub fn table_persistence(&self, table: &str) -> Option<TablePersistence> {
258        self.table_persistence.get(table).map(|r| *r)
259    }
260
261    /// Replication Master를 활성화하고, WAL 변경사항을 구독할 수 있는 Receiver를 반환합니다.
262    ///
263    /// `capacity`는 브로드캐스트 채널의 버퍼 크기입니다.
264    /// 이미 활성화되어 있다면 새로운 Receiver만 생성하여 반환합니다.
265    pub fn enable_replication(
266        &mut self,
267        capacity: usize,
268    ) -> tokio::sync::broadcast::Receiver<crate::replication::protocol::ReplicationMessage> {
269        use crate::replication::master::ReplicationMaster;
270
271        if let Some(master) = &self.replication_master {
272            master.subscribe()
273        } else {
274            let (master, rx) = ReplicationMaster::new(capacity);
275            self.replication_master = Some(Arc::new(master));
276            rx
277        }
278    }
279
280    // ── Monitoring / Observability API (Phase 0.4) ────────────────────
281
282    /// Export all DBX metrics in Prometheus exposition text format.
283    ///
284    /// The returned string can be served at a `/metrics` HTTP endpoint
285    /// and consumed by Prometheus or any compatible scraper.
286    ///
287    /// # Example
288    ///
289    /// ```rust
290    /// use dbx_core::Database;
291    ///
292    /// # fn main() -> dbx_core::DbxResult<()> {
293    /// let db = Database::open_in_memory()?;
294    /// db.insert("users", b"k1", b"v1")?;
295    /// let text = db.export_metrics();
296    /// assert!(text.contains("dbx_inserts_total 1"));
297    /// # Ok(())
298    /// # }
299    /// ```
300    pub fn export_metrics(&self) -> String {
301        crate::monitoring::export_prometheus(&self.metrics)
302    }
303
304    /// Return a non-atomic snapshot of current metrics.
305    ///
306    /// Useful for programmatic inspection of counters and hit rates.
307    ///
308    /// # Example
309    ///
310    /// ```rust
311    /// use dbx_core::Database;
312    ///
313    /// # fn main() -> dbx_core::DbxResult<()> {
314    /// let db = Database::open_in_memory()?;
315    /// db.insert("users", b"k1", b"v1")?;
316    /// let snap = db.metrics_snapshot();
317    /// assert_eq!(snap.inserts_total, 1);
318    /// # Ok(())
319    /// # }
320    /// ```
321    pub fn metrics_snapshot(&self) -> crate::monitoring::MetricsSnapshot {
322        self.metrics.snapshot()
323    }
324
325    /// Reset all metrics counters and histograms to zero.
326    pub fn reset_metrics(&self) {
327        self.metrics.reset();
328    }
329
330    // ── Streaming Ingestion API ──────────────────────────────────────────────
331
332    /// 채널 기반 스트리밍 수집 파이프라인 생성
333    ///
334    /// INSERT / UPDATE / DELETE 이벤트를 [`StreamEvent`](crate::engine::StreamEvent)로
335    /// 전송하면 백그라운드에서 자동 배치 처리합니다.
336    ///
337    /// # 인수
338    ///
339    /// * `table` - 이벤트를 적용할 테이블 이름
340    /// * `batch_size` - 이 이벤트 수에 도달하면 즉시 flush
341    /// * `max_latency_ms` - 버퍼가 차지 않아도 이 밀리초마다 강제 flush
342    ///
343    /// # 예시
344    ///
345    /// ```rust,no_run
346    /// use dbx_core::{Database, engine::{StreamIngester, StreamEvent}};
347    /// use std::{sync::Arc, time::Duration};
348    ///
349    /// let db = Arc::new(Database::open_in_memory().unwrap());
350    /// let ingester = db.create_stream_ingester("orders", 1000, 100);
351    /// let tx = ingester.sender();
352    /// tx.send(vec![StreamEvent::Insert { key: "1".into(), value: b"data".to_vec() }]).unwrap();
353    /// ingester.flush().unwrap();
354    /// ```
355    pub fn create_stream_ingester(
356        self: &Arc<Self>,
357        table: &str,
358        batch_size: usize,
359        max_latency_ms: u64,
360    ) -> crate::engine::StreamIngester {
361        crate::engine::StreamIngester::new(
362            Arc::clone(self),
363            table,
364            batch_size,
365            std::time::Duration::from_millis(max_latency_ms),
366        )
367    }
368}