1use arrow::array::{ArrayRef, UInt64Array, UInt64Builder};
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use llkv_column_map::store::{
11 CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, FIELD_ID_META_KEY, ROW_ID_COLUMN_NAME,
12};
13use llkv_column_map::types::{FieldId, RowId};
14use llkv_result::Error;
15use rustc_hash::FxHashMap;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19
20pub type TxnId = u64;
22
23pub const TXN_ID_NONE: TxnId = TxnId::MAX;
25
26pub const TXN_ID_AUTO_COMMIT: TxnId = 1;
28
29pub const TXN_ID_MIN_MULTI_STATEMENT: TxnId = TXN_ID_AUTO_COMMIT + 1;
31
32#[inline]
34pub fn is_reserved_txn_id(id: TxnId) -> bool {
35 id == TXN_ID_NONE || id <= TXN_ID_AUTO_COMMIT
36}
37
38#[inline]
40pub fn reserved_txn_id_message(id: TxnId) -> String {
41 match id {
42 TXN_ID_NONE => format!(
43 "Transaction ID {} (u64::MAX) is reserved for TXN_ID_NONE",
44 id
45 ),
46 0 => "Transaction ID 0 is invalid".to_string(),
47 TXN_ID_AUTO_COMMIT => "Transaction ID 1 is reserved for TXN_ID_AUTO_COMMIT".to_string(),
48 _ => format!("Transaction ID {} is reserved", id),
49 }
50}
51
52#[derive(Debug)]
54struct TxnIdManagerInner {
55 next_txn_id: AtomicU64,
57 last_committed: AtomicU64,
59 statuses: Mutex<FxHashMap<TxnId, TxnStatus>>,
61}
62
63impl TxnIdManagerInner {
64 fn new() -> Self {
65 Self::new_with_initial_txn_id(TXN_ID_AUTO_COMMIT + 1)
66 }
67
68 fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
69 Self::new_with_initial_state(next_txn_id, TXN_ID_AUTO_COMMIT)
70 }
71
72 fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
73 let mut statuses = FxHashMap::with_capacity_and_hasher(1, Default::default());
74 statuses.insert(TXN_ID_AUTO_COMMIT, TxnStatus::Committed);
75
76 Self {
77 next_txn_id: AtomicU64::new(next_txn_id),
78 last_committed: AtomicU64::new(last_committed),
79 statuses: Mutex::new(statuses),
80 }
81 }
82}
83
84#[derive(Clone, Debug)]
86pub struct TxnIdManager {
87 inner: Arc<TxnIdManagerInner>,
88}
89
90impl TxnIdManager {
91 pub fn new() -> Self {
93 Self {
94 inner: Arc::new(TxnIdManagerInner::new()),
95 }
96 }
97
98 pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
101 Self {
102 inner: Arc::new(TxnIdManagerInner::new_with_initial_txn_id(next_txn_id)),
103 }
104 }
105
106 pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
109 Self {
110 inner: Arc::new(TxnIdManagerInner::new_with_initial_state(
111 next_txn_id,
112 last_committed,
113 )),
114 }
115 }
116
117 pub fn current_next_txn_id(&self) -> TxnId {
119 self.inner.next_txn_id.load(Ordering::SeqCst)
120 }
121
122 pub fn begin_transaction(&self) -> TransactionSnapshot {
128 let snapshot_id = self.inner.last_committed.load(Ordering::SeqCst);
129 let txn_id = self.inner.next_txn_id.fetch_add(1, Ordering::SeqCst);
130
131 {
132 let mut guard = self
133 .inner
134 .statuses
135 .lock()
136 .expect("txn status lock poisoned");
137 guard.insert(txn_id, TxnStatus::Active);
138 }
139
140 TransactionSnapshot {
141 txn_id,
142 snapshot_id,
143 }
144 }
145
146 pub fn next_txn_id(&self) -> TxnId {
152 self.begin_transaction().txn_id
153 }
154
155 pub fn status(&self, txn_id: TxnId) -> TxnStatus {
157 if txn_id == TXN_ID_NONE {
158 return TxnStatus::None;
159 }
160 if txn_id == TXN_ID_AUTO_COMMIT {
161 return TxnStatus::Committed;
162 }
163
164 let guard = self
165 .inner
166 .statuses
167 .lock()
168 .expect("txn status lock poisoned");
169 guard.get(&txn_id).copied().unwrap_or(TxnStatus::Committed)
170 }
171
172 pub fn mark_committed(&self, txn_id: TxnId) {
174 {
175 let mut guard = self
176 .inner
177 .statuses
178 .lock()
179 .expect("txn status lock poisoned");
180 guard.insert(txn_id, TxnStatus::Committed);
181 }
182
183 let mut current = self.inner.last_committed.load(Ordering::SeqCst);
186 loop {
187 if txn_id <= current {
188 break;
189 }
190 match self.inner.last_committed.compare_exchange(
191 current,
192 txn_id,
193 Ordering::SeqCst,
194 Ordering::SeqCst,
195 ) {
196 Ok(_) => break,
197 Err(observed) => current = observed,
198 }
199 }
200 }
201
202 pub fn mark_aborted(&self, txn_id: TxnId) {
204 let mut guard = self
205 .inner
206 .statuses
207 .lock()
208 .expect("txn status lock poisoned");
209 guard.insert(txn_id, TxnStatus::Aborted);
210 }
211
212 pub fn last_committed(&self) -> TxnId {
214 self.inner.last_committed.load(Ordering::SeqCst)
215 }
216}
217
218impl Default for TxnIdManager {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct RowVersion {
240 pub created_by: TxnId,
242 pub deleted_by: TxnId,
244}
245
246impl RowVersion {
247 pub fn new(created_by: TxnId) -> Self {
249 Self {
250 created_by,
251 deleted_by: TXN_ID_NONE,
252 }
253 }
254
255 pub fn delete(&mut self, deleted_by: TxnId) {
257 self.deleted_by = deleted_by;
258 }
259
260 pub fn is_visible(&self, snapshot_txn_id: TxnId) -> bool {
264 self.created_by <= snapshot_txn_id
265 && (self.deleted_by == TXN_ID_NONE || self.deleted_by > snapshot_txn_id)
266 }
267
268 pub fn is_visible_for(&self, manager: &TxnIdManager, snapshot: TransactionSnapshot) -> bool {
270 tracing::trace!(
271 "[MVCC] is_visible_for: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
272 self.created_by,
273 self.deleted_by,
274 snapshot.txn_id,
275 snapshot.snapshot_id
276 );
277
278 if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
282 let visible = self.deleted_by != snapshot.txn_id;
283 tracing::trace!("[MVCC] created by current txn, visible={}", visible);
284 return visible;
285 }
286
287 let creator_status = manager.status(self.created_by);
289 tracing::trace!("[MVCC] creator_status={:?}", creator_status);
290 if !creator_status.is_committed() {
291 tracing::trace!("[MVCC] creator not committed, invisible");
292 return false;
293 }
294
295 if self.created_by > snapshot.snapshot_id {
296 tracing::trace!("[MVCC] created_by > snapshot_id, invisible");
297 return false;
298 }
299
300 match self.deleted_by {
301 TXN_ID_NONE => {
302 tracing::trace!("[MVCC] not deleted, visible");
303 true
304 }
305 tx if tx == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT => {
306 tracing::trace!("[MVCC] deleted by current txn, invisible");
307 false
308 }
309 tx => {
310 if !manager.status(tx).is_committed() {
311 tracing::trace!("[MVCC] deleter not committed, visible");
314 return true;
315 }
316 let visible = tx > snapshot.snapshot_id;
317 tracing::trace!("[MVCC] deleter committed, visible={}", visible);
318 visible
319 }
320 }
321 }
322}
323
324#[derive(Debug, Clone, Copy)]
334pub struct TransactionSnapshot {
335 pub txn_id: TxnId,
337 pub snapshot_id: TxnId,
339}
340
341#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum TxnStatus {
344 Active,
345 Committed,
346 Aborted,
347 None,
348}
349
350impl TxnStatus {
351 pub fn is_committed(self) -> bool {
352 matches!(self, TxnStatus::Committed)
353 }
354
355 pub fn is_active(self) -> bool {
356 matches!(self, TxnStatus::Active)
357 }
358
359 pub fn is_aborted(self) -> bool {
360 matches!(self, TxnStatus::Aborted)
361 }
362}
363
364pub fn build_insert_mvcc_columns(
370 row_count: usize,
371 start_row_id: RowId,
372 creator_txn_id: TxnId,
373 deleted_marker: TxnId,
374) -> (ArrayRef, ArrayRef, ArrayRef) {
375 let mut row_builder = UInt64Builder::with_capacity(row_count);
376 for offset in 0..row_count {
377 row_builder.append_value(start_row_id + offset as u64);
378 }
379
380 let mut created_builder = UInt64Builder::with_capacity(row_count);
381 let mut deleted_builder = UInt64Builder::with_capacity(row_count);
382 for _ in 0..row_count {
383 created_builder.append_value(creator_txn_id);
384 deleted_builder.append_value(deleted_marker);
385 }
386
387 (
388 Arc::new(row_builder.finish()) as ArrayRef,
389 Arc::new(created_builder.finish()) as ArrayRef,
390 Arc::new(deleted_builder.finish()) as ArrayRef,
391 )
392}
393
394pub fn build_mvcc_fields() -> Vec<Field> {
396 vec![
397 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
398 Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
399 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
400 ]
401}
402
403pub fn build_field_with_metadata(
405 name: &str,
406 data_type: DataType,
407 nullable: bool,
408 field_id: FieldId,
409) -> Field {
410 let mut metadata = HashMap::with_capacity(1);
411 metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
412 Field::new(name, data_type, nullable).with_metadata(metadata)
413}
414
415pub fn build_delete_batch(
417 row_ids: Vec<RowId>,
418 deleted_by_txn_id: TxnId,
419) -> llkv_result::Result<RecordBatch> {
420 let row_count = row_ids.len();
421
422 let fields = vec![
423 Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
424 Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
425 ];
426
427 let arrays: Vec<ArrayRef> = vec![
428 Arc::new(UInt64Array::from(row_ids)),
429 Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
430 ];
431
432 RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[test]
440 fn test_txn_id_manager_allocates_monotonic_ids() {
441 let manager = TxnIdManager::new();
442 let snapshot1 = manager.begin_transaction();
443 let snapshot2 = manager.begin_transaction();
444 assert!(snapshot2.txn_id > snapshot1.txn_id);
445 }
446
447 #[test]
448 fn test_row_visibility_simple() {
449 let manager = TxnIdManager::new();
450 let writer_snapshot = manager.begin_transaction();
451 let mut row = RowVersion::new(writer_snapshot.txn_id);
452
453 assert!(row.is_visible(writer_snapshot.txn_id));
455 assert!(row.is_visible_for(&manager, writer_snapshot));
456
457 manager.mark_committed(writer_snapshot.txn_id);
459 let committed = manager.last_committed();
460 assert!(row.is_visible(committed));
461
462 let reader_snapshot = manager.begin_transaction();
463 assert!(row.is_visible_for(&manager, reader_snapshot));
464
465 let deleter_snapshot = manager.begin_transaction();
467 row.delete(deleter_snapshot.txn_id);
468 assert!(row.is_visible_for(&manager, reader_snapshot));
469
470 manager.mark_committed(deleter_snapshot.txn_id);
471 assert!(row.is_visible_for(&manager, reader_snapshot));
472
473 let post_delete_snapshot = manager.begin_transaction();
474 assert!(!row.is_visible_for(&manager, post_delete_snapshot));
475 }
476}