1use arrow::array::{ArrayRef, UInt64Array, UInt64Builder};
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use llkv_column_map::store::{
11 CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, FIELD_ID_META_KEY, ROW_ID_COLUMN_NAME,
12};
13use llkv_column_map::types::{FieldId, RowId};
14use llkv_result::Error;
15use rustc_hash::FxHashMap;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19
20pub type TxnId = u64;
22
23pub const TXN_ID_NONE: TxnId = TxnId::MAX;
25
26pub const TXN_ID_AUTO_COMMIT: TxnId = 1;
28
29pub const TXN_ID_MIN_MULTI_STATEMENT: TxnId = TXN_ID_AUTO_COMMIT + 1;
31
32#[inline]
34pub fn is_reserved_txn_id(id: TxnId) -> bool {
35 id == TXN_ID_NONE || id <= TXN_ID_AUTO_COMMIT
36}
37
38#[inline]
40pub fn reserved_txn_id_message(id: TxnId) -> String {
41 match id {
42 TXN_ID_NONE => format!(
43 "Transaction ID {} (u64::MAX) is reserved for TXN_ID_NONE",
44 id
45 ),
46 0 => "Transaction ID 0 is invalid".to_string(),
47 TXN_ID_AUTO_COMMIT => "Transaction ID 1 is reserved for TXN_ID_AUTO_COMMIT".to_string(),
48 _ => format!("Transaction ID {} is reserved", id),
49 }
50}
51
52#[derive(Debug)]
54struct TxnIdManagerInner {
55 next_txn_id: AtomicU64,
57 last_committed: AtomicU64,
59 statuses: Mutex<FxHashMap<TxnId, TxnStatus>>,
61}
62
63impl TxnIdManagerInner {
64 fn new() -> Self {
65 Self::new_with_initial_txn_id(TXN_ID_AUTO_COMMIT + 1)
66 }
67
68 fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
69 Self::new_with_initial_state(next_txn_id, TXN_ID_AUTO_COMMIT)
70 }
71
72 fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
73 let mut statuses = FxHashMap::with_capacity_and_hasher(1, Default::default());
74 statuses.insert(TXN_ID_AUTO_COMMIT, TxnStatus::Committed);
75
76 Self {
77 next_txn_id: AtomicU64::new(next_txn_id),
78 last_committed: AtomicU64::new(last_committed),
79 statuses: Mutex::new(statuses),
80 }
81 }
82}
83
84#[derive(Clone, Debug)]
86pub struct TxnIdManager {
87 inner: Arc<TxnIdManagerInner>,
88}
89
90impl TxnIdManager {
91 pub fn new() -> Self {
93 Self {
94 inner: Arc::new(TxnIdManagerInner::new()),
95 }
96 }
97
98 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
101 Self {
102 inner: Arc::new(TxnIdManagerInner::new_with_initial_txn_id(next_txn_id)),
103 }
104 }
105
106 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
109 Self {
110 inner: Arc::new(TxnIdManagerInner::new_with_initial_state(
111 next_txn_id,
112 last_committed,
113 )),
114 }
115 }
116
117 pub fn current_next_txn_id(&self) -> TxnId {
119 self.inner.next_txn_id.load(Ordering::SeqCst)
120 }
121
122 pub fn begin_transaction(&self) -> TransactionSnapshot {
128 let snapshot_id = self.inner.last_committed.load(Ordering::SeqCst);
129 let txn_id = self.inner.next_txn_id.fetch_add(1, Ordering::SeqCst);
130
131 {
132 let mut guard = self
133 .inner
134 .statuses
135 .lock()
136 .expect("txn status lock poisoned");
137 guard.insert(txn_id, TxnStatus::Active);
138 }
139
140 TransactionSnapshot {
141 txn_id,
142 snapshot_id,
143 }
144 }
145
146 pub fn next_txn_id(&self) -> TxnId {
152 self.begin_transaction().txn_id
153 }
154
155 pub fn status(&self, txn_id: TxnId) -> TxnStatus {
157 if txn_id == TXN_ID_NONE {
158 return TxnStatus::None;
159 }
160 if txn_id == TXN_ID_AUTO_COMMIT {
161 return TxnStatus::Committed;
162 }
163
164 let guard = self
165 .inner
166 .statuses
167 .lock()
168 .expect("txn status lock poisoned");
169 guard.get(&txn_id).copied().unwrap_or(TxnStatus::Committed)
170 }
171
172 pub fn mark_committed(&self, txn_id: TxnId) {
174 {
175 let mut guard = self
176 .inner
177 .statuses
178 .lock()
179 .expect("txn status lock poisoned");
180 guard.insert(txn_id, TxnStatus::Committed);
181 }
182
183 let mut current = self.inner.last_committed.load(Ordering::SeqCst);
186 loop {
187 if txn_id <= current {
188 break;
189 }
190 match self.inner.last_committed.compare_exchange(
191 current,
192 txn_id,
193 Ordering::SeqCst,
194 Ordering::SeqCst,
195 ) {
196 Ok(_) => break,
197 Err(observed) => current = observed,
198 }
199 }
200 }
201
202 pub fn mark_aborted(&self, txn_id: TxnId) {
204 let mut guard = self
205 .inner
206 .statuses
207 .lock()
208 .expect("txn status lock poisoned");
209 guard.insert(txn_id, TxnStatus::Aborted);
210 }
211
212 pub fn last_committed(&self) -> TxnId {
214 self.inner.last_committed.load(Ordering::SeqCst)
215 }
216}
217
218impl Default for TxnIdManager {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct RowVersion {
240 pub created_by: TxnId,
242 pub deleted_by: TxnId,
244}
245
246impl RowVersion {
247 pub fn new(created_by: TxnId) -> Self {
249 Self {
250 created_by,
251 deleted_by: TXN_ID_NONE,
252 }
253 }
254
255 pub fn delete(&mut self, deleted_by: TxnId) {
257 self.deleted_by = deleted_by;
258 }
259
260 pub fn is_visible(&self, snapshot_txn_id: TxnId) -> bool {
264 self.created_by <= snapshot_txn_id
265 && (self.deleted_by == TXN_ID_NONE || self.deleted_by > snapshot_txn_id)
266 }
267
268 pub fn is_visible_for(&self, manager: &TxnIdManager, snapshot: TransactionSnapshot) -> bool {
270 tracing::trace!(
271 "[MVCC] is_visible_for: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
272 self.created_by,
273 self.deleted_by,
274 snapshot.txn_id,
275 snapshot.snapshot_id
276 );
277
278 if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
282 let visible = self.deleted_by != snapshot.txn_id;
283 tracing::trace!("[MVCC] created by current txn, visible={}", visible);
284 return visible;
285 }
286
287 let creator_status = manager.status(self.created_by);
289 tracing::trace!("[MVCC] creator_status={:?}", creator_status);
290 if !creator_status.is_committed() {
291 tracing::trace!("[MVCC] creator not committed, invisible");
292 return false;
293 }
294
295 if self.created_by > snapshot.snapshot_id {
296 tracing::trace!("[MVCC] created_by > snapshot_id, invisible");
297 return false;
298 }
299
300 match self.deleted_by {
301 TXN_ID_NONE => {
302 tracing::trace!("[MVCC] not deleted, visible");
303 true
304 }
305 tx if tx == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT => {
306 tracing::trace!("[MVCC] deleted by current txn, invisible");
307 false
308 }
309 tx => {
310 if !manager.status(tx).is_committed() {
311 tracing::trace!("[MVCC] deleter not committed, visible");
314 return true;
315 }
316 let visible = tx > snapshot.snapshot_id;
317 tracing::trace!("[MVCC] deleter committed, visible={}", visible);
318 visible
319 }
320 }
321 }
322
323 pub fn is_visible_for_fk_check(
333 &self,
334 manager: &TxnIdManager,
335 snapshot: TransactionSnapshot,
336 ) -> bool {
337 tracing::trace!(
338 "[MVCC-FK] is_visible_for_fk_check: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
339 self.created_by,
340 self.deleted_by,
341 snapshot.txn_id,
342 snapshot.snapshot_id
343 );
344
345 if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
348 tracing::trace!("[MVCC-FK] created by current txn, visible");
349 return true;
350 }
351
352 let creator_status = manager.status(self.created_by);
354 tracing::trace!("[MVCC-FK] creator_status={:?}", creator_status);
355 if !creator_status.is_committed() {
356 tracing::trace!("[MVCC-FK] creator not committed, invisible");
357 return false;
358 }
359
360 if self.created_by > snapshot.snapshot_id {
361 tracing::trace!("[MVCC-FK] created_by > snapshot_id, invisible");
362 return false;
363 }
364
365 if self.deleted_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
367 tracing::trace!("[MVCC-FK] deleted by current txn, but still visible for FK check");
368 return true;
369 }
370
371 match self.deleted_by {
372 TXN_ID_NONE => {
373 tracing::trace!("[MVCC-FK] not deleted, visible");
374 true
375 }
376 tx => {
377 if !manager.status(tx).is_committed() {
378 tracing::trace!("[MVCC-FK] deleter not committed, visible");
381 return true;
382 }
383 let visible = tx > snapshot.snapshot_id;
384 tracing::trace!("[MVCC-FK] deleter committed, visible={}", visible);
385 visible
386 }
387 }
388 }
389}
390
391#[derive(Debug, Clone, Copy)]
401pub struct TransactionSnapshot {
402 pub txn_id: TxnId,
404 pub snapshot_id: TxnId,
406}
407
408#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410pub enum TxnStatus {
411 Active,
412 Committed,
413 Aborted,
414 None,
415}
416
417impl TxnStatus {
418 pub fn is_committed(self) -> bool {
419 matches!(self, TxnStatus::Committed)
420 }
421
422 pub fn is_active(self) -> bool {
423 matches!(self, TxnStatus::Active)
424 }
425
426 pub fn is_aborted(self) -> bool {
427 matches!(self, TxnStatus::Aborted)
428 }
429}
430
431pub fn build_insert_mvcc_columns(
437 row_count: usize,
438 start_row_id: RowId,
439 creator_txn_id: TxnId,
440 deleted_marker: TxnId,
441) -> (ArrayRef, ArrayRef, ArrayRef) {
442 let mut row_builder = UInt64Builder::with_capacity(row_count);
443 for offset in 0..row_count {
444 row_builder.append_value(start_row_id + offset as u64);
445 }
446
447 let mut created_builder = UInt64Builder::with_capacity(row_count);
448 let mut deleted_builder = UInt64Builder::with_capacity(row_count);
449 for _ in 0..row_count {
450 created_builder.append_value(creator_txn_id);
451 deleted_builder.append_value(deleted_marker);
452 }
453
454 (
455 Arc::new(row_builder.finish()) as ArrayRef,
456 Arc::new(created_builder.finish()) as ArrayRef,
457 Arc::new(deleted_builder.finish()) as ArrayRef,
458 )
459}
460
461pub fn build_mvcc_fields() -> Vec<Field> {
463 vec![
464 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
465 Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
466 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
467 ]
468}
469
470pub fn build_field_with_metadata(
472 name: &str,
473 data_type: DataType,
474 nullable: bool,
475 field_id: FieldId,
476) -> Field {
477 let mut metadata = HashMap::with_capacity(1);
478 metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
479 Field::new(name, data_type, nullable).with_metadata(metadata)
480}
481
482pub fn build_delete_batch(
484 row_ids: Vec<RowId>,
485 deleted_by_txn_id: TxnId,
486) -> llkv_result::Result<RecordBatch> {
487 let row_count = row_ids.len();
488
489 let fields = vec![
490 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
491 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
492 ];
493
494 let arrays: Vec<ArrayRef> = vec![
495 Arc::new(UInt64Array::from(row_ids)),
496 Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
497 ];
498
499 RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 #[test]
507 fn test_txn_id_manager_allocates_monotonic_ids() {
508 let manager = TxnIdManager::new();
509 let snapshot1 = manager.begin_transaction();
510 let snapshot2 = manager.begin_transaction();
511 assert!(snapshot2.txn_id > snapshot1.txn_id);
512 }
513
514 #[test]
515 fn test_row_visibility_simple() {
516 let manager = TxnIdManager::new();
517 let writer_snapshot = manager.begin_transaction();
518 let mut row = RowVersion::new(writer_snapshot.txn_id);
519
520 assert!(row.is_visible(writer_snapshot.txn_id));
522 assert!(row.is_visible_for(&manager, writer_snapshot));
523
524 manager.mark_committed(writer_snapshot.txn_id);
526 let committed = manager.last_committed();
527 assert!(row.is_visible(committed));
528
529 let reader_snapshot = manager.begin_transaction();
530 assert!(row.is_visible_for(&manager, reader_snapshot));
531
532 let deleter_snapshot = manager.begin_transaction();
534 row.delete(deleter_snapshot.txn_id);
535 assert!(row.is_visible_for(&manager, reader_snapshot));
536
537 manager.mark_committed(deleter_snapshot.txn_id);
538 assert!(row.is_visible_for(&manager, reader_snapshot));
539
540 let post_delete_snapshot = manager.begin_transaction();
541 assert!(!row.is_visible_for(&manager, post_delete_snapshot));
542 }
543}