1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
//! Transaction state machine (§12.10, bd-7pxb).
//!
//! Implements BEGIN/COMMIT/ROLLBACK with four transaction modes (DEFERRED,
//! IMMEDIATE, EXCLUSIVE, CONCURRENT) and a LIFO savepoint stack.
use std::collections::HashMap;
use fsqlite_ast::TransactionMode;
use fsqlite_error::{FrankenError, Result};
use tracing::{debug, error, info};
// ---------------------------------------------------------------------------
// Lock state
// ---------------------------------------------------------------------------
/// SQLite-compatible lock level for the transaction state machine.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LockLevel {
/// No lock held.
None,
/// Shared lock (readers).
Shared,
/// Reserved lock (pending writer).
Reserved,
/// Exclusive lock (active writer, blocks readers in rollback journal mode).
Exclusive,
}
// ---------------------------------------------------------------------------
// Transaction state
// ---------------------------------------------------------------------------
/// Current state of a connection's transaction.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxnState {
/// No active transaction (autocommit mode).
Idle,
/// Transaction is active.
Active,
/// Transaction is in error state (needs ROLLBACK).
Error,
}
// ---------------------------------------------------------------------------
// Savepoint
// ---------------------------------------------------------------------------
/// A savepoint on the LIFO stack.
///
/// RELEASE X commits work since SAVEPOINT X and removes X and all later
/// savepoints. ROLLBACK TO X undoes work since X but leaves X on the stack.
#[derive(Debug, Clone)]
pub struct SavepointEntry {
/// User-visible savepoint name.
pub name: String,
/// Write-set snapshot (page_number → data copy) for partial rollback.
write_set_snapshot: HashMap<u64, Vec<u8>>,
}
// ---------------------------------------------------------------------------
// TransactionController
// ---------------------------------------------------------------------------
/// Manages the transaction lifecycle for a single connection.
///
/// Tracks the current transaction mode, lock level, and savepoint stack.
/// This is the "SQL layer" state machine; the underlying MVCC machinery
/// lives in `fsqlite_mvcc::lifecycle::TransactionManager`.
#[derive(Debug)]
pub struct TransactionController {
/// Current transaction state.
state: TxnState,
/// Transaction mode (set at BEGIN time).
mode: Option<TransactionMode>,
/// Current lock level.
lock_level: LockLevel,
/// LIFO savepoint stack.
savepoints: Vec<SavepointEntry>,
/// Write-set tracking for savepoint rollback support.
write_set: HashMap<u64, Vec<u8>>,
/// Whether we are in CONCURRENT (MVCC) mode.
concurrent: bool,
/// Whether the transaction was implicitly started by a SAVEPOINT.
implicit_txn: bool,
}
impl TransactionController {
/// Create a new transaction controller in idle state.
#[must_use]
pub fn new() -> Self {
Self {
state: TxnState::Idle,
mode: None,
lock_level: LockLevel::None,
savepoints: Vec::new(),
write_set: HashMap::new(),
concurrent: false,
implicit_txn: false,
}
}
/// Current transaction state.
#[must_use]
pub const fn state(&self) -> TxnState {
self.state
}
/// Current lock level.
#[must_use]
pub const fn lock_level(&self) -> LockLevel {
self.lock_level
}
/// Current transaction mode.
#[must_use]
pub const fn mode(&self) -> Option<TransactionMode> {
self.mode
}
/// Whether we are in CONCURRENT (MVCC) mode.
#[must_use]
pub const fn is_concurrent(&self) -> bool {
self.concurrent
}
/// Number of savepoints on the stack.
#[must_use]
pub fn savepoint_depth(&self) -> usize {
self.savepoints.len()
}
// -----------------------------------------------------------------------
// BEGIN
// -----------------------------------------------------------------------
/// Begin a transaction with the given mode.
///
/// # Errors
/// Returns `FrankenError::Busy` if a transaction is already active.
pub fn begin(&mut self, mode: Option<TransactionMode>) -> Result<()> {
if self.state != TxnState::Idle {
error!(
begin_mode = ?mode,
"BEGIN failed: transaction already active"
);
return Err(FrankenError::Busy);
}
let resolved_mode = mode.unwrap_or(TransactionMode::Deferred);
// Acquire locks based on mode.
let (lock, concurrent) = match resolved_mode {
TransactionMode::Deferred => {
// DEFERRED: no lock until first read/write.
(LockLevel::None, false)
}
TransactionMode::Immediate => {
// IMMEDIATE: acquire RESERVED lock immediately.
(LockLevel::Reserved, false)
}
TransactionMode::Exclusive => {
// EXCLUSIVE: acquire EXCLUSIVE lock immediately.
(LockLevel::Exclusive, false)
}
TransactionMode::Concurrent => {
// CONCURRENT: enter MVCC concurrent writer mode with snapshot.
(LockLevel::Shared, true)
}
};
self.state = TxnState::Active;
self.mode = Some(resolved_mode);
self.lock_level = lock;
self.concurrent = concurrent;
self.write_set.clear();
info!(
begin_mode = ?resolved_mode,
lock_level = ?lock,
concurrent,
"transaction started"
);
Ok(())
}
// -----------------------------------------------------------------------
// COMMIT / END
// -----------------------------------------------------------------------
/// Commit the active transaction.
///
/// END TRANSACTION is a synonym for COMMIT (invariant #5).
///
/// # Errors
/// Returns error if no transaction is active or if in error state.
pub fn commit(&mut self) -> Result<()> {
match self.state {
TxnState::Idle => {
return Err(FrankenError::NoActiveTransaction);
}
TxnState::Error => {
error!("COMMIT failed: transaction is in error state, must ROLLBACK");
return Err(FrankenError::Busy);
}
TxnState::Active => {}
}
info!(
mode = ?self.mode,
savepoint_depth = self.savepoints.len(),
"commit"
);
self.reset();
Ok(())
}
// -----------------------------------------------------------------------
// ROLLBACK
// -----------------------------------------------------------------------
/// Roll back the active transaction, undoing all changes since BEGIN.
///
/// # Errors
/// Returns error if no transaction is active.
pub fn rollback(&mut self) -> Result<()> {
if self.state == TxnState::Idle {
return Err(FrankenError::NoActiveTransaction);
}
info!(
mode = ?self.mode,
savepoint_depth = self.savepoints.len(),
"rollback"
);
self.reset();
Ok(())
}
// -----------------------------------------------------------------------
// SAVEPOINT
// -----------------------------------------------------------------------
/// Create a named savepoint (pushes onto LIFO stack).
///
/// If no transaction is active, implicitly starts a DEFERRED transaction
/// (per SQLite semantics: SAVEPOINT outside a transaction starts one).
#[allow(clippy::needless_pass_by_value)]
pub fn savepoint(&mut self, name: String) -> Result<()> {
if self.state == TxnState::Idle {
self.begin(Some(TransactionMode::Deferred))?;
self.implicit_txn = true;
}
let entry = SavepointEntry {
name: name.clone(),
write_set_snapshot: self.write_set.clone(),
};
self.savepoints.push(entry);
debug!(
savepoint = %name,
depth = self.savepoints.len(),
"savepoint created"
);
Ok(())
}
/// RELEASE savepoint: commits all work since SAVEPOINT X and removes
/// X and all more recent savepoints from the stack (invariant #6).
///
/// # Errors
/// Returns error if the named savepoint is not on the stack.
pub fn release(&mut self, name: &str) -> Result<()> {
let pos = self.find_savepoint(name)?;
// Remove the named savepoint and all more recent ones.
let removed = self.savepoints.len() - pos;
self.savepoints.truncate(pos);
debug!(
savepoint = %name,
removed,
remaining = self.savepoints.len(),
"savepoint released"
);
// If releasing the last savepoint and we implicitly began a
// transaction, commit it.
if self.savepoints.is_empty() && self.state == TxnState::Active && self.implicit_txn {
// Per SQLite: RELEASE of the outermost savepoint is equivalent to COMMIT.
self.commit()?;
}
Ok(())
}
/// ROLLBACK TO savepoint: undoes all work since SAVEPOINT X but
/// leaves X on the stack for further use (invariant #7).
///
/// # Errors
/// Returns error if the named savepoint is not on the stack.
pub fn rollback_to(&mut self, name: &str) -> Result<()> {
let pos = self.find_savepoint(name)?;
// Remove all savepoints more recent than X (but keep X itself).
self.savepoints.truncate(pos + 1);
// Restore write set to the snapshot taken when X was created.
let sp = &self.savepoints[pos];
self.write_set = sp.write_set_snapshot.clone();
// If we were in error state, ROLLBACK TO clears it.
if self.state == TxnState::Error {
self.state = TxnState::Active;
}
info!(
savepoint = %name,
depth = self.savepoints.len(),
"rollback to savepoint"
);
Ok(())
}
// -----------------------------------------------------------------------
// Write-set tracking (for savepoint rollback)
// -----------------------------------------------------------------------
/// Record a page write in the write set (for savepoint rollback support).
pub fn record_write(&mut self, page_number: u64, data: Vec<u8>) {
// Only record if not already present (we want the original pre-image).
self.write_set.entry(page_number).or_insert(data);
}
/// Promote lock level on first read (DEFERRED → SHARED) or first write
/// (SHARED/RESERVED → appropriate level).
pub fn promote_on_read(&mut self) {
if self.state == TxnState::Active && self.lock_level == LockLevel::None {
self.lock_level = LockLevel::Shared;
debug!("DEFERRED transaction promoted to SHARED on first read");
}
}
/// Promote lock level on first write.
pub fn promote_on_write(&mut self) {
if self.state == TxnState::Active {
match self.lock_level {
LockLevel::None | LockLevel::Shared => {
if self.concurrent {
// CONCURRENT mode: stay at SHARED, use page-level locks.
self.lock_level = LockLevel::Shared;
} else {
self.lock_level = LockLevel::Reserved;
}
debug!(
lock_level = ?self.lock_level,
concurrent = self.concurrent,
"transaction promoted on first write"
);
}
LockLevel::Reserved | LockLevel::Exclusive => {
// Already at or above RESERVED, no promotion needed.
}
}
}
}
/// Mark transaction as in error state (e.g., after a constraint violation).
pub fn set_error(&mut self) {
if self.state == TxnState::Active {
self.state = TxnState::Error;
error!("transaction entered error state");
}
}
// -----------------------------------------------------------------------
// Internal helpers
// -----------------------------------------------------------------------
/// Find a savepoint by name (case-insensitive, searches from top of stack).
fn find_savepoint(&self, name: &str) -> Result<usize> {
for (i, sp) in self.savepoints.iter().enumerate().rev() {
if sp.name.eq_ignore_ascii_case(name) {
return Ok(i);
}
}
Err(FrankenError::internal(format!("no such savepoint: {name}")))
}
/// Reset all transaction state back to idle.
fn reset(&mut self) {
self.state = TxnState::Idle;
self.mode = None;
self.lock_level = LockLevel::None;
self.savepoints.clear();
self.write_set.clear();
self.concurrent = false;
self.implicit_txn = false;
}
}
impl Default for TransactionController {
fn default() -> Self {
Self::new()
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
// === Test 1: BEGIN DEFERRED ===
#[test]
fn test_begin_deferred() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Deferred)).unwrap();
assert_eq!(tc.state(), TxnState::Active);
// DEFERRED: no lock until first read/write (invariant #1).
assert_eq!(tc.lock_level(), LockLevel::None);
}
// === Test 2: BEGIN IMMEDIATE ===
#[test]
fn test_begin_immediate() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Immediate)).unwrap();
assert_eq!(tc.state(), TxnState::Active);
// IMMEDIATE: RESERVED lock immediately (invariant #2).
assert_eq!(tc.lock_level(), LockLevel::Reserved);
}
// === Test 3: BEGIN EXCLUSIVE ===
#[test]
fn test_begin_exclusive() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Exclusive)).unwrap();
assert_eq!(tc.state(), TxnState::Active);
// EXCLUSIVE: EXCLUSIVE lock immediately (invariant #3).
assert_eq!(tc.lock_level(), LockLevel::Exclusive);
}
// === Test 4: BEGIN CONCURRENT ===
#[test]
fn test_begin_concurrent() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Concurrent)).unwrap();
assert_eq!(tc.state(), TxnState::Active);
// CONCURRENT: enters MVCC mode (invariant #4).
assert!(tc.is_concurrent());
assert_eq!(tc.lock_level(), LockLevel::Shared);
}
// === Test 5: CONCURRENT no conflict (two controllers, different pages) ===
#[test]
fn test_concurrent_no_conflict() {
let mut tc1 = TransactionController::new();
let mut tc2 = TransactionController::new();
tc1.begin(Some(TransactionMode::Concurrent)).unwrap();
tc2.begin(Some(TransactionMode::Concurrent)).unwrap();
// Writer 1 modifies page 1.
tc1.promote_on_write();
tc1.record_write(1, vec![0xAA; 4096]);
// Writer 2 modifies page 2 (different page, no conflict).
tc2.promote_on_write();
tc2.record_write(2, vec![0xBB; 4096]);
// Both commit successfully.
tc1.commit().unwrap();
tc2.commit().unwrap();
}
// === Test 6: CONCURRENT page conflict detection ===
// Note: Full page-level conflict detection with SQLITE_BUSY_SNAPSHOT
// requires the MVCC TransactionManager from fsqlite-mvcc. This test
// verifies the state machine correctly tracks concurrent mode.
#[test]
fn test_concurrent_page_conflict() {
let mut tc1 = TransactionController::new();
let mut tc2 = TransactionController::new();
tc1.begin(Some(TransactionMode::Concurrent)).unwrap();
tc2.begin(Some(TransactionMode::Concurrent)).unwrap();
assert!(tc1.is_concurrent());
assert!(tc2.is_concurrent());
// Both write to the same page — conflict detection would happen at
// the MVCC layer (TransactionManager). Here we verify state tracking.
tc1.record_write(1, vec![0xAA; 4096]);
tc2.record_write(1, vec![0xBB; 4096]);
// In the full system, tc2.commit() would return SQLITE_BUSY_SNAPSHOT.
// At this layer, both commits succeed; the MVCC layer enforces conflicts.
tc1.commit().unwrap();
tc2.commit().unwrap();
}
// === Test 7: END TRANSACTION is synonym for COMMIT (invariant #5) ===
#[test]
fn test_commit_end_synonym() {
let mut tc = TransactionController::new();
tc.begin(None).unwrap();
assert_eq!(tc.state(), TxnState::Active);
// COMMIT and END are the same operation.
tc.commit().unwrap();
assert_eq!(tc.state(), TxnState::Idle);
}
// === Test 8: ROLLBACK undoes all changes ===
#[test]
fn test_rollback() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Immediate)).unwrap();
tc.record_write(1, vec![0xAA; 100]);
tc.rollback().unwrap();
assert_eq!(tc.state(), TxnState::Idle);
assert_eq!(tc.lock_level(), LockLevel::None);
}
// === Test 9: SAVEPOINT creates named savepoint ===
#[test]
fn test_savepoint_basic() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Deferred)).unwrap();
tc.savepoint("sp1".to_owned()).unwrap();
assert_eq!(tc.savepoint_depth(), 1);
}
// === Test 10: RELEASE commits work and removes savepoint ===
#[test]
fn test_savepoint_release() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Immediate)).unwrap();
tc.savepoint("sp1".to_owned()).unwrap();
tc.record_write(1, vec![0xAA; 100]);
tc.release("sp1").unwrap();
// Savepoint removed.
assert_eq!(tc.savepoint_depth(), 0);
}
// === Test 11: RELEASE X removes X and all more recent savepoints (invariant #6) ===
#[test]
fn test_savepoint_release_removes_later() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Immediate)).unwrap();
tc.savepoint("sp1".to_owned()).unwrap();
tc.savepoint("sp2".to_owned()).unwrap();
tc.savepoint("sp3".to_owned()).unwrap();
assert_eq!(tc.savepoint_depth(), 3);
// RELEASE sp1 removes sp1, sp2, sp3.
tc.release("sp1").unwrap();
assert_eq!(tc.savepoint_depth(), 0);
}
// === Test 12: ROLLBACK TO undoes work since savepoint but preserves it (invariant #7) ===
#[test]
fn test_savepoint_rollback_to() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Immediate)).unwrap();
tc.savepoint("sp1".to_owned()).unwrap();
tc.record_write(1, vec![0xAA; 100]);
tc.rollback_to("sp1").unwrap();
// Savepoint still on stack.
assert_eq!(tc.savepoint_depth(), 1);
}
// === Test 13: Multiple nested savepoints form a stack ===
#[test]
fn test_savepoint_nested() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Immediate)).unwrap();
tc.savepoint("sp1".to_owned()).unwrap();
tc.savepoint("sp2".to_owned()).unwrap();
tc.savepoint("sp3".to_owned()).unwrap();
assert_eq!(tc.savepoint_depth(), 3);
// ROLLBACK TO sp2 removes sp3 but keeps sp1 and sp2.
tc.rollback_to("sp2").unwrap();
assert_eq!(tc.savepoint_depth(), 2);
}
// === Test 14: After ROLLBACK TO, further operations within same scope ===
#[test]
fn test_savepoint_rollback_then_continue() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Immediate)).unwrap();
tc.savepoint("sp1".to_owned()).unwrap();
tc.record_write(1, vec![0xAA; 100]);
tc.rollback_to("sp1").unwrap();
// Can continue operating after ROLLBACK TO.
tc.record_write(2, vec![0xBB; 100]);
tc.commit().unwrap();
assert_eq!(tc.state(), TxnState::Idle);
}
// === Test: DEFERRED lock promotion ===
#[test]
fn test_deferred_lock_promotion() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Deferred)).unwrap();
assert_eq!(tc.lock_level(), LockLevel::None);
// First read promotes to SHARED.
tc.promote_on_read();
assert_eq!(tc.lock_level(), LockLevel::Shared);
// First write promotes to RESERVED.
tc.promote_on_write();
assert_eq!(tc.lock_level(), LockLevel::Reserved);
}
// === Test: Error state requires ROLLBACK ===
#[test]
fn test_error_state_requires_rollback() {
let mut tc = TransactionController::new();
tc.begin(None).unwrap();
tc.set_error();
assert_eq!(tc.state(), TxnState::Error);
// COMMIT should fail in error state.
assert!(tc.commit().is_err());
// ROLLBACK succeeds.
tc.rollback().unwrap();
assert_eq!(tc.state(), TxnState::Idle);
}
// === Test: Cannot begin within a transaction ===
#[test]
fn test_begin_within_transaction() {
let mut tc = TransactionController::new();
tc.begin(None).unwrap();
assert!(tc.begin(None).is_err());
}
// === Test: SAVEPOINT outside transaction starts one ===
#[test]
fn test_savepoint_starts_transaction() {
let mut tc = TransactionController::new();
assert_eq!(tc.state(), TxnState::Idle);
tc.savepoint("sp1".to_owned()).unwrap();
assert_eq!(tc.state(), TxnState::Active);
assert_eq!(tc.savepoint_depth(), 1);
tc.release("sp1").unwrap();
assert_eq!(tc.state(), TxnState::Idle);
}
// === Test: Explicit transaction does not commit on outermost release ===
#[test]
fn test_savepoint_explicit_transaction_no_commit_on_release() {
let mut tc = TransactionController::new();
tc.begin(Some(TransactionMode::Deferred)).unwrap();
tc.savepoint("sp1".to_owned()).unwrap();
assert_eq!(tc.state(), TxnState::Active);
tc.release("sp1").unwrap();
assert_eq!(tc.state(), TxnState::Active); // Remains active
tc.commit().unwrap();
assert_eq!(tc.state(), TxnState::Idle);
}
// === Test: ROLLBACK TO clears error state ===
#[test]
fn test_rollback_to_clears_error() {
let mut tc = TransactionController::new();
tc.begin(None).unwrap();
tc.savepoint("sp1".to_owned()).unwrap();
tc.set_error();
assert_eq!(tc.state(), TxnState::Error);
tc.rollback_to("sp1").unwrap();
assert_eq!(tc.state(), TxnState::Active);
}
}