1use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use alopex_core::kv::any::AnyKVTransaction;
6use alopex_core::{KVStore, KVTransaction, TxnMode};
7
8use crate::{Database, Error, Result};
9
10const TXN_META_PREFIX: &[u8] = b"__alopex_txn_meta__:";
11const TXN_WRITE_PREFIX: &[u8] = b"__alopex_txn_write__:";
12const TXN_WRITE_DELETE: u8 = 0;
13const TXN_WRITE_PUT: u8 = 1;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct TransactionInfo {
18 pub txn_id: String,
20 pub started_at: SystemTime,
22 pub timeout_secs: u64,
24 pub is_expired: bool,
26}
27
28#[derive(Debug, Clone, Copy)]
29struct TxnMeta {
30 started_at_secs: u64,
31 timeout_secs: u64,
32}
33
34enum TxnWrite {
35 Put(Vec<u8>),
36 Delete,
37}
38
39pub struct TransactionManager;
41
42impl TransactionManager {
43 pub fn begin_with_timeout(db: &Database, timeout: Duration) -> Result<String> {
45 let txn_id = generate_txn_id();
46 let meta = TxnMeta {
47 started_at_secs: current_timestamp_secs(),
48 timeout_secs: timeout.as_secs(),
49 };
50 let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
51 txn.put(txn_meta_key(&txn_id), encode_meta(meta))
52 .map_err(Error::Core)?;
53 txn.commit_self().map_err(Error::Core)?;
54 Ok(txn_id)
55 }
56
57 pub fn get_info(db: &Database, txn_id: &str) -> Result<TransactionInfo> {
59 let mut txn = db.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
60 let result = load_meta(&mut txn, txn_id).map(|meta| {
61 let started_at = UNIX_EPOCH + Duration::from_secs(meta.started_at_secs);
62 let is_expired = is_expired_from_meta(meta, current_timestamp_secs());
63 TransactionInfo {
64 txn_id: txn_id.to_string(),
65 started_at,
66 timeout_secs: meta.timeout_secs,
67 is_expired,
68 }
69 });
70 let commit_result = txn.commit_self().map_err(Error::Core);
71 match (result, commit_result) {
72 (Err(err), _) => Err(err),
73 (Ok(_), Err(err)) => Err(err),
74 (Ok(info), Ok(())) => Ok(info),
75 }
76 }
77
78 pub fn is_expired(db: &Database, txn_id: &str) -> Result<bool> {
80 let mut txn = db.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
81 let result = load_meta(&mut txn, txn_id)
82 .map(|meta| is_expired_from_meta(meta, current_timestamp_secs()));
83 let commit_result = txn.commit_self().map_err(Error::Core);
84 match (result, commit_result) {
85 (Err(err), _) => Err(err),
86 (Ok(_), Err(err)) => Err(err),
87 (Ok(value), Ok(())) => Ok(value),
88 }
89 }
90
91 pub fn get(db: &Database, txn_id: &str, key: &[u8]) -> Result<Option<Vec<u8>>> {
93 let mut txn = db.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
94 let result = (|| {
95 let _ = load_meta(&mut txn, txn_id)?;
96 if let Some(raw) = txn.get(&txn_write_key(txn_id, key)).map_err(Error::Core)? {
97 return Ok(match decode_write(txn_id, &raw)? {
98 TxnWrite::Put(value) => Some(value),
99 TxnWrite::Delete => None,
100 });
101 }
102 txn.get(&key.to_vec()).map_err(Error::Core)
103 })();
104 let commit_result = txn.commit_self().map_err(Error::Core);
105 match (result, commit_result) {
106 (Err(err), _) => Err(err),
107 (Ok(_), Err(err)) => Err(err),
108 (Ok(value), Ok(())) => Ok(value),
109 }
110 }
111
112 pub fn put(db: &Database, txn_id: &str, key: &[u8], value: &[u8]) -> Result<()> {
114 let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
115 let _ = load_meta(&mut txn, txn_id)?;
116 let encoded = encode_write(TxnWrite::Put(value.to_vec()));
117 txn.put(txn_write_key(txn_id, key), encoded)
118 .map_err(Error::Core)?;
119 txn.commit_self().map_err(Error::Core)?;
120 Ok(())
121 }
122
123 pub fn delete(db: &Database, txn_id: &str, key: &[u8]) -> Result<()> {
125 let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
126 let _ = load_meta(&mut txn, txn_id)?;
127 let encoded = encode_write(TxnWrite::Delete);
128 txn.put(txn_write_key(txn_id, key), encoded)
129 .map_err(Error::Core)?;
130 txn.commit_self().map_err(Error::Core)?;
131 Ok(())
132 }
133
134 pub fn commit(db: &Database, txn_id: &str) -> Result<()> {
136 let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
137 let _ = load_meta(&mut txn, txn_id)?;
138 let prefix = txn_write_prefix(txn_id);
139 let staged: Vec<(Vec<u8>, Vec<u8>)> =
140 txn.scan_prefix(&prefix).map_err(Error::Core)?.collect();
141 for (staged_key, raw) in &staged {
142 let user_key = extract_user_key(txn_id, staged_key)?;
143 match decode_write(txn_id, raw)? {
144 TxnWrite::Put(value) => {
145 txn.put(user_key, value).map_err(Error::Core)?;
146 }
147 TxnWrite::Delete => {
148 txn.delete(user_key).map_err(Error::Core)?;
149 }
150 }
151 }
152 for (staged_key, _) in staged {
153 txn.delete(staged_key).map_err(Error::Core)?;
154 }
155 txn.delete(txn_meta_key(txn_id)).map_err(Error::Core)?;
156 txn.commit_self().map_err(Error::Core)?;
157 Ok(())
158 }
159
160 pub fn rollback(db: &Database, txn_id: &str) -> Result<()> {
162 let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
163 let _ = load_meta(&mut txn, txn_id)?;
164 let prefix = txn_write_prefix(txn_id);
165 let staged: Vec<(Vec<u8>, Vec<u8>)> =
166 txn.scan_prefix(&prefix).map_err(Error::Core)?.collect();
167 for (staged_key, _) in staged {
168 txn.delete(staged_key).map_err(Error::Core)?;
169 }
170 txn.delete(txn_meta_key(txn_id)).map_err(Error::Core)?;
171 txn.commit_self().map_err(Error::Core)?;
172 Ok(())
173 }
174}
175
176fn current_timestamp_secs() -> u64 {
177 SystemTime::now()
178 .duration_since(UNIX_EPOCH)
179 .unwrap_or_default()
180 .as_secs()
181}
182
183fn generate_txn_id() -> String {
184 let nanos = SystemTime::now()
185 .duration_since(UNIX_EPOCH)
186 .unwrap_or_default()
187 .as_nanos();
188 format!("txn-{}-{}", nanos, std::process::id())
189}
190
191fn txn_meta_key(txn_id: &str) -> Vec<u8> {
192 let mut key = Vec::with_capacity(TXN_META_PREFIX.len() + txn_id.len());
193 key.extend_from_slice(TXN_META_PREFIX);
194 key.extend_from_slice(txn_id.as_bytes());
195 key
196}
197
198fn txn_write_prefix(txn_id: &str) -> Vec<u8> {
199 let mut key = Vec::with_capacity(TXN_WRITE_PREFIX.len() + txn_id.len() + 1);
200 key.extend_from_slice(TXN_WRITE_PREFIX);
201 key.extend_from_slice(txn_id.as_bytes());
202 key.push(b':');
203 key
204}
205
206fn txn_write_key(txn_id: &str, key: &[u8]) -> Vec<u8> {
207 let mut full = txn_write_prefix(txn_id);
208 full.extend_from_slice(key);
209 full
210}
211
212fn encode_meta(meta: TxnMeta) -> Vec<u8> {
213 let mut payload = Vec::with_capacity(16);
214 payload.extend_from_slice(&meta.started_at_secs.to_le_bytes());
215 payload.extend_from_slice(&meta.timeout_secs.to_le_bytes());
216 payload
217}
218
219fn decode_meta(txn_id: &str, raw: &[u8]) -> Result<TxnMeta> {
220 if raw.len() < 16 {
221 return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
222 "transaction metadata invalid: {}",
223 txn_id
224 ))));
225 }
226 let started_at_secs = u64::from_le_bytes(raw[0..8].try_into().unwrap());
227 let timeout_secs = u64::from_le_bytes(raw[8..16].try_into().unwrap());
228 Ok(TxnMeta {
229 started_at_secs,
230 timeout_secs,
231 })
232}
233
234fn load_meta(txn: &mut AnyKVTransaction<'_>, txn_id: &str) -> Result<TxnMeta> {
235 let Some(raw) = txn.get(&txn_meta_key(txn_id)).map_err(Error::Core)? else {
236 return Err(Error::InvalidTransactionId(txn_id.to_string()));
237 };
238 decode_meta(txn_id, &raw)
239}
240
241fn is_expired_from_meta(meta: TxnMeta, now_secs: u64) -> bool {
242 now_secs.saturating_sub(meta.started_at_secs) >= meta.timeout_secs
243}
244
245fn encode_write(entry: TxnWrite) -> Vec<u8> {
246 match entry {
247 TxnWrite::Put(value) => {
248 let mut payload = Vec::with_capacity(1 + value.len());
249 payload.push(TXN_WRITE_PUT);
250 payload.extend_from_slice(&value);
251 payload
252 }
253 TxnWrite::Delete => vec![TXN_WRITE_DELETE],
254 }
255}
256
257fn decode_write(txn_id: &str, raw: &[u8]) -> Result<TxnWrite> {
258 let Some((&tag, rest)) = raw.split_first() else {
259 return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
260 "transaction write entry invalid: {}",
261 txn_id
262 ))));
263 };
264 match tag {
265 TXN_WRITE_PUT => Ok(TxnWrite::Put(rest.to_vec())),
266 TXN_WRITE_DELETE => Ok(TxnWrite::Delete),
267 _ => Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
268 "transaction write entry invalid: {}",
269 txn_id
270 )))),
271 }
272}
273
274fn extract_user_key(txn_id: &str, staged_key: &[u8]) -> Result<Vec<u8>> {
275 let prefix = txn_write_prefix(txn_id);
276 if !staged_key.starts_with(&prefix) {
277 return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
278 "transaction write key invalid: {}",
279 txn_id
280 ))));
281 }
282 Ok(staged_key[prefix.len()..].to_vec())
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 fn create_test_db() -> Database {
290 Database::open_in_memory().unwrap()
291 }
292
293 #[test]
294 fn test_txn_put_get_commit() {
295 let db = create_test_db();
296 let txn_id = TransactionManager::begin_with_timeout(&db, Duration::from_secs(60)).unwrap();
297
298 TransactionManager::put(&db, &txn_id, b"alpha", b"beta").unwrap();
299 let value = TransactionManager::get(&db, &txn_id, b"alpha").unwrap();
300 assert_eq!(value, Some(b"beta".to_vec()));
301
302 TransactionManager::commit(&db, &txn_id).unwrap();
303
304 let mut verify_txn = db.begin(TxnMode::ReadOnly).unwrap();
305 let stored = verify_txn.get(b"alpha").unwrap();
306 verify_txn.commit().unwrap();
307 assert_eq!(stored, Some(b"beta".to_vec()));
308
309 let err = TransactionManager::get(&db, &txn_id, b"alpha").unwrap_err();
310 assert!(matches!(err, Error::InvalidTransactionId(_)));
311 }
312
313 #[test]
314 fn test_txn_rollback_discards_writes() {
315 let db = create_test_db();
316 let txn_id = TransactionManager::begin_with_timeout(&db, Duration::from_secs(60)).unwrap();
317
318 TransactionManager::put(&db, &txn_id, b"key", b"value").unwrap();
319 TransactionManager::rollback(&db, &txn_id).unwrap();
320
321 let mut verify_txn = db.begin(TxnMode::ReadOnly).unwrap();
322 let stored = verify_txn.get(b"key").unwrap();
323 verify_txn.commit().unwrap();
324 assert!(stored.is_none());
325
326 let err = TransactionManager::get(&db, &txn_id, b"key").unwrap_err();
327 assert!(matches!(err, Error::InvalidTransactionId(_)));
328 }
329
330 #[test]
331 fn test_txn_delete_marks_missing() {
332 let db = create_test_db();
333 {
334 let mut seed = db.begin(TxnMode::ReadWrite).unwrap();
335 seed.put(b"drop-me", b"payload").unwrap();
336 seed.commit().unwrap();
337 }
338
339 let txn_id = TransactionManager::begin_with_timeout(&db, Duration::from_secs(60)).unwrap();
340 TransactionManager::delete(&db, &txn_id, b"drop-me").unwrap();
341 let value = TransactionManager::get(&db, &txn_id, b"drop-me").unwrap();
342 assert!(value.is_none());
343 TransactionManager::rollback(&db, &txn_id).unwrap();
344 }
345
346 #[test]
347 fn test_txn_is_expired() {
348 let db = create_test_db();
349 let txn_id = TransactionManager::begin_with_timeout(&db, Duration::from_secs(0)).unwrap();
350 assert!(TransactionManager::is_expired(&db, &txn_id).unwrap());
351 }
352}