1use arrow::array::{ArrayRef, UInt64Array, UInt64Builder};
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use croaring::Treemap;
11use llkv_column_map::store::{
12 CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, FIELD_ID_META_KEY, ROW_ID_COLUMN_NAME,
13};
14use llkv_result::Error;
15use llkv_types::{FieldId, RowId};
16use rustc_hash::FxHashMap;
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, Mutex};
20
21pub type TxnId = u64;
23
24pub const TXN_ID_NONE: TxnId = TxnId::MAX;
26
27pub const TXN_ID_AUTO_COMMIT: TxnId = 1;
29
30pub const TXN_ID_MIN_MULTI_STATEMENT: TxnId = TXN_ID_AUTO_COMMIT + 1;
32
33#[inline]
35pub fn is_reserved_txn_id(id: TxnId) -> bool {
36 id == TXN_ID_NONE || id <= TXN_ID_AUTO_COMMIT
37}
38
39#[inline]
41pub fn reserved_txn_id_message(id: TxnId) -> String {
42 match id {
43 TXN_ID_NONE => format!(
44 "Transaction ID {} (u64::MAX) is reserved for TXN_ID_NONE",
45 id
46 ),
47 0 => "Transaction ID 0 is invalid".to_string(),
48 TXN_ID_AUTO_COMMIT => "Transaction ID 1 is reserved for TXN_ID_AUTO_COMMIT".to_string(),
49 _ => format!("Transaction ID {} is reserved", id),
50 }
51}
52
53#[derive(Debug)]
55struct TxnIdManagerInner {
56 next_txn_id: AtomicU64,
58 last_committed: AtomicU64,
60 statuses: Mutex<FxHashMap<TxnId, TxnStatus>>,
62}
63
64impl TxnIdManagerInner {
65 fn new() -> Self {
66 Self::new_with_initial_txn_id(TXN_ID_AUTO_COMMIT + 1)
67 }
68
69 fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
70 Self::new_with_initial_state(next_txn_id, TXN_ID_AUTO_COMMIT)
71 }
72
73 fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
74 let mut statuses = FxHashMap::with_capacity_and_hasher(1, Default::default());
75 statuses.insert(TXN_ID_AUTO_COMMIT, TxnStatus::Committed);
76
77 Self {
78 next_txn_id: AtomicU64::new(next_txn_id),
79 last_committed: AtomicU64::new(last_committed),
80 statuses: Mutex::new(statuses),
81 }
82 }
83}
84
85#[derive(Clone, Debug)]
87pub struct TxnIdManager {
88 inner: Arc<TxnIdManagerInner>,
89}
90
91impl TxnIdManager {
92 pub fn new() -> Self {
94 Self {
95 inner: Arc::new(TxnIdManagerInner::new()),
96 }
97 }
98
99 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
102 Self {
103 inner: Arc::new(TxnIdManagerInner::new_with_initial_txn_id(next_txn_id)),
104 }
105 }
106
107 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
110 Self {
111 inner: Arc::new(TxnIdManagerInner::new_with_initial_state(
112 next_txn_id,
113 last_committed,
114 )),
115 }
116 }
117
118 pub fn current_next_txn_id(&self) -> TxnId {
120 self.inner.next_txn_id.load(Ordering::SeqCst)
121 }
122
123 pub fn begin_transaction(&self) -> TransactionSnapshot {
129 let snapshot_id = self.inner.last_committed.load(Ordering::SeqCst);
130 let txn_id = self.inner.next_txn_id.fetch_add(1, Ordering::SeqCst);
131
132 {
133 let mut guard = self
134 .inner
135 .statuses
136 .lock()
137 .expect("txn status lock poisoned");
138 guard.insert(txn_id, TxnStatus::Active);
139 }
140
141 TransactionSnapshot {
142 txn_id,
143 snapshot_id,
144 }
145 }
146
147 pub fn next_txn_id(&self) -> TxnId {
153 self.begin_transaction().txn_id
154 }
155
156 pub fn status(&self, txn_id: TxnId) -> TxnStatus {
158 if txn_id == TXN_ID_NONE {
159 return TxnStatus::None;
160 }
161 if txn_id == TXN_ID_AUTO_COMMIT {
162 return TxnStatus::Committed;
163 }
164
165 let guard = self
166 .inner
167 .statuses
168 .lock()
169 .expect("txn status lock poisoned");
170 guard.get(&txn_id).copied().unwrap_or(TxnStatus::Committed)
171 }
172
173 pub fn mark_committed(&self, txn_id: TxnId) {
175 {
176 let mut guard = self
177 .inner
178 .statuses
179 .lock()
180 .expect("txn status lock poisoned");
181 guard.insert(txn_id, TxnStatus::Committed);
182 }
183
184 let mut current = self.inner.last_committed.load(Ordering::SeqCst);
187 loop {
188 if txn_id <= current {
189 break;
190 }
191 match self.inner.last_committed.compare_exchange(
192 current,
193 txn_id,
194 Ordering::SeqCst,
195 Ordering::SeqCst,
196 ) {
197 Ok(_) => break,
198 Err(observed) => current = observed,
199 }
200 }
201 }
202
203 pub fn mark_aborted(&self, txn_id: TxnId) {
205 let mut guard = self
206 .inner
207 .statuses
208 .lock()
209 .expect("txn status lock poisoned");
210 guard.insert(txn_id, TxnStatus::Aborted);
211 }
212
213 pub fn last_committed(&self) -> TxnId {
215 self.inner.last_committed.load(Ordering::SeqCst)
216 }
217}
218
219impl Default for TxnIdManager {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
240pub struct RowVersion {
241 pub created_by: TxnId,
243 pub deleted_by: TxnId,
245}
246
247impl RowVersion {
248 pub fn new(created_by: TxnId) -> Self {
250 Self {
251 created_by,
252 deleted_by: TXN_ID_NONE,
253 }
254 }
255
256 pub fn delete(&mut self, deleted_by: TxnId) {
258 self.deleted_by = deleted_by;
259 }
260
261 pub fn is_visible(&self, snapshot_txn_id: TxnId) -> bool {
265 self.created_by <= snapshot_txn_id
266 && (self.deleted_by == TXN_ID_NONE || self.deleted_by > snapshot_txn_id)
267 }
268
269 pub fn is_visible_for(&self, manager: &TxnIdManager, snapshot: TransactionSnapshot) -> bool {
271 tracing::trace!(
272 "[MVCC] is_visible_for: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
273 self.created_by,
274 self.deleted_by,
275 snapshot.txn_id,
276 snapshot.snapshot_id
277 );
278
279 if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
283 let visible = self.deleted_by != snapshot.txn_id;
284 tracing::trace!("[MVCC] created by current txn, visible={}", visible);
285 return visible;
286 }
287
288 let creator_status = manager.status(self.created_by);
290 tracing::trace!("[MVCC] creator_status={:?}", creator_status);
291 if !creator_status.is_committed() {
292 tracing::trace!("[MVCC] creator not committed, invisible");
293 return false;
294 }
295
296 if self.created_by > snapshot.snapshot_id {
297 tracing::trace!("[MVCC] created_by > snapshot_id, invisible");
298 return false;
299 }
300
301 match self.deleted_by {
302 TXN_ID_NONE => {
303 tracing::trace!("[MVCC] not deleted, visible");
304 true
305 }
306 tx if tx == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT => {
307 tracing::trace!("[MVCC] deleted by current txn, invisible");
308 false
309 }
310 tx => {
311 if !manager.status(tx).is_committed() {
312 tracing::trace!("[MVCC] deleter not committed, visible");
315 return true;
316 }
317 let visible = tx > snapshot.snapshot_id;
318 tracing::trace!("[MVCC] deleter committed, visible={}", visible);
319 visible
320 }
321 }
322 }
323
324 pub fn is_visible_for_fk_check(
334 &self,
335 manager: &TxnIdManager,
336 snapshot: TransactionSnapshot,
337 ) -> bool {
338 tracing::trace!(
339 "[MVCC-FK] is_visible_for_fk_check: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
340 self.created_by,
341 self.deleted_by,
342 snapshot.txn_id,
343 snapshot.snapshot_id
344 );
345
346 if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
349 tracing::trace!("[MVCC-FK] created by current txn, visible");
350 return true;
351 }
352
353 let creator_status = manager.status(self.created_by);
355 tracing::trace!("[MVCC-FK] creator_status={:?}", creator_status);
356 if !creator_status.is_committed() {
357 tracing::trace!("[MVCC-FK] creator not committed, invisible");
358 return false;
359 }
360
361 if self.created_by > snapshot.snapshot_id {
362 tracing::trace!("[MVCC-FK] created_by > snapshot_id, invisible");
363 return false;
364 }
365
366 if self.deleted_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
368 tracing::trace!("[MVCC-FK] deleted by current txn, but still visible for FK check");
369 return true;
370 }
371
372 match self.deleted_by {
373 TXN_ID_NONE => {
374 tracing::trace!("[MVCC-FK] not deleted, visible");
375 true
376 }
377 tx => {
378 if !manager.status(tx).is_committed() {
379 tracing::trace!("[MVCC-FK] deleter not committed, visible");
382 return true;
383 }
384 let visible = tx > snapshot.snapshot_id;
385 tracing::trace!("[MVCC-FK] deleter committed, visible={}", visible);
386 visible
387 }
388 }
389 }
390}
391
392#[derive(Debug, Clone, Copy)]
402pub struct TransactionSnapshot {
403 pub txn_id: TxnId,
405 pub snapshot_id: TxnId,
407}
408
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
411pub enum TxnStatus {
412 Active,
413 Committed,
414 Aborted,
415 None,
416}
417
418impl TxnStatus {
419 pub fn is_committed(self) -> bool {
420 matches!(self, TxnStatus::Committed)
421 }
422
423 pub fn is_active(self) -> bool {
424 matches!(self, TxnStatus::Active)
425 }
426
427 pub fn is_aborted(self) -> bool {
428 matches!(self, TxnStatus::Aborted)
429 }
430}
431
432pub fn build_insert_mvcc_columns(
438 row_count: usize,
439 start_row_id: RowId,
440 creator_txn_id: TxnId,
441 deleted_marker: TxnId,
442) -> (ArrayRef, ArrayRef, ArrayRef) {
443 let mut row_builder = UInt64Builder::with_capacity(row_count);
444 for offset in 0..row_count {
445 row_builder.append_value(start_row_id + offset as u64);
446 }
447
448 let mut created_builder = UInt64Builder::with_capacity(row_count);
449 let mut deleted_builder = UInt64Builder::with_capacity(row_count);
450 for _ in 0..row_count {
451 created_builder.append_value(creator_txn_id);
452 deleted_builder.append_value(deleted_marker);
453 }
454
455 (
456 Arc::new(row_builder.finish()) as ArrayRef,
457 Arc::new(created_builder.finish()) as ArrayRef,
458 Arc::new(deleted_builder.finish()) as ArrayRef,
459 )
460}
461
462pub fn build_mvcc_fields() -> Vec<Field> {
464 vec![
465 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
466 Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
467 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
468 ]
469}
470
471pub fn build_field_with_metadata(
473 name: &str,
474 data_type: DataType,
475 nullable: bool,
476 field_id: FieldId,
477) -> Field {
478 let mut metadata = HashMap::with_capacity(1);
479 metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
480 Field::new(name, data_type, nullable).with_metadata(metadata)
481}
482
483pub fn build_delete_batch(
485 row_ids: Treemap,
486 deleted_by_txn_id: TxnId,
487) -> llkv_result::Result<RecordBatch> {
488 let row_count = row_ids.cardinality() as usize;
489
490 let fields = vec![
491 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
492 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
493 ];
494
495 let arrays: Vec<ArrayRef> = vec![
496 Arc::new(UInt64Array::from_iter_values(row_ids.iter())),
497 Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
498 ];
499
500 RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[test]
508 fn test_txn_id_manager_allocates_monotonic_ids() {
509 let manager = TxnIdManager::new();
510 let snapshot1 = manager.begin_transaction();
511 let snapshot2 = manager.begin_transaction();
512 assert!(snapshot2.txn_id > snapshot1.txn_id);
513 }
514
515 #[test]
516 fn test_row_visibility_simple() {
517 let manager = TxnIdManager::new();
518 let writer_snapshot = manager.begin_transaction();
519 let mut row = RowVersion::new(writer_snapshot.txn_id);
520
521 assert!(row.is_visible(writer_snapshot.txn_id));
523 assert!(row.is_visible_for(&manager, writer_snapshot));
524
525 manager.mark_committed(writer_snapshot.txn_id);
527 let committed = manager.last_committed();
528 assert!(row.is_visible(committed));
529
530 let reader_snapshot = manager.begin_transaction();
531 assert!(row.is_visible_for(&manager, reader_snapshot));
532
533 let deleter_snapshot = manager.begin_transaction();
535 row.delete(deleter_snapshot.txn_id);
536 assert!(row.is_visible_for(&manager, reader_snapshot));
537
538 manager.mark_committed(deleter_snapshot.txn_id);
539 assert!(row.is_visible_for(&manager, reader_snapshot));
540
541 let post_delete_snapshot = manager.begin_transaction();
542 assert!(!row.is_visible_for(&manager, post_delete_snapshot));
543 }
544}