Skip to main content

kevy_client/
transaction.rs

1//! `MULTI` / `EXEC` / `DISCARD` — Redis transactions, with optional
2//! `WATCH`-driven optimistic concurrency (v1.5.0).
3//!
4//! Wire flow (Remote): client sends `MULTI` → server `+OK`; client sends
5//! each queued command → server `+QUEUED`; client sends `EXEC` → server
6//! returns an array of `N` typed replies, one per queued command. When
7//! `WATCH` was issued on the same `Connection` before `MULTI` and any
8//! watched key was modified between `WATCH` and `EXEC`, the server
9//! returns `Nil` (RESP null array) and the transaction aborts.
10//!
11//! Embedded mode rejects [`Connection::multi`] / [`Connection::watch`]
12//! / [`Connection::unwatch`] with `io::ErrorKind::Unsupported`:
13//! kevy-embedded has no MULTI dispatcher, and single-Connection embed
14//! access is already sequential (the inner mutex serialises every op),
15//! so the locking guarantee transactions add doesn't exist as a
16//! separate concept. Call methods directly instead.
17//!
18//! ```no_run
19//! use kevy_client::Connection;
20//!
21//! let mut conn = Connection::open("kevy://localhost:6379")?;
22//! conn.watch(&[b"counter"])?;
23//! let mut txn = conn.multi()?;
24//! txn.incr(b"counter")?
25//!    .set(b"a", b"1")?;
26//! match txn.exec_watched()? {
27//!     Some(replies) => assert_eq!(replies.len(), 2),
28//!     None => { /* watched key changed — retry */ }
29//! }
30//! # Ok::<(), std::io::Error>(())
31//! ```
32//!
33//! Each queued command's reply is the raw [`kevy_resp::Reply`] — callers
34//! parse the typed payload themselves. The reply-side decode (e.g.
35//! `let n: i64 = replies[0].as_int()?`) is a v1.6.0 candidate.
36
37use std::io;
38
39use kevy_resp::Reply;
40use kevy_resp_client::RespClient;
41
42use crate::{Connection, string, unexpected, vec2, vec3};
43
44/// One in-flight `MULTI` block over a `Remote` connection.
45///
46/// Drop without calling [`Self::exec`] / [`Self::exec_watched`] /
47/// [`Self::discard`] sends an implicit `DISCARD` so the underlying
48/// socket isn't left in MULTI mode.
49pub struct Transaction<'a> {
50    client: &'a mut RespClient,
51    /// `false` after `exec`/`exec_watched`/`discard` consumed the txn —
52    /// suppresses the implicit-DISCARD in Drop.
53    live: bool,
54}
55
56impl std::fmt::Debug for Transaction<'_> {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("Transaction")
59            .field("live", &self.live)
60            .finish_non_exhaustive()
61    }
62}
63
64impl Connection {
65    /// Start a `MULTI` block. Embedded backend returns
66    /// [`io::ErrorKind::Unsupported`].
67    pub fn multi(&mut self) -> io::Result<Transaction<'_>> {
68        match self {
69            Self::Embedded(_) => Err(io::Error::new(
70                io::ErrorKind::Unsupported,
71                "MULTI/EXEC is not implemented for the embedded backend; \
72                 call Connection methods directly (each is atomic on its own lock)",
73            )),
74            Self::Remote(client) => match client.request(&[b"MULTI".to_vec()])? {
75                Reply::Simple(s) if s == b"OK" => Ok(Transaction {
76                    client,
77                    live: true,
78                }),
79                Reply::Error(e) => Err(io::Error::other(string(e))),
80                other => Err(unexpected(other)),
81            },
82        }
83    }
84
85    /// `WATCH key [key ...]` — mark keys for optimistic concurrency.
86    /// The next [`multi`](Self::multi) on this connection will abort
87    /// (EXEC returns Nil) if any watched key was modified between
88    /// this call and EXEC. Remote-only.
89    ///
90    /// Per RESP spec, WATCH must be sent **before** MULTI. Repeated
91    /// `watch` calls accumulate — the abort triggers on any of the
92    /// watched keys changing.
93    pub fn watch(&mut self, keys: &[&[u8]]) -> io::Result<()> {
94        if keys.is_empty() {
95            return Err(io::Error::new(
96                io::ErrorKind::InvalidInput,
97                "WATCH needs at least one key",
98            ));
99        }
100        match self {
101            Self::Embedded(_) => Err(io::Error::new(
102                io::ErrorKind::Unsupported,
103                "WATCH is a transaction primitive; embedded backend has no MULTI",
104            )),
105            Self::Remote(c) => {
106                let mut args = Vec::with_capacity(keys.len() + 1);
107                args.push(b"WATCH".to_vec());
108                args.extend(keys.iter().map(|k| k.to_vec()));
109                match c.request(&args)? {
110                    Reply::Simple(s) if s == b"OK" => Ok(()),
111                    Reply::Error(e) => Err(io::Error::other(string(e))),
112                    other => Err(unexpected(other)),
113                }
114            }
115        }
116    }
117
118    /// `UNWATCH` — drop every WATCH set on this connection without
119    /// running a transaction. Remote-only.
120    pub fn unwatch(&mut self) -> io::Result<()> {
121        match self {
122            Self::Embedded(_) => Err(io::Error::new(
123                io::ErrorKind::Unsupported,
124                "UNWATCH is a transaction primitive; embedded backend has no MULTI",
125            )),
126            Self::Remote(c) => match c.request(&[b"UNWATCH".to_vec()])? {
127                Reply::Simple(s) if s == b"OK" => Ok(()),
128                Reply::Error(e) => Err(io::Error::other(string(e))),
129                other => Err(unexpected(other)),
130            },
131        }
132    }
133}
134
135impl<'a> Transaction<'a> {
136    /// Queue one command — verb + args as raw byte slices. The server
137    /// replies `+QUEUED` synchronously; errors propagate as `io::Error`.
138    pub fn queue(&mut self, parts: &[&[u8]]) -> io::Result<()> {
139        if parts.is_empty() {
140            return Err(io::Error::new(
141                io::ErrorKind::InvalidInput,
142                "Transaction::queue needs at least a verb",
143            ));
144        }
145        let argv: Vec<Vec<u8>> = parts.iter().map(|p| p.to_vec()).collect();
146        self.queue_argv(argv)
147    }
148
149    /// `EXEC` — send EXEC, return the per-queued-command reply array.
150    /// Consumes the transaction handle.
151    ///
152    /// When a `WATCH` violation aborts the transaction the server
153    /// returns Nil; this method collapses that into an empty `Vec`
154    /// for backward compatibility with v1.4.x. For new code, prefer
155    /// [`exec_watched`](Self::exec_watched), which distinguishes
156    /// "aborted by WATCH" (returns `None`) from "successful empty
157    /// transaction" (returns `Some(vec![])`).
158    pub fn exec(mut self) -> io::Result<Vec<Reply>> {
159        self.live = false;
160        match self.client.request(&[b"EXEC".to_vec()])? {
161            Reply::Array(items) => Ok(items),
162            Reply::Nil => Ok(Vec::new()),
163            Reply::Error(e) => Err(io::Error::other(string(e))),
164            other => Err(unexpected(other)),
165        }
166    }
167
168    /// Like [`exec`](Self::exec) but returns `None` when a `WATCH`
169    /// violation aborts the transaction (RESP Nil reply to EXEC).
170    /// Use this when you've called [`Connection::watch`] and need to
171    /// distinguish an abort from a successfully-empty queue.
172    pub fn exec_watched(mut self) -> io::Result<Option<Vec<Reply>>> {
173        self.live = false;
174        match self.client.request(&[b"EXEC".to_vec()])? {
175            Reply::Array(items) => Ok(Some(items)),
176            Reply::Nil => Ok(None),
177            Reply::Error(e) => Err(io::Error::other(string(e))),
178            other => Err(unexpected(other)),
179        }
180    }
181
182    /// `DISCARD` — abandon the queued commands. Consumes the handle.
183    pub fn discard(mut self) -> io::Result<()> {
184        self.live = false;
185        match self.client.request(&[b"DISCARD".to_vec()])? {
186            Reply::Simple(s) if s == b"OK" => Ok(()),
187            Reply::Error(e) => Err(io::Error::other(string(e))),
188            other => Err(unexpected(other)),
189        }
190    }
191}
192
193// ─────────────────────────────────────────────────────────────────────────
194// Typed builders (v1.5.0). Each mirrors the same-named Connection method's
195// argument shape; on EXEC the matching index in the returned Vec carries
196// the typed payload (raw `Reply` — typed decode is a v1.6.0 candidate).
197//
198// All builders return `&mut Self` so they can chain:
199//     txn.set(k, v)?.incr(c)?.del(&[k2])?;
200// ─────────────────────────────────────────────────────────────────────────
201
202impl<'a> Transaction<'a> {
203    /// Queue `SET key value`.
204    pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<&mut Self> {
205        self.queue_argv(vec3(b"SET", key, value))?;
206        Ok(self)
207    }
208
209    /// Queue `GET key`.
210    pub fn get(&mut self, key: &[u8]) -> io::Result<&mut Self> {
211        self.queue_argv(vec2(b"GET", key))?;
212        Ok(self)
213    }
214
215    /// Queue `DEL key [key ...]`.
216    pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
217        if keys.is_empty() {
218            return Err(io::Error::new(
219                io::ErrorKind::InvalidInput,
220                "Transaction::del needs at least one key",
221            ));
222        }
223        let mut args = Vec::with_capacity(keys.len() + 1);
224        args.push(b"DEL".to_vec());
225        args.extend(keys.iter().map(|k| k.to_vec()));
226        self.queue_argv(args)?;
227        Ok(self)
228    }
229
230    /// Queue `EXISTS key [key ...]`.
231    pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
232        if keys.is_empty() {
233            return Err(io::Error::new(
234                io::ErrorKind::InvalidInput,
235                "Transaction::exists needs at least one key",
236            ));
237        }
238        let mut args = Vec::with_capacity(keys.len() + 1);
239        args.push(b"EXISTS".to_vec());
240        args.extend(keys.iter().map(|k| k.to_vec()));
241        self.queue_argv(args)?;
242        Ok(self)
243    }
244
245    /// Queue `INCR key`.
246    pub fn incr(&mut self, key: &[u8]) -> io::Result<&mut Self> {
247        self.queue_argv(vec2(b"INCR", key))?;
248        Ok(self)
249    }
250
251    /// Queue `INCRBY key delta`.
252    pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<&mut Self> {
253        let args = vec![
254            b"INCRBY".to_vec(),
255            key.to_vec(),
256            delta.to_string().into_bytes(),
257        ];
258        self.queue_argv(args)?;
259        Ok(self)
260    }
261
262    /// Queue `MGET key [key ...]`.
263    pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
264        if keys.is_empty() {
265            return Err(io::Error::new(
266                io::ErrorKind::InvalidInput,
267                "Transaction::mget needs at least one key",
268            ));
269        }
270        let mut args = Vec::with_capacity(keys.len() + 1);
271        args.push(b"MGET".to_vec());
272        args.extend(keys.iter().map(|k| k.to_vec()));
273        self.queue_argv(args)?;
274        Ok(self)
275    }
276
277    /// Queue `MSET key value [key value ...]`.
278    pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<&mut Self> {
279        if pairs.is_empty() {
280            return Err(io::Error::new(
281                io::ErrorKind::InvalidInput,
282                "Transaction::mset needs at least one (key, value) pair",
283            ));
284        }
285        let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
286        args.push(b"MSET".to_vec());
287        for (k, v) in pairs {
288            args.push(k.to_vec());
289            args.push(v.to_vec());
290        }
291        self.queue_argv(args)?;
292        Ok(self)
293    }
294
295    /// Send one already-materialised argv and parse the `+QUEUED` ack.
296    /// Shared back-end for `queue` + every typed builder.
297    fn queue_argv(&mut self, argv: Vec<Vec<u8>>) -> io::Result<()> {
298        match self.client.request(&argv)? {
299            Reply::Simple(s) if s == b"QUEUED" => Ok(()),
300            Reply::Error(e) => Err(io::Error::other(string(e))),
301            other => Err(unexpected(other)),
302        }
303    }
304}
305
306impl Drop for Transaction<'_> {
307    fn drop(&mut self) {
308        // Implicit DISCARD if the caller dropped the handle without
309        // exec/exec_watched/discard. Best-effort: ignore any error
310        // since we're in Drop.
311        if self.live {
312            let _ = self.client.request(&[b"DISCARD".to_vec()]);
313        }
314    }
315}