Skip to main content

dbx_core/engine/
database.rs

1//! Database struct definition — the core data structure
2
3use crate::engine::types::{BackgroundJob, TablePersistence};
4use crate::engine::{DeltaVariant, DurabilityLevel, WosVariant};
5use crate::monitoring::DbxMetrics;
6use crate::sql::optimizer::QueryOptimizer;
7use crate::sql::parser::SqlParser;
8use crate::sql::view::{SharedMaterializedViewRegistry, ViewRegistry};
9use crate::storage::encryption::EncryptionConfig;
10use crate::transaction::mvcc::manager::TransactionManager;
11use arrow::array::RecordBatch;
12use arrow::datatypes::Schema;
13use dashmap::DashMap;
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17// ════════════════════════════════════════════
18// CAS (Compare-And-Swap) Row Latch Lock Manager
19// ════════════════════════════════════════════
20
21/// A Lock Striping mechanism to provide fine-grained, row-level locks
22/// for concurrent CAS operations without memory bloat.
23pub struct RowLockManager {
24    stripes: Vec<std::sync::Mutex<()>>,
25    mask: usize,
26}
27
28impl RowLockManager {
29    /// Creates a new RowLockManager with `1 << power_of_two` stripes.
30    /// E.g., `power_of_two = 10` gives 1024 locks.
31    pub fn new(power_of_two: usize) -> Self {
32        let size = 1 << power_of_two;
33        let mut stripes = Vec::with_capacity(size);
34        for _ in 0..size {
35            stripes.push(std::sync::Mutex::new(()));
36        }
37        Self {
38            stripes,
39            mask: size - 1,
40        }
41    }
42
43    /// Acquires a lock for a specific (table, key) combination.
44    pub fn lock<'a>(&'a self, table: &str, key: &[u8]) -> std::sync::MutexGuard<'a, ()> {
45        let mut hasher = ahash::AHasher::default();
46        std::hash::Hash::hash(&table, &mut hasher);
47        std::hash::Hash::hash(&key, &mut hasher);
48        let hash = std::hash::Hasher::finish(&hasher) as usize;
49        self.stripes[hash & self.mask].lock().unwrap()
50    }
51}
52
53/// DBX 데이터베이스 엔진
54///
55/// 5-Tier Hybrid Storage 아키텍처를 관리하는 메인 API입니다.
56///
57/// # 데이터 흐름
58///
59/// - **INSERT**: Delta Store (Tier 1)에 먼저 쓰기
60/// - **GET**: Delta → WOS 순서로 조회 (첫 번째 hit에서 short-circuit)
61/// - **DELETE**: 모든 계층에서 삭제
62/// - **flush()**: Delta Store 데이터를 WOS로 이동
63///
64/// # 예제
65///
66/// ```rust
67/// use dbx_core::Database;
68///
69/// # fn main() -> dbx_core::DbxResult<()> {
70/// let db = Database::open_in_memory()?;
71/// db.insert("users", b"user:1", b"Alice")?;
72/// let value = db.get("users", b"user:1")?;
73/// assert_eq!(value, Some(b"Alice".to_vec()));
74/// # Ok(())
75/// # }
76/// ```
77pub struct Database {
78    /// Tier 1: Delta Store (in-memory write buffer) — row-based or columnar
79    pub(crate) delta: DeltaVariant,
80
81    /// Tier 3: 메모리 WOS (테이블별 Memory 지정 시 사용)
82    pub(crate) memory_wos: WosVariant,
83
84    /// Tier 3: 파일 WOS (path 기반 DB에서만 Some, 테이블별 File/미지정 시 사용)
85    pub(crate) file_wos: Option<WosVariant>,
86
87    /// 통합 경로 관리자
88    pub(crate) storage_manager: Arc<crate::storage::manager::StoragePathManager>,
89
90    /// 테이블별 저장소 지정 (Memory | File). file_wos가 Some일 때만 사용.
91    pub(crate) table_persistence: DashMap<String, TablePersistence>,
92
93    /// Schema registry: table_name → Arrow Schema
94    #[allow(dead_code)]
95    pub(crate) schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
96
97    /// SQL table registry: table_name → `Vec<RecordBatch>`
98    pub(crate) tables: RwLock<HashMap<String, Vec<RecordBatch>>>,
99
100    /// SQL table schemas: table_name → Schema
101    pub(crate) table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
102
103    /// Hash Index: fast O(1) key lookups
104    pub(crate) index: Arc<crate::index::HashIndex>,
105
106    /// Row ID counters: table_name → next_row_id
107    pub(crate) row_counters: Arc<DashMap<String, std::sync::atomic::AtomicUsize>>,
108
109    /// SQL parser (cached)
110    pub(crate) sql_parser: SqlParser,
111    /// SQL optimizer (cached)
112    pub(crate) sql_optimizer: QueryOptimizer,
113
114    /// Write-Ahead Log for crash recovery (optional, plain or encrypted)
115    pub(crate) wal: Option<Arc<crate::wal::WriteAheadLog>>,
116
117    /// Encrypted WAL (optional, used when encryption is enabled)
118    pub(crate) encrypted_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
119
120    /// Encryption config (None = no encryption)
121    pub(crate) encryption: RwLock<Option<EncryptionConfig>>,
122
123    /// MVCC Transaction Manager
124    pub(crate) tx_manager: Arc<TransactionManager>,
125
126    /// Columnar Cache for OLAP queries (Tier 2)
127    pub(crate) columnar_cache: Arc<crate::storage::columnar_cache::ColumnarCache>,
128
129    /// GPU Manager for optional acceleration (Phase 6.4)
130    pub(crate) gpu_manager: Option<Arc<crate::storage::gpu::GpuManager>>,
131
132    /// Background job sender
133    pub(crate) job_sender: Option<std::sync::mpsc::Sender<BackgroundJob>>,
134
135    /// WAL 내구성 정책
136    pub durability: DurabilityLevel,
137
138    /// Index registry: index_name → (table, column) mapping for DROP INDEX
139    pub(crate) index_registry: RwLock<HashMap<String, (String, String)>>,
140
141    /// Automation & Extensibility Engine (UDF, Triggers, Scheduler)
142    pub(crate) automation_engine: Arc<crate::automation::ExecutionEngine>,
143
144    /// Trigger Registry (이벤트 매칭용)
145    pub(crate) trigger_registry: crate::engine::automation_api::TriggerRegistry,
146
147    /// SQL Trigger Executor
148    pub(crate) trigger_executor: Arc<RwLock<crate::automation::TriggerExecutor>>,
149
150    /// Stored Procedure Executor
151    pub(crate) procedure_executor: Arc<RwLock<crate::automation::ProcedureExecutor>>,
152
153    /// SQL Schedule Executor
154    pub(crate) schedule_executor: Arc<RwLock<crate::automation::ScheduleExecutor>>,
155
156    /// Parallel Execution Engine for multi-threaded query execution
157    #[allow(dead_code)]
158    pub(crate) parallel_engine: Arc<crate::engine::parallel_engine::ParallelExecutionEngine>,
159
160    /// SQL View Registry — CREATE/DROP VIEW 지원
161    pub(crate) view_registry: ViewRegistry,
162
163    /// Materialized View Registry — CREATE/REFRESH/DROP MATERIALIZED VIEW 지원
164    pub mat_view_registry: SharedMaterializedViewRegistry,
165
166    /// 파티션 매핑 정보 (테이블명 -> PartitionMap)
167    pub(crate) partition_maps:
168        Arc<RwLock<std::collections::HashMap<String, crate::storage::partition::PartitionMap>>>,
169
170    /// 파티션별 통계 정보 ("table__partname" → PartitionStats)
171    pub(crate) partition_stats: Arc<DashMap<String, crate::storage::partition::PartitionStats>>,
172
173    /// 파티션별 압축 설정 ("table__partname" → CompressionConfig)
174    pub(crate) partition_compression:
175        Arc<DashMap<String, crate::storage::compression::CompressionConfig>>,
176
177    /// 파티션 수명 주기 정책 (테이블명 → PartitionLifecycle)
178    pub(crate) partition_lifecycle:
179        Arc<DashMap<String, crate::storage::partition::PartitionLifecycle>>,
180
181    /// 파티션 티어 힌트 ("table__partname" → PartitionTierHint)
182    pub(crate) partition_tier_hints:
183        Arc<DashMap<String, crate::storage::partition::PartitionTierHint>>,
184
185    /// 파티션 서브테이블 첫 쓰기 시각 ("table__partname" → UNIX timestamp secs)
186    /// INSERT 경로에서 자동 기록 → partition_needs_archive/delete의 기준으로 활용
187    pub(crate) partition_creation_times: Arc<DashMap<String, u64>>,
188
189    /// Lifecycle 자동 스케줄러 중단 플래그
190    /// `true`로 설정되면 백그라운드 스레드가 자동 종료 (Database Drop 시 자동 처리)
191    pub(crate) lifecycle_stop_flag: Arc<std::sync::atomic::AtomicBool>,
192
193    /// Lifecycle 스케줄러 기동 여부 (중복 spawn 방지)
194    pub(crate) lifecycle_running: Arc<std::sync::atomic::AtomicBool>,
195
196    /// Database Configuration (Parallelism, HTAP Sync)
197    pub(crate) config: crate::engine::parallel_engine::DbConfig,
198
199    /// 적응형 워크로드 분석기 (HTAP/OLAP 비율 분석)
200    pub(crate) workload_analyzer: Arc<RwLock<crate::engine::workload_analyzer::WorkloadAnalyzer>>,
201
202    /// Replication Master (MVP)
203    pub(crate) replication_master: Option<Arc<crate::replication::master::ReplicationMaster>>,
204
205    /// Sharding 라우터
206    pub(crate) sharding_router: Arc<crate::sharding::router::ShardRouter>,
207
208    /// Sharding 데이터 분산/수집 처리기
209    pub(crate) scatter_gather: Arc<crate::sharding::scatter_gather::ScatterGather>,
210
211    /// Metrics & Observability (Phase 0.4)
212    pub(crate) metrics: Arc<DbxMetrics>,
213
214    /// Atomic CAS operations를 위한 Row-level Latch 매니저 (Phase 1.1)
215    pub(crate) cas_locks: Arc<RowLockManager>,
216
217    /// 5-Tier Storage Metadata Registry (Phase 6)
218    pub(crate) metadata_registry: Arc<crate::storage::metadata::MetadataRegistry>,
219}
220
221impl Database {
222    /// 테이블에 사용할 WOS. file_wos가 None이면(인메모리 전용 DB) 항상 memory_wos.
223    pub(crate) fn wos_for_table(&self, table: &str) -> &WosVariant {
224        match &self.file_wos {
225            None => &self.memory_wos,
226            Some(file) => {
227                let is_memory =
228                    self.table_persistence.get(table).map(|r| *r) == Some(TablePersistence::Memory);
229                if is_memory { &self.memory_wos } else { file }
230            }
231        }
232    }
233
234    /// 스키마/인덱스 메타데이터 저장용 WOS. 파일이 있으면 파일, 없으면 메모리.
235    pub(crate) fn wos_for_metadata(&self) -> &WosVariant {
236        self.file_wos.as_ref().unwrap_or(&self.memory_wos)
237    }
238
239    /// 테이블별 저장소를 지정합니다 (파일 DB에서만 유효).
240    ///
241    /// `TablePersistence::File`은 파일에 저장(재시작 후 유지),
242    /// `TablePersistence::Memory`는 메모리만 사용(프로세스 종료 시 사라짐).
243    /// 인메모리 전용 DB(`open_in_memory`)에서는 `File` 지정 시 에러를 반환합니다.
244    pub fn set_table_persistence(
245        &self,
246        table: &str,
247        persistence: TablePersistence,
248    ) -> crate::error::DbxResult<()> {
249        if persistence == TablePersistence::File && self.file_wos.is_none() {
250            return Err(crate::error::DbxError::InvalidOperation {
251                message: "TablePersistence::File requires a file-backed database (open with path)"
252                    .to_string(),
253                context: "Use Database::open(path) to enable per-table file persistence"
254                    .to_string(),
255            });
256        }
257        self.table_persistence
258            .insert(table.to_string(), persistence);
259        Ok(())
260    }
261
262    /// 테이블에 지정된 저장소 종류를 반환합니다 (미지정 시 None).
263    pub fn table_persistence(&self, table: &str) -> Option<TablePersistence> {
264        self.table_persistence.get(table).map(|r| *r)
265    }
266
267    /// Replication Master를 활성화하고, WAL 변경사항을 구독할 수 있는 Receiver를 반환합니다.
268    ///
269    /// `capacity`는 브로드캐스트 채널의 버퍼 크기입니다.
270    /// 이미 활성화되어 있다면 새로운 Receiver만 생성하여 반환합니다.
271    pub fn enable_replication(
272        &mut self,
273        capacity: usize,
274    ) -> tokio::sync::broadcast::Receiver<crate::replication::protocol::ReplicationMessage> {
275        use crate::replication::master::ReplicationMaster;
276
277        if let Some(master) = &self.replication_master {
278            master.subscribe()
279        } else {
280            let (master, rx) = ReplicationMaster::new(capacity);
281            self.replication_master = Some(Arc::new(master));
282            rx
283        }
284    }
285
286    // ── Monitoring / Observability API (Phase 0.4) ────────────────────
287
288    /// Export all DBX metrics in Prometheus exposition text format.
289    ///
290    /// The returned string can be served at a `/metrics` HTTP endpoint
291    /// and consumed by Prometheus or any compatible scraper.
292    ///
293    /// # Example
294    ///
295    /// ```rust
296    /// use dbx_core::Database;
297    ///
298    /// # fn main() -> dbx_core::DbxResult<()> {
299    /// let db = Database::open_in_memory()?;
300    /// db.insert("users", b"k1", b"v1")?;
301    /// let text = db.export_metrics();
302    /// assert!(text.contains("dbx_inserts_total 1"));
303    /// # Ok(())
304    /// # }
305    /// ```
306    pub fn export_metrics(&self) -> String {
307        crate::monitoring::export_prometheus(&self.metrics)
308    }
309
310    /// Return a non-atomic snapshot of current metrics.
311    ///
312    /// Useful for programmatic inspection of counters and hit rates.
313    ///
314    /// # Example
315    ///
316    /// ```rust
317    /// use dbx_core::Database;
318    ///
319    /// # fn main() -> dbx_core::DbxResult<()> {
320    /// let db = Database::open_in_memory()?;
321    /// db.insert("users", b"k1", b"v1")?;
322    /// let snap = db.metrics_snapshot();
323    /// assert_eq!(snap.inserts_total, 1);
324    /// # Ok(())
325    /// # }
326    /// ```
327    pub fn metrics_snapshot(&self) -> crate::monitoring::MetricsSnapshot {
328        self.metrics.snapshot()
329    }
330
331    /// Reset all metrics counters and histograms to zero.
332    pub fn reset_metrics(&self) {
333        self.metrics.reset();
334    }
335
336    // ── Streaming Ingestion API ──────────────────────────────────────────────
337
338    /// 채널 기반 스트리밍 수집 파이프라인 생성
339    ///
340    /// INSERT / UPDATE / DELETE 이벤트를 [`StreamEvent`](crate::engine::StreamEvent)로
341    /// 전송하면 백그라운드에서 자동 배치 처리합니다.
342    ///
343    /// # 인수
344    ///
345    /// * `table` - 이벤트를 적용할 테이블 이름
346    /// * `batch_size` - 이 이벤트 수에 도달하면 즉시 flush
347    /// * `max_latency_ms` - 버퍼가 차지 않아도 이 밀리초마다 강제 flush
348    ///
349    /// # 예시
350    ///
351    /// ```rust,no_run
352    /// use dbx_core::{Database, engine::{StreamIngester, StreamEvent}};
353    /// use std::{sync::Arc, time::Duration};
354    ///
355    /// let db = Arc::new(Database::open_in_memory().unwrap());
356    /// let ingester = db.create_stream_ingester("orders", 1000, 100);
357    /// let tx = ingester.sender();
358    /// tx.send(vec![StreamEvent::Insert { key: "1".into(), value: b"data".to_vec() }]).unwrap();
359    /// ingester.flush().unwrap();
360    /// ```
361    pub fn create_stream_ingester(
362        self: &Arc<Self>,
363        table: &str,
364        batch_size: usize,
365        max_latency_ms: u64,
366    ) -> crate::engine::StreamIngester {
367        crate::engine::StreamIngester::new(
368            Arc::clone(self),
369            table,
370            batch_size,
371            std::time::Duration::from_millis(max_latency_ms),
372        )
373    }
374}