Skip to main content

dbx_core/engine/
crud.rs

1//! Database CRUD Operations — Create, Read, Update, Delete methods
2
3use crate::engine::Database;
4use crate::engine::types::{BackgroundJob, DurabilityLevel};
5use crate::error::{DbxError, DbxResult};
6use crate::storage::StorageBackend;
7
8// ════════════════════════════════════════════
9// ⚠️ MVCC Value Encoding Constants
10// ════════════════════════════════════════════
11// MVCC 버전 관리를 위한 매직 헤더.
12// 반드시 2바이트 [0x00, tag]를 사용하여 일반 사용자 데이터와 충돌을 방지한다.
13// 일반 UTF-8 텍스트나 바이너리 데이터는 0x00으로 시작하지 않으므로 안전하다.
14// 이 상수를 변경하면 crud.rs, snapshot.rs 양쪽 모두 동기화해야 한다.
15
16/// MVCC 값이 존재함을 나타내는 2바이트 매직 헤더: [0x00, 0x01]
17pub(crate) const MVCC_VALUE_PREFIX: [u8; 2] = [0x00, 0x01];
18/// MVCC 삭제(tombstone)를 나타내는 2바이트 매직 헤더: [0x00, 0x02]
19pub(crate) const MVCC_TOMBSTONE_PREFIX: [u8; 2] = [0x00, 0x02];
20/// MVCC 매직 헤더 길이
21pub(crate) const MVCC_PREFIX_LEN: usize = 2;
22
23impl Database {
24    // ════════════════════════════════════════════
25    // WAL Helper
26    // ════════════════════════════════════════════
27
28    /// Append a WAL record if durability is enabled and a WAL backend exists.
29    #[inline]
30    fn append_to_wal(&self, record: &crate::wal::WalRecord) -> DbxResult<()> {
31        if self.durability == DurabilityLevel::None {
32            return Ok(());
33        }
34        if let Some(wal) = &self.wal {
35            wal.append(record)?;
36            if self.durability == DurabilityLevel::Full {
37                if let Some(tx) = &self.job_sender {
38                    let _ = tx.send(BackgroundJob::WalSync);
39                } else {
40                    wal.sync()?;
41                }
42            }
43        } else if let Some(encrypted_wal) = &self.encrypted_wal {
44            encrypted_wal.append(record)?;
45            if self.durability == DurabilityLevel::Full {
46                if let Some(tx) = &self.job_sender {
47                    let _ = tx.send(BackgroundJob::EncryptedWalSync);
48                } else {
49                    encrypted_wal.sync()?;
50                }
51            }
52        }
53        Ok(())
54    }
55
56    // ════════════════════════════════════════════
57    // CRUD Operations
58    // ════════════════════════════════════════════
59
60    // ════════════════════════════════════════════
61    // CREATE Operations
62    // ════════════════════════════════════════════
63
64    /// 키-값 쌍을 삽입합니다.
65    ///
66    /// 데이터는 먼저 Delta Store (Tier 1)에 쓰여집니다.
67    /// Flush 임계값을 초과하면 자동으로 WOS로 이동합니다.
68    ///
69    /// # 인자
70    ///
71    /// * `table` - 테이블 이름
72    /// * `key` - 키 (바이트 배열)
73    /// * `value` - 값 (바이트 배열)
74    pub fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
75        // Log to WAL first — only allocate record if WAL exists
76        #[cfg(feature = "wal")]
77        if self.durability != DurabilityLevel::None
78            && (self.wal.is_some() || self.encrypted_wal.is_some())
79        {
80            self.append_to_wal(&crate::wal::WalRecord::Insert {
81                table: table.to_string(),
82                key: key.to_vec(),
83                value: value.to_vec(),
84                ts: 0,
85            })?;
86        }
87
88        // 데이터 삽입
89        self.delta.insert(table, key, value)?;
90
91        // O(1) row_id 계산 + 인덱스 업데이트 — only when index exists
92        #[cfg(feature = "index")]
93        if self.has_index(table, "key") {
94            let counter = self
95                .row_counters
96                .entry(table.to_string())
97                .or_insert_with(|| std::sync::atomic::AtomicUsize::new(0));
98            let row_id = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
99            if let Some(tx) = &self.job_sender {
100                let _ = tx.send(BackgroundJob::IndexUpdate {
101                    table: table.to_string(),
102                    column: "key".to_string(),
103                    key: key.to_vec(),
104                    row_id,
105                });
106            } else {
107                self.index.update_on_insert(table, "key", key, row_id)?;
108            }
109        }
110
111        // Auto-flush if threshold exceeded
112        if self.delta.should_flush() {
113            self.flush()?;
114        }
115
116        Ok(())
117    }
118
119    /// 여러 키-값 쌍을 일괄 삽입합니다 (최적화됨).
120    pub fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
121        #[cfg(feature = "wal")]
122        if self.durability != DurabilityLevel::None
123            && (self.wal.is_some() || self.encrypted_wal.is_some())
124        {
125            self.append_to_wal(&crate::wal::WalRecord::Batch {
126                table: table.to_string(),
127                rows: rows.clone(),
128                ts: 0,
129            })?;
130        }
131
132        self.delta.insert_batch(table, rows)?;
133
134        // Auto-flush if threshold exceeded
135        if self.delta.should_flush() {
136            self.flush()?;
137        }
138
139        Ok(())
140    }
141
142    /// Insert a versioned key-value pair for MVCC.
143    pub fn insert_versioned(
144        &self,
145        table: &str,
146        key: &[u8],
147        value: Option<&[u8]>,
148        commit_ts: u64,
149    ) -> DbxResult<()> {
150        let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), commit_ts);
151        let encoded_key = vk.encode();
152
153        // Encode value with prefix
154        // ⚠️ MVCC 매직 헤더 인코딩 — MVCC_VALUE_PREFIX / MVCC_TOMBSTONE_PREFIX 사용
155        let encoded_value = match value {
156            Some(v) => {
157                let mut bytes = Vec::with_capacity(v.len() + MVCC_PREFIX_LEN);
158                bytes.extend_from_slice(&MVCC_VALUE_PREFIX);
159                bytes.extend_from_slice(v);
160                bytes
161            }
162            None => MVCC_TOMBSTONE_PREFIX.to_vec(),
163        };
164
165        // Write to Delta Store
166        self.delta.insert(table, &encoded_key, &encoded_value)?;
167
168        Ok(())
169    }
170
171    // ════════════════════════════════════════════
172    // READ Operations
173    // ════════════════════════════════════════════
174
175    /// Read a specific version of a key (Snapshot Read).
176    pub fn get_snapshot(
177        &self,
178        table: &str,
179        key: &[u8],
180        read_ts: u64,
181    ) -> DbxResult<Option<Option<Vec<u8>>>> {
182        let start_vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), read_ts);
183        let start_bytes = start_vk.encode();
184
185        // Helper: returns Some(Some(v)), Some(None) (tombstone), or None (mismatch)
186        let check_entry = |entry_key: &[u8], entry_val: &[u8]| -> Option<Option<Vec<u8>>> {
187            let decoded =
188                crate::transaction::mvcc::version::VersionedKey::decode(entry_key).ok()?;
189            if decoded.user_key != key {
190                return None;
191            }
192            if decoded.commit_ts > read_ts {
193                return None;
194            }
195            if entry_val.is_empty() {
196                return Some(Some(entry_val.to_vec())); // Legacy empty value
197            }
198            // ⚠️ MVCC 매직 헤더 디코딩 — 2바이트 [0x00, tag] 확인
199            if entry_val.len() >= MVCC_PREFIX_LEN && entry_val[0] == 0x00 {
200                match entry_val[1] {
201                    0x01 => return Some(Some(entry_val[MVCC_PREFIX_LEN..].to_vec())),
202                    0x02 => return Some(None), // Tombstone
203                    _ => {}
204                }
205            }
206            // Legacy non-prefixed value
207            Some(Some(entry_val.to_vec()))
208        };
209
210        // 1. Check Delta Store
211        if let Some((k, v)) = self.delta.scan_one(table, start_bytes.clone()..)?
212            && let Some(result) = check_entry(&k, &v)
213        {
214            return Ok(Some(result));
215        }
216
217        // 2. Check WOS
218        if let Some((k, v)) = self.wos.scan_one(table, start_bytes..)?
219            && let Some(result) = check_entry(&k, &v)
220        {
221            return Ok(Some(result));
222        }
223
224        Ok(None)
225    }
226
227    /// Helper method for Snapshot: scan all versioned entries from Delta Store.
228    pub(crate) fn scan_delta_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
229        StorageBackend::scan(&self.delta, table, ..)
230    }
231
232    /// Helper method for Snapshot: scan all versioned entries from WOS.
233    pub(crate) fn scan_wos_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
234        self.wos.scan(table, ..)
235    }
236
237    /// Get the current timestamp from the transaction manager.
238    pub fn current_timestamp(&self) -> u64 {
239        self.tx_manager.current_ts()
240    }
241
242    /// Allocate a new commit timestamp for a transaction.
243    /// This increments the timestamp oracle and returns a unique timestamp.
244    pub fn allocate_commit_ts(&self) -> u64 {
245        self.tx_manager.allocate_commit_ts()
246    }
247
248    /// 키로 값을 조회합니다.
249    ///
250    /// 성능 최적화: MVCC feature가 비활성화되면 Fast-path만 사용하여
251    /// 최대 성능을 달성합니다.
252    #[inline(always)]
253    pub fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
254        // Fast-path: Delta → WOS 직접 조회 (MVCC 오버헤드 없음)
255        // MVCC feature가 활성화되어도 Fast-path를 우선 사용
256        // 일반 insert()로 저장된 데이터는 여기서 조회됨
257        if let Some(value) = self.delta.get(table, key)? {
258            return Ok(Some(value));
259        }
260        if let Some(value) = self.wos.get(table, key)? {
261            return Ok(Some(value));
262        }
263
264        // ════════════════════════════════════════════
265        // MVCC Fallback: Transaction Commit 후 데이터 조회
266        // ════════════════════════════════════════════
267        // Transaction::commit()은 insert_versioned()와 insert()를 모두 호출하므로
268        // 일반적으로 위의 Fast-path에서 데이터를 찾을 수 있습니다.
269        //
270        // 하지만 다음 경우에 이 Fallback이 필요합니다:
271        // 1. insert_versioned()만 호출된 경우 (일반 key 없음)
272        // 2. 향후 MVCC 전용 모드 지원 시
273        // 3. Snapshot isolation 구현 시
274        //
275        // 현재는 최신 타임스탬프로 조회하지만, 향후 snapshot_ts를 인자로 받아
276        // 특정 시점의 데이터를 조회할 수 있도록 확장 가능합니다.
277        let current_ts = self.tx_manager.allocate_commit_ts();
278        let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), current_ts);
279        let encoded_key = vk.encode();
280
281        // Delta에서 versioned key 조회
282        if let Some(value) = self.delta.get(table, &encoded_key)? {
283            return Ok(Self::decode_mvcc_value(value));
284        }
285
286        // WOS에서 versioned key 조회
287        if let Some(value) = self.wos.get(table, &encoded_key)? {
288            return Ok(Self::decode_mvcc_value(value));
289        }
290
291        Ok(None)
292    }
293
294    /// MVCC 값 디코딩 (Tombstone 필터링)
295    #[inline(always)]
296    fn decode_mvcc_value(v: Vec<u8>) -> Option<Vec<u8>> {
297        if v.len() < MVCC_PREFIX_LEN || v[0] != 0x00 {
298            return Some(v); // Legacy value
299        }
300
301        match v[1] {
302            0x01 => Some(v[MVCC_PREFIX_LEN..].to_vec()), // Value
303            0x02 => None,                                // Tombstone
304            _ => Some(v),                                // Unknown tag
305        }
306    }
307
308    /// VersionedKey 디코딩
309    #[inline(always)]
310    fn decode_versioned_key(k: Vec<u8>) -> Vec<u8> {
311        if k.len() <= 8 {
312            return k;
313        }
314
315        crate::transaction::mvcc::version::VersionedKey::decode(&k)
316            .map(|vk| vk.user_key)
317            .unwrap_or(k)
318    }
319
320    /// 테이블의 모든 키-값 쌍을 스캔합니다.
321    pub fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
322        // Fast-path: Delta가 비어있으면 WOS 직접 스캔 (merge 오버헤드 제거)
323        let delta_entries = self.delta.scan(table, ..)?;
324        if delta_entries.is_empty() {
325            return self.wos.scan(table, ..);
326        }
327
328        // 1. Collect from Delta Store and WOS
329        let wos_entries = self.wos.scan(table, ..)?;
330
331        // 2. Direct 2-way merge (both are already sorted)
332        let mut result = Vec::with_capacity(delta_entries.len() + wos_entries.len());
333
334        let mut i = 0;
335        let mut j = 0;
336
337        while i < delta_entries.len() && j < wos_entries.len() {
338            match delta_entries[i].0.cmp(&wos_entries[j].0) {
339                std::cmp::Ordering::Less => {
340                    // Delta key is smaller
341                    if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
342                        let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
343                        result.push((user_key, decoded_v));
344                    }
345                    i += 1;
346                }
347                std::cmp::Ordering::Equal => {
348                    // Same key - Delta takes priority
349                    if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
350                        let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
351                        result.push((user_key, decoded_v));
352                    }
353                    i += 1;
354                    j += 1; // Skip WOS entry
355                }
356                std::cmp::Ordering::Greater => {
357                    // WOS key is smaller
358                    if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
359                        let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
360                        result.push((user_key, decoded_v));
361                    }
362                    j += 1;
363                }
364            }
365        }
366
367        // 3. Process remaining Delta entries
368        while i < delta_entries.len() {
369            if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
370                let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
371                result.push((user_key, decoded_v));
372            }
373            i += 1;
374        }
375
376        // 4. Process remaining WOS entries
377        while j < wos_entries.len() {
378            if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
379                let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
380                result.push((user_key, decoded_v));
381            }
382            j += 1;
383        }
384
385        Ok(result)
386    }
387
388    /// 테이블의 키 범위를 스캔합니다.
389    pub fn range(
390        &self,
391        table: &str,
392        start_key: &[u8],
393        end_key: &[u8],
394    ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
395        let range = start_key.to_vec()..end_key.to_vec();
396
397        // Scan both Delta Store and WOS with range bounds
398        let mut merged = std::collections::BTreeMap::new();
399        for (k, v) in self.delta.scan(table, range.clone())? {
400            merged.insert(k, v);
401        }
402        for (k, v) in self.wos.scan(table, range)? {
403            merged.entry(k).or_insert(v);
404        }
405
406        Ok(merged.into_iter().collect())
407    }
408
409    /// 테이블의 행 개수를 반환합니다.
410    pub fn table_row_count(&self, table: &str) -> DbxResult<usize> {
411        self.count(table)
412    }
413
414    // ════════════════════════════════════════════
415    // DELETE Operations
416    // ════════════════════════════════════════════
417
418    /// 키를 삭제합니다.
419    pub fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
420        #[cfg(feature = "index")]
421        if self.has_index(table, "key") {
422            let row_ids = self.index.lookup(table, "key", key)?;
423            for row_id in row_ids {
424                self.index.update_on_delete(table, "key", key, row_id)?;
425            }
426        }
427
428        // 1. Delete from legacy
429        let delta_deleted = self.delta.delete(table, key)?;
430        let wos_deleted = self.wos.delete(table, key)?;
431
432        // 2. Add versioned tombstone if it was a versioned key
433        #[cfg(feature = "mvcc")]
434        {
435            let commit_ts = self.tx_manager.allocate_commit_ts();
436            self.insert_versioned(table, key, None, commit_ts)?;
437        }
438
439        Ok(delta_deleted || wos_deleted)
440    }
441
442    // ════════════════════════════════════════════
443    // Helper Methods
444    // ════════════════════════════════════════════
445
446    /// Synchronize the Columnar Cache with the latest data from Delta Store.
447    ///
448    /// If the table has a schema in table_schemas, it will be synced as typed data.
449    /// Otherwise, it will be synced as raw Binary data.
450    pub fn sync_columnar_cache(&self, table: &str) -> DbxResult<usize> {
451        // Check if table has a schema (SQL table) - case-insensitive
452        let schemas = self.table_schemas.read().unwrap();
453        let table_schema = schemas
454            .get(table)
455            .or_else(|| {
456                let table_lower = table.to_lowercase();
457                schemas
458                    .iter()
459                    .find(|(k, _)| k.to_lowercase() == table_lower)
460                    .map(|(_, v)| v)
461            })
462            .cloned();
463        drop(schemas);
464
465        // Scan from both Tier 1 (Delta) and Tier 3 (WOS)
466        let table_lower = table.to_lowercase();
467        let mut rows = self.delta.scan(&table_lower, ..)?;
468        let mut wos_rows = self.wos.scan(&table_lower, ..)?;
469        rows.append(&mut wos_rows);
470
471        self.columnar_cache
472            .sync_from_storage(table, rows, table_schema)
473    }
474
475    /// Sync data from multiple tiers (Delta and ROS) to GPU for merge operations.
476    pub fn sync_gpu_cache_multi_tier(&self, table: &str) -> DbxResult<()> {
477        let gpu = self
478            .gpu_manager
479            .as_ref()
480            .ok_or_else(|| DbxError::NotImplemented("GPU manager not available".to_string()))?;
481
482        // 1. Sync Delta data (Tier 1)
483        let delta_batches = self.columnar_cache.get_batches(table, None)?;
484        if let Some(batches) = delta_batches {
485            for batch in batches {
486                gpu.upload_batch_pinned(&format!("{}_delta", table), &batch)?;
487            }
488        }
489
490        // 2. Sync ROS data (Tier 5) - simplified: assuming ROS is already in SQL tables for now
491        let tables = self.tables.read().unwrap();
492        if let Some(batches) = tables.get(table) {
493            for batch in batches {
494                gpu.upload_batch_pinned(&format!("{}_ros", table), batch)?;
495            }
496        }
497
498        Ok(())
499    }
500
501    /// Legacy method to sync data from Columnar Cache to GPU.
502    pub fn sync_gpu_cache(&self, table: &str) -> DbxResult<()> {
503        self.sync_gpu_cache_multi_tier(table)
504    }
505
506    /// Execute an operation on GPU with automatic fallback to CPU on any error.
507    pub fn gpu_exec_with_fallback<T, F, C>(&self, gpu_op: F, cpu_op: C) -> DbxResult<T>
508    where
509        F: FnOnce(&crate::storage::gpu::GpuManager) -> DbxResult<T>,
510        C: FnOnce() -> DbxResult<T>,
511    {
512        if let Some(gpu) = &self.gpu_manager {
513            match gpu_op(gpu) {
514                Ok(val) => Ok(val),
515                Err(e) => {
516                    tracing::warn!("GPU execution failed, falling back to CPU: {:?}", e);
517                    cpu_op()
518                }
519            }
520        } else {
521            cpu_op()
522        }
523    }
524}
525
526// ════════════════════════════════════════════
527// DatabaseCore Trait Implementation
528// ════════════════════════════════════════════
529
530impl crate::traits::DatabaseCore for Database {
531    fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
532        // Reuse existing implementation
533        Database::insert(self, table, key, value)
534    }
535
536    fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
537        // Reuse existing implementation
538        Database::get(self, table, key)
539    }
540
541    fn delete(&self, table: &str, key: &[u8]) -> DbxResult<()> {
542        // Reuse existing implementation
543        Database::delete(self, table, key).map(|_| ())
544    }
545
546    fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
547        // Reuse existing implementation
548        Database::scan(self, table)
549    }
550
551    fn flush(&self) -> DbxResult<()> {
552        // Reuse existing implementation
553        Database::flush(self)
554    }
555
556    fn insert_batch(&self, table: &str, entries: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
557        // Reuse existing implementation
558        Database::insert_batch(self, table, entries)
559    }
560}