edgestore 1.0.0

Local-first embedded KV + vector database in Rust
use crate::error::EdgestoreError;
use crate::types::{Lsn, WalRecord, Operation};

#[derive(Debug, PartialEq)]
enum TxState {
    Active,
    Committed,
    RolledBack,
}

/// Multi-record atomic transaction batch.
pub struct Transaction {
    pub(crate) pending: Vec<WalRecord>,
    pub(crate) txid: u64,
    state: TxState,
}

impl Transaction {
    /// Create a new empty transaction with the given txid.
    pub fn new(txid: u64) -> Self {
        Transaction {
            pending: Vec::new(),
            txid,
            state: TxState::Active,
        }
    }

    /// Append a put operation to this transaction.
    pub fn put(&mut self, ns: &[u8], key: &[u8], val: &[u8], lsn: Lsn, timestamp: i64) -> Result<(), EdgestoreError> {
        if self.state != TxState::Active {
            return Err(EdgestoreError::InvalidOperation("transaction not active".to_string()));
        }
        let record = WalRecord {
            txid: self.txid,
            lsn,
            timestamp,
            ttl: 0,
            ns_len: ns.len() as u16,
            ns_bytes: ns.to_vec(),
            key_bytes: key.to_vec(),
            op: Operation::Put,
            value_hash: blake3::hash(val).into(),
            value_bytes: val.to_vec(),
        };
        self.pending.push(record);
        Ok(())
    }

    /// Append a put-with-TTL operation to this transaction.
    pub fn put_with_ttl(&mut self, ns: &[u8], key: &[u8], val: &[u8], ttl_secs: u32, lsn: Lsn, timestamp: i64) -> Result<(), EdgestoreError> {
        if self.state != TxState::Active {
            return Err(EdgestoreError::InvalidOperation("transaction not active".to_string()));
        }
        let record = WalRecord {
            txid: self.txid,
            lsn,
            timestamp,
            ttl: ttl_secs,
            ns_len: ns.len() as u16,
            ns_bytes: ns.to_vec(),
            key_bytes: key.to_vec(),
            op: Operation::Put,
            value_hash: blake3::hash(val).into(),
            value_bytes: val.to_vec(),
        };
        self.pending.push(record);
        Ok(())
    }

    /// Append a delete operation to this transaction.
    pub fn delete(&mut self, ns: &[u8], key: &[u8], lsn: Lsn, timestamp: i64) -> Result<(), EdgestoreError> {
        if self.state != TxState::Active {
            return Err(EdgestoreError::InvalidOperation("transaction not active".to_string()));
        }
        let record = WalRecord {
            txid: self.txid,
            lsn,
            timestamp,
            ttl: 0,
            ns_len: ns.len() as u16,
            ns_bytes: ns.to_vec(),
            key_bytes: key.to_vec(),
            op: Operation::Delete,
            value_hash: blake3::hash(b"").into(),
            value_bytes: vec![],
        };
        self.pending.push(record);
        Ok(())
    }

    /// Roll back this transaction, clearing all pending records.
    pub fn rollback_self(&mut self) {
        self.state = TxState::RolledBack;
        self.pending.clear();
    }

    /// Take ownership of pending records, marking this transaction committed.
    pub fn take_pending(&mut self) -> Result<Vec<WalRecord>, EdgestoreError> {
        match self.state {
            TxState::Active => {
                self.state = TxState::Committed;
                Ok(std::mem::take(&mut self.pending))
            }
            TxState::Committed => Err(EdgestoreError::InvalidOperation("already committed".to_string())),
            TxState::RolledBack => Err(EdgestoreError::InvalidOperation("already rolled back".to_string())),
        }
    }

    /// Returns true if this transaction is still active.
    pub fn is_active(&self) -> bool {
        self.state == TxState::Active
    }

    /// Convenience wrapper: commit via engine.
    pub fn commit(self, engine: &mut crate::engine::Engine) -> Result<Lsn, EdgestoreError> {
        engine.commit_transaction(self)
    }

    /// Convenience wrapper: rollback via engine.
    pub fn rollback(self, engine: &mut crate::engine::Engine) {
        engine.rollback_transaction(self)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_new_transaction_is_active() {
        let tx = Transaction::new(1);
        assert!(tx.is_active());
    }

    #[test]
    fn test_rollback_clears_pending() {
        let mut tx = Transaction::new(1);
        tx.put(b"ns", b"k", b"v", 1, 0).unwrap();
        assert_eq!(tx.pending.len(), 1);
        tx.rollback_self();
        assert_eq!(tx.pending.len(), 0);
        assert!(!tx.is_active());
        // subsequent put returns Err
        let result = tx.put(b"ns", b"k2", b"v2", 2, 0);
        assert!(result.is_err());
    }

    #[test]
    fn test_take_pending_twice_returns_err() {
        let mut tx = Transaction::new(1);
        tx.put(b"ns", b"k", b"v", 1, 0).unwrap();
        let _ = tx.take_pending().unwrap();
        let result = tx.take_pending();
        assert!(result.is_err());
    }

    #[test]
    fn test_delete_appends_delete_record() {
        let mut tx = Transaction::new(42);
        tx.put(b"ns", b"k1", b"v1", 1, 0).unwrap();
        tx.delete(b"ns", b"k1", 2, 0).unwrap();
        assert_eq!(tx.pending.len(), 2);
        let del = &tx.pending[1];
        assert_eq!(del.op, Operation::Delete);
        assert_eq!(del.key_bytes, b"k1");
        assert!(del.value_bytes.is_empty());
        assert_eq!(del.txid, 42);
    }

    #[test]
    fn test_delete_on_inactive_tx_returns_err() {
        let mut tx = Transaction::new(1);
        tx.rollback_self();
        let result = tx.delete(b"ns", b"k", 1, 0);
        assert!(result.is_err());
    }

    #[test]
    fn test_put_with_ttl_sets_ttl_field() {
        let mut tx = Transaction::new(5);
        tx.put_with_ttl(b"ns", b"key", b"val", 3600, 1, 0).unwrap();
        assert_eq!(tx.pending.len(), 1);
        assert_eq!(tx.pending[0].ttl, 3600);
        assert_eq!(tx.pending[0].op, Operation::Put);
        assert_eq!(tx.pending[0].value_bytes, b"val");
    }

    #[test]
    fn test_put_with_ttl_on_inactive_tx_returns_err() {
        let mut tx = Transaction::new(1);
        tx.rollback_self();
        let result = tx.put_with_ttl(b"ns", b"k", b"v", 60, 1, 0);
        assert!(result.is_err());
    }

    #[test]
    fn test_take_pending_after_rollback_returns_err() {
        let mut tx = Transaction::new(1);
        tx.put(b"ns", b"k", b"v", 1, 0).unwrap();
        tx.rollback_self();
        let result = tx.take_pending();
        assert!(result.is_err());
    }
}