Skip to main content

dbx_core/transaction/
api.rs

1//! Typestate Transaction — 타입 안전 트랜잭션
2//!
3//! Typestate 패턴을 사용하여 트랜잭션 오용을 컴파일 타임에 방지.
4//! 트랜잭션 내 쓰기는 로컬 버퍼에 축적되며,
5//! `commit()` 시 메인 Delta Store에 원자적으로 반영됩니다.
6
7use crate::engine::Database;
8use crate::error::DbxResult;
9use crate::sql::builder::{Execute, Query, QueryOne, QueryOptional, QueryScalar};
10use std::collections::HashMap;
11use std::marker::PhantomData;
12
13/// 트랜잭션 상태 트레이트
14pub trait TxState {}
15
16/// Active 상태 — 트랜잭션 진행 중
17pub struct Active;
18
19/// Committed 상태 — 커밋 완료
20pub struct Committed;
21
22/// RolledBack 상태 — 롤백 완료
23pub struct RolledBack;
24
25impl TxState for Active {}
26impl TxState for Committed {}
27impl TxState for RolledBack {}
28
29/// 트랜잭션 내 쓰기 작업 로그
30#[derive(Debug, Clone)]
31enum TxOp {
32    /// Insert(table, key, value)
33    Insert(String, Vec<u8>, Vec<u8>),
34    /// Delete(table, key)
35    Delete(String, Vec<u8>),
36    /// Batch(table, rows)
37    Batch(String, Vec<(Vec<u8>, Vec<u8>)>),
38}
39
40/// Typestate Transaction
41///
42/// Active 상태에서만 쿼리/쓰기가 가능하며,
43/// commit/rollback 후에는 컴파일 타임에 사용 불가.
44pub struct Transaction<'a, S: TxState> {
45    db: &'a Database,
46    /// 트랜잭션 내 쓰기 작업 로그 (commit 시 일괄 적용)
47    ops: Vec<TxOp>,
48    /// 트랜잭션 내 로컬 읽기 버퍼 (Insert된 데이터의 읽기 지원)
49    local_buffer: HashMap<String, HashMap<Vec<u8>, Option<Vec<u8>>>>,
50    _state: PhantomData<S>,
51}
52
53impl Database {
54    /// 트랜잭션 시작
55    ///
56    /// 새로운 Active 트랜잭션을 생성합니다.
57    /// 트랜잭션 내 모든 쓰기는 로컬 버퍼에 축적되며,
58    /// `commit()` 호출 시에만 메인 스토리지에 반영됩니다.
59    pub fn begin(&self) -> DbxResult<Transaction<'_, Active>> {
60        Ok(Transaction {
61            db: self,
62            ops: Vec::new(),
63            local_buffer: HashMap::new(),
64            _state: PhantomData,
65        })
66    }
67}
68
69impl<'a> Transaction<'a, Active> {
70    // ════════════════════════════════════════════
71    // Query Methods (read-through to Database)
72    // ════════════════════════════════════════════
73
74    /// SELECT 쿼리 — 여러 행 반환
75    pub fn query<T: crate::api::FromRow>(&self, sql: impl Into<String>) -> Query<'_, T> {
76        self.db.query(sql)
77    }
78
79    /// SELECT 쿼리 — 단일 행 반환 (없으면 에러)
80    pub fn query_one<T: crate::api::FromRow>(&self, sql: impl Into<String>) -> QueryOne<'_, T> {
81        self.db.query_one(sql)
82    }
83
84    /// SELECT 쿼리 — 단일 행 반환 (없으면 None)
85    pub fn query_optional<T: crate::api::FromRow>(
86        &self,
87        sql: impl Into<String>,
88    ) -> QueryOptional<'_, T> {
89        self.db.query_optional(sql)
90    }
91
92    /// SELECT 쿼리 — 단일 스칼라 값 반환
93    pub fn query_scalar<T: crate::api::FromScalar>(
94        &self,
95        sql: impl Into<String>,
96    ) -> QueryScalar<'_, T> {
97        self.db.query_scalar(sql)
98    }
99
100    /// INSERT/UPDATE/DELETE — 영향받은 행 수 반환
101    pub fn execute(&self, sql: impl Into<String>) -> Execute<'_> {
102        self.db.execute(sql)
103    }
104
105    // ════════════════════════════════════════════
106    // Buffered Write Operations
107    // ════════════════════════════════════════════
108
109    /// 트랜잭션 내 INSERT — 로컬 버퍼에 저장 (commit 전까지 미반영)
110    pub fn insert(&mut self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
111        self.ops.push(TxOp::Insert(
112            table.to_string(),
113            key.to_vec(),
114            value.to_vec(),
115        ));
116        // 로컬 읽기 버퍼에도 반영 (트랜잭션 내 read-your-writes)
117        self.local_buffer
118            .entry(table.to_string())
119            .or_default()
120            .insert(key.to_vec(), Some(value.to_vec()));
121        Ok(())
122    }
123
124    /// 트랜잭션 내 BATCH INSERT — 여러 키-값 쌍을 일괄 삽입 (최적화됨)
125    pub fn insert_batch(&mut self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
126        self.ops.push(TxOp::Batch(table.to_string(), rows.clone()));
127        // 로컬 읽기 버퍼에도 반영
128        let table_buf = self.local_buffer.entry(table.to_string()).or_default();
129        for (key, value) in rows {
130            table_buf.insert(key, Some(value));
131        }
132        Ok(())
133    }
134
135    /// 트랜잭션 내 DELETE — 로컬 버퍼에 tombstone 기록
136    pub fn delete(&mut self, table: &str, key: &[u8]) -> DbxResult<bool> {
137        self.ops.push(TxOp::Delete(table.to_string(), key.to_vec()));
138        // 로컬 버퍼에 tombstone (None)
139        self.local_buffer
140            .entry(table.to_string())
141            .or_default()
142            .insert(key.to_vec(), None);
143        Ok(true)
144    }
145
146    /// 트랜잭션 내 GET — 로컬 버퍼 우선, 없으면 메인 스토리지 조회
147    pub fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
148        // 1. 로컬 버퍼 확인 (read-your-writes)
149        if let Some(table_buf) = self.local_buffer.get(table)
150            && let Some(value_opt) = table_buf.get(key)
151        {
152            return Ok(value_opt.clone()); // Some(value) or None (tombstone)
153        }
154        // 2. 메인 스토리지 fallback
155        self.db.get(table, key)
156    }
157
158    /// 현재 트랜잭션의 보류 중인 연산 개수
159    pub fn pending_ops(&self) -> usize {
160        self.ops.len()
161    }
162
163    // ════════════════════════════════════════════
164    // Commit / Rollback
165    // ════════════════════════════════════════════
166
167    /// 트랜잭션 커밋 — 모든 버퍼링된 쓰기를 메인 스토리지에 원자적으로 반영
168    ///
169    /// # MVCC Dual-Write 전략
170    ///
171    /// 이 구현은 MVCC versioned key와 일반 key를 모두 저장하는 dual-write 전략을 사용합니다:
172    ///
173    /// 1. **insert_versioned()**: MVCC 타임스탬프가 포함된 versioned key로 저장
174    ///    - 향후 snapshot isolation, time-travel query 지원
175    ///    - 버전 히스토리 추적 가능
176    ///
177    /// 2. **insert()**: 일반 key로도 저장
178    ///    - get() 메서드와의 backward compatibility 보장
179    ///    - Fast-path 조회 성능 유지
180    ///    - 기존 코드와의 호환성 확보
181    ///
182    /// 이 전략은 약간의 저장 공간 오버헤드가 있지만, 다음 이점을 제공합니다:
183    /// - 기존 CRUD API와 완벽한 호환성
184    /// - MVCC 기능을 점진적으로 활성화 가능
185    /// - 성능 저하 없이 transaction isolation 지원
186    pub fn commit(self) -> DbxResult<Transaction<'a, Committed>> {
187        // Allocate a unique commit timestamp for this transaction
188        let commit_ts = self.db.allocate_commit_ts();
189
190        // ops를 순서대로 메인 Delta Store에 적용 (MVCC 버전 포함)
191        for op in &self.ops {
192            match op {
193                TxOp::Insert(table, key, value) => {
194                    // Use insert_versioned to include MVCC timestamp
195                    self.db
196                        .insert_versioned(table, key, Some(value), commit_ts)?;
197                    // Also insert with regular key for get() compatibility
198                    self.db.insert(table, key, value)?;
199                }
200                TxOp::Delete(table, key) => {
201                    // Use insert_versioned with None to create tombstone
202                    self.db.insert_versioned(table, key, None, commit_ts)?;
203                    // Also delete with regular key
204                    self.db.delete(table, key)?;
205                }
206                TxOp::Batch(table, rows) => {
207                    // Batch insert with versioning
208                    for (key, value) in rows {
209                        self.db
210                            .insert_versioned(table, key, Some(value), commit_ts)?;
211                        // Also insert with regular key
212                        self.db.insert(table, key, value)?;
213                    }
214                }
215            }
216        }
217        Ok(Transaction {
218            db: self.db,
219            ops: Vec::new(),
220            local_buffer: HashMap::new(),
221            _state: PhantomData,
222        })
223    }
224
225    /// 트랜잭션 롤백 — 모든 버퍼링된 쓰기를 폐기
226    pub fn rollback(self) -> DbxResult<Transaction<'a, RolledBack>> {
227        // ops 버퍼를 단순히 버림 — 메인 스토리지에는 아무것도 적용하지 않음
228        Ok(Transaction {
229            db: self.db,
230            ops: Vec::new(),
231            local_buffer: HashMap::new(),
232            _state: PhantomData,
233        })
234    }
235}
236
237// Committed/RolledBack 상태에서는 쿼리 불가 (컴파일 에러)
238impl<'a> Transaction<'a, Committed> {
239    /// 커밋된 트랜잭션은 더 이상 사용할 수 없음
240    pub fn is_committed(&self) -> bool {
241        true
242    }
243}
244
245impl<'a> Transaction<'a, RolledBack> {
246    /// 롤백된 트랜잭션은 더 이상 사용할 수 없음
247    pub fn is_rolled_back(&self) -> bool {
248        true
249    }
250}
251
252// ════════════════════════════════════════════
253// DatabaseTransaction Trait Implementation
254// ════════════════════════════════════════════
255
256impl crate::traits::DatabaseTransaction for Database {
257    fn begin(&self) -> DbxResult<Transaction<'_, Active>> {
258        // Reuse existing implementation
259        Database::begin(self)
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use crate::engine::Database;
266
267    #[test]
268    fn test_begin_commit() {
269        let db = Database::open_in_memory().unwrap();
270        let mut tx = db.begin().unwrap();
271
272        tx.insert("users", b"u1", b"Alice").unwrap();
273        tx.insert("users", b"u2", b"Bob").unwrap();
274
275        // 커밋 전: 메인 스토리지에는 없음
276        assert_eq!(db.get("users", b"u1").unwrap(), None);
277
278        // 트랜잭션 내 read-your-writes
279        assert_eq!(tx.get("users", b"u1").unwrap(), Some(b"Alice".to_vec()));
280
281        // 커밋
282        let committed = tx.commit().unwrap();
283        assert!(committed.is_committed());
284
285        // 커밋 후: 메인 스토리지에 반영됨
286        assert_eq!(db.get("users", b"u1").unwrap(), Some(b"Alice".to_vec()));
287        assert_eq!(db.get("users", b"u2").unwrap(), Some(b"Bob".to_vec()));
288    }
289
290    #[test]
291    fn test_begin_rollback() {
292        let db = Database::open_in_memory().unwrap();
293        let mut tx = db.begin().unwrap();
294
295        tx.insert("users", b"u1", b"Alice").unwrap();
296        tx.insert("users", b"u2", b"Bob").unwrap();
297
298        // 롤백
299        let rolled_back = tx.rollback().unwrap();
300        assert!(rolled_back.is_rolled_back());
301
302        // 롤백 후: 메인 스토리지에 반영 안 됨
303        assert_eq!(db.get("users", b"u1").unwrap(), None);
304        assert_eq!(db.get("users", b"u2").unwrap(), None);
305    }
306
307    #[test]
308    fn test_delete_in_transaction() {
309        let db = Database::open_in_memory().unwrap();
310
311        // 메인에 데이터 먼저 삽입
312        db.insert("users", b"u1", b"Alice").unwrap();
313        assert_eq!(db.get("users", b"u1").unwrap(), Some(b"Alice".to_vec()));
314
315        // 트랜잭션에서 삭제
316        let mut tx = db.begin().unwrap();
317        tx.delete("users", b"u1").unwrap();
318
319        // 트랜잭션 내에서 tombstone 확인
320        assert_eq!(tx.get("users", b"u1").unwrap(), None);
321
322        // 메인에는 아직 있음
323        assert_eq!(db.get("users", b"u1").unwrap(), Some(b"Alice".to_vec()));
324
325        // 커밋 후 삭제 반영
326        tx.commit().unwrap();
327        assert_eq!(db.get("users", b"u1").unwrap(), None);
328    }
329
330    #[test]
331    fn test_read_your_writes() {
332        let db = Database::open_in_memory().unwrap();
333
334        // 메인에 초기 데이터
335        db.insert("t", b"k1", b"old").unwrap();
336
337        let mut tx = db.begin().unwrap();
338
339        // 트랜잭션 내 덮어쓰기
340        tx.insert("t", b"k1", b"new").unwrap();
341        assert_eq!(tx.get("t", b"k1").unwrap(), Some(b"new".to_vec()));
342
343        // 메인 데이터를 트랜잭션에서도 조회 가능 (로컬 버퍼에 없는 키)
344        db.insert("t", b"k2", b"main_data").unwrap();
345        assert_eq!(tx.get("t", b"k2").unwrap(), Some(b"main_data".to_vec()));
346
347        tx.rollback().unwrap();
348        // 롤백 후 메인 데이터 원복
349        assert_eq!(db.get("t", b"k1").unwrap(), Some(b"old".to_vec()));
350    }
351
352    #[test]
353    fn test_pending_ops_count() {
354        let db = Database::open_in_memory().unwrap();
355        let mut tx = db.begin().unwrap();
356
357        assert_eq!(tx.pending_ops(), 0);
358        tx.insert("t", b"a", b"1").unwrap();
359        assert_eq!(tx.pending_ops(), 1);
360        tx.delete("t", b"b").unwrap();
361        assert_eq!(tx.pending_ops(), 2);
362        tx.insert("t", b"c", b"3").unwrap();
363        assert_eq!(tx.pending_ops(), 3);
364    }
365
366    #[test]
367    fn test_empty_transaction_commit() {
368        let db = Database::open_in_memory().unwrap();
369        let tx = db.begin().unwrap();
370        let committed = tx.commit().unwrap();
371        assert!(committed.is_committed());
372    }
373
374    #[test]
375    fn test_empty_transaction_rollback() {
376        let db = Database::open_in_memory().unwrap();
377        let tx = db.begin().unwrap();
378        let rolled_back = tx.rollback().unwrap();
379        assert!(rolled_back.is_rolled_back());
380    }
381}