kevy_client/transaction.rs
1//! `MULTI` / `EXEC` / `DISCARD` — Redis transactions.
2//!
3//! Wire flow (Remote): client sends `MULTI` → server `+OK`; client sends
4//! each queued command → server `+QUEUED`; client sends `EXEC` → server
5//! returns an array of `N` typed replies, one per queued command.
6//!
7//! Embedded mode rejects [`Connection::multi`] with
8//! `io::ErrorKind::Unsupported`: kevy-embedded has no MULTI dispatcher,
9//! and single-Connection embed access is already sequential (the inner
10//! mutex serialises every op), so the locking guarantee transactions
11//! add doesn't exist as a separate concept. Call methods directly
12//! instead.
13//!
14//! ```no_run
15//! use kevy_client::Connection;
16//!
17//! let mut conn = Connection::open("kevy://localhost:6379")?;
18//! let mut txn = conn.multi()?;
19//! txn.queue(&[b"SET", b"a", b"1"])?;
20//! txn.queue(&[b"INCR", b"counter"])?;
21//! let replies = txn.exec()?;
22//! assert_eq!(replies.len(), 2);
23//! # Ok::<(), std::io::Error>(())
24//! ```
25//!
26//! Each queued command's reply is the raw [`kevy_resp::Reply`] — callers
27//! parse the typed payload themselves. v1.4.0 deliberately keeps the
28//! `queue(&[verb, args ...])` raw shape; typed builders
29//! (`txn.set(k, v)?` → indexed reply on EXEC) are a v1.5.0 candidate.
30
31use std::io;
32
33use kevy_resp::Reply;
34use kevy_resp_client::RespClient;
35
36use crate::{Connection, string, unexpected};
37
38/// One in-flight `MULTI` block over a `Remote` connection.
39///
40/// Drop without calling [`Self::exec`] or [`Self::discard`] sends an
41/// implicit `DISCARD` so the underlying socket isn't left in MULTI mode.
42pub struct Transaction<'a> {
43 client: &'a mut RespClient,
44 /// `false` after `exec`/`discard` consumed the txn — suppresses the
45 /// implicit-DISCARD in Drop.
46 live: bool,
47}
48
49impl std::fmt::Debug for Transaction<'_> {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("Transaction")
52 .field("live", &self.live)
53 .finish_non_exhaustive()
54 }
55}
56
57impl Connection {
58 /// Start a `MULTI` block. Embedded backend returns
59 /// [`io::ErrorKind::Unsupported`].
60 pub fn multi(&mut self) -> io::Result<Transaction<'_>> {
61 match self {
62 Self::Embedded(_) => Err(io::Error::new(
63 io::ErrorKind::Unsupported,
64 "MULTI/EXEC is not implemented for the embedded backend; \
65 call Connection methods directly (each is atomic on its own lock)",
66 )),
67 Self::Remote(client) => match client.request(&[b"MULTI".to_vec()])? {
68 Reply::Simple(s) if s == b"OK" => Ok(Transaction {
69 client,
70 live: true,
71 }),
72 Reply::Error(e) => Err(io::Error::other(string(e))),
73 other => Err(unexpected(other)),
74 },
75 }
76 }
77}
78
79impl<'a> Transaction<'a> {
80 /// Queue one command — verb + args as raw byte slices. The server
81 /// replies `+QUEUED` synchronously; errors propagate as `io::Error`.
82 pub fn queue(&mut self, parts: &[&[u8]]) -> io::Result<()> {
83 if parts.is_empty() {
84 return Err(io::Error::new(
85 io::ErrorKind::InvalidInput,
86 "Transaction::queue needs at least a verb",
87 ));
88 }
89 let argv: Vec<Vec<u8>> = parts.iter().map(|p| p.to_vec()).collect();
90 match self.client.request(&argv)? {
91 Reply::Simple(s) if s == b"QUEUED" => Ok(()),
92 Reply::Error(e) => Err(io::Error::other(string(e))),
93 other => Err(unexpected(other)),
94 }
95 }
96
97 /// `EXEC` — send EXEC, return the per-queued-command reply array.
98 /// Consumes the transaction handle.
99 pub fn exec(mut self) -> io::Result<Vec<Reply>> {
100 self.live = false;
101 match self.client.request(&[b"EXEC".to_vec()])? {
102 Reply::Array(items) => Ok(items),
103 // Redis returns a null bulk if EXEC was aborted (WATCH violation, etc.)
104 // We don't expose WATCH yet, but stay forward-compatible.
105 Reply::Nil => Ok(Vec::new()),
106 Reply::Error(e) => Err(io::Error::other(string(e))),
107 other => Err(unexpected(other)),
108 }
109 }
110
111 /// `DISCARD` — abandon the queued commands. Consumes the handle.
112 pub fn discard(mut self) -> io::Result<()> {
113 self.live = false;
114 match self.client.request(&[b"DISCARD".to_vec()])? {
115 Reply::Simple(s) if s == b"OK" => Ok(()),
116 Reply::Error(e) => Err(io::Error::other(string(e))),
117 other => Err(unexpected(other)),
118 }
119 }
120}
121
122impl Drop for Transaction<'_> {
123 fn drop(&mut self) {
124 // Implicit DISCARD if the caller dropped the handle without
125 // exec/discard. Best-effort: ignore any error since we're in Drop.
126 if self.live {
127 let _ = self.client.request(&[b"DISCARD".to_vec()]);
128 }
129 }
130}