Skip to main content

dbx_core/engine/
crud.rs

1//! Database CRUD Operations — Create, Read, Update, Delete methods
2
3use crate::engine::Database;
4use crate::engine::types::{BackgroundJob, DurabilityLevel};
5use crate::error::{DbxError, DbxResult};
6use crate::storage::StorageBackend;
7
8// ════════════════════════════════════════════
9// ⚠️ MVCC Value Encoding Constants
10// ════════════════════════════════════════════
11// MVCC 버전 관리를 위한 매직 헤더.
12// 반드시 2바이트 [0x00, tag]를 사용하여 일반 사용자 데이터와 충돌을 방지한다.
13// 일반 UTF-8 텍스트나 바이너리 데이터는 0x00으로 시작하지 않으므로 안전하다.
14// 이 상수를 변경하면 crud.rs, snapshot.rs 양쪽 모두 동기화해야 한다.
15
16/// MVCC 값이 존재함을 나타내는 2바이트 매직 헤더: [0x00, 0x01]
17pub(crate) const MVCC_VALUE_PREFIX: [u8; 2] = [0x00, 0x01];
18/// MVCC 삭제(tombstone)를 나타내는 2바이트 매직 헤더: [0x00, 0x02]
19pub(crate) const MVCC_TOMBSTONE_PREFIX: [u8; 2] = [0x00, 0x02];
20/// MVCC 매직 헤더 길이
21pub(crate) const MVCC_PREFIX_LEN: usize = 2;
22
23impl Database {
24    // ════════════════════════════════════════════
25    // WAL Helper
26    // ════════════════════════════════════════════
27
28    /// Append a WAL record if durability is enabled and a WAL backend exists.
29    #[inline]
30    fn append_to_wal(&self, record: &crate::wal::WalRecord) -> DbxResult<()> {
31        if self.durability == DurabilityLevel::None {
32            return Ok(());
33        }
34        if let Some(wal) = &self.wal {
35            wal.append(record)?;
36            if self.durability == DurabilityLevel::Full {
37                if let Some(tx) = &self.job_sender {
38                    let _ = tx.send(BackgroundJob::WalSync);
39                } else {
40                    wal.sync()?;
41                }
42            }
43        } else if let Some(encrypted_wal) = &self.encrypted_wal {
44            encrypted_wal.append(record)?;
45            if self.durability == DurabilityLevel::Full {
46                if let Some(tx) = &self.job_sender {
47                    let _ = tx.send(BackgroundJob::EncryptedWalSync);
48                } else {
49                    encrypted_wal.sync()?;
50                }
51            }
52        }
53
54        // ════════════════════════════════════════════
55        // Phase 5: Replication (Master 브로드캐스트)
56        // ════════════════════════════════════════════
57        if let Some(master) = &self.replication_master {
58            // bincode로 직렬화하여 전송 (임시)
59            if let Ok(data) = bincode::serialize(record) {
60                master.replicate(data);
61            }
62        }
63
64        Ok(())
65    }
66
67    // ════════════════════════════════════════════
68    // CRUD Operations
69    // ════════════════════════════════════════════
70
71    // ════════════════════════════════════════════
72    // CREATE Operations
73    // ════════════════════════════════════════════
74
75    // ════════════════════════════════════════════
76    // Phase 3.4: 자동 파티션 지원 라우터
77    // ════════════════════════════════════════════
78    fn route_partition_or_expand(&self, table: &str, key_str: &str) -> String {
79        let route_res = {
80            let maps = self.partition_maps.read().unwrap();
81            if let Some(map) = maps.get(table) {
82                use crate::storage::partition::PartitionValue;
83                let pv = PartitionValue::Text(key_str.to_string());
84                Some(map.route_or_expand(&pv))
85            } else {
86                None
87            }
88        };
89
90        if let Some(res) = route_res {
91            use crate::storage::partition::RouteResult;
92            match res {
93                RouteResult::Routed(sub_tbl) => sub_tbl,
94                RouteResult::NeedsExpansion {
95                    new_table,
96                    new_bounds,
97                } => {
98                    // 쓰기 락을 획득하여 Range 파티션 bounds 갱신
99                    let mut maps = self.partition_maps.write().unwrap();
100                    if let Some(map) = maps.get_mut(table) {
101                        use crate::storage::partition::PartitionType;
102                        if let PartitionType::Range { bounds, .. } = &mut map.partition_type {
103                            let last_hi = bounds.last().map(|(_, hi)| *hi).unwrap_or(0);
104                            let v = key_str.parse::<i64>().unwrap_or(0);
105                            // 다른 스레드에서 먼저 갱신했을 수 있으므로 재검사
106                            if v >= last_hi {
107                                bounds.push(new_bounds);
108                                map.num_partitions += 1;
109                            }
110
111                            // 갱신 후 다시 라우팅 시도
112                            use crate::storage::partition::PartitionValue;
113                            let pv = PartitionValue::Text(key_str.to_string());
114                            let final_res = map.route_or_expand(&pv);
115                            if let RouteResult::Routed(final_tbl) = final_res {
116                                final_tbl
117                            } else {
118                                new_table
119                            }
120                        } else {
121                            new_table
122                        }
123                    } else {
124                        table.to_string()
125                    }
126                }
127            }
128        } else {
129            table.to_string()
130        }
131    }
132
133    /// 키-값 쌍을 삽입합니다.
134    ///
135    /// 데이터는 먼저 Delta Store (Tier 1)에 쓰여집니다.
136    /// Flush 임계값을 초과하면 자동으로 WOS로 이동합니다.
137    ///
138    /// # 인자
139    ///
140    /// * `table` - 테이블 이름
141    /// * `key` - 키 (바이트 배열)
142    /// * `value` - 값 (바이트 배열)
143    pub fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
144        // ════════════════════════════════════════════
145        // Phase 3: 파티셔닝 (Partition Routing with Auto-Expand)
146        // ════════════════════════════════════════════
147        let original_table = table; // 파티션 자동 통계 비교용 원본 이름 보존
148        let key_str = String::from_utf8_lossy(key).into_owned();
149        let target_table = self.route_partition_or_expand(table, &key_str);
150        let table = target_table.as_str();
151        // Log to WAL first — only allocate record if WAL exists
152        #[cfg(feature = "wal")]
153        if self.durability != DurabilityLevel::None
154            && (self.wal.is_some() || self.encrypted_wal.is_some())
155        {
156            self.append_to_wal(&crate::wal::WalRecord::Insert {
157                table: table.to_string(),
158                key: key.to_vec(),
159                value: value.to_vec(),
160                ts: 0,
161            })?;
162        }
163
164        // 데이터 삽입
165        self.delta.insert(table, key, value)?;
166
167        // O(1) row_id 계산 + 인덱스 업데이트 — only when index exists
168        #[cfg(feature = "index")]
169        if self.has_index(table, "key") {
170            let counter = self
171                .row_counters
172                .entry(table.to_string())
173                .or_insert_with(|| std::sync::atomic::AtomicUsize::new(0));
174            let row_id = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
175            if let Some(tx) = &self.job_sender {
176                let _ = tx.send(BackgroundJob::IndexUpdate {
177                    table: table.to_string(),
178                    column: "key".to_string(),
179                    key: key.to_vec(),
180                    row_id,
181                });
182            } else {
183                self.index.update_on_insert(table, "key", key, row_id)?;
184            }
185        }
186
187        // Auto-flush if threshold exceeded
188        if self.delta.should_flush() {
189            self.flush()?;
190        }
191
192        // ════════════════════════════════════════════
193        // Phase 0: HTAP 실시간 동기화 (RealtimeSync)
194        // ════════════════════════════════════════════
195        use crate::storage::realtime_sync::SyncMode;
196        let sync_config = &self.config.sync;
197        match sync_config.mode {
198            SyncMode::Immediate => {
199                // 즉시 Columnar Cache 동기화
200                let _ = self.sync_columnar_cache(table);
201            }
202            SyncMode::Threshold(n) => {
203                // N건 이상일 때만 동기화 (이 로직은 보통 flush에서 처리되거나,
204                // delta row count를 기반으로 수행)
205                if self.delta.count(table).unwrap_or(0) >= n {
206                    let _ = self.sync_columnar_cache(table);
207                }
208            }
209            SyncMode::AsyncBatch { .. } => {
210                // 비동기 배치는 백그라운드 스레드에서 주기적으로 수행됨.
211                // 여기서는 nothing.
212            }
213        }
214
215        // ════════════════════════════════════════════
216        // Phase 3 Synergy: 파티셔닝 자동 통계 갱신
217        // ════════════════════════════════════════════
218        // 파티셔닝된 테이블이면 sub-table명이 original_table과 다름
219        if table != original_table {
220            // 1) row_count 자동 증가 (원자적)
221            self.partition_stats
222                .entry(table.to_string())
223                .and_modify(|s| s.row_count += 1)
224                .or_insert_with(|| crate::storage::partition::PartitionStats {
225                    row_count: 1,
226                    ..Default::default()
227                });
228
229            // 2) 첫 쓰기 시각 자동 기록 (이미 있으면 덮어쓰지 않음)
230            self.partition_creation_times
231                .entry(table.to_string())
232                .or_insert_with(|| {
233                    std::time::SystemTime::now()
234                        .duration_since(std::time::UNIX_EPOCH)
235                        .unwrap()
236                        .as_secs()
237                });
238        }
239
240        // Event-driven MV 갱신 알림 (MV가 없으면 zero overhead)
241        self.mat_view_registry.notify_change(table);
242
243        self.metrics.inc_inserts();
244        Ok(())
245    }
246
247    /// 여러 키-값 쌍을 일괄 삽입합니다 (최적화됨).
248    ///
249    /// `parallel_engine`의 `min_rows_for_parallel` 이상이면 Rayon 스레드 풀에서 병렬 삽입합니다.
250    pub fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
251        // ════════════════════════════════════════════
252        // Phase 3: 파티셔닝 (Partition Routing with Auto-Expand)
253        // ════════════════════════════════════════════
254        let target_table = if let Some((first_key, _)) = rows.first() {
255            let key_str = String::from_utf8_lossy(first_key).into_owned();
256            self.route_partition_or_expand(table, &key_str)
257        } else {
258            table.to_string()
259        };
260        let table = target_table.as_str();
261        #[cfg(feature = "wal")]
262        if self.durability != DurabilityLevel::None
263            && (self.wal.is_some() || self.encrypted_wal.is_some())
264        {
265            self.append_to_wal(&crate::wal::WalRecord::Batch {
266                table: table.to_string(),
267                rows: rows.clone(),
268                ts: 0,
269            })?;
270        }
271
272        // 병렬화 임계값 이상이면 parallel_engine 으로 병렬 삽입
273        if self.parallel_engine.should_parallelize_rows(rows.len()) {
274            use rayon::prelude::*;
275            let delta = &self.delta;
276            let table_owned = table.to_string();
277            // 각 (key, value) 쌍을 병렬로 삽입
278            // DeltaStore는 내부적으로 DashMap + SkipMap(Arc)이므로 공유 안전
279            let results: Vec<crate::error::DbxResult<()>> = self.parallel_engine.execute(|| {
280                rows.par_iter()
281                    .map(|(key, value)| delta.insert(&table_owned, key, value))
282                    .collect()
283            });
284            for r in results {
285                r?;
286            }
287        } else {
288            self.delta.insert_batch(table, rows)?;
289        }
290
291        // Auto-flush if threshold exceeded
292        if self.delta.should_flush() {
293            self.flush()?;
294        }
295
296        // ════════════════════════════════════════════
297        // Phase 0: HTAP 실시간 동기화 (RealtimeSync)
298        // ════════════════════════════════════════════
299        use crate::storage::realtime_sync::SyncMode;
300        let sync_config = &self.config.sync;
301        match sync_config.mode {
302            SyncMode::Immediate => {
303                let _ = self.sync_columnar_cache(table);
304            }
305            SyncMode::Threshold(n) => {
306                if self.delta.count(table).unwrap_or(0) >= n {
307                    let _ = self.sync_columnar_cache(table);
308                }
309            }
310            SyncMode::AsyncBatch { .. } => {}
311        }
312
313        // Event-driven MV 갱신 알림
314        self.mat_view_registry.notify_change(table);
315
316        Ok(())
317    }
318
319    /// Insert a versioned key-value pair for MVCC.
320    pub fn insert_versioned(
321        &self,
322        table: &str,
323        key: &[u8],
324        value: Option<&[u8]>,
325        commit_ts: u64,
326    ) -> DbxResult<()> {
327        // ════════════════════════════════════════════
328        // Phase 3: 파티셔닝 (Partition Routing)
329        // ════════════════════════════════════════════
330        let target_table = {
331            let maps = self.partition_maps.read().unwrap();
332            if let Some(map) = maps.get(table) {
333                use crate::storage::partition::PartitionValue;
334                let key_str = String::from_utf8_lossy(key).into_owned();
335                map.route_key(&PartitionValue::Text(key_str))
336            } else {
337                table.to_string()
338            }
339        };
340        let table = target_table.as_str();
341        let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), commit_ts);
342        let encoded_key = vk.encode();
343
344        // Encode value with prefix
345        // ⚠️ MVCC 매직 헤더 인코딩 — MVCC_VALUE_PREFIX / MVCC_TOMBSTONE_PREFIX 사용
346        let encoded_value = match value {
347            Some(v) => {
348                let mut bytes = Vec::with_capacity(v.len() + MVCC_PREFIX_LEN);
349                bytes.extend_from_slice(&MVCC_VALUE_PREFIX);
350                bytes.extend_from_slice(v);
351                bytes
352            }
353            None => MVCC_TOMBSTONE_PREFIX.to_vec(),
354        };
355
356        // Write to Delta Store
357        self.delta.insert(table, &encoded_key, &encoded_value)?;
358
359        // ════════════════════════════════════════════
360        // Phase 0: HTAP 실시간 동기화 (RealtimeSync)
361        // ════════════════════════════════════════════
362        use crate::storage::realtime_sync::SyncMode;
363        let sync_config = &self.config.sync;
364        match sync_config.mode {
365            SyncMode::Immediate => {
366                let _ = self.sync_columnar_cache(table);
367            }
368            SyncMode::Threshold(n) => {
369                if self.delta.count(table).unwrap_or(0) >= n {
370                    let _ = self.sync_columnar_cache(table);
371                }
372            }
373            SyncMode::AsyncBatch { .. } => {}
374        }
375
376        Ok(())
377    }
378
379    // ════════════════════════════════════════════
380    // READ Operations
381    // ════════════════════════════════════════════
382
383    /// Read a specific version of a key (Snapshot Read).
384    pub fn get_snapshot(
385        &self,
386        table: &str,
387        key: &[u8],
388        read_ts: u64,
389    ) -> DbxResult<Option<Option<Vec<u8>>>> {
390        let target_table = {
391            let maps = self.partition_maps.read().unwrap();
392            if let Some(map) = maps.get(table) {
393                use crate::storage::partition::PartitionValue;
394                let key_str = String::from_utf8_lossy(key).into_owned();
395                map.route_key(&PartitionValue::Text(key_str))
396            } else {
397                table.to_string()
398            }
399        };
400        let table = target_table.as_str();
401
402        let start_vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), read_ts);
403        let start_bytes = start_vk.encode();
404
405        // Helper: returns Some(Some(v)), Some(None) (tombstone), or None (mismatch)
406        let check_entry = |entry_key: &[u8], entry_val: &[u8]| -> Option<Option<Vec<u8>>> {
407            let decoded =
408                crate::transaction::mvcc::version::VersionedKey::decode(entry_key).ok()?;
409            if decoded.user_key != key {
410                return None;
411            }
412            if decoded.commit_ts > read_ts {
413                return None;
414            }
415            if entry_val.is_empty() {
416                return Some(Some(entry_val.to_vec())); // Legacy empty value
417            }
418            // ⚠️ MVCC 매직 헤더 디코딩 — 2바이트 [0x00, tag] 확인
419            if entry_val.len() >= MVCC_PREFIX_LEN && entry_val[0] == 0x00 {
420                match entry_val[1] {
421                    0x01 => return Some(Some(entry_val[MVCC_PREFIX_LEN..].to_vec())),
422                    0x02 => return Some(None), // Tombstone
423                    _ => {}
424                }
425            }
426            // Legacy non-prefixed value
427            Some(Some(entry_val.to_vec()))
428        };
429
430        // 1. Check Delta Store
431        if let Some((k, v)) = self.delta.scan_one(table, start_bytes.clone()..)?
432            && let Some(result) = check_entry(&k, &v)
433        {
434            return Ok(Some(result));
435        }
436
437        // 2. Check WOS
438        if let Some((k, v)) = self.wos_for_table(table).scan_one(table, start_bytes..)?
439            && let Some(result) = check_entry(&k, &v)
440        {
441            return Ok(Some(result));
442        }
443
444        Ok(None)
445    }
446
447    /// Helper method for Snapshot: scan all versioned entries from Delta Store.
448    pub(crate) fn scan_delta_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
449        StorageBackend::scan(&self.delta, table, ..)
450    }
451
452    /// Helper method for Snapshot: scan all versioned entries from WOS.
453    pub(crate) fn scan_wos_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
454        self.wos_for_table(table).scan(table, ..)
455    }
456
457    /// Get the current timestamp from the transaction manager.
458    pub fn current_timestamp(&self) -> u64 {
459        self.tx_manager.current_ts()
460    }
461
462    /// Allocate a new commit timestamp for a transaction.
463    /// This increments the timestamp oracle and returns a unique timestamp.
464    pub fn allocate_commit_ts(&self) -> u64 {
465        self.tx_manager.allocate_commit_ts()
466    }
467
468    /// 키로 값을 조회합니다.
469    ///
470    /// 성능 최적화: MVCC feature가 비활성화되면 Fast-path만 사용하여
471    /// 최대 성능을 달성합니다.
472    #[inline(always)]
473    pub fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
474        let target_table = {
475            let maps = self.partition_maps.read().unwrap();
476            if let Some(map) = maps.get(table) {
477                use crate::storage::partition::PartitionValue;
478                let key_str = String::from_utf8_lossy(key).into_owned();
479                map.route_key(&PartitionValue::Text(key_str))
480            } else {
481                table.to_string()
482            }
483        };
484        let table = target_table.as_str();
485
486        // Fast-path: Delta → WOS 직접 조회 (MVCC 오버헤드 없음)
487        // MVCC feature가 활성화되어도 Fast-path를 우선 사용
488        // 일반 insert()로 저장된 데이터는 여기서 조회됨
489        if let Some(value) = self.delta.get(table, key)? {
490            self.metrics.inc_gets();
491            self.metrics.inc_delta_hit();
492            return Ok(Some(value));
493        }
494        self.metrics.inc_delta_miss();
495        if let Some(value) = self.wos_for_table(table).get(table, key)? {
496            self.metrics.inc_gets();
497            self.metrics.inc_wos_hit();
498            return Ok(Some(value));
499        }
500        self.metrics.inc_wos_miss();
501
502        // ════════════════════════════════════════════
503        // MVCC Fallback: Transaction Commit 후 데이터 조회
504        // ════════════════════════════════════════════
505        // Transaction::commit()은 insert_versioned()와 insert()를 모두 호출하므로
506        // 일반적으로 위의 Fast-path에서 데이터를 찾을 수 있습니다.
507        //
508        // 하지만 다음 경우에 이 Fallback이 필요합니다:
509        // 1. insert_versioned()만 호출된 경우 (일반 key 없음)
510        // 2. 향후 MVCC 전용 모드 지원 시
511        // 3. Snapshot isolation 구현 시
512        //
513        // 현재는 최신 타임스탬프로 조회하지만, 향후 snapshot_ts를 인자로 받아
514        // 특정 시점의 데이터를 조회할 수 있도록 확장 가능합니다.
515        let current_ts = self.tx_manager.allocate_commit_ts();
516        let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), current_ts);
517        let encoded_key = vk.encode();
518
519        // Delta에서 versioned key 조회
520        if let Some(value) = self.delta.get(table, &encoded_key)? {
521            return Ok(Self::decode_mvcc_value(value));
522        }
523
524        // WOS에서 versioned key 조회
525        if let Some(value) = self.wos_for_table(table).get(table, &encoded_key)? {
526            return Ok(Self::decode_mvcc_value(value));
527        }
528
529        // ── Metrics ──────────────────────────────────────────────────────────
530        self.metrics.inc_gets();
531        // No hit recorded for MVCC fallback path
532
533        Ok(None)
534    }
535
536    /// MVCC 값 디코딩 (Tombstone 필터링)
537    #[inline(always)]
538    fn decode_mvcc_value(v: Vec<u8>) -> Option<Vec<u8>> {
539        if v.len() < MVCC_PREFIX_LEN || v[0] != 0x00 {
540            return Some(v); // Legacy value
541        }
542
543        match v[1] {
544            0x01 => Some(v[MVCC_PREFIX_LEN..].to_vec()), // Value
545            0x02 => None,                                // Tombstone
546            _ => Some(v),                                // Unknown tag
547        }
548    }
549
550    /// VersionedKey 디코딩
551    #[inline(always)]
552    fn decode_versioned_key(k: Vec<u8>) -> Vec<u8> {
553        if k.len() <= 8 {
554            return k;
555        }
556
557        crate::transaction::mvcc::version::VersionedKey::decode(&k)
558            .map(|vk| vk.user_key)
559            .unwrap_or(k)
560    }
561
562    /// 테이블의 모든 키-값 쌍을 스캔합니다.
563    ///
564    /// Delta + WOS 를 `rayon::join()`으로 병렬 스캔합니다.
565    pub fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
566        use crate::storage::StorageBackend;
567
568        // Delta + WOS 병렬 스캔
569        let (delta_result, wos_result) = rayon::join(
570            || StorageBackend::scan(&self.delta, table, ..),
571            || self.wos_for_table(table).scan(table, ..),
572        );
573
574        let delta_entries = delta_result?;
575        let wos_entries = wos_result?;
576
577        // Fast-path: Delta가 비어있으면 WOS만 반환
578        if delta_entries.is_empty() {
579            return Ok(wos_entries
580                .into_iter()
581                .filter_map(|(k, v)| {
582                    let dk = Self::decode_versioned_key(k);
583                    Self::decode_mvcc_value(v).map(|dv| (dk, dv))
584                })
585                .collect());
586        }
587
588        // 2-way merge (both are already sorted)
589        let mut result = Vec::with_capacity(delta_entries.len() + wos_entries.len());
590
591        let mut i = 0;
592        let mut j = 0;
593
594        while i < delta_entries.len() && j < wos_entries.len() {
595            match delta_entries[i].0.cmp(&wos_entries[j].0) {
596                std::cmp::Ordering::Less => {
597                    if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
598                        let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
599                        result.push((user_key, decoded_v));
600                    }
601                    i += 1;
602                }
603                std::cmp::Ordering::Equal => {
604                    if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
605                        let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
606                        result.push((user_key, decoded_v));
607                    }
608                    i += 1;
609                    j += 1;
610                }
611                std::cmp::Ordering::Greater => {
612                    if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
613                        let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
614                        result.push((user_key, decoded_v));
615                    }
616                    j += 1;
617                }
618            }
619        }
620
621        while i < delta_entries.len() {
622            if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
623                let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
624                result.push((user_key, decoded_v));
625            }
626            i += 1;
627        }
628
629        while j < wos_entries.len() {
630            if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
631                let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
632                result.push((user_key, decoded_v));
633            }
634            j += 1;
635        }
636
637        Ok(result)
638    }
639
640    /// 테이블의 키 범위를 스캔합니다.
641    pub fn range(
642        &self,
643        table: &str,
644        start_key: &[u8],
645        end_key: &[u8],
646    ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
647        let range = start_key.to_vec()..end_key.to_vec();
648
649        // Scan both Delta Store and WOS with range bounds
650        let mut merged = std::collections::BTreeMap::new();
651        for (k, v) in self.delta.scan(table, range.clone())? {
652            merged.insert(k, v);
653        }
654        for (k, v) in self.wos_for_table(table).scan(table, range)? {
655            merged.entry(k).or_insert(v);
656        }
657
658        Ok(merged.into_iter().collect())
659    }
660
661    /// 테이블의 행 개수를 반환합니다.
662    pub fn table_row_count(&self, table: &str) -> DbxResult<usize> {
663        self.count(table)
664    }
665
666    // ════════════════════════════════════════════
667    // DELETE Operations
668    // ════════════════════════════════════════════
669
670    /// 키를 삭제합니다.
671    pub fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
672        let target_table = {
673            let maps = self.partition_maps.read().unwrap();
674            if let Some(map) = maps.get(table) {
675                use crate::storage::partition::PartitionValue;
676                let key_str = String::from_utf8_lossy(key).into_owned();
677                map.route_key(&PartitionValue::Text(key_str))
678            } else {
679                table.to_string()
680            }
681        };
682        let table = target_table.as_str();
683
684        #[cfg(feature = "wal")]
685        if self.durability != DurabilityLevel::None
686            && (self.wal.is_some() || self.encrypted_wal.is_some())
687        {
688            self.append_to_wal(&crate::wal::WalRecord::Delete {
689                table: table.to_string(),
690                key: key.to_vec(),
691                ts: 0,
692            })?;
693        }
694
695        #[cfg(feature = "index")]
696        if self.has_index(table, "key") {
697            let row_ids = self.index.lookup(table, "key", key)?;
698            for row_id in row_ids {
699                self.index.update_on_delete(table, "key", key, row_id)?;
700            }
701        }
702
703        // 1. Delete from legacy
704        let delta_deleted = self.delta.delete(table, key)?;
705        let wos_deleted = self.wos_for_table(table).delete(table, key)?;
706
707        // 2. Add versioned tombstone if it was a versioned key
708        #[cfg(feature = "mvcc")]
709        {
710            let commit_ts = self.tx_manager.allocate_commit_ts();
711            self.insert_versioned(table, key, None, commit_ts)?;
712        }
713
714        // Event-driven MV 갱신 알림
715        self.mat_view_registry.notify_change(table);
716
717        self.metrics.inc_deletes();
718        Ok(delta_deleted || wos_deleted)
719    }
720
721    // ════════════════════════════════════════════
722    // Helper Methods
723    // ════════════════════════════════════════════
724
725    /// Synchronize the Columnar Cache with the latest data from Delta Store.
726    ///
727    /// If the table has a schema in table_schemas, it will be synced as typed data.
728    /// Otherwise, it will be synced as raw Binary data.
729    pub fn sync_columnar_cache(&self, table: &str) -> DbxResult<usize> {
730        let base_table = if let Some(idx) = table.find("__shard_") {
731            &table[..idx]
732        } else {
733            table
734        };
735
736        // Check if table has a schema (SQL table) - case-insensitive
737        let schemas = self.table_schemas.read().unwrap();
738        let table_schema = schemas
739            .get(base_table)
740            .or_else(|| {
741                let table_lower = base_table.to_lowercase();
742                schemas
743                    .iter()
744                    .find(|(k, _)| k.to_lowercase() == table_lower)
745                    .map(|(_, v)| v)
746            })
747            .cloned();
748        drop(schemas);
749
750        // Scan from both Tier 1 (Delta) and Tier 3 (WOS)
751        let table_lower = table.to_lowercase();
752        let mut rows = self.delta.scan(&table_lower, ..)?;
753        let mut wos_rows = self.wos_for_table(&table_lower).scan(&table_lower, ..)?;
754        rows.append(&mut wos_rows);
755
756        self.columnar_cache
757            .sync_from_storage(table, rows, table_schema)
758    }
759
760    /// Sync data from multiple tiers (Delta and ROS) to GPU for merge operations.
761    pub fn sync_gpu_cache_multi_tier(&self, table: &str) -> DbxResult<()> {
762        let gpu = self
763            .gpu_manager
764            .as_ref()
765            .ok_or_else(|| DbxError::NotImplemented("GPU manager not available".to_string()))?;
766
767        // 1. Sync Delta data (Tier 1)
768        let delta_batches = self.columnar_cache.get_batches(table, None)?;
769        if let Some(batches) = delta_batches {
770            for batch in batches {
771                gpu.upload_batch_pinned(&format!("{}_delta", table), &batch)?;
772            }
773        }
774
775        // 2. Sync ROS data (Tier 5) - simplified: assuming ROS is already in SQL tables for now
776        let tables = self.tables.read().unwrap();
777        if let Some(batches) = tables.get(table) {
778            for batch in batches {
779                gpu.upload_batch_pinned(&format!("{}_ros", table), batch)?;
780            }
781        }
782
783        Ok(())
784    }
785
786    /// Legacy method to sync data from Columnar Cache to GPU.
787    pub fn sync_gpu_cache(&self, table: &str) -> DbxResult<()> {
788        self.sync_gpu_cache_multi_tier(table)
789    }
790
791    /// Execute an operation on GPU with automatic fallback to CPU on any error.
792    pub fn gpu_exec_with_fallback<T, F, C>(&self, gpu_op: F, cpu_op: C) -> DbxResult<T>
793    where
794        F: FnOnce(&crate::storage::gpu::GpuManager) -> DbxResult<T>,
795        C: FnOnce() -> DbxResult<T>,
796    {
797        if let Some(gpu) = &self.gpu_manager {
798            match gpu_op(gpu) {
799                Ok(val) => Ok(val),
800                Err(e) => {
801                    tracing::warn!("GPU execution failed, falling back to CPU: {:?}", e);
802                    cpu_op()
803                }
804            }
805        } else {
806            cpu_op()
807        }
808    }
809
810    // ════════════════════════════════════════════
811    // CAS (Compare-And-Swap) Operations
812    // ════════════════════════════════════════════
813
814    /// 값이 없을 때만 삽입 (Atomic CAS)
815    pub fn insert_if_not_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
816        let _guard = self.cas_locks.lock(table, key);
817
818        if self.get(table, key)?.is_none() {
819            self.insert(table, key, value)?;
820            Ok(true)
821        } else {
822            Ok(false)
823        }
824    }
825
826    /// 기존 값과 비교하여 일치할 때만 새로운 값으로 교체 (Atomic CAS)
827    pub fn compare_and_swap(
828        &self,
829        table: &str,
830        key: &[u8],
831        expected: &[u8],
832        new_value: &[u8],
833    ) -> DbxResult<bool> {
834        let _guard = self.cas_locks.lock(table, key);
835
836        if let Some(current) = self.get(table, key)?
837            && current == expected
838        {
839            self.insert(table, key, new_value)?;
840            return Ok(true);
841        }
842        Ok(false)
843    }
844
845    /// 기존 값이 존재할 때만 업데이트 (Atomic CAS)
846    pub fn update_if_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
847        let _guard = self.cas_locks.lock(table, key);
848
849        if self.get(table, key)?.is_some() {
850            self.insert(table, key, value)?;
851            Ok(true)
852        } else {
853            Ok(false)
854        }
855    }
856
857    /// 기존 값과 일치할 때만 삭제 (Atomic CAS)
858    pub fn delete_if_equals(&self, table: &str, key: &[u8], expected: &[u8]) -> DbxResult<bool> {
859        let _guard = self.cas_locks.lock(table, key);
860
861        if let Some(current) = self.get(table, key)?
862            && current == expected
863        {
864            self.delete(table, key)?;
865            return Ok(true);
866        }
867        Ok(false)
868    }
869}
870
871// ════════════════════════════════════════════
872// DatabaseCore Trait Implementation
873// ════════════════════════════════════════════
874
875impl crate::traits::DatabaseCore for Database {
876    fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
877        // Reuse existing implementation
878        Database::insert(self, table, key, value)
879    }
880
881    fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
882        // Reuse existing implementation
883        Database::get(self, table, key)
884    }
885
886    fn delete(&self, table: &str, key: &[u8]) -> DbxResult<()> {
887        // Reuse existing implementation
888        Database::delete(self, table, key).map(|_| ())
889    }
890
891    fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
892        // Reuse existing implementation
893        Database::scan(self, table)
894    }
895
896    fn flush(&self) -> DbxResult<()> {
897        // Reuse existing implementation
898        Database::flush(self)
899    }
900
901    fn insert_batch(&self, table: &str, entries: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
902        // Reuse existing implementation
903        Database::insert_batch(self, table, entries)
904    }
905
906    fn insert_if_not_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
907        Database::insert_if_not_exists(self, table, key, value)
908    }
909
910    fn compare_and_swap(
911        &self,
912        table: &str,
913        key: &[u8],
914        expected: &[u8],
915        new_value: &[u8],
916    ) -> DbxResult<bool> {
917        Database::compare_and_swap(self, table, key, expected, new_value)
918    }
919
920    fn update_if_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
921        Database::update_if_exists(self, table, key, value)
922    }
923
924    fn delete_if_equals(&self, table: &str, key: &[u8], expected: &[u8]) -> DbxResult<bool> {
925        Database::delete_if_equals(self, table, key, expected)
926    }
927}
928
929// ════════════════════════════════════════════
930// DatabaseSerde Trait Implementation
931// ════════════════════════════════════════════
932
933impl crate::traits::DatabaseSerde for Database {
934    fn insert_struct<T: serde::Serialize>(
935        &self,
936        table: &str,
937        key: &[u8],
938        data: &T,
939    ) -> DbxResult<()> {
940        let serialized = bincode::serialize(data).map_err(|e| {
941            crate::error::DbxError::Serialization(format!("Failed to serialize struct: {}", e))
942        })?;
943        self.insert(table, key, &serialized)
944    }
945
946    fn get_struct<T: serde::de::DeserializeOwned>(
947        &self,
948        table: &str,
949        key: &[u8],
950    ) -> DbxResult<Option<T>> {
951        if let Some(bytes) = self.get(table, key)? {
952            let deserialized = bincode::deserialize(&bytes).map_err(|e| {
953                crate::error::DbxError::Serialization(format!(
954                    "Failed to deserialize struct: {}",
955                    e
956                ))
957            })?;
958            Ok(Some(deserialized))
959        } else {
960            Ok(None)
961        }
962    }
963}