amaters_sdk_rust/transaction.rs
1//! Transaction support for AmateRS SDK.
2//!
3//! A `Transaction` buffers SET and DELETE operations locally and issues a
4//! single atomic `execute_batch` RPC on `commit()`. `rollback()` discards
5//! the buffer without any network call.
6//!
7//! ## Cache note
8//!
9//! `AmateRSClient::execute_batch` does not invalidate the query cache (consistent
10//! with the rest of `execute_batch` usage in this crate). If the client has a
11//! cache enabled, committing a transaction may leave stale entries for the keys
12//! that were written. Callers that require strict read-your-writes guarantees
13//! should either disable the cache or call `client.cache().map(|c| c.invalidate(...))`
14//! manually after commit.
15
16use crate::client::AmateRSClient;
17use crate::error::{Result, SdkError};
18use amaters_core::{CipherBlob, Key, Query};
19use std::sync::Arc;
20
21// ---------------------------------------------------------------------------
22// Internal state / operation types
23// ---------------------------------------------------------------------------
24
25/// Lifecycle state of a [`Transaction`].
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27enum TransactionState {
28 Active,
29 Committed,
30 RolledBack,
31}
32
33/// A single buffered write operation.
34#[derive(Debug, Clone)]
35enum TransactionOp {
36 Set { key: Key, value: CipherBlob },
37 Delete { key: Key },
38}
39
40// ---------------------------------------------------------------------------
41// Public Transaction type
42// ---------------------------------------------------------------------------
43
44/// A buffered, commit-or-rollback transaction over [`AmateRSClient`].
45///
46/// All writes are staged locally in a `Vec<TransactionOp>` until `commit()` is
47/// called. `commit()` issues a single `execute_batch` RPC so the writes are
48/// applied atomically. `rollback()` discards the local buffer with no network
49/// call.
50///
51/// ## Reading inside a transaction
52///
53/// [`Transaction::get`] first inspects the local buffer using last-write-wins
54/// semantics (reverse scan). A buffered `Delete` for the queried key returns
55/// `Ok(None)`. If the key has not been written in this transaction the call
56/// falls through to the server.
57///
58/// ## Drop behaviour
59///
60/// Dropping a transaction that is still `Active` and has un-committed
61/// operations emits a `tracing::warn!` message. The buffer is silently
62/// discarded (no rollback RPC is issued — rollback is always local).
63///
64/// ## Construction
65///
66/// Prefer the factory method [`AmateRSClient::transaction`] over constructing
67/// directly.
68pub struct Transaction {
69 collection: String,
70 ops: Vec<TransactionOp>,
71 client: Arc<AmateRSClient>,
72 state: TransactionState,
73}
74
75impl Transaction {
76 /// Create a new transaction bound to `collection`.
77 ///
78 /// Use [`AmateRSClient::transaction`] instead of calling this directly.
79 pub fn new(client: Arc<AmateRSClient>, collection: impl Into<String>) -> Self {
80 Self {
81 collection: collection.into(),
82 ops: Vec::new(),
83 client,
84 state: TransactionState::Active,
85 }
86 }
87
88 // -----------------------------------------------------------------------
89 // Write staging
90 // -----------------------------------------------------------------------
91
92 /// Stage a SET operation into the local buffer.
93 ///
94 /// The write is not applied to the server until [`Self::commit`] is called.
95 ///
96 /// # Errors
97 ///
98 /// Returns [`SdkError::InvalidState`] if the transaction is no longer
99 /// active (already committed or rolled back).
100 pub fn set(&mut self, key: Key, value: CipherBlob) -> Result<()> {
101 self.ensure_active()?;
102 self.ops.push(TransactionOp::Set { key, value });
103 Ok(())
104 }
105
106 /// Stage a DELETE operation into the local buffer.
107 ///
108 /// The delete is not applied to the server until [`Self::commit`] is called.
109 ///
110 /// # Errors
111 ///
112 /// Returns [`SdkError::InvalidState`] if the transaction is no longer active.
113 pub fn delete(&mut self, key: Key) -> Result<()> {
114 self.ensure_active()?;
115 self.ops.push(TransactionOp::Delete { key });
116 Ok(())
117 }
118
119 // -----------------------------------------------------------------------
120 // Read (local buffer + server fallthrough)
121 // -----------------------------------------------------------------------
122
123 /// Read a key, consulting the local buffer first (last-write-wins), then
124 /// the server.
125 ///
126 /// * A buffered `SET` returns the in-flight value without a server round-trip.
127 /// * A buffered `DELETE` returns `Ok(None)` without a server round-trip.
128 /// * If the key has not been touched in this transaction, the call falls
129 /// through to `client.get()`.
130 ///
131 /// # Errors
132 ///
133 /// Returns [`SdkError::InvalidState`] if the transaction is no longer active,
134 /// or any error returned by the server fall-through.
135 pub async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
136 self.ensure_active()?;
137
138 // Walk ops in reverse for the most recent write to this key.
139 for op in self.ops.iter().rev() {
140 match op {
141 TransactionOp::Set { key: k, value: v } if k == key => {
142 return Ok(Some(v.clone()));
143 }
144 TransactionOp::Delete { key: k } if k == key => {
145 return Ok(None);
146 }
147 _ => {}
148 }
149 }
150
151 // Fall through to the server.
152 self.client.get(&self.collection, key).await
153 }
154
155 // -----------------------------------------------------------------------
156 // Commit / rollback
157 // -----------------------------------------------------------------------
158
159 /// Commit all buffered operations atomically via `execute_batch`.
160 ///
161 /// On success the transaction transitions to the `Committed` state.
162 /// If the batch RPC fails, the state remains `Active` so the caller can
163 /// retry or roll back.
164 ///
165 /// An empty transaction commits instantly without a network round-trip.
166 ///
167 /// # Errors
168 ///
169 /// * [`SdkError::InvalidState`] — already committed or rolled back.
170 /// * Any `SdkError` returned by the underlying `execute_batch` RPC.
171 pub async fn commit(&mut self) -> Result<()> {
172 self.ensure_active()?;
173
174 if !self.ops.is_empty() {
175 let queries: Vec<Query> = self
176 .ops
177 .drain(..)
178 .map(|op| match op {
179 TransactionOp::Set { key, value } => Query::Set {
180 collection: self.collection.clone(),
181 key,
182 value,
183 },
184 TransactionOp::Delete { key } => Query::Delete {
185 collection: self.collection.clone(),
186 key,
187 },
188 })
189 .collect();
190
191 self.client.execute_batch(queries).await?;
192 }
193
194 self.state = TransactionState::Committed;
195 Ok(())
196 }
197
198 /// Rollback by discarding the local buffer — no network call is made.
199 ///
200 /// # Errors
201 ///
202 /// Returns [`SdkError::InvalidState`] if the transaction is already
203 /// committed or rolled back.
204 pub fn rollback(&mut self) -> Result<()> {
205 self.ensure_active()?;
206 self.ops.clear();
207 self.state = TransactionState::RolledBack;
208 Ok(())
209 }
210
211 // -----------------------------------------------------------------------
212 // Introspection
213 // -----------------------------------------------------------------------
214
215 /// Number of operations currently staged in the local buffer.
216 pub fn pending_ops(&self) -> usize {
217 self.ops.len()
218 }
219
220 /// Returns `true` if the transaction is still active (not yet committed or
221 /// rolled back).
222 pub fn is_active(&self) -> bool {
223 self.state == TransactionState::Active
224 }
225
226 /// Returns the collection name this transaction is bound to.
227 pub fn collection(&self) -> &str {
228 &self.collection
229 }
230
231 // -----------------------------------------------------------------------
232 // Private helpers
233 // -----------------------------------------------------------------------
234
235 fn ensure_active(&self) -> Result<()> {
236 if self.state != TransactionState::Active {
237 Err(SdkError::InvalidState(
238 "transaction already committed or rolled back".to_string(),
239 ))
240 } else {
241 Ok(())
242 }
243 }
244}
245
246impl Drop for Transaction {
247 fn drop(&mut self) {
248 if self.state == TransactionState::Active && !self.ops.is_empty() {
249 tracing::warn!(
250 pending_ops = self.ops.len(),
251 collection = %self.collection,
252 "Transaction dropped with uncommitted operation(s) — changes discarded",
253 );
254 }
255 }
256}
257
258// ---------------------------------------------------------------------------
259// Tests
260// ---------------------------------------------------------------------------
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use crate::config::ClientConfig;
266 use amaters_core::{CipherBlob, Key};
267
268 /// Helper: build an offline client wrapped in Arc so Transaction::new can
269 /// be called without a live server.
270 fn offline_client() -> Arc<AmateRSClient> {
271 let config = ClientConfig::new("http://127.0.0.1:50051");
272 Arc::new(AmateRSClient::new_offline(config))
273 }
274
275 // -----------------------------------------------------------------------
276 // State-machine tests (no server required)
277 // -----------------------------------------------------------------------
278
279 #[test]
280 fn test_transaction_rollback_clears_buffer() {
281 let client = offline_client();
282 let mut tx = Transaction::new(client, "users");
283
284 let key = Key::from_str("k1");
285 let val = CipherBlob::new(vec![1, 2, 3]);
286 tx.set(key, val).expect("set should succeed on active tx");
287 assert_eq!(tx.pending_ops(), 1);
288
289 tx.rollback().expect("rollback should succeed on active tx");
290 assert_eq!(tx.pending_ops(), 0);
291 assert!(!tx.is_active());
292 }
293
294 #[test]
295 fn test_transaction_double_commit_returns_error() {
296 // An empty transaction commits without a network call, so we can test
297 // the state-machine offline.
298 let client = offline_client();
299 let mut tx = Transaction::new(client, "users");
300
301 // First commit: no ops → fast path, no RPC.
302 let rt = tokio::runtime::Builder::new_current_thread()
303 .enable_all()
304 .build()
305 .expect("failed to build runtime");
306
307 rt.block_on(async {
308 tx.commit().await.expect("first commit should succeed");
309
310 let err = tx
311 .commit()
312 .await
313 .expect_err("second commit should return Err");
314 assert!(
315 matches!(err, SdkError::InvalidState(_)),
316 "expected InvalidState, got: {err}"
317 );
318 });
319 }
320
321 #[test]
322 fn test_transaction_commit_then_rollback_is_error() {
323 let client = offline_client();
324 let mut tx = Transaction::new(client, "users");
325
326 let rt = tokio::runtime::Builder::new_current_thread()
327 .enable_all()
328 .build()
329 .expect("failed to build runtime");
330
331 rt.block_on(async {
332 tx.commit().await.expect("commit should succeed");
333
334 let err = tx
335 .rollback()
336 .expect_err("rollback after commit should return Err");
337 assert!(
338 matches!(err, SdkError::InvalidState(_)),
339 "expected InvalidState, got: {err}"
340 );
341 });
342 }
343
344 #[test]
345 fn test_transaction_rollback_then_commit_is_error() {
346 let client = offline_client();
347 let mut tx = Transaction::new(client, "users");
348
349 tx.rollback().expect("rollback should succeed on active tx");
350
351 let rt = tokio::runtime::Builder::new_current_thread()
352 .enable_all()
353 .build()
354 .expect("failed to build runtime");
355
356 rt.block_on(async {
357 let err = tx
358 .commit()
359 .await
360 .expect_err("commit after rollback should return Err");
361 assert!(
362 matches!(err, SdkError::InvalidState(_)),
363 "expected InvalidState, got: {err}"
364 );
365 });
366 }
367
368 // -----------------------------------------------------------------------
369 // Local-buffer read tests (no server required)
370 // -----------------------------------------------------------------------
371
372 #[tokio::test]
373 async fn test_transaction_read_sees_local_set() {
374 let client = offline_client();
375 let mut tx = Transaction::new(client, "users");
376
377 let key = Key::from_str("local_key");
378 let val = CipherBlob::new(vec![10, 20, 30]);
379 tx.set(key.clone(), val.clone())
380 .expect("set should succeed");
381
382 let result = tx.get(&key).await.expect("get should succeed (local hit)");
383 assert_eq!(
384 result.as_ref().map(|b| b.to_vec()),
385 Some(val.to_vec()),
386 "get should return the locally staged value"
387 );
388 }
389
390 #[tokio::test]
391 async fn test_transaction_read_sees_local_delete_as_none() {
392 let client = offline_client();
393 let mut tx = Transaction::new(client, "users");
394
395 let key = Key::from_str("will_delete");
396 let val = CipherBlob::new(vec![1]);
397 tx.set(key.clone(), val).expect("set should succeed");
398 tx.delete(key.clone()).expect("delete should succeed");
399
400 let result = tx
401 .get(&key)
402 .await
403 .expect("get should succeed (local delete hit)");
404 assert!(
405 result.is_none(),
406 "locally deleted key should appear as None"
407 );
408 }
409
410 #[tokio::test]
411 async fn test_transaction_read_last_write_wins() {
412 let client = offline_client();
413 let mut tx = Transaction::new(client, "users");
414
415 let key = Key::from_str("overwritten");
416 let v1 = CipherBlob::new(vec![1]);
417 let v2 = CipherBlob::new(vec![2]);
418 tx.set(key.clone(), v1).expect("first set");
419 tx.set(key.clone(), v2.clone()).expect("second set");
420
421 let result = tx.get(&key).await.expect("get should succeed");
422 assert_eq!(
423 result.as_ref().map(|b| b.to_vec()),
424 Some(v2.to_vec()),
425 "last write should win"
426 );
427 }
428
429 // -----------------------------------------------------------------------
430 // Drop / tracing tests
431 // -----------------------------------------------------------------------
432
433 #[test]
434 fn test_transaction_empty_drop_no_warn() {
435 // An empty transaction dropped while Active should NOT emit a warning.
436 // We cannot assert "no log was emitted" without tracing-test, but we
437 // can at least verify the Drop impl path is exercised without panic.
438 let client = offline_client();
439 let tx = Transaction::new(client, "noop");
440 drop(tx); // should not panic or warn
441 }
442
443 #[tracing_test::traced_test]
444 #[test]
445 fn test_transaction_drop_warns_uncommitted() {
446 let client = offline_client();
447 let mut tx = Transaction::new(client, "events");
448
449 let key = Key::from_str("pending");
450 let val = CipherBlob::new(vec![0xFF]);
451 tx.set(key, val).expect("set should succeed");
452
453 // Drop without commit/rollback — should trigger the warn! in Drop.
454 drop(tx);
455
456 assert!(
457 logs_contain("Transaction dropped with uncommitted operation(s)"),
458 "expected a tracing warn about uncommitted ops"
459 );
460 }
461}