1use arrow::array::{ArrayRef, UInt64Array, UInt64Builder};
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use croaring::Treemap;
11use llkv_column_map::store::{
12 CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, FIELD_ID_META_KEY, ROW_ID_COLUMN_NAME,
13};
14use llkv_result::Error;
15use llkv_types::{FieldId, RowId};
16use rustc_hash::FxHashMap;
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, Mutex};
20
21pub type TxnId = u64;
23
24pub const TXN_ID_NONE: TxnId = TxnId::MAX;
26
27pub const TXN_ID_AUTO_COMMIT: TxnId = 1;
29
30pub const TXN_ID_MIN_MULTI_STATEMENT: TxnId = TXN_ID_AUTO_COMMIT + 1;
32
33#[inline]
35pub fn is_reserved_txn_id(id: TxnId) -> bool {
36 id == TXN_ID_NONE || id <= TXN_ID_AUTO_COMMIT
37}
38
39#[inline]
41pub fn reserved_txn_id_message(id: TxnId) -> String {
42 match id {
43 TXN_ID_NONE => format!(
44 "Transaction ID {} (u64::MAX) is reserved for TXN_ID_NONE",
45 id
46 ),
47 0 => "Transaction ID 0 is invalid".to_string(),
48 TXN_ID_AUTO_COMMIT => "Transaction ID 1 is reserved for TXN_ID_AUTO_COMMIT".to_string(),
49 _ => format!("Transaction ID {} is reserved", id),
50 }
51}
52
53#[derive(Debug)]
55struct TxnIdManagerInner {
56 next_txn_id: AtomicU64,
58 last_committed: AtomicU64,
60 statuses: Mutex<FxHashMap<TxnId, TxnStatus>>,
62}
63
64impl TxnIdManagerInner {
65 fn new() -> Self {
66 Self::new_with_initial_txn_id(TXN_ID_AUTO_COMMIT + 1)
67 }
68
69 fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
70 Self::new_with_initial_state(next_txn_id, TXN_ID_AUTO_COMMIT)
71 }
72
73 fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
74 let mut statuses = FxHashMap::with_capacity_and_hasher(1, Default::default());
75 statuses.insert(TXN_ID_AUTO_COMMIT, TxnStatus::Committed);
76
77 Self {
78 next_txn_id: AtomicU64::new(next_txn_id),
79 last_committed: AtomicU64::new(last_committed),
80 statuses: Mutex::new(statuses),
81 }
82 }
83}
84
85#[derive(Clone, Debug)]
87pub struct TxnIdManager {
88 inner: Arc<TxnIdManagerInner>,
89}
90
91impl TxnIdManager {
92 pub fn new() -> Self {
94 Self {
95 inner: Arc::new(TxnIdManagerInner::new()),
96 }
97 }
98
99 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
102 Self {
103 inner: Arc::new(TxnIdManagerInner::new_with_initial_txn_id(next_txn_id)),
104 }
105 }
106
107 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
110 Self {
111 inner: Arc::new(TxnIdManagerInner::new_with_initial_state(
112 next_txn_id,
113 last_committed,
114 )),
115 }
116 }
117
118 pub fn current_next_txn_id(&self) -> TxnId {
120 self.inner.next_txn_id.load(Ordering::SeqCst)
121 }
122
123 pub fn begin_transaction(&self) -> TransactionSnapshot {
129 let snapshot_id = self.inner.last_committed.load(Ordering::SeqCst);
130 let txn_id = self.inner.next_txn_id.fetch_add(1, Ordering::SeqCst);
131
132 {
133 let mut guard = self
134 .inner
135 .statuses
136 .lock()
137 .expect("txn status lock poisoned");
138 guard.insert(txn_id, TxnStatus::Active);
139 }
140
141 TransactionSnapshot {
142 txn_id,
143 snapshot_id,
144 }
145 }
146
147 pub fn next_txn_id(&self) -> TxnId {
153 self.begin_transaction().txn_id
154 }
155
156 pub fn status(&self, txn_id: TxnId) -> TxnStatus {
158 if txn_id == TXN_ID_NONE {
159 return TxnStatus::None;
160 }
161 if txn_id == TXN_ID_AUTO_COMMIT {
162 return TxnStatus::Committed;
163 }
164
165 let guard = self
166 .inner
167 .statuses
168 .lock()
169 .expect("txn status lock poisoned");
170 guard.get(&txn_id).copied().unwrap_or(TxnStatus::Committed)
171 }
172
173 pub fn mark_committed(&self, txn_id: TxnId) {
175 {
176 let mut guard = self
177 .inner
178 .statuses
179 .lock()
180 .expect("txn status lock poisoned");
181 guard.insert(txn_id, TxnStatus::Committed);
182 }
183
184 let mut current = self.inner.last_committed.load(Ordering::SeqCst);
187 loop {
188 if txn_id <= current {
189 break;
190 }
191 match self.inner.last_committed.compare_exchange(
192 current,
193 txn_id,
194 Ordering::SeqCst,
195 Ordering::SeqCst,
196 ) {
197 Ok(_) => break,
198 Err(observed) => current = observed,
199 }
200 }
201 }
202
203 pub fn has_other_active_transactions(&self, txn_id: TxnId) -> bool {
205 let guard = self
206 .inner
207 .statuses
208 .lock()
209 .expect("txn status lock poisoned");
210 guard
211 .iter()
212 .any(|(&other_id, status)| other_id != txn_id && status.is_active())
213 }
214
215 pub fn mark_aborted(&self, txn_id: TxnId) {
217 let mut guard = self
218 .inner
219 .statuses
220 .lock()
221 .expect("txn status lock poisoned");
222 guard.insert(txn_id, TxnStatus::Aborted);
223 }
224
225 pub fn last_committed(&self) -> TxnId {
227 self.inner.last_committed.load(Ordering::SeqCst)
228 }
229}
230
231impl Default for TxnIdManager {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
252pub struct RowVersion {
253 pub created_by: TxnId,
255 pub deleted_by: TxnId,
257}
258
259impl RowVersion {
260 pub fn new(created_by: TxnId) -> Self {
262 Self {
263 created_by,
264 deleted_by: TXN_ID_NONE,
265 }
266 }
267
268 pub fn delete(&mut self, deleted_by: TxnId) {
270 self.deleted_by = deleted_by;
271 }
272
273 pub fn is_visible(&self, snapshot_txn_id: TxnId) -> bool {
277 self.created_by <= snapshot_txn_id
278 && (self.deleted_by == TXN_ID_NONE || self.deleted_by > snapshot_txn_id)
279 }
280
281 pub fn is_visible_for(&self, manager: &TxnIdManager, snapshot: TransactionSnapshot) -> bool {
283 tracing::trace!(
284 "[MVCC] is_visible_for: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
285 self.created_by,
286 self.deleted_by,
287 snapshot.txn_id,
288 snapshot.snapshot_id
289 );
290
291 if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
295 let visible = self.deleted_by != snapshot.txn_id;
296 tracing::trace!("[MVCC] created by current txn, visible={}", visible);
297 return visible;
298 }
299
300 let creator_status = manager.status(self.created_by);
302 tracing::trace!("[MVCC] creator_status={:?}", creator_status);
303 if !creator_status.is_committed() {
304 tracing::trace!("[MVCC] creator not committed, invisible");
305 return false;
306 }
307
308 if self.created_by > snapshot.snapshot_id {
309 tracing::trace!("[MVCC] created_by > snapshot_id, invisible");
310 return false;
311 }
312
313 match self.deleted_by {
314 TXN_ID_NONE => {
315 tracing::trace!("[MVCC] not deleted, visible");
316 true
317 }
318 tx if tx == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT => {
319 tracing::trace!("[MVCC] deleted by current txn, invisible");
320 false
321 }
322 tx => {
323 if !manager.status(tx).is_committed() {
324 tracing::trace!("[MVCC] deleter not committed, visible");
327 return true;
328 }
329 let visible = tx > snapshot.snapshot_id;
330 tracing::trace!("[MVCC] deleter committed, visible={}", visible);
331 visible
332 }
333 }
334 }
335
336 pub fn is_visible_for_fk_check(
346 &self,
347 manager: &TxnIdManager,
348 snapshot: TransactionSnapshot,
349 ) -> bool {
350 tracing::trace!(
351 "[MVCC-FK] is_visible_for_fk_check: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
352 self.created_by,
353 self.deleted_by,
354 snapshot.txn_id,
355 snapshot.snapshot_id
356 );
357
358 if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
361 tracing::trace!("[MVCC-FK] created by current txn, visible");
362 return true;
363 }
364
365 let creator_status = manager.status(self.created_by);
367 tracing::trace!("[MVCC-FK] creator_status={:?}", creator_status);
368 if !creator_status.is_committed() {
369 tracing::trace!("[MVCC-FK] creator not committed, invisible");
370 return false;
371 }
372
373 if self.created_by > snapshot.snapshot_id {
374 tracing::trace!("[MVCC-FK] created_by > snapshot_id, invisible");
375 return false;
376 }
377
378 if self.deleted_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
380 tracing::trace!("[MVCC-FK] deleted by current txn, but still visible for FK check");
381 return true;
382 }
383
384 match self.deleted_by {
385 TXN_ID_NONE => {
386 tracing::trace!("[MVCC-FK] not deleted, visible");
387 true
388 }
389 tx => {
390 if !manager.status(tx).is_committed() {
391 tracing::trace!("[MVCC-FK] deleter not committed, visible");
394 return true;
395 }
396 let visible = tx > snapshot.snapshot_id;
397 tracing::trace!("[MVCC-FK] deleter committed, visible={}", visible);
398 visible
399 }
400 }
401 }
402}
403
404#[derive(Debug, Clone, Copy)]
414pub struct TransactionSnapshot {
415 pub txn_id: TxnId,
417 pub snapshot_id: TxnId,
419}
420
421#[derive(Debug, Clone, Copy, PartialEq, Eq)]
423pub enum TxnStatus {
424 Active,
425 Committed,
426 Aborted,
427 None,
428}
429
430impl TxnStatus {
431 pub fn is_committed(self) -> bool {
432 matches!(self, TxnStatus::Committed)
433 }
434
435 pub fn is_active(self) -> bool {
436 matches!(self, TxnStatus::Active)
437 }
438
439 pub fn is_aborted(self) -> bool {
440 matches!(self, TxnStatus::Aborted)
441 }
442}
443
444pub fn build_insert_mvcc_columns(
450 row_count: usize,
451 start_row_id: RowId,
452 creator_txn_id: TxnId,
453 deleted_marker: TxnId,
454) -> (ArrayRef, ArrayRef, ArrayRef) {
455 let mut row_builder = UInt64Builder::with_capacity(row_count);
456 for offset in 0..row_count {
457 row_builder.append_value(start_row_id + offset as u64);
458 }
459
460 let mut created_builder = UInt64Builder::with_capacity(row_count);
461 let mut deleted_builder = UInt64Builder::with_capacity(row_count);
462 for _ in 0..row_count {
463 created_builder.append_value(creator_txn_id);
464 deleted_builder.append_value(deleted_marker);
465 }
466
467 (
468 Arc::new(row_builder.finish()) as ArrayRef,
469 Arc::new(created_builder.finish()) as ArrayRef,
470 Arc::new(deleted_builder.finish()) as ArrayRef,
471 )
472}
473
474pub fn build_mvcc_fields() -> Vec<Field> {
476 vec![
477 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
478 Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
479 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
480 ]
481}
482
483pub fn build_field_with_metadata(
485 name: &str,
486 data_type: DataType,
487 nullable: bool,
488 field_id: FieldId,
489) -> Field {
490 let mut metadata = HashMap::with_capacity(1);
491 metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
492 Field::new(name, data_type, nullable).with_metadata(metadata)
493}
494
495pub fn build_delete_batch(
497 row_ids: Treemap,
498 deleted_by_txn_id: TxnId,
499) -> llkv_result::Result<RecordBatch> {
500 let row_count = row_ids.cardinality() as usize;
501
502 let fields = vec![
503 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
504 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
505 ];
506
507 let arrays: Vec<ArrayRef> = vec![
508 Arc::new(UInt64Array::from_iter_values(row_ids.iter())),
509 Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
510 ];
511
512 RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518
519 #[test]
520 fn test_txn_id_manager_allocates_monotonic_ids() {
521 let manager = TxnIdManager::new();
522 let snapshot1 = manager.begin_transaction();
523 let snapshot2 = manager.begin_transaction();
524 assert!(snapshot2.txn_id > snapshot1.txn_id);
525 }
526
527 #[test]
528 fn test_row_visibility_simple() {
529 let manager = TxnIdManager::new();
530 let writer_snapshot = manager.begin_transaction();
531 let mut row = RowVersion::new(writer_snapshot.txn_id);
532
533 assert!(row.is_visible(writer_snapshot.txn_id));
535 assert!(row.is_visible_for(&manager, writer_snapshot));
536
537 manager.mark_committed(writer_snapshot.txn_id);
539 let committed = manager.last_committed();
540 assert!(row.is_visible(committed));
541
542 let reader_snapshot = manager.begin_transaction();
543 assert!(row.is_visible_for(&manager, reader_snapshot));
544
545 let deleter_snapshot = manager.begin_transaction();
547 row.delete(deleter_snapshot.txn_id);
548 assert!(row.is_visible_for(&manager, reader_snapshot));
549
550 manager.mark_committed(deleter_snapshot.txn_id);
551 assert!(row.is_visible_for(&manager, reader_snapshot));
552
553 let post_delete_snapshot = manager.begin_transaction();
554 assert!(!row.is_visible_for(&manager, post_delete_snapshot));
555 }
556}