Skip to main content

dbx_core/engine/
crud.rs

1//! Database CRUD Operations — Create, Read, Update, Delete methods
2
3use crate::engine::Database;
4use crate::engine::types::{BackgroundJob, DurabilityLevel};
5use crate::error::{DbxError, DbxResult};
6use crate::storage::StorageBackend;
7
8// ════════════════════════════════════════════
9// ⚠️ MVCC Value Encoding Constants
10// ════════════════════════════════════════════
11// MVCC 버전 관리를 위한 매직 헤더.
12// 반드시 2바이트 [0x00, tag]를 사용하여 일반 사용자 데이터와 충돌을 방지한다.
13// 일반 UTF-8 텍스트나 바이너리 데이터는 0x00으로 시작하지 않으므로 안전하다.
14// 이 상수를 변경하면 crud.rs, snapshot.rs 양쪽 모두 동기화해야 한다.
15
16/// MVCC 값이 존재함을 나타내는 2바이트 매직 헤더: [0x00, 0x01]
17pub(crate) const MVCC_VALUE_PREFIX: [u8; 2] = [0x00, 0x01];
18/// MVCC 삭제(tombstone)를 나타내는 2바이트 매직 헤더: [0x00, 0x02]
19pub(crate) const MVCC_TOMBSTONE_PREFIX: [u8; 2] = [0x00, 0x02];
20/// MVCC 매직 헤더 길이
21pub(crate) const MVCC_PREFIX_LEN: usize = 2;
22
23impl Database {
24    // ════════════════════════════════════════════
25    // WAL Helper
26    // ════════════════════════════════════════════
27
28    /// Append a WAL record if durability is enabled and a WAL backend exists.
29    #[inline]
30    fn append_to_wal(&self, record: &crate::wal::WalRecord) -> DbxResult<()> {
31        if self.durability == DurabilityLevel::None {
32            return Ok(());
33        }
34        if let Some(wal) = &self.wal {
35            wal.append(record)?;
36            if self.durability == DurabilityLevel::Full {
37                if let Some(tx) = &self.job_sender {
38                    let _ = tx.send(BackgroundJob::WalSync);
39                } else {
40                    wal.sync()?;
41                }
42            }
43        } else if let Some(encrypted_wal) = &self.encrypted_wal {
44            encrypted_wal.append(record)?;
45            if self.durability == DurabilityLevel::Full {
46                if let Some(tx) = &self.job_sender {
47                    let _ = tx.send(BackgroundJob::EncryptedWalSync);
48                } else {
49                    encrypted_wal.sync()?;
50                }
51            }
52        }
53
54        // ════════════════════════════════════════════
55        // Phase 5: Replication (Master 브로드캐스트)
56        // ════════════════════════════════════════════
57        if let Some(master) = &self.replication_master {
58            // bincode로 직렬화하여 전송 (임시)
59            if let Ok(data) = bincode::serialize(record) {
60                master.replicate(data);
61            }
62        }
63
64        Ok(())
65    }
66
67    // ════════════════════════════════════════════
68    // CRUD Operations
69    // ════════════════════════════════════════════
70
71    // ════════════════════════════════════════════
72    // CREATE Operations
73    // ════════════════════════════════════════════
74
75    // ════════════════════════════════════════════
76    // Phase 3.4: 자동 파티션 지원 라우터
77    // ════════════════════════════════════════════
78    fn route_partition_or_expand(&self, table: &str, key_str: &str) -> String {
79        let route_res = {
80            let maps = self.partition_maps.read().unwrap();
81            if let Some(map) = maps.get(table) {
82                use crate::storage::partition::PartitionValue;
83                let pv = PartitionValue::Text(key_str.to_string());
84                Some(map.route_or_expand(&pv))
85            } else {
86                None
87            }
88        };
89
90        if let Some(res) = route_res {
91            use crate::storage::partition::RouteResult;
92            match res {
93                RouteResult::Routed(sub_tbl) => sub_tbl,
94                RouteResult::NeedsExpansion {
95                    new_table,
96                    new_bounds,
97                } => {
98                    // 쓰기 락을 획득하여 Range 파티션 bounds 갱신
99                    let mut maps = self.partition_maps.write().unwrap();
100                    if let Some(map) = maps.get_mut(table) {
101                        use crate::storage::partition::PartitionType;
102                        if let PartitionType::Range { bounds, .. } = &mut map.partition_type {
103                            let last_hi = bounds.last().map(|(_, hi)| *hi).unwrap_or(0);
104                            let v = key_str.parse::<i64>().unwrap_or(0);
105                            // 다른 스레드에서 먼저 갱신했을 수 있으므로 재검사
106                            if v >= last_hi {
107                                bounds.push(new_bounds);
108                                map.num_partitions += 1;
109                            }
110
111                            // 갱신 후 다시 라우팅 시도
112                            use crate::storage::partition::PartitionValue;
113                            let pv = PartitionValue::Text(key_str.to_string());
114                            let final_res = map.route_or_expand(&pv);
115                            if let RouteResult::Routed(final_tbl) = final_res {
116                                final_tbl
117                            } else {
118                                new_table
119                            }
120                        } else {
121                            new_table
122                        }
123                    } else {
124                        table.to_string()
125                    }
126                }
127            }
128        } else {
129            table.to_string()
130        }
131    }
132
133    /// 키-값 쌍을 삽입합니다.
134    ///
135    /// 데이터는 먼저 Delta Store (Tier 1)에 쓰여집니다.
136    /// Flush 임계값을 초과하면 자동으로 WOS로 이동합니다.
137    ///
138    /// # 인자
139    ///
140    /// * `table` - 테이블 이름
141    /// * `key` - 키 (바이트 배열)
142    /// * `value` - 값 (바이트 배열)
143    pub fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
144        // ════════════════════════════════════════════
145        // Phase 3: 파티셔닝 (Partition Routing with Auto-Expand)
146        // ════════════════════════════════════════════
147        let original_table = table; // 파티션 자동 통계 비교용 원본 이름 보존
148        let key_str = String::from_utf8_lossy(key).into_owned();
149        let target_table = self.route_partition_or_expand(table, &key_str);
150        let table = target_table.as_str();
151        // Log to WAL first — only allocate record if WAL exists
152        #[cfg(feature = "wal")]
153        if self.durability != DurabilityLevel::None
154            && (self.wal.is_some() || self.encrypted_wal.is_some())
155        {
156            self.append_to_wal(&crate::wal::WalRecord::Insert {
157                table: table.to_string(),
158                key: key.to_vec(),
159                value: value.to_vec(),
160                ts: 0,
161            })?;
162        }
163
164        // 데이터 삽입
165        self.delta.insert(table, key, value)?;
166
167        // O(1) row_id 계산 + 인덱스 업데이트 — only when index exists
168        #[cfg(feature = "index")]
169        if self.has_index(table, "key") {
170            let counter = self
171                .row_counters
172                .entry(table.to_string())
173                .or_insert_with(|| std::sync::atomic::AtomicUsize::new(0));
174            let row_id = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
175            if let Some(tx) = &self.job_sender {
176                let _ = tx.send(BackgroundJob::IndexUpdate {
177                    table: table.to_string(),
178                    column: "key".to_string(),
179                    key: key.to_vec(),
180                    row_id,
181                });
182            } else {
183                self.index.update_on_insert(table, "key", key, row_id)?;
184            }
185        }
186
187        // Auto-flush if threshold exceeded
188        if self.delta.should_flush() {
189            self.flush()?;
190        }
191
192        // ════════════════════════════════════════════
193        // Phase 0: HTAP 실시간 동기화 (RealtimeSync)
194        // ════════════════════════════════════════════
195        use crate::storage::realtime_sync::SyncMode;
196        let sync_config = &self.config.sync;
197        match sync_config.mode {
198            SyncMode::Immediate => {
199                // 즉시 Columnar Cache 동기화
200                let _ = self.sync_columnar_cache(table);
201            }
202            SyncMode::Threshold(n) => {
203                // N건 이상일 때만 동기화 (이 로직은 보통 flush에서 처리되거나,
204                // delta row count를 기반으로 수행)
205                if self.delta.count(table).unwrap_or(0) >= n {
206                    let _ = self.sync_columnar_cache(table);
207                }
208            }
209            SyncMode::AsyncBatch { .. } => {
210                // 비동기 배치는 백그라운드 스레드에서 주기적으로 수행됨.
211                // 여기서는 nothing.
212            }
213        }
214
215        // ════════════════════════════════════════════
216        // Phase 3 Synergy: 파티셔닝 자동 통계 갱신
217        // ════════════════════════════════════════════
218        // 파티셔닝된 테이블이면 sub-table명이 original_table과 다름
219        if table != original_table {
220            // 1) row_count 자동 증가 (원자적)
221            self.partition_stats
222                .entry(table.to_string())
223                .and_modify(|s| s.row_count += 1)
224                .or_insert_with(|| crate::storage::partition::PartitionStats {
225                    row_count: 1,
226                    ..Default::default()
227                });
228
229            // 2) 첫 쓰기 시각 자동 기록 (이미 있으면 덮어쓰지 않음)
230            self.partition_creation_times
231                .entry(table.to_string())
232                .or_insert_with(|| {
233                    std::time::SystemTime::now()
234                        .duration_since(std::time::UNIX_EPOCH)
235                        .unwrap()
236                        .as_secs()
237                });
238        }
239
240        // Event-driven MV 갱신 알림 (MV가 없으면 zero overhead)
241        self.mat_view_registry.notify_change(table);
242
243        self.metrics.inc_inserts();
244        Ok(())
245    }
246
247    /// 여러 키-값 쌍을 일괄 삽입합니다 (최적화됨).
248    ///
249    /// `parallel_engine`의 `min_rows_for_parallel` 이상이면 Rayon 스레드 풀에서 병렬 삽입합니다.
250    pub fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
251        // ════════════════════════════════════════════
252        // Phase 3: 파티셔닝 (Partition Routing with Auto-Expand)
253        // ════════════════════════════════════════════
254        let target_table = if let Some((first_key, _)) = rows.first() {
255            let key_str = String::from_utf8_lossy(first_key).into_owned();
256            self.route_partition_or_expand(table, &key_str)
257        } else {
258            table.to_string()
259        };
260        let table = target_table.as_str();
261        #[cfg(feature = "wal")]
262        if self.durability != DurabilityLevel::None
263            && (self.wal.is_some() || self.encrypted_wal.is_some())
264        {
265            self.append_to_wal(&crate::wal::WalRecord::Batch {
266                table: table.to_string(),
267                rows: rows.clone(),
268                ts: 0,
269            })?;
270        }
271
272        // 병렬화 임계값 이상이면 parallel_engine 으로 병렬 삽입
273        if self.parallel_engine.should_parallelize_rows(rows.len()) {
274            use rayon::prelude::*;
275            let delta = &self.delta;
276            let table_owned = table.to_string();
277            // 각 (key, value) 쌍을 병렬로 삽입
278            // DeltaStore는 내부적으로 DashMap + SkipMap(Arc)이므로 공유 안전
279            let results: Vec<crate::error::DbxResult<()>> = self.parallel_engine.execute(|| {
280                rows.par_iter()
281                    .map(|(key, value)| delta.insert(&table_owned, key, value))
282                    .collect()
283            });
284            for r in results {
285                r?;
286            }
287        } else {
288            self.delta.insert_batch(table, rows)?;
289        }
290
291        // Auto-flush if threshold exceeded
292        if self.delta.should_flush() {
293            self.flush()?;
294        }
295
296        // ════════════════════════════════════════════
297        // Phase 0: HTAP 실시간 동기화 (RealtimeSync)
298        // ════════════════════════════════════════════
299        use crate::storage::realtime_sync::SyncMode;
300        let sync_config = &self.config.sync;
301        match sync_config.mode {
302            SyncMode::Immediate => {
303                let _ = self.sync_columnar_cache(table);
304            }
305            SyncMode::Threshold(n) => {
306                if self.delta.count(table).unwrap_or(0) >= n {
307                    let _ = self.sync_columnar_cache(table);
308                }
309            }
310            SyncMode::AsyncBatch { .. } => {}
311        }
312
313        // Event-driven MV 갱신 알림
314        self.mat_view_registry.notify_change(table);
315
316        Ok(())
317    }
318
319    /// Insert a versioned key-value pair for MVCC.
320    pub fn insert_versioned(
321        &self,
322        table: &str,
323        key: &[u8],
324        value: Option<&[u8]>,
325        commit_ts: u64,
326    ) -> DbxResult<()> {
327        // ════════════════════════════════════════════
328        // Phase 3: 파티셔닝 (Partition Routing)
329        // ════════════════════════════════════════════
330        let target_table = {
331            let maps = self.partition_maps.read().unwrap();
332            if let Some(map) = maps.get(table) {
333                use crate::storage::partition::PartitionValue;
334                let key_str = String::from_utf8_lossy(key).into_owned();
335                map.route_key(&PartitionValue::Text(key_str))
336            } else {
337                table.to_string()
338            }
339        };
340        let table = target_table.as_str();
341        let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), commit_ts);
342        let encoded_key = vk.encode();
343
344        // Encode value with prefix
345        // ⚠️ MVCC 매직 헤더 인코딩 — MVCC_VALUE_PREFIX / MVCC_TOMBSTONE_PREFIX 사용
346        let encoded_value = match value {
347            Some(v) => {
348                let mut bytes = Vec::with_capacity(v.len() + MVCC_PREFIX_LEN);
349                bytes.extend_from_slice(&MVCC_VALUE_PREFIX);
350                bytes.extend_from_slice(v);
351                bytes
352            }
353            None => MVCC_TOMBSTONE_PREFIX.to_vec(),
354        };
355
356        // Write to Delta Store
357        self.delta.insert(table, &encoded_key, &encoded_value)?;
358
359        // ════════════════════════════════════════════
360        // Phase 0: HTAP 실시간 동기화 (RealtimeSync)
361        // ════════════════════════════════════════════
362        use crate::storage::realtime_sync::SyncMode;
363        let sync_config = &self.config.sync;
364        match sync_config.mode {
365            SyncMode::Immediate => {
366                let _ = self.sync_columnar_cache(table);
367            }
368            SyncMode::Threshold(n) => {
369                if self.delta.count(table).unwrap_or(0) >= n {
370                    let _ = self.sync_columnar_cache(table);
371                }
372            }
373            SyncMode::AsyncBatch { .. } => {}
374        }
375
376        Ok(())
377    }
378
379    // ════════════════════════════════════════════
380    // READ Operations
381    // ════════════════════════════════════════════
382
383    /// Read a specific version of a key (Snapshot Read).
384    pub fn get_snapshot(
385        &self,
386        table: &str,
387        key: &[u8],
388        read_ts: u64,
389    ) -> DbxResult<Option<Option<Vec<u8>>>> {
390        let start_vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), read_ts);
391        let start_bytes = start_vk.encode();
392
393        // Helper: returns Some(Some(v)), Some(None) (tombstone), or None (mismatch)
394        let check_entry = |entry_key: &[u8], entry_val: &[u8]| -> Option<Option<Vec<u8>>> {
395            let decoded =
396                crate::transaction::mvcc::version::VersionedKey::decode(entry_key).ok()?;
397            if decoded.user_key != key {
398                return None;
399            }
400            if decoded.commit_ts > read_ts {
401                return None;
402            }
403            if entry_val.is_empty() {
404                return Some(Some(entry_val.to_vec())); // Legacy empty value
405            }
406            // ⚠️ MVCC 매직 헤더 디코딩 — 2바이트 [0x00, tag] 확인
407            if entry_val.len() >= MVCC_PREFIX_LEN && entry_val[0] == 0x00 {
408                match entry_val[1] {
409                    0x01 => return Some(Some(entry_val[MVCC_PREFIX_LEN..].to_vec())),
410                    0x02 => return Some(None), // Tombstone
411                    _ => {}
412                }
413            }
414            // Legacy non-prefixed value
415            Some(Some(entry_val.to_vec()))
416        };
417
418        // 1. Check Delta Store
419        if let Some((k, v)) = self.delta.scan_one(table, start_bytes.clone()..)?
420            && let Some(result) = check_entry(&k, &v)
421        {
422            return Ok(Some(result));
423        }
424
425        // 2. Check WOS
426        if let Some((k, v)) = self.wos_for_table(table).scan_one(table, start_bytes..)?
427            && let Some(result) = check_entry(&k, &v)
428        {
429            return Ok(Some(result));
430        }
431
432        Ok(None)
433    }
434
435    /// Helper method for Snapshot: scan all versioned entries from Delta Store.
436    pub(crate) fn scan_delta_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
437        StorageBackend::scan(&self.delta, table, ..)
438    }
439
440    /// Helper method for Snapshot: scan all versioned entries from WOS.
441    pub(crate) fn scan_wos_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
442        self.wos_for_table(table).scan(table, ..)
443    }
444
445    /// Get the current timestamp from the transaction manager.
446    pub fn current_timestamp(&self) -> u64 {
447        self.tx_manager.current_ts()
448    }
449
450    /// Allocate a new commit timestamp for a transaction.
451    /// This increments the timestamp oracle and returns a unique timestamp.
452    pub fn allocate_commit_ts(&self) -> u64 {
453        self.tx_manager.allocate_commit_ts()
454    }
455
456    /// 키로 값을 조회합니다.
457    ///
458    /// 성능 최적화: MVCC feature가 비활성화되면 Fast-path만 사용하여
459    /// 최대 성능을 달성합니다.
460    #[inline(always)]
461    pub fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
462        // Fast-path: Delta → WOS 직접 조회 (MVCC 오버헤드 없음)
463        // MVCC feature가 활성화되어도 Fast-path를 우선 사용
464        // 일반 insert()로 저장된 데이터는 여기서 조회됨
465        if let Some(value) = self.delta.get(table, key)? {
466            self.metrics.inc_gets();
467            self.metrics.inc_delta_hit();
468            return Ok(Some(value));
469        }
470        self.metrics.inc_delta_miss();
471        if let Some(value) = self.wos_for_table(table).get(table, key)? {
472            self.metrics.inc_gets();
473            self.metrics.inc_wos_hit();
474            return Ok(Some(value));
475        }
476        self.metrics.inc_wos_miss();
477
478        // ════════════════════════════════════════════
479        // MVCC Fallback: Transaction Commit 후 데이터 조회
480        // ════════════════════════════════════════════
481        // Transaction::commit()은 insert_versioned()와 insert()를 모두 호출하므로
482        // 일반적으로 위의 Fast-path에서 데이터를 찾을 수 있습니다.
483        //
484        // 하지만 다음 경우에 이 Fallback이 필요합니다:
485        // 1. insert_versioned()만 호출된 경우 (일반 key 없음)
486        // 2. 향후 MVCC 전용 모드 지원 시
487        // 3. Snapshot isolation 구현 시
488        //
489        // 현재는 최신 타임스탬프로 조회하지만, 향후 snapshot_ts를 인자로 받아
490        // 특정 시점의 데이터를 조회할 수 있도록 확장 가능합니다.
491        let current_ts = self.tx_manager.allocate_commit_ts();
492        let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), current_ts);
493        let encoded_key = vk.encode();
494
495        // Delta에서 versioned key 조회
496        if let Some(value) = self.delta.get(table, &encoded_key)? {
497            return Ok(Self::decode_mvcc_value(value));
498        }
499
500        // WOS에서 versioned key 조회
501        if let Some(value) = self.wos_for_table(table).get(table, &encoded_key)? {
502            return Ok(Self::decode_mvcc_value(value));
503        }
504
505        // ── Metrics ──────────────────────────────────────────────────────────
506        self.metrics.inc_gets();
507        // No hit recorded for MVCC fallback path
508
509        Ok(None)
510    }
511
512    /// MVCC 값 디코딩 (Tombstone 필터링)
513    #[inline(always)]
514    fn decode_mvcc_value(v: Vec<u8>) -> Option<Vec<u8>> {
515        if v.len() < MVCC_PREFIX_LEN || v[0] != 0x00 {
516            return Some(v); // Legacy value
517        }
518
519        match v[1] {
520            0x01 => Some(v[MVCC_PREFIX_LEN..].to_vec()), // Value
521            0x02 => None,                                // Tombstone
522            _ => Some(v),                                // Unknown tag
523        }
524    }
525
526    /// VersionedKey 디코딩
527    #[inline(always)]
528    fn decode_versioned_key(k: Vec<u8>) -> Vec<u8> {
529        if k.len() <= 8 {
530            return k;
531        }
532
533        crate::transaction::mvcc::version::VersionedKey::decode(&k)
534            .map(|vk| vk.user_key)
535            .unwrap_or(k)
536    }
537
538    /// 테이블의 모든 키-값 쌍을 스캔합니다.
539    ///
540    /// Delta + WOS 를 `rayon::join()`으로 병렬 스캔합니다.
541    pub fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
542        use crate::storage::StorageBackend;
543
544        // Delta + WOS 병렬 스캔
545        let (delta_result, wos_result) = rayon::join(
546            || StorageBackend::scan(&self.delta, table, ..),
547            || self.wos_for_table(table).scan(table, ..),
548        );
549
550        let delta_entries = delta_result?;
551        let wos_entries = wos_result?;
552
553        // Fast-path: Delta가 비어있으면 WOS만 반환
554        if delta_entries.is_empty() {
555            return Ok(wos_entries
556                .into_iter()
557                .filter_map(|(k, v)| {
558                    let dk = Self::decode_versioned_key(k);
559                    Self::decode_mvcc_value(v).map(|dv| (dk, dv))
560                })
561                .collect());
562        }
563
564        // 2-way merge (both are already sorted)
565        let mut result = Vec::with_capacity(delta_entries.len() + wos_entries.len());
566
567        let mut i = 0;
568        let mut j = 0;
569
570        while i < delta_entries.len() && j < wos_entries.len() {
571            match delta_entries[i].0.cmp(&wos_entries[j].0) {
572                std::cmp::Ordering::Less => {
573                    if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
574                        let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
575                        result.push((user_key, decoded_v));
576                    }
577                    i += 1;
578                }
579                std::cmp::Ordering::Equal => {
580                    if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
581                        let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
582                        result.push((user_key, decoded_v));
583                    }
584                    i += 1;
585                    j += 1;
586                }
587                std::cmp::Ordering::Greater => {
588                    if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
589                        let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
590                        result.push((user_key, decoded_v));
591                    }
592                    j += 1;
593                }
594            }
595        }
596
597        while i < delta_entries.len() {
598            if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
599                let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
600                result.push((user_key, decoded_v));
601            }
602            i += 1;
603        }
604
605        while j < wos_entries.len() {
606            if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
607                let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
608                result.push((user_key, decoded_v));
609            }
610            j += 1;
611        }
612
613        Ok(result)
614    }
615
616    /// 테이블의 키 범위를 스캔합니다.
617    pub fn range(
618        &self,
619        table: &str,
620        start_key: &[u8],
621        end_key: &[u8],
622    ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
623        let range = start_key.to_vec()..end_key.to_vec();
624
625        // Scan both Delta Store and WOS with range bounds
626        let mut merged = std::collections::BTreeMap::new();
627        for (k, v) in self.delta.scan(table, range.clone())? {
628            merged.insert(k, v);
629        }
630        for (k, v) in self.wos_for_table(table).scan(table, range)? {
631            merged.entry(k).or_insert(v);
632        }
633
634        Ok(merged.into_iter().collect())
635    }
636
637    /// 테이블의 행 개수를 반환합니다.
638    pub fn table_row_count(&self, table: &str) -> DbxResult<usize> {
639        self.count(table)
640    }
641
642    // ════════════════════════════════════════════
643    // DELETE Operations
644    // ════════════════════════════════════════════
645
646    /// 키를 삭제합니다.
647    pub fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
648        #[cfg(feature = "index")]
649        if self.has_index(table, "key") {
650            let row_ids = self.index.lookup(table, "key", key)?;
651            for row_id in row_ids {
652                self.index.update_on_delete(table, "key", key, row_id)?;
653            }
654        }
655
656        // 1. Delete from legacy
657        let delta_deleted = self.delta.delete(table, key)?;
658        let wos_deleted = self.wos_for_table(table).delete(table, key)?;
659
660        // 2. Add versioned tombstone if it was a versioned key
661        #[cfg(feature = "mvcc")]
662        {
663            let commit_ts = self.tx_manager.allocate_commit_ts();
664            self.insert_versioned(table, key, None, commit_ts)?;
665        }
666
667        // Event-driven MV 갱신 알림
668        self.mat_view_registry.notify_change(table);
669
670        self.metrics.inc_deletes();
671        Ok(delta_deleted || wos_deleted)
672    }
673
674    // ════════════════════════════════════════════
675    // Helper Methods
676    // ════════════════════════════════════════════
677
678    /// Synchronize the Columnar Cache with the latest data from Delta Store.
679    ///
680    /// If the table has a schema in table_schemas, it will be synced as typed data.
681    /// Otherwise, it will be synced as raw Binary data.
682    pub fn sync_columnar_cache(&self, table: &str) -> DbxResult<usize> {
683        let base_table = if let Some(idx) = table.find("__shard_") {
684            &table[..idx]
685        } else {
686            table
687        };
688
689        // Check if table has a schema (SQL table) - case-insensitive
690        let schemas = self.table_schemas.read().unwrap();
691        let table_schema = schemas
692            .get(base_table)
693            .or_else(|| {
694                let table_lower = base_table.to_lowercase();
695                schemas
696                    .iter()
697                    .find(|(k, _)| k.to_lowercase() == table_lower)
698                    .map(|(_, v)| v)
699            })
700            .cloned();
701        drop(schemas);
702
703        // Scan from both Tier 1 (Delta) and Tier 3 (WOS)
704        let table_lower = table.to_lowercase();
705        let mut rows = self.delta.scan(&table_lower, ..)?;
706        let mut wos_rows = self.wos_for_table(&table_lower).scan(&table_lower, ..)?;
707        rows.append(&mut wos_rows);
708
709        self.columnar_cache
710            .sync_from_storage(table, rows, table_schema)
711    }
712
713    /// Sync data from multiple tiers (Delta and ROS) to GPU for merge operations.
714    pub fn sync_gpu_cache_multi_tier(&self, table: &str) -> DbxResult<()> {
715        let gpu = self
716            .gpu_manager
717            .as_ref()
718            .ok_or_else(|| DbxError::NotImplemented("GPU manager not available".to_string()))?;
719
720        // 1. Sync Delta data (Tier 1)
721        let delta_batches = self.columnar_cache.get_batches(table, None)?;
722        if let Some(batches) = delta_batches {
723            for batch in batches {
724                gpu.upload_batch_pinned(&format!("{}_delta", table), &batch)?;
725            }
726        }
727
728        // 2. Sync ROS data (Tier 5) - simplified: assuming ROS is already in SQL tables for now
729        let tables = self.tables.read().unwrap();
730        if let Some(batches) = tables.get(table) {
731            for batch in batches {
732                gpu.upload_batch_pinned(&format!("{}_ros", table), batch)?;
733            }
734        }
735
736        Ok(())
737    }
738
739    /// Legacy method to sync data from Columnar Cache to GPU.
740    pub fn sync_gpu_cache(&self, table: &str) -> DbxResult<()> {
741        self.sync_gpu_cache_multi_tier(table)
742    }
743
744    /// Execute an operation on GPU with automatic fallback to CPU on any error.
745    pub fn gpu_exec_with_fallback<T, F, C>(&self, gpu_op: F, cpu_op: C) -> DbxResult<T>
746    where
747        F: FnOnce(&crate::storage::gpu::GpuManager) -> DbxResult<T>,
748        C: FnOnce() -> DbxResult<T>,
749    {
750        if let Some(gpu) = &self.gpu_manager {
751            match gpu_op(gpu) {
752                Ok(val) => Ok(val),
753                Err(e) => {
754                    tracing::warn!("GPU execution failed, falling back to CPU: {:?}", e);
755                    cpu_op()
756                }
757            }
758        } else {
759            cpu_op()
760        }
761    }
762
763    // ════════════════════════════════════════════
764    // CAS (Compare-And-Swap) Operations
765    // ════════════════════════════════════════════
766
767    /// 값이 없을 때만 삽입 (Atomic CAS)
768    pub fn insert_if_not_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
769        let _guard = self.cas_locks.lock(table, key);
770
771        if self.get(table, key)?.is_none() {
772            self.insert(table, key, value)?;
773            Ok(true)
774        } else {
775            Ok(false)
776        }
777    }
778
779    /// 기존 값과 비교하여 일치할 때만 새로운 값으로 교체 (Atomic CAS)
780    pub fn compare_and_swap(
781        &self,
782        table: &str,
783        key: &[u8],
784        expected: &[u8],
785        new_value: &[u8],
786    ) -> DbxResult<bool> {
787        let _guard = self.cas_locks.lock(table, key);
788
789        if let Some(current) = self.get(table, key)?
790            && current == expected
791        {
792            self.insert(table, key, new_value)?;
793            return Ok(true);
794        }
795        Ok(false)
796    }
797
798    /// 기존 값이 존재할 때만 업데이트 (Atomic CAS)
799    pub fn update_if_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
800        let _guard = self.cas_locks.lock(table, key);
801
802        if self.get(table, key)?.is_some() {
803            self.insert(table, key, value)?;
804            Ok(true)
805        } else {
806            Ok(false)
807        }
808    }
809
810    /// 기존 값과 일치할 때만 삭제 (Atomic CAS)
811    pub fn delete_if_equals(&self, table: &str, key: &[u8], expected: &[u8]) -> DbxResult<bool> {
812        let _guard = self.cas_locks.lock(table, key);
813
814        if let Some(current) = self.get(table, key)?
815            && current == expected
816        {
817            self.delete(table, key)?;
818            return Ok(true);
819        }
820        Ok(false)
821    }
822}
823
824// ════════════════════════════════════════════
825// DatabaseCore Trait Implementation
826// ════════════════════════════════════════════
827
828impl crate::traits::DatabaseCore for Database {
829    fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
830        // Reuse existing implementation
831        Database::insert(self, table, key, value)
832    }
833
834    fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
835        // Reuse existing implementation
836        Database::get(self, table, key)
837    }
838
839    fn delete(&self, table: &str, key: &[u8]) -> DbxResult<()> {
840        // Reuse existing implementation
841        Database::delete(self, table, key).map(|_| ())
842    }
843
844    fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
845        // Reuse existing implementation
846        Database::scan(self, table)
847    }
848
849    fn flush(&self) -> DbxResult<()> {
850        // Reuse existing implementation
851        Database::flush(self)
852    }
853
854    fn insert_batch(&self, table: &str, entries: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
855        // Reuse existing implementation
856        Database::insert_batch(self, table, entries)
857    }
858
859    fn insert_if_not_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
860        Database::insert_if_not_exists(self, table, key, value)
861    }
862
863    fn compare_and_swap(
864        &self,
865        table: &str,
866        key: &[u8],
867        expected: &[u8],
868        new_value: &[u8],
869    ) -> DbxResult<bool> {
870        Database::compare_and_swap(self, table, key, expected, new_value)
871    }
872
873    fn update_if_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
874        Database::update_if_exists(self, table, key, value)
875    }
876
877    fn delete_if_equals(&self, table: &str, key: &[u8], expected: &[u8]) -> DbxResult<bool> {
878        Database::delete_if_equals(self, table, key, expected)
879    }
880}
881
882// ════════════════════════════════════════════
883// DatabaseSerde Trait Implementation
884// ════════════════════════════════════════════
885
886impl crate::traits::DatabaseSerde for Database {
887    fn insert_struct<T: serde::Serialize>(
888        &self,
889        table: &str,
890        key: &[u8],
891        data: &T,
892    ) -> DbxResult<()> {
893        let serialized = bincode::serialize(data).map_err(|e| {
894            crate::error::DbxError::Serialization(format!("Failed to serialize struct: {}", e))
895        })?;
896        self.insert(table, key, &serialized)
897    }
898
899    fn get_struct<T: serde::de::DeserializeOwned>(
900        &self,
901        table: &str,
902        key: &[u8],
903    ) -> DbxResult<Option<T>> {
904        if let Some(bytes) = self.get(table, key)? {
905            let deserialized = bincode::deserialize(&bytes).map_err(|e| {
906                crate::error::DbxError::Serialization(format!(
907                    "Failed to deserialize struct: {}",
908                    e
909                ))
910            })?;
911            Ok(Some(deserialized))
912        } else {
913            Ok(None)
914        }
915    }
916}