kevy_client/transaction.rs
1//! `MULTI` / `EXEC` / `DISCARD` — Redis transactions, with optional
2//! `WATCH`-driven optimistic concurrency (v1.5.0).
3//!
4//! Wire flow (Remote): client sends `MULTI` → server `+OK`; client sends
5//! each queued command → server `+QUEUED`; client sends `EXEC` → server
6//! returns an array of `N` typed replies, one per queued command. When
7//! `WATCH` was issued on the same `Connection` before `MULTI` and any
8//! watched key was modified between `WATCH` and `EXEC`, the server
9//! returns `Nil` (RESP null array) and the transaction aborts.
10//!
11//! Embedded mode rejects [`Connection::multi`] / [`Connection::watch`]
12//! / [`Connection::unwatch`] with `io::ErrorKind::Unsupported`:
13//! kevy-embedded has no MULTI dispatcher, and single-Connection embed
14//! access is already sequential (the inner mutex serialises every op),
15//! so the locking guarantee transactions add doesn't exist as a
16//! separate concept. Call methods directly instead.
17//!
18//! ```no_run
19//! use kevy_client::Connection;
20//!
21//! let mut conn = Connection::open("kevy://localhost:6379")?;
22//! conn.watch(&[b"counter"])?;
23//! let mut txn = conn.multi()?;
24//! txn.incr(b"counter")?
25//! .set(b"a", b"1")?;
26//! match txn.exec_watched()? {
27//! Some(replies) => assert_eq!(replies.len(), 2),
28//! None => { /* watched key changed — retry */ }
29//! }
30//! # Ok::<(), std::io::Error>(())
31//! ```
32//!
33//! Each queued command's reply is the raw [`kevy_resp::Reply`] — callers
34//! parse the typed payload themselves. The reply-side decode (e.g.
35//! `let n: i64 = replies[0].as_int()?`) is a v1.6.0 candidate.
36
37use std::io;
38
39use kevy_resp::Reply;
40use kevy_resp_client::RespClient;
41
42use crate::{Connection, string, unexpected, vec2, vec3};
43
44/// One in-flight `MULTI` block over a `Remote` connection.
45///
46/// Drop without calling [`Self::exec`] / [`Self::exec_watched`] /
47/// [`Self::discard`] sends an implicit `DISCARD` so the underlying
48/// socket isn't left in MULTI mode.
49pub struct Transaction<'a> {
50 client: &'a mut RespClient,
51 /// `false` after `exec`/`exec_watched`/`discard` consumed the txn —
52 /// suppresses the implicit-DISCARD in Drop.
53 live: bool,
54}
55
56impl std::fmt::Debug for Transaction<'_> {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("Transaction")
59 .field("live", &self.live)
60 .finish_non_exhaustive()
61 }
62}
63
64impl Connection {
65 /// Start a `MULTI` block. Embedded backend returns
66 /// [`io::ErrorKind::Unsupported`].
67 pub fn multi(&mut self) -> io::Result<Transaction<'_>> {
68 match self {
69 Self::Embedded(_) => Err(io::Error::new(
70 io::ErrorKind::Unsupported,
71 "MULTI/EXEC is not implemented for the embedded backend; \
72 call Connection methods directly (each is atomic on its own lock)",
73 )),
74 Self::Remote(client) => match client.request(&[b"MULTI".to_vec()])? {
75 Reply::Simple(s) if s == b"OK" => Ok(Transaction {
76 client,
77 live: true,
78 }),
79 Reply::Error(e) => Err(io::Error::other(string(e))),
80 other => Err(unexpected(other)),
81 },
82 }
83 }
84
85 /// `WATCH key [key ...]` — mark keys for optimistic concurrency.
86 /// The next [`multi`](Self::multi) on this connection will abort
87 /// (EXEC returns Nil) if any watched key was modified between
88 /// this call and EXEC. Remote-only.
89 ///
90 /// Per RESP spec, WATCH must be sent **before** MULTI. Repeated
91 /// `watch` calls accumulate — the abort triggers on any of the
92 /// watched keys changing.
93 pub fn watch(&mut self, keys: &[&[u8]]) -> io::Result<()> {
94 if keys.is_empty() {
95 return Err(io::Error::new(
96 io::ErrorKind::InvalidInput,
97 "WATCH needs at least one key",
98 ));
99 }
100 match self {
101 Self::Embedded(_) => Err(io::Error::new(
102 io::ErrorKind::Unsupported,
103 "WATCH is a transaction primitive; embedded backend has no MULTI",
104 )),
105 Self::Remote(c) => {
106 let mut args = Vec::with_capacity(keys.len() + 1);
107 args.push(b"WATCH".to_vec());
108 args.extend(keys.iter().map(|k| k.to_vec()));
109 match c.request(&args)? {
110 Reply::Simple(s) if s == b"OK" => Ok(()),
111 Reply::Error(e) => Err(io::Error::other(string(e))),
112 other => Err(unexpected(other)),
113 }
114 }
115 }
116 }
117
118 /// `UNWATCH` — drop every WATCH set on this connection without
119 /// running a transaction. Remote-only.
120 pub fn unwatch(&mut self) -> io::Result<()> {
121 match self {
122 Self::Embedded(_) => Err(io::Error::new(
123 io::ErrorKind::Unsupported,
124 "UNWATCH is a transaction primitive; embedded backend has no MULTI",
125 )),
126 Self::Remote(c) => match c.request(&[b"UNWATCH".to_vec()])? {
127 Reply::Simple(s) if s == b"OK" => Ok(()),
128 Reply::Error(e) => Err(io::Error::other(string(e))),
129 other => Err(unexpected(other)),
130 },
131 }
132 }
133}
134
135impl<'a> Transaction<'a> {
136 /// Queue one command — verb + args as raw byte slices. The server
137 /// replies `+QUEUED` synchronously; errors propagate as `io::Error`.
138 pub fn queue(&mut self, parts: &[&[u8]]) -> io::Result<()> {
139 if parts.is_empty() {
140 return Err(io::Error::new(
141 io::ErrorKind::InvalidInput,
142 "Transaction::queue needs at least a verb",
143 ));
144 }
145 let argv: Vec<Vec<u8>> = parts.iter().map(|p| p.to_vec()).collect();
146 self.queue_argv(argv)
147 }
148
149 /// `EXEC` — send EXEC, return the per-queued-command reply array.
150 /// Consumes the transaction handle.
151 ///
152 /// When a `WATCH` violation aborts the transaction the server
153 /// returns Nil; this method collapses that into an empty `Vec`
154 /// for backward compatibility with v1.4.x. For new code, prefer
155 /// [`exec_watched`](Self::exec_watched), which distinguishes
156 /// "aborted by WATCH" (returns `None`) from "successful empty
157 /// transaction" (returns `Some(vec![])`).
158 pub fn exec(mut self) -> io::Result<Vec<Reply>> {
159 self.live = false;
160 match self.client.request(&[b"EXEC".to_vec()])? {
161 Reply::Array(items) => Ok(items),
162 Reply::Nil => Ok(Vec::new()),
163 Reply::Error(e) => Err(io::Error::other(string(e))),
164 other => Err(unexpected(other)),
165 }
166 }
167
168 /// Like [`exec`](Self::exec) but returns `None` when a `WATCH`
169 /// violation aborts the transaction (RESP Nil reply to EXEC).
170 /// Use this when you've called [`Connection::watch`] and need to
171 /// distinguish an abort from a successfully-empty queue.
172 pub fn exec_watched(mut self) -> io::Result<Option<Vec<Reply>>> {
173 self.live = false;
174 match self.client.request(&[b"EXEC".to_vec()])? {
175 Reply::Array(items) => Ok(Some(items)),
176 Reply::Nil => Ok(None),
177 Reply::Error(e) => Err(io::Error::other(string(e))),
178 other => Err(unexpected(other)),
179 }
180 }
181
182 /// `DISCARD` — abandon the queued commands. Consumes the handle.
183 pub fn discard(mut self) -> io::Result<()> {
184 self.live = false;
185 match self.client.request(&[b"DISCARD".to_vec()])? {
186 Reply::Simple(s) if s == b"OK" => Ok(()),
187 Reply::Error(e) => Err(io::Error::other(string(e))),
188 other => Err(unexpected(other)),
189 }
190 }
191}
192
193// ─────────────────────────────────────────────────────────────────────────
194// Typed builders (v1.5.0). Each mirrors the same-named Connection method's
195// argument shape; on EXEC the matching index in the returned Vec carries
196// the typed payload (raw `Reply` — typed decode is a v1.6.0 candidate).
197//
198// All builders return `&mut Self` so they can chain:
199// txn.set(k, v)?.incr(c)?.del(&[k2])?;
200// ─────────────────────────────────────────────────────────────────────────
201
202impl<'a> Transaction<'a> {
203 /// Queue `SET key value`.
204 pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<&mut Self> {
205 self.queue_argv(vec3(b"SET", key, value))?;
206 Ok(self)
207 }
208
209 /// Queue `GET key`.
210 pub fn get(&mut self, key: &[u8]) -> io::Result<&mut Self> {
211 self.queue_argv(vec2(b"GET", key))?;
212 Ok(self)
213 }
214
215 /// Queue `DEL key [key ...]`.
216 pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
217 if keys.is_empty() {
218 return Err(io::Error::new(
219 io::ErrorKind::InvalidInput,
220 "Transaction::del needs at least one key",
221 ));
222 }
223 let mut args = Vec::with_capacity(keys.len() + 1);
224 args.push(b"DEL".to_vec());
225 args.extend(keys.iter().map(|k| k.to_vec()));
226 self.queue_argv(args)?;
227 Ok(self)
228 }
229
230 /// Queue `EXISTS key [key ...]`.
231 pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
232 if keys.is_empty() {
233 return Err(io::Error::new(
234 io::ErrorKind::InvalidInput,
235 "Transaction::exists needs at least one key",
236 ));
237 }
238 let mut args = Vec::with_capacity(keys.len() + 1);
239 args.push(b"EXISTS".to_vec());
240 args.extend(keys.iter().map(|k| k.to_vec()));
241 self.queue_argv(args)?;
242 Ok(self)
243 }
244
245 /// Queue `INCR key`.
246 pub fn incr(&mut self, key: &[u8]) -> io::Result<&mut Self> {
247 self.queue_argv(vec2(b"INCR", key))?;
248 Ok(self)
249 }
250
251 /// Queue `INCRBY key delta`.
252 pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<&mut Self> {
253 let args = vec![
254 b"INCRBY".to_vec(),
255 key.to_vec(),
256 delta.to_string().into_bytes(),
257 ];
258 self.queue_argv(args)?;
259 Ok(self)
260 }
261
262 /// Queue `MGET key [key ...]`.
263 pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
264 if keys.is_empty() {
265 return Err(io::Error::new(
266 io::ErrorKind::InvalidInput,
267 "Transaction::mget needs at least one key",
268 ));
269 }
270 let mut args = Vec::with_capacity(keys.len() + 1);
271 args.push(b"MGET".to_vec());
272 args.extend(keys.iter().map(|k| k.to_vec()));
273 self.queue_argv(args)?;
274 Ok(self)
275 }
276
277 /// Queue `MSET key value [key value ...]`.
278 pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<&mut Self> {
279 if pairs.is_empty() {
280 return Err(io::Error::new(
281 io::ErrorKind::InvalidInput,
282 "Transaction::mset needs at least one (key, value) pair",
283 ));
284 }
285 let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
286 args.push(b"MSET".to_vec());
287 for (k, v) in pairs {
288 args.push(k.to_vec());
289 args.push(v.to_vec());
290 }
291 self.queue_argv(args)?;
292 Ok(self)
293 }
294
295 /// Send one already-materialised argv and parse the `+QUEUED` ack.
296 /// Shared back-end for `queue` + every typed builder.
297 fn queue_argv(&mut self, argv: Vec<Vec<u8>>) -> io::Result<()> {
298 match self.client.request(&argv)? {
299 Reply::Simple(s) if s == b"QUEUED" => Ok(()),
300 Reply::Error(e) => Err(io::Error::other(string(e))),
301 other => Err(unexpected(other)),
302 }
303 }
304}
305
306impl Drop for Transaction<'_> {
307 fn drop(&mut self) {
308 // Implicit DISCARD if the caller dropped the handle without
309 // exec/exec_watched/discard. Best-effort: ignore any error
310 // since we're in Drop.
311 if self.live {
312 let _ = self.client.request(&[b"DISCARD".to_vec()]);
313 }
314 }
315}