Skip to main content

kevy_client/
transaction.rs

1//! `MULTI` / `EXEC` / `DISCARD` — Redis transactions, with optional
2//! `WATCH`-driven optimistic concurrency (v1.5.0).
3//!
4//! Wire flow (Remote): client sends `MULTI` → server `+OK`; client sends
5//! each queued command → server `+QUEUED`; client sends `EXEC` → server
6//! returns an array of `N` typed replies, one per queued command. When
7//! `WATCH` was issued on the same `Connection` before `MULTI` and any
8//! watched key was modified between `WATCH` and `EXEC`, the server
9//! returns `Nil` (RESP null array) and the transaction aborts.
10//!
11//! Embedded mode rejects [`Connection::multi`] / [`Connection::watch`]
12//! / [`Connection::unwatch`] with `io::ErrorKind::Unsupported`:
13//! kevy-embedded has no MULTI dispatcher, and single-Connection embed
14//! access is already sequential (the inner mutex serialises every op),
15//! so the locking guarantee transactions add doesn't exist as a
16//! separate concept. Call methods directly instead.
17//!
18//! ```no_run
19//! use kevy_client::Connection;
20//!
21//! let mut conn = Connection::open("kevy://localhost:6379")?;
22//! conn.watch(&[b"counter"])?;
23//! let mut txn = conn.multi()?;
24//! txn.incr(b"counter")?
25//!    .set(b"a", b"1")?;
26//! match txn.exec_watched()? {
27//!     Some(replies) => assert_eq!(replies.len(), 2),
28//!     None => { /* watched key changed — retry */ }
29//! }
30//! # Ok::<(), std::io::Error>(())
31//! ```
32//!
33//! Each queued command's reply is the raw [`kevy_resp::Reply`] — callers
34//! parse the typed payload themselves. The reply-side decode (e.g.
35//! `let n: i64 = replies[0].as_int()?`) is a v1.6.0 candidate.
36
37use std::io;
38
39use kevy_resp::Reply;
40use kevy_resp_client::RespClient;
41
42use crate::{Connection, string, unexpected, vec2, vec3};
43
44/// One in-flight `MULTI` block over a `Remote` connection.
45///
46/// Drop without calling [`Self::exec`] / [`Self::exec_watched`] /
47/// [`Self::discard`] sends an implicit `DISCARD` so the underlying
48/// socket isn't left in MULTI mode.
49pub struct Transaction<'a> {
50    client: &'a mut RespClient,
51    /// `false` after `exec`/`exec_watched`/`discard` consumed the txn —
52    /// suppresses the implicit-DISCARD in Drop.
53    live: bool,
54}
55
56impl std::fmt::Debug for Transaction<'_> {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("Transaction")
59            .field("live", &self.live)
60            .finish_non_exhaustive()
61    }
62}
63
64impl Connection {
65    /// Start a `MULTI` block. Embedded backend returns
66    /// [`io::ErrorKind::Unsupported`].
67    pub fn multi(&mut self) -> io::Result<Transaction<'_>> {
68        match self {
69            Self::Embedded(_) => Err(io::Error::new(
70                io::ErrorKind::Unsupported,
71                "MULTI/EXEC is not implemented for the embedded backend; \
72                 call Connection methods directly (each is atomic on its own lock)",
73            )),
74            Self::Remote(client) => match client.request(&[b"MULTI".to_vec()])? {
75                Reply::Simple(s) if s == b"OK" => Ok(Transaction {
76                    client,
77                    live: true,
78                }),
79                Reply::Error(e) => Err(io::Error::other(string(e))),
80                other => Err(unexpected(other)),
81            },
82        }
83    }
84
85    /// `WATCH key [key ...]` — mark keys for optimistic concurrency.
86    /// The next [`multi`](Self::multi) on this connection will abort
87    /// (EXEC returns Nil) if any watched key was modified between
88    /// this call and EXEC. Remote-only.
89    ///
90    /// Per RESP spec, WATCH must be sent **before** MULTI. Repeated
91    /// `watch` calls accumulate — the abort triggers on any of the
92    /// watched keys changing.
93    pub fn watch(&mut self, keys: &[&[u8]]) -> io::Result<()> {
94        if keys.is_empty() {
95            return Err(io::Error::new(
96                io::ErrorKind::InvalidInput,
97                "WATCH needs at least one key",
98            ));
99        }
100        match self {
101            Self::Embedded(_) => Err(io::Error::new(
102                io::ErrorKind::Unsupported,
103                "WATCH is a transaction primitive; embedded backend has no MULTI",
104            )),
105            Self::Remote(c) => {
106                let mut args = Vec::with_capacity(keys.len() + 1);
107                args.push(b"WATCH".to_vec());
108                args.extend(keys.iter().map(|k| k.to_vec()));
109                match c.request(&args)? {
110                    Reply::Simple(s) if s == b"OK" => Ok(()),
111                    Reply::Error(e) => Err(io::Error::other(string(e))),
112                    other => Err(unexpected(other)),
113                }
114            }
115        }
116    }
117
118    /// `UNWATCH` — drop every WATCH set on this connection without
119    /// running a transaction. Remote-only.
120    pub fn unwatch(&mut self) -> io::Result<()> {
121        match self {
122            Self::Embedded(_) => Err(io::Error::new(
123                io::ErrorKind::Unsupported,
124                "UNWATCH is a transaction primitive; embedded backend has no MULTI",
125            )),
126            Self::Remote(c) => match c.request(&[b"UNWATCH".to_vec()])? {
127                Reply::Simple(s) if s == b"OK" => Ok(()),
128                Reply::Error(e) => Err(io::Error::other(string(e))),
129                other => Err(unexpected(other)),
130            },
131        }
132    }
133}
134
135impl<'a> Transaction<'a> {
136    /// Queue one command — verb + args as raw byte slices. The server
137    /// replies `+QUEUED` synchronously; errors propagate as `io::Error`.
138    pub fn queue(&mut self, parts: &[&[u8]]) -> io::Result<()> {
139        if parts.is_empty() {
140            return Err(io::Error::new(
141                io::ErrorKind::InvalidInput,
142                "Transaction::queue needs at least a verb",
143            ));
144        }
145        let argv: Vec<Vec<u8>> = parts.iter().map(|p| p.to_vec()).collect();
146        self.queue_argv(argv)
147    }
148
149    /// `EXEC` — send EXEC, return the per-queued-command reply array.
150    /// Consumes the transaction handle.
151    ///
152    /// When a `WATCH` violation aborts the transaction the server
153    /// returns Nil; this method collapses that into an empty `Vec`
154    /// for backward compatibility with v1.4.x. For new code, prefer
155    /// [`exec_watched`](Self::exec_watched), which distinguishes
156    /// "aborted by WATCH" (returns `None`) from "successful empty
157    /// transaction" (returns `Some(vec![])`).
158    pub fn exec(mut self) -> io::Result<Vec<Reply>> {
159        self.live = false;
160        match self.client.request(&[b"EXEC".to_vec()])? {
161            Reply::Array(items) => Ok(items),
162            Reply::Nil => Ok(Vec::new()),
163            Reply::Error(e) => Err(io::Error::other(string(e))),
164            other => Err(unexpected(other)),
165        }
166    }
167
168    /// Like [`exec`](Self::exec) but returns `None` when a `WATCH`
169    /// violation aborts the transaction (RESP Nil reply to EXEC).
170    /// Use this when you've called [`Connection::watch`] and need to
171    /// distinguish an abort from a successfully-empty queue.
172    pub fn exec_watched(mut self) -> io::Result<Option<Vec<Reply>>> {
173        self.live = false;
174        match self.client.request(&[b"EXEC".to_vec()])? {
175            Reply::Array(items) => Ok(Some(items)),
176            Reply::Nil => Ok(None),
177            Reply::Error(e) => Err(io::Error::other(string(e))),
178            other => Err(unexpected(other)),
179        }
180    }
181
182    /// Like [`exec`](Self::exec) but returns a [`TransactionReplies`]
183    /// cursor with typed extractors (`next_int`, `next_bulk`, …) so
184    /// callers don't hand-match every `Reply` themselves. Aborts with
185    /// `io::ErrorKind::InvalidData` ("transaction aborted by WATCH") if
186    /// the server replied Nil; use [`exec_watched_typed`](Self::exec_watched_typed)
187    /// to distinguish abort from successfully-empty.
188    ///
189    /// Consumes the handle. The cursor remembers how many replies are
190    /// left ([`TransactionReplies::remaining`]) so callers can sanity-
191    /// check arity at the end of the read sequence.
192    pub fn exec_typed(mut self) -> io::Result<TransactionReplies> {
193        self.live = false;
194        match self.client.request(&[b"EXEC".to_vec()])? {
195            Reply::Array(items) => Ok(TransactionReplies::new(items)),
196            Reply::Nil => Err(io::Error::new(
197                io::ErrorKind::InvalidData,
198                "transaction aborted by WATCH",
199            )),
200            Reply::Error(e) => Err(io::Error::other(string(e))),
201            other => Err(unexpected(other)),
202        }
203    }
204
205    /// Like [`exec_watched`](Self::exec_watched) but returns a typed
206    /// [`TransactionReplies`] cursor on commit; `None` on WATCH abort.
207    pub fn exec_watched_typed(mut self) -> io::Result<Option<TransactionReplies>> {
208        self.live = false;
209        match self.client.request(&[b"EXEC".to_vec()])? {
210            Reply::Array(items) => Ok(Some(TransactionReplies::new(items))),
211            Reply::Nil => Ok(None),
212            Reply::Error(e) => Err(io::Error::other(string(e))),
213            other => Err(unexpected(other)),
214        }
215    }
216
217    /// `DISCARD` — abandon the queued commands. Consumes the handle.
218    pub fn discard(mut self) -> io::Result<()> {
219        self.live = false;
220        match self.client.request(&[b"DISCARD".to_vec()])? {
221            Reply::Simple(s) if s == b"OK" => Ok(()),
222            Reply::Error(e) => Err(io::Error::other(string(e))),
223            other => Err(unexpected(other)),
224        }
225    }
226}
227
228// ─────────────────────────────────────────────────────────────────────────
229// Typed builders (v1.5.0). Each mirrors the same-named Connection method's
230// argument shape; on EXEC the matching index in the returned Vec carries
231// the typed payload (raw `Reply` — typed decode is a v1.6.0 candidate).
232//
233// All builders return `&mut Self` so they can chain:
234//     txn.set(k, v)?.incr(c)?.del(&[k2])?;
235// ─────────────────────────────────────────────────────────────────────────
236
237impl<'a> Transaction<'a> {
238    /// Queue `SET key value`.
239    pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<&mut Self> {
240        self.queue_argv(vec3(b"SET", key, value))?;
241        Ok(self)
242    }
243
244    /// Queue `GET key`.
245    pub fn get(&mut self, key: &[u8]) -> io::Result<&mut Self> {
246        self.queue_argv(vec2(b"GET", key))?;
247        Ok(self)
248    }
249
250    /// Queue `DEL key [key ...]`.
251    pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
252        if keys.is_empty() {
253            return Err(io::Error::new(
254                io::ErrorKind::InvalidInput,
255                "Transaction::del needs at least one key",
256            ));
257        }
258        let mut args = Vec::with_capacity(keys.len() + 1);
259        args.push(b"DEL".to_vec());
260        args.extend(keys.iter().map(|k| k.to_vec()));
261        self.queue_argv(args)?;
262        Ok(self)
263    }
264
265    /// Queue `EXISTS key [key ...]`.
266    pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
267        if keys.is_empty() {
268            return Err(io::Error::new(
269                io::ErrorKind::InvalidInput,
270                "Transaction::exists needs at least one key",
271            ));
272        }
273        let mut args = Vec::with_capacity(keys.len() + 1);
274        args.push(b"EXISTS".to_vec());
275        args.extend(keys.iter().map(|k| k.to_vec()));
276        self.queue_argv(args)?;
277        Ok(self)
278    }
279
280    /// Queue `INCR key`.
281    pub fn incr(&mut self, key: &[u8]) -> io::Result<&mut Self> {
282        self.queue_argv(vec2(b"INCR", key))?;
283        Ok(self)
284    }
285
286    /// Queue `INCRBY key delta`.
287    pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<&mut Self> {
288        let args = vec![
289            b"INCRBY".to_vec(),
290            key.to_vec(),
291            delta.to_string().into_bytes(),
292        ];
293        self.queue_argv(args)?;
294        Ok(self)
295    }
296
297    /// Queue `MGET key [key ...]`.
298    pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
299        if keys.is_empty() {
300            return Err(io::Error::new(
301                io::ErrorKind::InvalidInput,
302                "Transaction::mget needs at least one key",
303            ));
304        }
305        let mut args = Vec::with_capacity(keys.len() + 1);
306        args.push(b"MGET".to_vec());
307        args.extend(keys.iter().map(|k| k.to_vec()));
308        self.queue_argv(args)?;
309        Ok(self)
310    }
311
312    /// Queue `MSET key value [key value ...]`.
313    pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<&mut Self> {
314        if pairs.is_empty() {
315            return Err(io::Error::new(
316                io::ErrorKind::InvalidInput,
317                "Transaction::mset needs at least one (key, value) pair",
318            ));
319        }
320        let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
321        args.push(b"MSET".to_vec());
322        for (k, v) in pairs {
323            args.push(k.to_vec());
324            args.push(v.to_vec());
325        }
326        self.queue_argv(args)?;
327        Ok(self)
328    }
329
330    /// Send one already-materialised argv and parse the `+QUEUED` ack.
331    /// Shared back-end for `queue` + every typed builder.
332    fn queue_argv(&mut self, argv: Vec<Vec<u8>>) -> io::Result<()> {
333        match self.client.request(&argv)? {
334            Reply::Simple(s) if s == b"QUEUED" => Ok(()),
335            Reply::Error(e) => Err(io::Error::other(string(e))),
336            other => Err(unexpected(other)),
337        }
338    }
339}
340
341impl Drop for Transaction<'_> {
342    fn drop(&mut self) {
343        // Implicit DISCARD if the caller dropped the handle without
344        // exec/exec_watched/discard. Best-effort: ignore any error
345        // since we're in Drop.
346        if self.live {
347            let _ = self.client.request(&[b"DISCARD".to_vec()]);
348        }
349    }
350}
351
352// ─────────────────────────────────────────────────────────────────────────
353// Typed EXEC reply cursor (v1.7.0). Sits between the existing raw
354// `Vec<Reply>` API and the maximalist typestate-tuple alternative —
355// callers consume queued replies in order via per-typed extractors:
356//
357//     let mut r = txn.exec_typed()?;
358//     let counter: i64       = r.next_int()?;        // INCR
359//     let prior:   Option<_> = r.next_bulk()?;       // GET
360//     let bulk_m:  Vec<_>    = r.next_array_of_bulks()?;  // MGET
361//     r.expect_empty()?;                              // arity gate
362//
363// Mismatch surfaces InvalidData with the actual variant in the message
364// so debugging doesn't require turning on RESP wire logging. The cursor
365// also exposes `raw()` as an escape hatch for verbs the typed helpers
366// don't cover (HGETALL → array of bulks; ZRANGE WITHSCORES → mixed
367// pairs; etc.).
368// ─────────────────────────────────────────────────────────────────────────
369
370/// Typed cursor over the per-queued-command replies of a successful
371/// `EXEC`. Produced by [`Transaction::exec_typed`] /
372/// [`Transaction::exec_watched_typed`]. Each `next_*` consumes one
373/// reply; if the variant doesn't match the extractor, an
374/// `io::ErrorKind::InvalidData` is returned and the cursor advances
375/// regardless (so a downstream `expect_empty` still works correctly).
376#[derive(Debug)]
377pub struct TransactionReplies {
378    iter: std::vec::IntoIter<Reply>,
379}
380
381impl TransactionReplies {
382    fn new(items: Vec<Reply>) -> Self {
383        Self { iter: items.into_iter() }
384    }
385
386    /// Number of replies still un-consumed.
387    pub fn remaining(&self) -> usize {
388        self.iter.len()
389    }
390
391    /// Error out if the cursor still has replies — useful at the end of
392    /// a typed read sequence to assert the queued-command count matched.
393    pub fn expect_empty(&mut self) -> io::Result<()> {
394        let left = self.remaining();
395        if left == 0 {
396            Ok(())
397        } else {
398            Err(io::Error::new(
399                io::ErrorKind::InvalidData,
400                format!("transaction reply cursor has {left} un-consumed replies"),
401            ))
402        }
403    }
404
405    /// Pop the next reply as a raw [`Reply`]. Escape hatch for verbs
406    /// the typed extractors don't cover.
407    pub fn raw(&mut self) -> io::Result<Reply> {
408        self.iter
409            .next()
410            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "exhausted"))
411    }
412
413    /// Expect `Reply::Simple(b"OK")` — `SET` / `MSET` ack.
414    pub fn next_ok(&mut self) -> io::Result<()> {
415        match self.raw()? {
416            Reply::Simple(s) if s == b"OK" => Ok(()),
417            other => Err(mismatch("Simple(OK)", &other)),
418        }
419    }
420
421    /// Expect `Reply::Simple(b"OK")` OR `Reply::Nil` — `SET key v NX/XX`
422    /// returns Nil when the condition is not met.
423    pub fn next_ok_or_nil(&mut self) -> io::Result<bool> {
424        match self.raw()? {
425            Reply::Simple(s) if s == b"OK" => Ok(true),
426            Reply::Nil => Ok(false),
427            other => Err(mismatch("Simple(OK) or Nil", &other)),
428        }
429    }
430
431    /// Expect `Reply::Int` — `INCR` / `DEL` / `EXISTS` / `INCRBY`.
432    pub fn next_int(&mut self) -> io::Result<i64> {
433        match self.raw()? {
434            Reply::Int(n) => Ok(n),
435            other => Err(mismatch("Int", &other)),
436        }
437    }
438
439    /// Expect `Reply::Bulk` (or `Nil` → `None`) — `GET`.
440    pub fn next_bulk(&mut self) -> io::Result<Option<Vec<u8>>> {
441        match self.raw()? {
442            Reply::Bulk(b) => Ok(Some(b)),
443            Reply::Nil => Ok(None),
444            other => Err(mismatch("Bulk or Nil", &other)),
445        }
446    }
447
448    /// Expect `Reply::Array` of `Bulk`/`Nil` entries — `MGET`. Returns
449    /// `Vec<Option<Vec<u8>>>` in request order.
450    pub fn next_array_of_bulks(&mut self) -> io::Result<Vec<Option<Vec<u8>>>> {
451        let items = match self.raw()? {
452            Reply::Array(v) => v,
453            Reply::Nil => return Ok(Vec::new()),
454            other => return Err(mismatch("Array", &other)),
455        };
456        items
457            .into_iter()
458            .map(|r| match r {
459                Reply::Bulk(b) => Ok(Some(b)),
460                Reply::Nil => Ok(None),
461                other => Err(mismatch("Array element Bulk/Nil", &other)),
462            })
463            .collect()
464    }
465
466    /// Expect `Reply::Simple` (any payload) — for verbs whose ack isn't
467    /// `OK` (e.g. `PING` → `+PONG`).
468    pub fn next_simple(&mut self) -> io::Result<Vec<u8>> {
469        match self.raw()? {
470            Reply::Simple(s) => Ok(s),
471            other => Err(mismatch("Simple", &other)),
472        }
473    }
474}
475
476fn mismatch(want: &str, got: &Reply) -> io::Error {
477    io::Error::new(
478        io::ErrorKind::InvalidData,
479        format!("transaction reply mismatch: expected {want}, got {got:?}"),
480    )
481}