Skip to main content

kevy_client/
transaction.rs

1//! `MULTI` / `EXEC` / `DISCARD` — Redis transactions.
2//!
3//! Wire flow (Remote): client sends `MULTI` → server `+OK`; client sends
4//! each queued command → server `+QUEUED`; client sends `EXEC` → server
5//! returns an array of `N` typed replies, one per queued command.
6//!
7//! Embedded mode rejects [`Connection::multi`] with
8//! `io::ErrorKind::Unsupported`: kevy-embedded has no MULTI dispatcher,
9//! and single-Connection embed access is already sequential (the inner
10//! mutex serialises every op), so the locking guarantee transactions
11//! add doesn't exist as a separate concept. Call methods directly
12//! instead.
13//!
14//! ```no_run
15//! use kevy_client::Connection;
16//!
17//! let mut conn = Connection::open("kevy://localhost:6379")?;
18//! let mut txn = conn.multi()?;
19//! txn.queue(&[b"SET", b"a", b"1"])?;
20//! txn.queue(&[b"INCR", b"counter"])?;
21//! let replies = txn.exec()?;
22//! assert_eq!(replies.len(), 2);
23//! # Ok::<(), std::io::Error>(())
24//! ```
25//!
26//! Each queued command's reply is the raw [`kevy_resp::Reply`] — callers
27//! parse the typed payload themselves. v1.4.0 deliberately keeps the
28//! `queue(&[verb, args ...])` raw shape; typed builders
29//! (`txn.set(k, v)?` → indexed reply on EXEC) are a v1.5.0 candidate.
30
31use std::io;
32
33use kevy_resp::Reply;
34use kevy_resp_client::RespClient;
35
36use crate::{Connection, string, unexpected};
37
38/// One in-flight `MULTI` block over a `Remote` connection.
39///
40/// Drop without calling [`Self::exec`] or [`Self::discard`] sends an
41/// implicit `DISCARD` so the underlying socket isn't left in MULTI mode.
42pub struct Transaction<'a> {
43    client: &'a mut RespClient,
44    /// `false` after `exec`/`discard` consumed the txn — suppresses the
45    /// implicit-DISCARD in Drop.
46    live: bool,
47}
48
49impl std::fmt::Debug for Transaction<'_> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("Transaction")
52            .field("live", &self.live)
53            .finish_non_exhaustive()
54    }
55}
56
57impl Connection {
58    /// Start a `MULTI` block. Embedded backend returns
59    /// [`io::ErrorKind::Unsupported`].
60    pub fn multi(&mut self) -> io::Result<Transaction<'_>> {
61        match self {
62            Self::Embedded(_) => Err(io::Error::new(
63                io::ErrorKind::Unsupported,
64                "MULTI/EXEC is not implemented for the embedded backend; \
65                 call Connection methods directly (each is atomic on its own lock)",
66            )),
67            Self::Remote(client) => match client.request(&[b"MULTI".to_vec()])? {
68                Reply::Simple(s) if s == b"OK" => Ok(Transaction {
69                    client,
70                    live: true,
71                }),
72                Reply::Error(e) => Err(io::Error::other(string(e))),
73                other => Err(unexpected(other)),
74            },
75        }
76    }
77}
78
79impl<'a> Transaction<'a> {
80    /// Queue one command — verb + args as raw byte slices. The server
81    /// replies `+QUEUED` synchronously; errors propagate as `io::Error`.
82    pub fn queue(&mut self, parts: &[&[u8]]) -> io::Result<()> {
83        if parts.is_empty() {
84            return Err(io::Error::new(
85                io::ErrorKind::InvalidInput,
86                "Transaction::queue needs at least a verb",
87            ));
88        }
89        let argv: Vec<Vec<u8>> = parts.iter().map(|p| p.to_vec()).collect();
90        match self.client.request(&argv)? {
91            Reply::Simple(s) if s == b"QUEUED" => Ok(()),
92            Reply::Error(e) => Err(io::Error::other(string(e))),
93            other => Err(unexpected(other)),
94        }
95    }
96
97    /// `EXEC` — send EXEC, return the per-queued-command reply array.
98    /// Consumes the transaction handle.
99    pub fn exec(mut self) -> io::Result<Vec<Reply>> {
100        self.live = false;
101        match self.client.request(&[b"EXEC".to_vec()])? {
102            Reply::Array(items) => Ok(items),
103            // Redis returns a null bulk if EXEC was aborted (WATCH violation, etc.)
104            // We don't expose WATCH yet, but stay forward-compatible.
105            Reply::Nil => Ok(Vec::new()),
106            Reply::Error(e) => Err(io::Error::other(string(e))),
107            other => Err(unexpected(other)),
108        }
109    }
110
111    /// `DISCARD` — abandon the queued commands. Consumes the handle.
112    pub fn discard(mut self) -> io::Result<()> {
113        self.live = false;
114        match self.client.request(&[b"DISCARD".to_vec()])? {
115            Reply::Simple(s) if s == b"OK" => Ok(()),
116            Reply::Error(e) => Err(io::Error::other(string(e))),
117            other => Err(unexpected(other)),
118        }
119    }
120}
121
122impl Drop for Transaction<'_> {
123    fn drop(&mut self) {
124        // Implicit DISCARD if the caller dropped the handle without
125        // exec/discard. Best-effort: ignore any error since we're in Drop.
126        if self.live {
127            let _ = self.client.request(&[b"DISCARD".to_vec()]);
128        }
129    }
130}