reifydb_transaction/multi/transaction/serializable/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4// This file includes and modifies code from the skipdb project (https://github.com/al8n/skipdb),
5// originally licensed under the Apache License, Version 2.0.
6// Original copyright:
7//   Copyright (c) 2024 Al Liu
8//
9// The original Apache License can be found at:
10//   http://www.apache.org/licenses/LICENSE-2.0
11
12use std::{ops::Deref, sync::Arc};
13
14use TransactionSingleVersion::SingleVersionLock;
15pub use command::*;
16pub use query::*;
17use reifydb_core::{CommitVersion, EncodedKey, EncodedKeyRange, event::EventBus};
18use reifydb_store_transaction::{
19	MultiVersionContains, MultiVersionGet, MultiVersionRange, MultiVersionRangeRev, TransactionStore,
20};
21use reifydb_type::util::hex;
22use tracing::instrument;
23
24use crate::multi::transaction::version::StandardVersionProvider;
25
26#[allow(clippy::module_inception)]
27mod command;
28pub(crate) mod query;
29
30use crate::{
31	multi::transaction::{Committed, TransactionManager},
32	single::{TransactionSingleVersion, TransactionSvl},
33};
34
35pub struct TransactionSerializable(Arc<Inner>);
36
37pub struct Inner {
38	pub(crate) tm: TransactionManager<StandardVersionProvider>,
39	pub(crate) store: TransactionStore,
40	pub(crate) event_bus: EventBus,
41}
42
43impl Deref for TransactionSerializable {
44	type Target = Inner;
45
46	fn deref(&self) -> &Self::Target {
47		&self.0
48	}
49}
50
51impl Clone for TransactionSerializable {
52	fn clone(&self) -> Self {
53		Self(self.0.clone())
54	}
55}
56
57impl Inner {
58	fn new(store: TransactionStore, single: TransactionSingleVersion, event_bus: EventBus) -> Self {
59		let tm = TransactionManager::new(StandardVersionProvider::new(single).unwrap()).unwrap();
60
61		Self {
62			tm,
63			store,
64			event_bus,
65		}
66	}
67
68	fn version(&self) -> crate::Result<CommitVersion> {
69		self.tm.version()
70	}
71}
72
73impl TransactionSerializable {
74	pub fn testing() -> Self {
75		let store = TransactionStore::testing_memory();
76		let event_bus = EventBus::new();
77		Self::new(store.clone(), SingleVersionLock(TransactionSvl::new(store, event_bus.clone())), event_bus)
78	}
79}
80
81impl TransactionSerializable {
82	#[instrument(level = "debug", skip(store, single, event_bus))]
83	pub fn new(store: TransactionStore, single: TransactionSingleVersion, event_bus: EventBus) -> Self {
84		Self(Arc::new(Inner::new(store, single, event_bus)))
85	}
86}
87
88impl TransactionSerializable {
89	#[instrument(level = "trace", skip(self))]
90	pub fn version(&self) -> crate::Result<CommitVersion> {
91		self.0.version()
92	}
93
94	#[instrument(level = "debug", skip(self))]
95	pub fn begin_query(&self) -> crate::Result<QueryTransaction> {
96		QueryTransaction::new(self.clone(), None)
97	}
98}
99
100impl TransactionSerializable {
101	#[instrument(level = "debug", skip(self))]
102	pub fn begin_command(&self) -> crate::Result<CommandTransaction> {
103		CommandTransaction::new(self.clone())
104	}
105}
106
107pub enum Transaction {
108	Query(QueryTransaction),
109	Command(CommandTransaction),
110}
111
112impl TransactionSerializable {
113	#[instrument(level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
114	pub fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<Committed>, reifydb_type::Error> {
115		Ok(self.store.get(key, version)?.map(|sv| sv.into()))
116	}
117
118	#[instrument(level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
119	pub fn contains_key(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool, reifydb_type::Error> {
120		self.store.contains(key, version)
121	}
122
123	#[instrument(level = "trace", skip(self), fields(version = version.0, batch_size = batch_size))]
124	pub fn range_batched(
125		&self,
126		range: EncodedKeyRange,
127		version: CommitVersion,
128		batch_size: u64,
129	) -> reifydb_type::Result<<TransactionStore as MultiVersionRange>::RangeIter<'_>> {
130		self.store.range_batched(range, version, batch_size)
131	}
132
133	pub fn range(
134		&self,
135		range: EncodedKeyRange,
136		version: CommitVersion,
137	) -> reifydb_type::Result<<TransactionStore as MultiVersionRange>::RangeIter<'_>> {
138		self.range_batched(range, version, 1024)
139	}
140
141	pub fn range_rev_batched(
142		&self,
143		range: EncodedKeyRange,
144		version: CommitVersion,
145		batch_size: u64,
146	) -> reifydb_type::Result<<TransactionStore as MultiVersionRangeRev>::RangeIterRev<'_>> {
147		self.store.range_rev_batched(range, version, batch_size)
148	}
149
150	pub fn range_rev(
151		&self,
152		range: EncodedKeyRange,
153		version: CommitVersion,
154	) -> reifydb_type::Result<<TransactionStore as MultiVersionRangeRev>::RangeIterRev<'_>> {
155		self.range_rev_batched(range, version, 1024)
156	}
157}