Skip to main content

amaters_sdk_rust/
transaction.rs

1//! Transaction support for AmateRS SDK.
2//!
3//! A `Transaction` buffers SET and DELETE operations locally and issues a
4//! single atomic `execute_batch` RPC on `commit()`.  `rollback()` discards
5//! the buffer without any network call.
6//!
7//! ## Cache note
8//!
9//! `AmateRSClient::execute_batch` does not invalidate the query cache (consistent
10//! with the rest of `execute_batch` usage in this crate).  If the client has a
11//! cache enabled, committing a transaction may leave stale entries for the keys
12//! that were written.  Callers that require strict read-your-writes guarantees
13//! should either disable the cache or call `client.cache().map(|c| c.invalidate(...))`
14//! manually after commit.
15
16use crate::client::AmateRSClient;
17use crate::error::{Result, SdkError};
18use amaters_core::{CipherBlob, Key, Query};
19use std::sync::Arc;
20
21// ---------------------------------------------------------------------------
22// Internal state / operation types
23// ---------------------------------------------------------------------------
24
25/// Lifecycle state of a [`Transaction`].
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27enum TransactionState {
28    Active,
29    Committed,
30    RolledBack,
31}
32
33/// A single buffered write operation.
34#[derive(Debug, Clone)]
35enum TransactionOp {
36    Set { key: Key, value: CipherBlob },
37    Delete { key: Key },
38}
39
40// ---------------------------------------------------------------------------
41// Public Transaction type
42// ---------------------------------------------------------------------------
43
44/// A buffered, commit-or-rollback transaction over [`AmateRSClient`].
45///
46/// All writes are staged locally in a `Vec<TransactionOp>` until `commit()` is
47/// called.  `commit()` issues a single `execute_batch` RPC so the writes are
48/// applied atomically.  `rollback()` discards the local buffer with no network
49/// call.
50///
51/// ## Reading inside a transaction
52///
53/// [`Transaction::get`] first inspects the local buffer using last-write-wins
54/// semantics (reverse scan).  A buffered `Delete` for the queried key returns
55/// `Ok(None)`.  If the key has not been written in this transaction the call
56/// falls through to the server.
57///
58/// ## Drop behaviour
59///
60/// Dropping a transaction that is still `Active` and has un-committed
61/// operations emits a `tracing::warn!` message.  The buffer is silently
62/// discarded (no rollback RPC is issued — rollback is always local).
63///
64/// ## Construction
65///
66/// Prefer the factory method [`AmateRSClient::transaction`] over constructing
67/// directly.
68pub struct Transaction {
69    collection: String,
70    ops: Vec<TransactionOp>,
71    client: Arc<AmateRSClient>,
72    state: TransactionState,
73}
74
75impl Transaction {
76    /// Create a new transaction bound to `collection`.
77    ///
78    /// Use [`AmateRSClient::transaction`] instead of calling this directly.
79    pub fn new(client: Arc<AmateRSClient>, collection: impl Into<String>) -> Self {
80        Self {
81            collection: collection.into(),
82            ops: Vec::new(),
83            client,
84            state: TransactionState::Active,
85        }
86    }
87
88    // -----------------------------------------------------------------------
89    // Write staging
90    // -----------------------------------------------------------------------
91
92    /// Stage a SET operation into the local buffer.
93    ///
94    /// The write is not applied to the server until [`Self::commit`] is called.
95    ///
96    /// # Errors
97    ///
98    /// Returns [`SdkError::InvalidState`] if the transaction is no longer
99    /// active (already committed or rolled back).
100    pub fn set(&mut self, key: Key, value: CipherBlob) -> Result<()> {
101        self.ensure_active()?;
102        self.ops.push(TransactionOp::Set { key, value });
103        Ok(())
104    }
105
106    /// Stage a DELETE operation into the local buffer.
107    ///
108    /// The delete is not applied to the server until [`Self::commit`] is called.
109    ///
110    /// # Errors
111    ///
112    /// Returns [`SdkError::InvalidState`] if the transaction is no longer active.
113    pub fn delete(&mut self, key: Key) -> Result<()> {
114        self.ensure_active()?;
115        self.ops.push(TransactionOp::Delete { key });
116        Ok(())
117    }
118
119    // -----------------------------------------------------------------------
120    // Read (local buffer + server fallthrough)
121    // -----------------------------------------------------------------------
122
123    /// Read a key, consulting the local buffer first (last-write-wins), then
124    /// the server.
125    ///
126    /// * A buffered `SET` returns the in-flight value without a server round-trip.
127    /// * A buffered `DELETE` returns `Ok(None)` without a server round-trip.
128    /// * If the key has not been touched in this transaction, the call falls
129    ///   through to `client.get()`.
130    ///
131    /// # Errors
132    ///
133    /// Returns [`SdkError::InvalidState`] if the transaction is no longer active,
134    /// or any error returned by the server fall-through.
135    pub async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
136        self.ensure_active()?;
137
138        // Walk ops in reverse for the most recent write to this key.
139        for op in self.ops.iter().rev() {
140            match op {
141                TransactionOp::Set { key: k, value: v } if k == key => {
142                    return Ok(Some(v.clone()));
143                }
144                TransactionOp::Delete { key: k } if k == key => {
145                    return Ok(None);
146                }
147                _ => {}
148            }
149        }
150
151        // Fall through to the server.
152        self.client.get(&self.collection, key).await
153    }
154
155    // -----------------------------------------------------------------------
156    // Commit / rollback
157    // -----------------------------------------------------------------------
158
159    /// Commit all buffered operations atomically via `execute_batch`.
160    ///
161    /// On success the transaction transitions to the `Committed` state.
162    /// If the batch RPC fails, the state remains `Active` so the caller can
163    /// retry or roll back.
164    ///
165    /// An empty transaction commits instantly without a network round-trip.
166    ///
167    /// # Errors
168    ///
169    /// * [`SdkError::InvalidState`] — already committed or rolled back.
170    /// * Any `SdkError` returned by the underlying `execute_batch` RPC.
171    pub async fn commit(&mut self) -> Result<()> {
172        self.ensure_active()?;
173
174        if !self.ops.is_empty() {
175            let queries: Vec<Query> = self
176                .ops
177                .drain(..)
178                .map(|op| match op {
179                    TransactionOp::Set { key, value } => Query::Set {
180                        collection: self.collection.clone(),
181                        key,
182                        value,
183                    },
184                    TransactionOp::Delete { key } => Query::Delete {
185                        collection: self.collection.clone(),
186                        key,
187                    },
188                })
189                .collect();
190
191            self.client.execute_batch(queries).await?;
192        }
193
194        self.state = TransactionState::Committed;
195        Ok(())
196    }
197
198    /// Rollback by discarding the local buffer — no network call is made.
199    ///
200    /// # Errors
201    ///
202    /// Returns [`SdkError::InvalidState`] if the transaction is already
203    /// committed or rolled back.
204    pub fn rollback(&mut self) -> Result<()> {
205        self.ensure_active()?;
206        self.ops.clear();
207        self.state = TransactionState::RolledBack;
208        Ok(())
209    }
210
211    // -----------------------------------------------------------------------
212    // Introspection
213    // -----------------------------------------------------------------------
214
215    /// Number of operations currently staged in the local buffer.
216    pub fn pending_ops(&self) -> usize {
217        self.ops.len()
218    }
219
220    /// Returns `true` if the transaction is still active (not yet committed or
221    /// rolled back).
222    pub fn is_active(&self) -> bool {
223        self.state == TransactionState::Active
224    }
225
226    /// Returns the collection name this transaction is bound to.
227    pub fn collection(&self) -> &str {
228        &self.collection
229    }
230
231    // -----------------------------------------------------------------------
232    // Private helpers
233    // -----------------------------------------------------------------------
234
235    fn ensure_active(&self) -> Result<()> {
236        if self.state != TransactionState::Active {
237            Err(SdkError::InvalidState(
238                "transaction already committed or rolled back".to_string(),
239            ))
240        } else {
241            Ok(())
242        }
243    }
244}
245
246impl Drop for Transaction {
247    fn drop(&mut self) {
248        if self.state == TransactionState::Active && !self.ops.is_empty() {
249            tracing::warn!(
250                pending_ops = self.ops.len(),
251                collection = %self.collection,
252                "Transaction dropped with uncommitted operation(s) — changes discarded",
253            );
254        }
255    }
256}
257
258// ---------------------------------------------------------------------------
259// Tests
260// ---------------------------------------------------------------------------
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use crate::config::ClientConfig;
266    use amaters_core::{CipherBlob, Key};
267
268    /// Helper: build an offline client wrapped in Arc so Transaction::new can
269    /// be called without a live server.
270    fn offline_client() -> Arc<AmateRSClient> {
271        let config = ClientConfig::new("http://127.0.0.1:50051");
272        Arc::new(AmateRSClient::new_offline(config))
273    }
274
275    // -----------------------------------------------------------------------
276    // State-machine tests (no server required)
277    // -----------------------------------------------------------------------
278
279    #[test]
280    fn test_transaction_rollback_clears_buffer() {
281        let client = offline_client();
282        let mut tx = Transaction::new(client, "users");
283
284        let key = Key::from_str("k1");
285        let val = CipherBlob::new(vec![1, 2, 3]);
286        tx.set(key, val).expect("set should succeed on active tx");
287        assert_eq!(tx.pending_ops(), 1);
288
289        tx.rollback().expect("rollback should succeed on active tx");
290        assert_eq!(tx.pending_ops(), 0);
291        assert!(!tx.is_active());
292    }
293
294    #[test]
295    fn test_transaction_double_commit_returns_error() {
296        // An empty transaction commits without a network call, so we can test
297        // the state-machine offline.
298        let client = offline_client();
299        let mut tx = Transaction::new(client, "users");
300
301        // First commit: no ops → fast path, no RPC.
302        let rt = tokio::runtime::Builder::new_current_thread()
303            .enable_all()
304            .build()
305            .expect("failed to build runtime");
306
307        rt.block_on(async {
308            tx.commit().await.expect("first commit should succeed");
309
310            let err = tx
311                .commit()
312                .await
313                .expect_err("second commit should return Err");
314            assert!(
315                matches!(err, SdkError::InvalidState(_)),
316                "expected InvalidState, got: {err}"
317            );
318        });
319    }
320
321    #[test]
322    fn test_transaction_commit_then_rollback_is_error() {
323        let client = offline_client();
324        let mut tx = Transaction::new(client, "users");
325
326        let rt = tokio::runtime::Builder::new_current_thread()
327            .enable_all()
328            .build()
329            .expect("failed to build runtime");
330
331        rt.block_on(async {
332            tx.commit().await.expect("commit should succeed");
333
334            let err = tx
335                .rollback()
336                .expect_err("rollback after commit should return Err");
337            assert!(
338                matches!(err, SdkError::InvalidState(_)),
339                "expected InvalidState, got: {err}"
340            );
341        });
342    }
343
344    #[test]
345    fn test_transaction_rollback_then_commit_is_error() {
346        let client = offline_client();
347        let mut tx = Transaction::new(client, "users");
348
349        tx.rollback().expect("rollback should succeed on active tx");
350
351        let rt = tokio::runtime::Builder::new_current_thread()
352            .enable_all()
353            .build()
354            .expect("failed to build runtime");
355
356        rt.block_on(async {
357            let err = tx
358                .commit()
359                .await
360                .expect_err("commit after rollback should return Err");
361            assert!(
362                matches!(err, SdkError::InvalidState(_)),
363                "expected InvalidState, got: {err}"
364            );
365        });
366    }
367
368    // -----------------------------------------------------------------------
369    // Local-buffer read tests (no server required)
370    // -----------------------------------------------------------------------
371
372    #[tokio::test]
373    async fn test_transaction_read_sees_local_set() {
374        let client = offline_client();
375        let mut tx = Transaction::new(client, "users");
376
377        let key = Key::from_str("local_key");
378        let val = CipherBlob::new(vec![10, 20, 30]);
379        tx.set(key.clone(), val.clone())
380            .expect("set should succeed");
381
382        let result = tx.get(&key).await.expect("get should succeed (local hit)");
383        assert_eq!(
384            result.as_ref().map(|b| b.to_vec()),
385            Some(val.to_vec()),
386            "get should return the locally staged value"
387        );
388    }
389
390    #[tokio::test]
391    async fn test_transaction_read_sees_local_delete_as_none() {
392        let client = offline_client();
393        let mut tx = Transaction::new(client, "users");
394
395        let key = Key::from_str("will_delete");
396        let val = CipherBlob::new(vec![1]);
397        tx.set(key.clone(), val).expect("set should succeed");
398        tx.delete(key.clone()).expect("delete should succeed");
399
400        let result = tx
401            .get(&key)
402            .await
403            .expect("get should succeed (local delete hit)");
404        assert!(
405            result.is_none(),
406            "locally deleted key should appear as None"
407        );
408    }
409
410    #[tokio::test]
411    async fn test_transaction_read_last_write_wins() {
412        let client = offline_client();
413        let mut tx = Transaction::new(client, "users");
414
415        let key = Key::from_str("overwritten");
416        let v1 = CipherBlob::new(vec![1]);
417        let v2 = CipherBlob::new(vec![2]);
418        tx.set(key.clone(), v1).expect("first set");
419        tx.set(key.clone(), v2.clone()).expect("second set");
420
421        let result = tx.get(&key).await.expect("get should succeed");
422        assert_eq!(
423            result.as_ref().map(|b| b.to_vec()),
424            Some(v2.to_vec()),
425            "last write should win"
426        );
427    }
428
429    // -----------------------------------------------------------------------
430    // Drop / tracing tests
431    // -----------------------------------------------------------------------
432
433    #[test]
434    fn test_transaction_empty_drop_no_warn() {
435        // An empty transaction dropped while Active should NOT emit a warning.
436        // We cannot assert "no log was emitted" without tracing-test, but we
437        // can at least verify the Drop impl path is exercised without panic.
438        let client = offline_client();
439        let tx = Transaction::new(client, "noop");
440        drop(tx); // should not panic or warn
441    }
442
443    #[tracing_test::traced_test]
444    #[test]
445    fn test_transaction_drop_warns_uncommitted() {
446        let client = offline_client();
447        let mut tx = Transaction::new(client, "events");
448
449        let key = Key::from_str("pending");
450        let val = CipherBlob::new(vec![0xFF]);
451        tx.set(key, val).expect("set should succeed");
452
453        // Drop without commit/rollback — should trigger the warn! in Drop.
454        drop(tx);
455
456        assert!(
457            logs_contain("Transaction dropped with uncommitted operation(s)"),
458            "expected a tracing warn about uncommitted ops"
459        );
460    }
461}