dynomite/net/dispatcher.rs
1//! Cluster-side dispatch hook.
2//!
3//! Routing decisions (whether to send a request to the local
4//! datastore, fan it out across racks, or relay it to a remote DC)
5//! land in Stage 10's cluster module. Stage 9 only owns the
6//! per-connection FSMs and exposes a seam, [`Dispatcher`], that
7//! Stage 10 plugs into.
8//!
9//! [`Dispatcher`] is the seam between the two stages. The Stage 9
10//! client / dnode-client FSMs hand each fully parsed [`Msg`] to a
11//! `Dispatcher` and inspect [`DispatchOutcome`] to decide whether
12//! the response can be returned synchronously or whether they
13//! should wait for a downstream response. Stage 10 will provide a
14//! cluster-aware implementation; tests in this stage exercise the
15//! seam with [`NoopDispatcher`].
16//!
17//! [`Msg`]: crate::msg::Msg
18
19use std::sync::Arc;
20
21use tokio::sync::mpsc;
22
23use crate::msg::Msg;
24
25/// Outcome of dispatching a parsed message.
26#[derive(Debug)]
27pub enum DispatchOutcome {
28 /// The dispatcher took ownership of the request and will deliver
29 /// the response asynchronously (over the connection's response
30 /// channel installed by the FSM).
31 Pending,
32 /// The dispatcher wants the FSM to reply with the supplied
33 /// message immediately. Used for control plane / synthetic
34 /// responses (e.g. swallowed `QUIT` commands).
35 Inline(Msg),
36 /// The dispatcher rejected the request with an error response
37 /// the FSM should return to the client immediately.
38 Error(Msg),
39 /// The request must be dropped; no response will be sent. Used
40 /// for swallowed / quit messages.
41 Drop,
42}
43
44/// Cluster-side dispatch hook implemented by Stage 10 and by tests.
45///
46/// The dispatcher is invoked from a tokio task; implementations may
47/// do async work but should avoid blocking. The trait uses
48/// `&self` so the dispatcher can be shared across many connections.
49pub trait Dispatcher: Send + Sync {
50 /// Hand a parsed request to the dispatcher.
51 ///
52 /// `responder` is a per-connection channel the dispatcher uses
53 /// to deliver responses (or errors) back to the FSM that owns
54 /// the originating client connection.
55 fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome;
56}
57
58/// Channel the dispatcher uses to send responses back to a client
59/// FSM. The FSM owns the receiving half.
60pub type ServerSink = mpsc::Sender<OutboundEnvelope>;
61
62/// Envelope wrapping a dispatcher response and the request id it
63/// corresponds to.
64///
65/// `span` carries the originating request span back to the
66/// client-side FSM so the response writeback nests under the
67/// originating client span. The default is
68/// [`tracing::Span::none`].
69///
70/// `source_peer_idx` identifies the peer this response came from
71/// when the dispatcher fanned the request to multiple replicas.
72/// `None` is used for synthetic / inline / single-target paths
73/// where the source is unambiguous.
74#[derive(Debug)]
75pub struct OutboundEnvelope {
76 /// Request id the response is for.
77 pub req_id: crate::core::types::MsgId,
78 /// The response message.
79 pub rsp: Msg,
80 /// Originating request span for cross-task propagation.
81 pub span: tracing::Span,
82 /// Index of the peer this reply was produced by, when known.
83 /// Set by the per-target server / dnode-server drivers and
84 /// consumed by the per-request reply coalescer.
85 pub source_peer_idx: Option<u32>,
86}
87
88/// Dispatcher that drops every request and emits no response.
89///
90/// Useful as a placeholder in tests that only exercise framing.
91#[derive(Debug, Default, Clone)]
92pub struct NoopDispatcher;
93
94impl Dispatcher for NoopDispatcher {
95 fn dispatch(&self, _req: Msg, _responder: ServerSink) -> DispatchOutcome {
96 DispatchOutcome::Drop
97 }
98}
99
100impl<T: Dispatcher + ?Sized> Dispatcher for Arc<T> {
101 fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome {
102 (**self).dispatch(req, responder)
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use crate::msg::MsgType;
110
111 #[test]
112 fn noop_returns_drop() {
113 let (tx, _rx) = mpsc::channel(1);
114 let outcome = NoopDispatcher.dispatch(Msg::new(1, MsgType::ReqRedisGet, true), tx);
115 matches!(outcome, DispatchOutcome::Drop);
116 }
117}