1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//! Per-connection state owned by its origin shard.
use crate::message::PendingSlot;
use kevy_resp::Argv;
use kevy_sys::Socket;
use std::collections::{HashSet, VecDeque};
/// Per-connection state owned by its origin shard.
pub(crate) struct Conn {
pub(crate) sock: Socket,
pub(crate) input: Vec<u8>,
pub(crate) output: Vec<u8>,
pub(crate) write_pos: usize,
pub(crate) want_write: bool,
/// Next seq to assign (== `next_emit + pending.len()`).
pub(crate) next_seq: u64,
/// Seq of `pending.front()` — the next reply to emit.
pub(crate) next_emit: u64,
/// QUIT / EOF / protocol error seen — close once drained & flushed.
pub(crate) closing: bool,
/// Outstanding commands in seq order; front == `next_emit`. An O(1) ring
/// that replaces the per-command HashMap churn.
pub(crate) pending: VecDeque<PendingSlot>,
/// Channels this connection is subscribed to (pub/sub).
pub(crate) sub: HashSet<Vec<u8>>,
/// Glob patterns this connection has `PSUBSCRIBE`-d. Disjoint from
/// `sub` — a PUBLISH that matches both yields one `message` and one
/// `pmessage` frame (Redis semantics). Empty for the vast majority
/// of conns (no pattern subscribers), so the steady-state cost is one
/// `HashSet::is_empty()` check per delivery candidate.
pub(crate) psub: HashSet<Vec<u8>>,
/// Queued commands inside a MULTI…EXEC transaction (`None` = not in MULTI).
pub(crate) multi: Option<Vec<Argv>>,
/// `WATCH`-ed keys + the version each had on its owning shard at
/// `WATCH` time. `EXEC` fans these out to every relevant shard via
/// `Op::CheckWatch`; if any shard reports a mismatch, the
/// transaction aborts (nil multi-bulk). Cleared on EXEC / DISCARD
/// / UNWATCH / connection close. Empty in steady state for conns
/// that never call `WATCH` (most clients).
pub(crate) watched: Vec<(Vec<u8>, u64)>,
}
impl Conn {
pub(crate) fn new(sock: Socket) -> Self {
Conn {
sock,
input: Vec::new(),
output: Vec::new(),
write_pos: 0,
want_write: false,
next_seq: 0,
next_emit: 0,
closing: false,
pending: VecDeque::new(),
sub: HashSet::new(),
psub: HashSet::new(),
multi: None,
watched: Vec::new(),
}
}
}