1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
//! Transaction Operations (MVCC & Savepoints)
//!
//! Extracted from database_legacy.rs
//! Provides ACID transactions with snapshot isolation
use crate::database::core::MoteDB;
use crate::types::{Row, RowId, Value, Timestamp, PartitionId};
use crate::txn::IsolationLevel;
use crate::{Result, StorageError};
type TransactionId = u64;
/// Transaction statistics
#[derive(Debug, Clone)]
pub struct TransactionStats {
pub active_transactions: u64,
pub total_committed: u64,
pub total_versions: u64,
pub total_rows_with_versions: u64,
pub avg_versions_per_row: f64,
}
impl MoteDB {
/// Begin a transaction with default isolation level (Read Committed)
///
/// # Example
/// ```ignore
/// let txn = db.begin_transaction()?;
/// db.insert_in_transaction(txn, 12345)?;
/// db.commit_transaction(txn)?;
/// ```
pub fn begin_transaction(&self) -> Result<TransactionId> {
self.txn_coordinator.begin(IsolationLevel::ReadCommitted)
}
/// Begin a transaction with specific isolation level
///
/// # Example
/// ```ignore
/// let txn = db.begin_transaction_with_isolation(IsolationLevel::Serializable)?;
/// ```
pub fn begin_transaction_with_isolation(&self, isolation: IsolationLevel) -> Result<TransactionId> {
self.txn_coordinator.begin(isolation)
}
/// Insert a row within a transaction
///
/// # Example
/// ```ignore
/// let row_id = db.insert_in_transaction(txn, timestamp)?;
/// ```
pub fn insert_in_transaction(&self, txn_id: TransactionId, timestamp: i64) -> Result<RowId> {
// 1. Allocate row ID
let row_id = {
let mut next_id = self.next_row_id.write();
let id = *next_id;
*next_id += 1;
id
};
// 2. Build row data
let row: Row = vec![Value::Timestamp(Timestamp::from_micros(timestamp))];
// 3. Add to transaction's write set (with table metadata)
let ctx = self.txn_coordinator.get_context(txn_id)?;
// Delta Snapshot: Record write operation
let old_value = ctx.write_set.read().get(&row_id).map(|(_, r)| r.clone());
ctx.record_write_delta(row_id, "_default", old_value, row.clone());
ctx.write_set.write().insert(row_id, ("_default".to_string(), row.clone()));
// 4. Add to read set for conflict detection
ctx.read_set.write().insert(row_id);
ctx.record_read_delta(row_id);
Ok(row_id)
}
/// Insert a row to table within a transaction (MVCC-aware)
///
/// # Flow:
/// 1. Validate row against schema
/// 2. Allocate row_id
/// 3. Add to transaction's write_set (local cache, uncommitted)
/// 4. On commit: WAL → Version Store → LSM → Index
///
/// # Example
/// ```ignore
/// let row = vec![Value::Text("Alice".into()), Value::Integer(30)];
/// let row_id = db.insert_row_to_table_txn(txn, "users", row)?;
/// ```
pub fn insert_row_to_table_txn(
&self,
txn_id: TransactionId,
table_name: &str,
row: Row,
) -> Result<RowId> {
// 1. Get table schema and validate
let schema = self.table_registry.get_table(table_name)?;
schema.validate_row(&row).map_err(|e| {
StorageError::InvalidData(format!(
"Row validation failed for table '{}': {}",
table_name, e
))
})?;
// 2. Allocate row ID
let row_id = {
let mut next_id = self.next_row_id.write();
let id = *next_id;
*next_id += 1;
id
};
// 3. Add to transaction's write set (with table_name metadata)
let ctx = self.txn_coordinator.get_context(txn_id)?;
// Delta Snapshot: Record write operation for active savepoints
let old_value = ctx.write_set.read().get(&row_id).map(|(_, r)| r.clone());
ctx.record_write_delta(row_id, table_name, old_value, row.clone());
// Store (table_name, row) directly in write_set
ctx.write_set.write().insert(row_id, (table_name.to_string(), row));
// 4. Add to read set for conflict detection (Serializable isolation)
ctx.read_set.write().insert(row_id);
ctx.record_read_delta(row_id);
Ok(row_id)
}
/// Query rows within a transaction (MVCC snapshot isolation)
///
/// Returns rows visible to this transaction's snapshot.
///
/// # Example
/// ```ignore
/// let row_ids = db.query_in_transaction(txn, 0, 1000000)?;
/// ```
pub fn query_in_transaction(&self, txn_id: TransactionId, start: i64, end: i64) -> Result<Vec<RowId>> {
let ctx = self.txn_coordinator.get_context(txn_id)?;
let snapshot = &ctx.snapshot;
// 1. Get candidates from timestamp index
let start_u64 = start as u64;
let end_u64 = end as u64;
let index_results = self.timestamp_index.read().range(&start_u64, &end_u64)?;
let candidates: Vec<RowId> = index_results.into_iter().map(|(_, row_id)| row_id).collect();
// 2. Filter by visibility
let mut results = Vec::new();
for row_id in candidates {
// Check if visible in snapshot
if let Ok(Some(_row)) = self.version_store.get_visible_version(row_id, snapshot) {
results.push(row_id);
}
}
// 3. Also check transaction's own write set
let write_set = ctx.write_set.read();
for (row_id, (_table_name, row_data)) in write_set.iter() {
// Check if this row matches the timestamp range
if let Some(Value::Timestamp(ts)) = row_data.first() {
let ts_micros = ts.as_micros();
if ts_micros >= start && ts_micros <= end {
// Avoid duplicates
if !results.contains(row_id) {
results.push(*row_id);
}
}
}
}
Ok(results)
}
/// Query table rows within a transaction (MVCC snapshot isolation)
///
/// Returns rows visible to this transaction's snapshot:
/// - Committed rows (commit_ts < snapshot.timestamp && txn_id not in active_txns)
/// - Uncommitted rows from THIS transaction (in write_set)
///
/// # Example
/// ```ignore
/// let rows = db.query_table_in_transaction(txn, "users")?;
/// ```
pub fn query_table_in_transaction(
&self,
txn_id: TransactionId,
table_name: &str,
) -> Result<Vec<(RowId, Row)>> {
let ctx = self.txn_coordinator.get_context(txn_id)?;
let snapshot = &ctx.snapshot;
// 1. Scan LSM for all rows in this table (committed data)
// Use streaming scan to avoid loading entire table into memory
let composite_prefix = self.compute_table_prefix(table_name);
let start_key = composite_prefix << 32;
let end_key = (composite_prefix + 1) << 32;
let mut results = Vec::new();
// Use streaming scan to reduce memory usage
let lsm_iter = self.lsm_engine.scan_range_streaming(start_key, end_key)?;
for result in lsm_iter {
let (composite_key, value) = result?;
// Extract row_id from composite_key
let row_id = (composite_key & 0xFFFFFFFF) as RowId;
// MVCC visibility check: only visible if committed before snapshot
if value.timestamp <= snapshot.timestamp {
// Decode row data
if let crate::storage::lsm::ValueData::Inline(data) = &value.data {
if let Ok(row) = Self::decode_row_data(data) {
results.push((row_id, row));
}
}
}
}
// 2. Get uncommitted data from transaction's write set
let write_set = ctx.write_set.read();
for (row_id, (tbl, row_data)) in write_set.iter() {
// Check if this row belongs to target table
if tbl == table_name {
results.push((*row_id, row_data.clone()));
}
}
Ok(results)
}
/// Commit a transaction (simplified version)
///
/// # Example
/// ```ignore
/// db.commit_transaction(txn)?;
/// ```
pub fn commit_transaction(&self, txn_id: TransactionId) -> Result<()> {
let ctx = self.txn_coordinator.get_context(txn_id)?;
let write_set = ctx.write_set.read().clone();
// 1. Write to WAL (now with table_name from write_set)
for (row_id, (table_name, row_data)) in &write_set {
let partition = (*row_id % self.num_partitions as u64) as PartitionId;
self.wal.log_insert(table_name, partition, *row_id, row_data.clone())?;
}
// 2. Commit in transaction coordinator (applies to version store)
let _commit_ts = self.txn_coordinator.commit(txn_id)?;
// 3. Update indexes
for (row_id, (_table_name, row_data)) in &write_set {
// Update timestamp index
if let Some(Value::Timestamp(ts)) = row_data.first() {
self.timestamp_index.write().insert(ts.as_micros() as u64, *row_id)?;
}
}
Ok(())
}
/// Commit a transaction with full LSM+Index integration (ACID-compliant)
///
/// # Flow:
/// 1. WAL logging (durability)
/// 2. Transaction validation (conflict detection)
/// 3. Version Store commit (MVCC visibility)
/// 4. LSM Engine write (persistent storage)
/// 5. Index building (triggered by LSM flush callback)
///
/// # Example
/// ```ignore
/// db.commit_transaction_full(txn)?;
/// ```
pub fn commit_transaction_full(&self, txn_id: TransactionId) -> Result<()> {
let ctx = self.txn_coordinator.get_context(txn_id)?;
let write_set = ctx.write_set.read().clone();
if write_set.is_empty() {
// Empty transaction, just mark as committed
let _commit_ts = self.txn_coordinator.commit(txn_id)?;
return Ok(());
}
// Step 1: WAL logging (durability first)
for (row_id, (table_name, row_data)) in &write_set {
let partition = (*row_id % self.num_partitions as u64) as PartitionId;
self.wal.log_insert(table_name, partition, *row_id, row_data.clone())?;
}
// Step 2: Transaction validation & Version Store commit
let commit_ts = self.txn_coordinator.commit(txn_id)?;
// Step 3: Write to LSM Engine (persistent storage)
for (row_id, (table_name, row_data)) in &write_set {
// Serialize and write to LSM
let row_bytes = bincode::serialize(&row_data)?;
let value = crate::storage::lsm::Value::new(row_bytes, commit_ts);
let composite_key = self.make_composite_key(table_name, *row_id);
self.lsm_engine.put(composite_key, value)?;
}
// Step 4: Update in-memory indexes (Primary Key only for real-time queries)
for (row_id, (table_name, row_data)) in &write_set {
// Only update PRIMARY KEY index immediately
if let Ok(schema) = self.table_registry.get_table(table_name) {
if let Some(pk_col) = schema.primary_key() {
let index_name = format!("{}.{}", table_name, pk_col);
if self.column_indexes.contains_key(&index_name) {
if let Some(col_def) = schema.columns.iter().find(|c| c.name == pk_col) {
if let Some(value) = row_data.get(col_def.position) {
let _ = self.insert_column_value(table_name, pk_col, *row_id, value);
}
}
}
}
}
// Update timestamp index if present
if let Some(Value::Timestamp(ts)) = row_data.first() {
self.timestamp_index.write().insert(ts.as_micros() as u64, *row_id)?;
}
}
Ok(())
}
/// Rollback a transaction
///
/// # Example
/// ```ignore
/// db.rollback_transaction(txn)?;
/// ```
pub fn rollback_transaction(&self, txn_id: TransactionId) -> Result<()> {
self.txn_coordinator.rollback(txn_id)
}
/// Get transaction statistics
///
/// # Example
/// ```ignore
/// let stats = db.transaction_stats();
/// println!("Active: {}, Committed: {}", stats.active_transactions, stats.total_committed);
/// ```
pub fn transaction_stats(&self) -> TransactionStats {
let coord_stats = self.txn_coordinator.stats();
let version_stats = self.version_store.stats();
TransactionStats {
active_transactions: coord_stats.active_transactions,
total_committed: coord_stats.total_committed,
total_versions: version_stats.total_versions,
total_rows_with_versions: version_stats.total_rows,
avg_versions_per_row: version_stats.avg_versions_per_row,
}
}
// ==================== Savepoint API ====================
/// Create a savepoint within the current transaction
///
/// # Example
/// ```ignore
/// let txn = db.begin_transaction()?;
/// db.insert_row_to_table_txn(txn, "users", row1)?;
/// db.create_savepoint(txn, "sp1".to_string())?;
/// db.insert_row_to_table_txn(txn, "users", row2)?;
/// db.rollback_to_savepoint(txn, "sp1")?; // row2 rolled back, row1 kept
/// db.commit_transaction_full(txn)?; // only row1 committed
/// ```
pub fn create_savepoint(&self, txn_id: TransactionId, name: String) -> Result<()> {
self.txn_coordinator.create_savepoint(txn_id, name)
}
/// Rollback to a savepoint, discarding all changes after it
///
/// This removes the savepoint and all savepoints created after it.
pub fn rollback_to_savepoint(&self, txn_id: TransactionId, name: &str) -> Result<()> {
self.txn_coordinator.rollback_to_savepoint(txn_id, name)
}
/// Release a savepoint without rolling back
///
/// Useful for cleaning up savepoints after critical sections complete successfully.
pub fn release_savepoint(&self, txn_id: TransactionId, name: &str) -> Result<()> {
self.txn_coordinator.release_savepoint(txn_id, name)
}
}