Skip to main content

alopex_embedded/
txn_manager.rs

1//! Transaction manager for CLI-driven KV transactions.
2
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use alopex_core::kv::any::AnyKVTransaction;
6use alopex_core::{KVStore, KVTransaction, TxnMode};
7
8use crate::{Database, Error, Result};
9
10const TXN_META_PREFIX: &[u8] = b"__alopex_txn_meta__:";
11const TXN_WRITE_PREFIX: &[u8] = b"__alopex_txn_write__:";
12const TXN_WRITE_DELETE: u8 = 0;
13const TXN_WRITE_PUT: u8 = 1;
14
15/// Metadata for a persisted KV transaction.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct TransactionInfo {
18    /// Transaction identifier.
19    pub txn_id: String,
20    /// Persisted transaction start time.
21    pub started_at: SystemTime,
22    /// Timeout duration in seconds.
23    pub timeout_secs: u64,
24    /// Whether the transaction is expired.
25    pub is_expired: bool,
26}
27
28#[derive(Debug, Clone, Copy)]
29struct TxnMeta {
30    started_at_secs: u64,
31    timeout_secs: u64,
32}
33
34enum TxnWrite {
35    Put(Vec<u8>),
36    Delete,
37}
38
39/// Manages persisted KV transactions for CLI usage.
40pub struct TransactionManager;
41
42impl TransactionManager {
43    /// Begins a new transaction with the given timeout and returns its ID.
44    pub fn begin_with_timeout(db: &Database, timeout: Duration) -> Result<String> {
45        let txn_id = generate_txn_id();
46        let meta = TxnMeta {
47            started_at_secs: current_timestamp_secs(),
48            timeout_secs: timeout.as_secs(),
49        };
50        let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
51        txn.put(txn_meta_key(&txn_id), encode_meta(meta))
52            .map_err(Error::Core)?;
53        txn.commit_self().map_err(Error::Core)?;
54        Ok(txn_id)
55    }
56
57    /// Retrieves persisted transaction metadata.
58    pub fn get_info(db: &Database, txn_id: &str) -> Result<TransactionInfo> {
59        let mut txn = db.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
60        let result = load_meta(&mut txn, txn_id).map(|meta| {
61            let started_at = UNIX_EPOCH + Duration::from_secs(meta.started_at_secs);
62            let is_expired = is_expired_from_meta(meta, current_timestamp_secs());
63            TransactionInfo {
64                txn_id: txn_id.to_string(),
65                started_at,
66                timeout_secs: meta.timeout_secs,
67                is_expired,
68            }
69        });
70        let commit_result = txn.commit_self().map_err(Error::Core);
71        match (result, commit_result) {
72            (Err(err), _) => Err(err),
73            (Ok(_), Err(err)) => Err(err),
74            (Ok(info), Ok(())) => Ok(info),
75        }
76    }
77
78    /// Checks whether a transaction has expired.
79    pub fn is_expired(db: &Database, txn_id: &str) -> Result<bool> {
80        let mut txn = db.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
81        let result = load_meta(&mut txn, txn_id)
82            .map(|meta| is_expired_from_meta(meta, current_timestamp_secs()));
83        let commit_result = txn.commit_self().map_err(Error::Core);
84        match (result, commit_result) {
85            (Err(err), _) => Err(err),
86            (Ok(_), Err(err)) => Err(err),
87            (Ok(value), Ok(())) => Ok(value),
88        }
89    }
90
91    /// Retrieves a key within the specified transaction.
92    pub fn get(db: &Database, txn_id: &str, key: &[u8]) -> Result<Option<Vec<u8>>> {
93        let mut txn = db.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
94        let result = (|| {
95            let _ = load_meta(&mut txn, txn_id)?;
96            if let Some(raw) = txn.get(&txn_write_key(txn_id, key)).map_err(Error::Core)? {
97                return Ok(match decode_write(txn_id, &raw)? {
98                    TxnWrite::Put(value) => Some(value),
99                    TxnWrite::Delete => None,
100                });
101            }
102            txn.get(&key.to_vec()).map_err(Error::Core)
103        })();
104        let commit_result = txn.commit_self().map_err(Error::Core);
105        match (result, commit_result) {
106            (Err(err), _) => Err(err),
107            (Ok(_), Err(err)) => Err(err),
108            (Ok(value), Ok(())) => Ok(value),
109        }
110    }
111
112    /// Stages a put operation within the specified transaction.
113    pub fn put(db: &Database, txn_id: &str, key: &[u8], value: &[u8]) -> Result<()> {
114        let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
115        let _ = load_meta(&mut txn, txn_id)?;
116        let encoded = encode_write(TxnWrite::Put(value.to_vec()));
117        txn.put(txn_write_key(txn_id, key), encoded)
118            .map_err(Error::Core)?;
119        txn.commit_self().map_err(Error::Core)?;
120        Ok(())
121    }
122
123    /// Stages a delete operation within the specified transaction.
124    pub fn delete(db: &Database, txn_id: &str, key: &[u8]) -> Result<()> {
125        let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
126        let _ = load_meta(&mut txn, txn_id)?;
127        let encoded = encode_write(TxnWrite::Delete);
128        txn.put(txn_write_key(txn_id, key), encoded)
129            .map_err(Error::Core)?;
130        txn.commit_self().map_err(Error::Core)?;
131        Ok(())
132    }
133
134    /// Commits staged writes and finalizes the transaction.
135    pub fn commit(db: &Database, txn_id: &str) -> Result<()> {
136        let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
137        let _ = load_meta(&mut txn, txn_id)?;
138        let prefix = txn_write_prefix(txn_id);
139        let staged: Vec<(Vec<u8>, Vec<u8>)> =
140            txn.scan_prefix(&prefix).map_err(Error::Core)?.collect();
141        for (staged_key, raw) in &staged {
142            let user_key = extract_user_key(txn_id, staged_key)?;
143            match decode_write(txn_id, raw)? {
144                TxnWrite::Put(value) => {
145                    txn.put(user_key, value).map_err(Error::Core)?;
146                }
147                TxnWrite::Delete => {
148                    txn.delete(user_key).map_err(Error::Core)?;
149                }
150            }
151        }
152        for (staged_key, _) in staged {
153            txn.delete(staged_key).map_err(Error::Core)?;
154        }
155        txn.delete(txn_meta_key(txn_id)).map_err(Error::Core)?;
156        txn.commit_self().map_err(Error::Core)?;
157        Ok(())
158    }
159
160    /// Rolls back staged writes and removes transaction metadata.
161    pub fn rollback(db: &Database, txn_id: &str) -> Result<()> {
162        let mut txn = db.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
163        let _ = load_meta(&mut txn, txn_id)?;
164        let prefix = txn_write_prefix(txn_id);
165        let staged: Vec<(Vec<u8>, Vec<u8>)> =
166            txn.scan_prefix(&prefix).map_err(Error::Core)?.collect();
167        for (staged_key, _) in staged {
168            txn.delete(staged_key).map_err(Error::Core)?;
169        }
170        txn.delete(txn_meta_key(txn_id)).map_err(Error::Core)?;
171        txn.commit_self().map_err(Error::Core)?;
172        Ok(())
173    }
174}
175
176fn current_timestamp_secs() -> u64 {
177    SystemTime::now()
178        .duration_since(UNIX_EPOCH)
179        .unwrap_or_default()
180        .as_secs()
181}
182
183fn generate_txn_id() -> String {
184    let nanos = SystemTime::now()
185        .duration_since(UNIX_EPOCH)
186        .unwrap_or_default()
187        .as_nanos();
188    format!("txn-{}-{}", nanos, std::process::id())
189}
190
191fn txn_meta_key(txn_id: &str) -> Vec<u8> {
192    let mut key = Vec::with_capacity(TXN_META_PREFIX.len() + txn_id.len());
193    key.extend_from_slice(TXN_META_PREFIX);
194    key.extend_from_slice(txn_id.as_bytes());
195    key
196}
197
198fn txn_write_prefix(txn_id: &str) -> Vec<u8> {
199    let mut key = Vec::with_capacity(TXN_WRITE_PREFIX.len() + txn_id.len() + 1);
200    key.extend_from_slice(TXN_WRITE_PREFIX);
201    key.extend_from_slice(txn_id.as_bytes());
202    key.push(b':');
203    key
204}
205
206fn txn_write_key(txn_id: &str, key: &[u8]) -> Vec<u8> {
207    let mut full = txn_write_prefix(txn_id);
208    full.extend_from_slice(key);
209    full
210}
211
212fn encode_meta(meta: TxnMeta) -> Vec<u8> {
213    let mut payload = Vec::with_capacity(16);
214    payload.extend_from_slice(&meta.started_at_secs.to_le_bytes());
215    payload.extend_from_slice(&meta.timeout_secs.to_le_bytes());
216    payload
217}
218
219fn decode_meta(txn_id: &str, raw: &[u8]) -> Result<TxnMeta> {
220    if raw.len() < 16 {
221        return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
222            "transaction metadata invalid: {}",
223            txn_id
224        ))));
225    }
226    let started_at_secs = u64::from_le_bytes(raw[0..8].try_into().unwrap());
227    let timeout_secs = u64::from_le_bytes(raw[8..16].try_into().unwrap());
228    Ok(TxnMeta {
229        started_at_secs,
230        timeout_secs,
231    })
232}
233
234fn load_meta(txn: &mut AnyKVTransaction<'_>, txn_id: &str) -> Result<TxnMeta> {
235    let Some(raw) = txn.get(&txn_meta_key(txn_id)).map_err(Error::Core)? else {
236        return Err(Error::InvalidTransactionId(txn_id.to_string()));
237    };
238    decode_meta(txn_id, &raw)
239}
240
241fn is_expired_from_meta(meta: TxnMeta, now_secs: u64) -> bool {
242    now_secs.saturating_sub(meta.started_at_secs) >= meta.timeout_secs
243}
244
245fn encode_write(entry: TxnWrite) -> Vec<u8> {
246    match entry {
247        TxnWrite::Put(value) => {
248            let mut payload = Vec::with_capacity(1 + value.len());
249            payload.push(TXN_WRITE_PUT);
250            payload.extend_from_slice(&value);
251            payload
252        }
253        TxnWrite::Delete => vec![TXN_WRITE_DELETE],
254    }
255}
256
257fn decode_write(txn_id: &str, raw: &[u8]) -> Result<TxnWrite> {
258    let Some((&tag, rest)) = raw.split_first() else {
259        return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
260            "transaction write entry invalid: {}",
261            txn_id
262        ))));
263    };
264    match tag {
265        TXN_WRITE_PUT => Ok(TxnWrite::Put(rest.to_vec())),
266        TXN_WRITE_DELETE => Ok(TxnWrite::Delete),
267        _ => Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
268            "transaction write entry invalid: {}",
269            txn_id
270        )))),
271    }
272}
273
274fn extract_user_key(txn_id: &str, staged_key: &[u8]) -> Result<Vec<u8>> {
275    let prefix = txn_write_prefix(txn_id);
276    if !staged_key.starts_with(&prefix) {
277        return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
278            "transaction write key invalid: {}",
279            txn_id
280        ))));
281    }
282    Ok(staged_key[prefix.len()..].to_vec())
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    fn create_test_db() -> Database {
290        Database::open_in_memory().unwrap()
291    }
292
293    #[test]
294    fn test_txn_put_get_commit() {
295        let db = create_test_db();
296        let txn_id = TransactionManager::begin_with_timeout(&db, Duration::from_secs(60)).unwrap();
297
298        TransactionManager::put(&db, &txn_id, b"alpha", b"beta").unwrap();
299        let value = TransactionManager::get(&db, &txn_id, b"alpha").unwrap();
300        assert_eq!(value, Some(b"beta".to_vec()));
301
302        TransactionManager::commit(&db, &txn_id).unwrap();
303
304        let mut verify_txn = db.begin(TxnMode::ReadOnly).unwrap();
305        let stored = verify_txn.get(b"alpha").unwrap();
306        verify_txn.commit().unwrap();
307        assert_eq!(stored, Some(b"beta".to_vec()));
308
309        let err = TransactionManager::get(&db, &txn_id, b"alpha").unwrap_err();
310        assert!(matches!(err, Error::InvalidTransactionId(_)));
311    }
312
313    #[test]
314    fn test_txn_rollback_discards_writes() {
315        let db = create_test_db();
316        let txn_id = TransactionManager::begin_with_timeout(&db, Duration::from_secs(60)).unwrap();
317
318        TransactionManager::put(&db, &txn_id, b"key", b"value").unwrap();
319        TransactionManager::rollback(&db, &txn_id).unwrap();
320
321        let mut verify_txn = db.begin(TxnMode::ReadOnly).unwrap();
322        let stored = verify_txn.get(b"key").unwrap();
323        verify_txn.commit().unwrap();
324        assert!(stored.is_none());
325
326        let err = TransactionManager::get(&db, &txn_id, b"key").unwrap_err();
327        assert!(matches!(err, Error::InvalidTransactionId(_)));
328    }
329
330    #[test]
331    fn test_txn_delete_marks_missing() {
332        let db = create_test_db();
333        {
334            let mut seed = db.begin(TxnMode::ReadWrite).unwrap();
335            seed.put(b"drop-me", b"payload").unwrap();
336            seed.commit().unwrap();
337        }
338
339        let txn_id = TransactionManager::begin_with_timeout(&db, Duration::from_secs(60)).unwrap();
340        TransactionManager::delete(&db, &txn_id, b"drop-me").unwrap();
341        let value = TransactionManager::get(&db, &txn_id, b"drop-me").unwrap();
342        assert!(value.is_none());
343        TransactionManager::rollback(&db, &txn_id).unwrap();
344    }
345
346    #[test]
347    fn test_txn_is_expired() {
348        let db = create_test_db();
349        let txn_id = TransactionManager::begin_with_timeout(&db, Duration::from_secs(0)).unwrap();
350        assert!(TransactionManager::is_expired(&db, &txn_id).unwrap());
351    }
352}