kevy_client/transaction.rs
1//! `MULTI` / `EXEC` / `DISCARD` — Redis transactions, with optional
2//! `WATCH`-driven optimistic concurrency (v1.5.0).
3//!
4//! Wire flow (Remote): client sends `MULTI` → server `+OK`; client sends
5//! each queued command → server `+QUEUED`; client sends `EXEC` → server
6//! returns an array of `N` typed replies, one per queued command. When
7//! `WATCH` was issued on the same `Connection` before `MULTI` and any
8//! watched key was modified between `WATCH` and `EXEC`, the server
9//! returns `Nil` (RESP null array) and the transaction aborts.
10//!
11//! Embedded mode rejects [`Connection::multi`] / [`Connection::watch`]
12//! / [`Connection::unwatch`] with `io::ErrorKind::Unsupported`:
13//! kevy-embedded has no MULTI dispatcher, and single-Connection embed
14//! access is already sequential (the inner mutex serialises every op),
15//! so the locking guarantee transactions add doesn't exist as a
16//! separate concept. Call methods directly instead.
17//!
18//! ```no_run
19//! use kevy_client::Connection;
20//!
21//! let mut conn = Connection::open("kevy://localhost:6379")?;
22//! conn.watch(&[b"counter"])?;
23//! let mut txn = conn.multi()?;
24//! txn.incr(b"counter")?
25//! .set(b"a", b"1")?;
26//! match txn.exec_watched()? {
27//! Some(replies) => assert_eq!(replies.len(), 2),
28//! None => { /* watched key changed — retry */ }
29//! }
30//! # Ok::<(), std::io::Error>(())
31//! ```
32//!
33//! Each queued command's reply is the raw [`kevy_resp::Reply`] — callers
34//! parse the typed payload themselves. The reply-side decode (e.g.
35//! `let n: i64 = replies[0].as_int()?`) is a v1.6.0 candidate.
36
37use std::io;
38
39use kevy_resp::Reply;
40use kevy_resp_client::RespClient;
41
42use crate::{Connection, string, unexpected, vec2, vec3};
43
44/// One in-flight `MULTI` block over a `Remote` connection.
45///
46/// Drop without calling [`Self::exec`] / [`Self::exec_watched`] /
47/// [`Self::discard`] sends an implicit `DISCARD` so the underlying
48/// socket isn't left in MULTI mode.
49pub struct Transaction<'a> {
50 client: &'a mut RespClient,
51 /// `false` after `exec`/`exec_watched`/`discard` consumed the txn —
52 /// suppresses the implicit-DISCARD in Drop.
53 live: bool,
54}
55
56impl std::fmt::Debug for Transaction<'_> {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("Transaction")
59 .field("live", &self.live)
60 .finish_non_exhaustive()
61 }
62}
63
64impl Connection {
65 /// Start a `MULTI` block. Embedded backend returns
66 /// [`io::ErrorKind::Unsupported`].
67 pub fn multi(&mut self) -> io::Result<Transaction<'_>> {
68 match self {
69 Self::Embedded(_) => Err(io::Error::new(
70 io::ErrorKind::Unsupported,
71 "MULTI/EXEC is not implemented for the embedded backend; \
72 call Connection methods directly (each is atomic on its own lock)",
73 )),
74 Self::Remote(client) => match client.request(&[b"MULTI".to_vec()])? {
75 Reply::Simple(s) if s == b"OK" => Ok(Transaction {
76 client,
77 live: true,
78 }),
79 Reply::Error(e) => Err(io::Error::other(string(e))),
80 other => Err(unexpected(other)),
81 },
82 }
83 }
84
85 /// `WATCH key [key ...]` — mark keys for optimistic concurrency.
86 /// The next [`multi`](Self::multi) on this connection will abort
87 /// (EXEC returns Nil) if any watched key was modified between
88 /// this call and EXEC. Remote-only.
89 ///
90 /// Per RESP spec, WATCH must be sent **before** MULTI. Repeated
91 /// `watch` calls accumulate — the abort triggers on any of the
92 /// watched keys changing.
93 pub fn watch(&mut self, keys: &[&[u8]]) -> io::Result<()> {
94 if keys.is_empty() {
95 return Err(io::Error::new(
96 io::ErrorKind::InvalidInput,
97 "WATCH needs at least one key",
98 ));
99 }
100 match self {
101 Self::Embedded(_) => Err(io::Error::new(
102 io::ErrorKind::Unsupported,
103 "WATCH is a transaction primitive; embedded backend has no MULTI",
104 )),
105 Self::Remote(c) => {
106 let mut args = Vec::with_capacity(keys.len() + 1);
107 args.push(b"WATCH".to_vec());
108 args.extend(keys.iter().map(|k| k.to_vec()));
109 match c.request(&args)? {
110 Reply::Simple(s) if s == b"OK" => Ok(()),
111 Reply::Error(e) => Err(io::Error::other(string(e))),
112 other => Err(unexpected(other)),
113 }
114 }
115 }
116 }
117
118 /// `UNWATCH` — drop every WATCH set on this connection without
119 /// running a transaction. Remote-only.
120 pub fn unwatch(&mut self) -> io::Result<()> {
121 match self {
122 Self::Embedded(_) => Err(io::Error::new(
123 io::ErrorKind::Unsupported,
124 "UNWATCH is a transaction primitive; embedded backend has no MULTI",
125 )),
126 Self::Remote(c) => match c.request(&[b"UNWATCH".to_vec()])? {
127 Reply::Simple(s) if s == b"OK" => Ok(()),
128 Reply::Error(e) => Err(io::Error::other(string(e))),
129 other => Err(unexpected(other)),
130 },
131 }
132 }
133}
134
135impl<'a> Transaction<'a> {
136 /// Queue one command — verb + args as raw byte slices. The server
137 /// replies `+QUEUED` synchronously; errors propagate as `io::Error`.
138 pub fn queue(&mut self, parts: &[&[u8]]) -> io::Result<()> {
139 if parts.is_empty() {
140 return Err(io::Error::new(
141 io::ErrorKind::InvalidInput,
142 "Transaction::queue needs at least a verb",
143 ));
144 }
145 let argv: Vec<Vec<u8>> = parts.iter().map(|p| p.to_vec()).collect();
146 self.queue_argv(argv)
147 }
148
149 /// `EXEC` — send EXEC, return the per-queued-command reply array.
150 /// Consumes the transaction handle.
151 ///
152 /// When a `WATCH` violation aborts the transaction the server
153 /// returns Nil; this method collapses that into an empty `Vec`
154 /// for backward compatibility with v1.4.x. For new code, prefer
155 /// [`exec_watched`](Self::exec_watched), which distinguishes
156 /// "aborted by WATCH" (returns `None`) from "successful empty
157 /// transaction" (returns `Some(vec![])`).
158 pub fn exec(mut self) -> io::Result<Vec<Reply>> {
159 self.live = false;
160 match self.client.request(&[b"EXEC".to_vec()])? {
161 Reply::Array(items) => Ok(items),
162 Reply::Nil => Ok(Vec::new()),
163 Reply::Error(e) => Err(io::Error::other(string(e))),
164 other => Err(unexpected(other)),
165 }
166 }
167
168 /// Like [`exec`](Self::exec) but returns `None` when a `WATCH`
169 /// violation aborts the transaction (RESP Nil reply to EXEC).
170 /// Use this when you've called [`Connection::watch`] and need to
171 /// distinguish an abort from a successfully-empty queue.
172 pub fn exec_watched(mut self) -> io::Result<Option<Vec<Reply>>> {
173 self.live = false;
174 match self.client.request(&[b"EXEC".to_vec()])? {
175 Reply::Array(items) => Ok(Some(items)),
176 Reply::Nil => Ok(None),
177 Reply::Error(e) => Err(io::Error::other(string(e))),
178 other => Err(unexpected(other)),
179 }
180 }
181
182 /// Like [`exec`](Self::exec) but returns a [`TransactionReplies`]
183 /// cursor with typed extractors (`next_int`, `next_bulk`, …) so
184 /// callers don't hand-match every `Reply` themselves. Aborts with
185 /// `io::ErrorKind::InvalidData` ("transaction aborted by WATCH") if
186 /// the server replied Nil; use [`exec_watched_typed`](Self::exec_watched_typed)
187 /// to distinguish abort from successfully-empty.
188 ///
189 /// Consumes the handle. The cursor remembers how many replies are
190 /// left ([`TransactionReplies::remaining`]) so callers can sanity-
191 /// check arity at the end of the read sequence.
192 pub fn exec_typed(mut self) -> io::Result<TransactionReplies> {
193 self.live = false;
194 match self.client.request(&[b"EXEC".to_vec()])? {
195 Reply::Array(items) => Ok(TransactionReplies::new(items)),
196 Reply::Nil => Err(io::Error::new(
197 io::ErrorKind::InvalidData,
198 "transaction aborted by WATCH",
199 )),
200 Reply::Error(e) => Err(io::Error::other(string(e))),
201 other => Err(unexpected(other)),
202 }
203 }
204
205 /// Like [`exec_watched`](Self::exec_watched) but returns a typed
206 /// [`TransactionReplies`] cursor on commit; `None` on WATCH abort.
207 pub fn exec_watched_typed(mut self) -> io::Result<Option<TransactionReplies>> {
208 self.live = false;
209 match self.client.request(&[b"EXEC".to_vec()])? {
210 Reply::Array(items) => Ok(Some(TransactionReplies::new(items))),
211 Reply::Nil => Ok(None),
212 Reply::Error(e) => Err(io::Error::other(string(e))),
213 other => Err(unexpected(other)),
214 }
215 }
216
217 /// `DISCARD` — abandon the queued commands. Consumes the handle.
218 pub fn discard(mut self) -> io::Result<()> {
219 self.live = false;
220 match self.client.request(&[b"DISCARD".to_vec()])? {
221 Reply::Simple(s) if s == b"OK" => Ok(()),
222 Reply::Error(e) => Err(io::Error::other(string(e))),
223 other => Err(unexpected(other)),
224 }
225 }
226}
227
228// ─────────────────────────────────────────────────────────────────────────
229// Typed builders (v1.5.0). Each mirrors the same-named Connection method's
230// argument shape; on EXEC the matching index in the returned Vec carries
231// the typed payload (raw `Reply` — typed decode is a v1.6.0 candidate).
232//
233// All builders return `&mut Self` so they can chain:
234// txn.set(k, v)?.incr(c)?.del(&[k2])?;
235// ─────────────────────────────────────────────────────────────────────────
236
237impl<'a> Transaction<'a> {
238 /// Queue `SET key value`.
239 pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<&mut Self> {
240 self.queue_argv(vec3(b"SET", key, value))?;
241 Ok(self)
242 }
243
244 /// Queue `GET key`.
245 pub fn get(&mut self, key: &[u8]) -> io::Result<&mut Self> {
246 self.queue_argv(vec2(b"GET", key))?;
247 Ok(self)
248 }
249
250 /// Queue `DEL key [key ...]`.
251 pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
252 if keys.is_empty() {
253 return Err(io::Error::new(
254 io::ErrorKind::InvalidInput,
255 "Transaction::del needs at least one key",
256 ));
257 }
258 let mut args = Vec::with_capacity(keys.len() + 1);
259 args.push(b"DEL".to_vec());
260 args.extend(keys.iter().map(|k| k.to_vec()));
261 self.queue_argv(args)?;
262 Ok(self)
263 }
264
265 /// Queue `EXISTS key [key ...]`.
266 pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
267 if keys.is_empty() {
268 return Err(io::Error::new(
269 io::ErrorKind::InvalidInput,
270 "Transaction::exists needs at least one key",
271 ));
272 }
273 let mut args = Vec::with_capacity(keys.len() + 1);
274 args.push(b"EXISTS".to_vec());
275 args.extend(keys.iter().map(|k| k.to_vec()));
276 self.queue_argv(args)?;
277 Ok(self)
278 }
279
280 /// Queue `INCR key`.
281 pub fn incr(&mut self, key: &[u8]) -> io::Result<&mut Self> {
282 self.queue_argv(vec2(b"INCR", key))?;
283 Ok(self)
284 }
285
286 /// Queue `INCRBY key delta`.
287 pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<&mut Self> {
288 let args = vec![
289 b"INCRBY".to_vec(),
290 key.to_vec(),
291 delta.to_string().into_bytes(),
292 ];
293 self.queue_argv(args)?;
294 Ok(self)
295 }
296
297 /// Queue `MGET key [key ...]`.
298 pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
299 if keys.is_empty() {
300 return Err(io::Error::new(
301 io::ErrorKind::InvalidInput,
302 "Transaction::mget needs at least one key",
303 ));
304 }
305 let mut args = Vec::with_capacity(keys.len() + 1);
306 args.push(b"MGET".to_vec());
307 args.extend(keys.iter().map(|k| k.to_vec()));
308 self.queue_argv(args)?;
309 Ok(self)
310 }
311
312 /// Queue `MSET key value [key value ...]`.
313 pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<&mut Self> {
314 if pairs.is_empty() {
315 return Err(io::Error::new(
316 io::ErrorKind::InvalidInput,
317 "Transaction::mset needs at least one (key, value) pair",
318 ));
319 }
320 let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
321 args.push(b"MSET".to_vec());
322 for (k, v) in pairs {
323 args.push(k.to_vec());
324 args.push(v.to_vec());
325 }
326 self.queue_argv(args)?;
327 Ok(self)
328 }
329
330 /// Send one already-materialised argv and parse the `+QUEUED` ack.
331 /// Shared back-end for `queue` + every typed builder.
332 fn queue_argv(&mut self, argv: Vec<Vec<u8>>) -> io::Result<()> {
333 match self.client.request(&argv)? {
334 Reply::Simple(s) if s == b"QUEUED" => Ok(()),
335 Reply::Error(e) => Err(io::Error::other(string(e))),
336 other => Err(unexpected(other)),
337 }
338 }
339}
340
341impl Drop for Transaction<'_> {
342 fn drop(&mut self) {
343 // Implicit DISCARD if the caller dropped the handle without
344 // exec/exec_watched/discard. Best-effort: ignore any error
345 // since we're in Drop.
346 if self.live {
347 let _ = self.client.request(&[b"DISCARD".to_vec()]);
348 }
349 }
350}
351
352// ─────────────────────────────────────────────────────────────────────────
353// Typed EXEC reply cursor (v1.7.0). Sits between the existing raw
354// `Vec<Reply>` API and the maximalist typestate-tuple alternative —
355// callers consume queued replies in order via per-typed extractors:
356//
357// let mut r = txn.exec_typed()?;
358// let counter: i64 = r.next_int()?; // INCR
359// let prior: Option<_> = r.next_bulk()?; // GET
360// let bulk_m: Vec<_> = r.next_array_of_bulks()?; // MGET
361// r.expect_empty()?; // arity gate
362//
363// Mismatch surfaces InvalidData with the actual variant in the message
364// so debugging doesn't require turning on RESP wire logging. The cursor
365// also exposes `raw()` as an escape hatch for verbs the typed helpers
366// don't cover (HGETALL → array of bulks; ZRANGE WITHSCORES → mixed
367// pairs; etc.).
368// ─────────────────────────────────────────────────────────────────────────
369
370/// Typed cursor over the per-queued-command replies of a successful
371/// `EXEC`. Produced by [`Transaction::exec_typed`] /
372/// [`Transaction::exec_watched_typed`]. Each `next_*` consumes one
373/// reply; if the variant doesn't match the extractor, an
374/// `io::ErrorKind::InvalidData` is returned and the cursor advances
375/// regardless (so a downstream `expect_empty` still works correctly).
376#[derive(Debug)]
377pub struct TransactionReplies {
378 iter: std::vec::IntoIter<Reply>,
379}
380
381impl TransactionReplies {
382 fn new(items: Vec<Reply>) -> Self {
383 Self { iter: items.into_iter() }
384 }
385
386 /// Number of replies still un-consumed.
387 pub fn remaining(&self) -> usize {
388 self.iter.len()
389 }
390
391 /// Error out if the cursor still has replies — useful at the end of
392 /// a typed read sequence to assert the queued-command count matched.
393 pub fn expect_empty(&mut self) -> io::Result<()> {
394 let left = self.remaining();
395 if left == 0 {
396 Ok(())
397 } else {
398 Err(io::Error::new(
399 io::ErrorKind::InvalidData,
400 format!("transaction reply cursor has {left} un-consumed replies"),
401 ))
402 }
403 }
404
405 /// Pop the next reply as a raw [`Reply`]. Escape hatch for verbs
406 /// the typed extractors don't cover.
407 pub fn raw(&mut self) -> io::Result<Reply> {
408 self.iter
409 .next()
410 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "exhausted"))
411 }
412
413 /// Expect `Reply::Simple(b"OK")` — `SET` / `MSET` ack.
414 pub fn next_ok(&mut self) -> io::Result<()> {
415 match self.raw()? {
416 Reply::Simple(s) if s == b"OK" => Ok(()),
417 other => Err(mismatch("Simple(OK)", &other)),
418 }
419 }
420
421 /// Expect `Reply::Simple(b"OK")` OR `Reply::Nil` — `SET key v NX/XX`
422 /// returns Nil when the condition is not met.
423 pub fn next_ok_or_nil(&mut self) -> io::Result<bool> {
424 match self.raw()? {
425 Reply::Simple(s) if s == b"OK" => Ok(true),
426 Reply::Nil => Ok(false),
427 other => Err(mismatch("Simple(OK) or Nil", &other)),
428 }
429 }
430
431 /// Expect `Reply::Int` — `INCR` / `DEL` / `EXISTS` / `INCRBY`.
432 pub fn next_int(&mut self) -> io::Result<i64> {
433 match self.raw()? {
434 Reply::Int(n) => Ok(n),
435 other => Err(mismatch("Int", &other)),
436 }
437 }
438
439 /// Expect `Reply::Bulk` (or `Nil` → `None`) — `GET`.
440 pub fn next_bulk(&mut self) -> io::Result<Option<Vec<u8>>> {
441 match self.raw()? {
442 Reply::Bulk(b) => Ok(Some(b)),
443 Reply::Nil => Ok(None),
444 other => Err(mismatch("Bulk or Nil", &other)),
445 }
446 }
447
448 /// Expect `Reply::Array` of `Bulk`/`Nil` entries — `MGET`. Returns
449 /// `Vec<Option<Vec<u8>>>` in request order.
450 pub fn next_array_of_bulks(&mut self) -> io::Result<Vec<Option<Vec<u8>>>> {
451 let items = match self.raw()? {
452 Reply::Array(v) => v,
453 Reply::Nil => return Ok(Vec::new()),
454 other => return Err(mismatch("Array", &other)),
455 };
456 items
457 .into_iter()
458 .map(|r| match r {
459 Reply::Bulk(b) => Ok(Some(b)),
460 Reply::Nil => Ok(None),
461 other => Err(mismatch("Array element Bulk/Nil", &other)),
462 })
463 .collect()
464 }
465
466 /// Expect `Reply::Simple` (any payload) — for verbs whose ack isn't
467 /// `OK` (e.g. `PING` → `+PONG`).
468 pub fn next_simple(&mut self) -> io::Result<Vec<u8>> {
469 match self.raw()? {
470 Reply::Simple(s) => Ok(s),
471 other => Err(mismatch("Simple", &other)),
472 }
473 }
474}
475
476fn mismatch(want: &str, got: &Reply) -> io::Error {
477 io::Error::new(
478 io::ErrorKind::InvalidData,
479 format!("transaction reply mismatch: expected {want}, got {got:?}"),
480 )
481}