1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
use crate::tree::tree_error::{TreeError, TreeResult};
use crate::{DataValue, Tree};
use std::collections::{HashMap, HashSet};
use std::time::{Duration, SystemTime};
/// Represents the current state of a database transaction.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TransactionStatus {
Active,
Committed,
Aborted,
}
/// A version stamp that tracks the version and timestamp of a data item.
#[derive(Debug, Clone)]
pub struct VersionStamp {
pub version: u64,
pub timestamp: SystemTime,
}
/// Complete context and state information for a database transaction.
#[derive(Debug, Clone)]
pub struct TransactionContext {
pub read_set: HashMap<Vec<u8>, VersionStamp>,
pub write_set: HashMap<Vec<u8>, DataValue>,
pub validation_set: HashSet<Vec<u8>>,
pub status: TransactionStatus,
}
impl Tree {
/// Begins a new transaction and returns its unique identifier.
///
/// This method creates a new transaction context with its own isolated write set.
/// The transaction will be assigned a unique ID that can be used for subsequent
/// transactional operations.
///
/// # Returns
/// - `Ok(u64)` - The unique transaction ID
/// - `Err(TreeError)` - If the transaction cannot be created
pub fn begin_transaction(&mut self) -> TreeResult<u64> {
let tx_manager = self.tx_manager.lock().unwrap();
tx_manager.begin_transaction()
}
/// Retrieves a value from the tree within the context of a transaction.
///
/// This method first checks the transaction's local write set for any uncommitted
/// changes. If no local changes are found, it falls back to reading from the main
/// tree storage. The read operation is recorded for transaction validation purposes.
///
/// # Arguments
/// - `tx_id` - The transaction ID
/// - `key` - The key to retrieve
///
/// # Returns
/// - `Ok(Some(Vec<u8>))` - The value if found and not expired
/// - `Ok(None)` - If the key doesn't exist or the value has expired
/// - `Err(TreeError)` - If the transaction is invalid or a read error occurs
pub fn get_tx(&mut self, tx_id: u64, key: &[u8]) -> TreeResult<Option<Vec<u8>>> {
let local_value = {
let tx_manager = self.tx_manager.lock().unwrap();
let active_txs = tx_manager.active_transactions.read().unwrap();
if let Some(tx_context) = active_txs.get(&tx_id) {
tx_context.write_set.get(key).cloned()
} else {
return Err(TreeError::transaction("Transaction not found"));
}
};
if let Some(value) = local_value {
if value.is_expired() {
return Ok(None);
}
return Ok(Some(value.data));
}
let result = self.get(key)?;
{
let tx_manager = self.tx_manager.lock().unwrap();
let mut active_txs = tx_manager.active_transactions.write().unwrap();
if let Some(tx_context) = active_txs.get_mut(&tx_id) {
tx_context.validation_set.insert(key.to_vec());
let key_versions = tx_manager.key_versions.read().unwrap();
if let Some(version_stamp) = key_versions.get(key) {
tx_context
.read_set
.insert(key.to_vec(), version_stamp.clone());
} else if result.is_some() {
use crate::tree::transaction::VersionStamp;
use std::time::SystemTime;
let default_version = VersionStamp {
version: 0,
timestamp: SystemTime::UNIX_EPOCH,
};
tx_context.read_set.insert(key.to_vec(), default_version);
}
}
}
Ok(result)
}
/// Retrieves and deserializes a typed value within the context of a transaction.
///
/// This method reads from the transaction's local write set first, then falls back
/// to the main tree storage if the key is not found locally. The value is automatically
/// deserialized using bincode. The read operation is recorded for transaction validation.
///
/// # Arguments
/// - `tx_id` - The transaction ID
/// - `key` - The string key to look up
///
/// # Type Parameters
/// - `T` - The type to deserialize to, must implement bincode::Decode
///
/// # Returns
/// - `Ok(Some(T))` - The deserialized value if found and valid
/// - `Ok(None)` - If the key doesn't exist, has expired, or deserialization fails
/// - `Err(TreeError)` - If the transaction is invalid or a read error occurs
pub fn get_typed_tx<T>(&mut self, tx_id: u64, key: &str) -> TreeResult<Option<T>>
where
T: bincode::Decode<()>,
{
let key_bytes = key.as_bytes();
match self.get_tx(tx_id, key_bytes)? {
Some(value_bytes) => {
let (decoded, _) =
bincode::decode_from_slice(&value_bytes, self.settings.bincode_config)?;
Ok(Some(decoded))
}
None => Ok(None),
}
}
/// Stores a key-value pair within the context of a transaction.
///
/// This method adds the key-value pair to the transaction's local write set
/// without immediately persisting it to the main tree storage. The changes
/// will only become visible to other transactions after a successful commit.
///
/// # Arguments
/// - `tx_id` - The transaction ID
/// - `key` - The key to store
/// - `value` - The value to associate with the key
/// - `ttl` - Optional time-to-live duration for the key-value pair
///
/// # Returns
/// - `Ok(())` - If the operation succeeds
/// - `Err(TreeError)` - If the transaction is invalid or a write error occurs
pub fn put_tx(
&mut self,
tx_id: u64,
key: Vec<u8>,
value: Vec<u8>,
ttl: Option<Duration>,
) -> TreeResult<()> {
let data_value = DataValue::new(value, ttl);
let tx_manager = self.tx_manager.lock().unwrap();
tx_manager.write_transaction(tx_id, key, data_value)
}
/// Stores a typed value within the context of a transaction without TTL.
///
/// The value is automatically serialized using bincode and added to the
/// transaction's local write set. Changes will only become visible after
/// a successful commit.
///
/// # Arguments
/// - `tx_id` - The transaction ID
/// - `key` - The string key to store the value under
/// - `value` - The value to store (must implement Encode trait)
///
/// # Type Parameters
/// - `T` - The type of value to store, must implement bincode::Encode
///
/// # Returns
/// - `Ok(())` - If the operation succeeds
/// - `Err(TreeError)` - If the transaction is invalid or serialization fails
pub fn put_typed_tx<T>(&mut self, tx_id: u64, key: &str, value: &T) -> TreeResult<()>
where
T: bincode::Encode,
{
self.put_typed_tx_with_ttl_optional(tx_id, key, value, None)
}
/// Stores a typed value within the context of a transaction with a TTL.
///
/// The value is automatically serialized using bincode and will expire
/// after the specified duration from the time of commit.
///
/// # Arguments
/// - `tx_id` - The transaction ID
/// - `key` - The string key to store the value under
/// - `value` - The value to store (must implement Encode trait)
/// - `ttl` - Time-to-live duration for the value
///
/// # Type Parameters
/// - `T` - The type of value to store, must implement bincode::Encode
///
/// # Returns
/// - `Ok(())` - If the operation succeeds
/// - `Err(TreeError)` - If the transaction is invalid or serialization fails
pub fn put_typed_tx_with_ttl<T>(
&mut self,
tx_id: u64,
key: &str,
value: &T,
ttl: Duration,
) -> TreeResult<()>
where
T: bincode::Encode,
{
self.put_typed_tx_with_ttl_optional(tx_id, key, value, Some(ttl))
}
fn put_typed_tx_with_ttl_optional<T>(
&mut self,
tx_id: u64,
key: &str,
value: &T,
ttl: Option<Duration>,
) -> TreeResult<()>
where
T: bincode::Encode,
{
let serialized = bincode::encode_to_vec(value, self.settings.bincode_config)?;
self.put_tx(tx_id, key.as_bytes().to_vec(), serialized, ttl)
}
/// Commits a transaction, making all its changes permanent and visible to other transactions.
///
/// This method applies all changes from the transaction's write set to the main tree storage.
/// It handles TTL expiration during commit and ensures that expired values are not persisted.
/// The transaction is marked as committed and then removed from the active transactions list.
///
/// # Arguments
/// - `tx_id` - The transaction ID to commit
///
/// # Returns
/// - `Ok(())` - If the transaction is successfully committed
/// - `Err(TreeError)` - If the transaction is not found or commit fails
pub fn commit_transaction(&mut self, tx_id: u64) -> TreeResult<()> {
let write_set = {
let tx_manager = self.tx_manager.lock().unwrap();
let validation_result = tx_manager.validate_transaction(tx_id)?;
if !validation_result {
tx_manager.rollback_transaction(tx_id)?;
return Err(TreeError::transaction("Transaction validation failed - conflicts detected"));
}
let active_txs = tx_manager.active_transactions.read().unwrap();
if let Some(tx_context) = active_txs.get(&tx_id) {
tx_context.write_set.clone()
} else {
return Err(TreeError::transaction("Transaction not found"));
}
};
for (key, value) in write_set {
if value.is_expired() {
continue;
}
match value.expires_at {
None => {
self.put(key, value.data)?;
}
Some(expiry) => match expiry.duration_since(SystemTime::now()) {
Ok(remaining_ttl) => {
self.put_to_tree(key, value.data, Some(remaining_ttl))?;
}
Err(_) => {
continue;
}
},
}
}
{
let tx_manager = self.tx_manager.lock().unwrap();
tx_manager.apply_transaction_changes(tx_id)?;
tx_manager.finalize_transaction(tx_id)?;
}
Ok(())
}
/// Rolls back a transaction, discarding all its changes and making them invisible.
///
/// This method cancels the transaction without applying any of its changes to the
/// main tree storage. All data in the transaction's write set is discarded, and
/// the transaction is removed from the active transactions list.
///
/// # Arguments
/// - `tx_id` - The transaction ID to rollback
///
/// # Returns
/// - `Ok(())` - If the transaction is successfully rolled back
/// - `Err(TreeError)` - If there's an error during rollback
pub fn rollback_transaction(&mut self, tx_id: u64) -> TreeResult<()> {
let tx_manager = self.tx_manager.lock().unwrap();
tx_manager.rollback_transaction(tx_id)
}
}