Skip to main content

dynomite/net/
dispatcher.rs

1//! Cluster-side dispatch hook.
2//!
3//! Routing decisions (whether to send a request to the local
4//! datastore, fan it out across racks, or relay it to a remote DC)
5//! land in Stage 10's cluster module. Stage 9 only owns the
6//! per-connection FSMs and exposes a seam, [`Dispatcher`], that
7//! Stage 10 plugs into.
8//!
9//! [`Dispatcher`] is the seam between the two stages. The Stage 9
10//! client / dnode-client FSMs hand each fully parsed [`Msg`] to a
11//! `Dispatcher` and inspect [`DispatchOutcome`] to decide whether
12//! the response can be returned synchronously or whether they
13//! should wait for a downstream response. Stage 10 will provide a
14//! cluster-aware implementation; tests in this stage exercise the
15//! seam with [`NoopDispatcher`].
16//!
17//! [`Msg`]: crate::msg::Msg
18
19use std::sync::Arc;
20
21use tokio::sync::mpsc;
22
23use crate::msg::Msg;
24
25/// Outcome of dispatching a parsed message.
26#[derive(Debug)]
27pub enum DispatchOutcome {
28    /// The dispatcher took ownership of the request and will deliver
29    /// the response asynchronously (over the connection's response
30    /// channel installed by the FSM).
31    Pending,
32    /// The dispatcher wants the FSM to reply with the supplied
33    /// message immediately. Used for control plane / synthetic
34    /// responses (e.g. swallowed `QUIT` commands).
35    Inline(Msg),
36    /// The dispatcher rejected the request with an error response
37    /// the FSM should return to the client immediately.
38    Error(Msg),
39    /// The request must be dropped; no response will be sent. Used
40    /// for swallowed / quit messages.
41    Drop,
42}
43
44/// Cluster-side dispatch hook implemented by Stage 10 and by tests.
45///
46/// The dispatcher is invoked from a tokio task; implementations may
47/// do async work but should avoid blocking. The trait uses
48/// `&self` so the dispatcher can be shared across many connections.
49pub trait Dispatcher: Send + Sync {
50    /// Hand a parsed request to the dispatcher.
51    ///
52    /// `responder` is a per-connection channel the dispatcher uses
53    /// to deliver responses (or errors) back to the FSM that owns
54    /// the originating client connection.
55    fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome;
56}
57
58/// Channel the dispatcher uses to send responses back to a client
59/// FSM. The FSM owns the receiving half.
60pub type ServerSink = mpsc::Sender<OutboundEnvelope>;
61
62/// Envelope wrapping a dispatcher response and the request id it
63/// corresponds to.
64///
65/// `span` carries the originating request span back to the
66/// client-side FSM so the response writeback nests under the
67/// originating client span. The default is
68/// [`tracing::Span::none`].
69///
70/// `source_peer_idx` identifies the peer this response came from
71/// when the dispatcher fanned the request to multiple replicas.
72/// `None` is used for synthetic / inline / single-target paths
73/// where the source is unambiguous.
74#[derive(Debug)]
75pub struct OutboundEnvelope {
76    /// Request id the response is for.
77    pub req_id: crate::core::types::MsgId,
78    /// The response message.
79    pub rsp: Msg,
80    /// Originating request span for cross-task propagation.
81    pub span: tracing::Span,
82    /// Index of the peer this reply was produced by, when known.
83    /// Set by the per-target server / dnode-server drivers and
84    /// consumed by the per-request reply coalescer.
85    pub source_peer_idx: Option<u32>,
86}
87
88/// Dispatcher that drops every request and emits no response.
89///
90/// Useful as a placeholder in tests that only exercise framing.
91#[derive(Debug, Default, Clone)]
92pub struct NoopDispatcher;
93
94impl Dispatcher for NoopDispatcher {
95    fn dispatch(&self, _req: Msg, _responder: ServerSink) -> DispatchOutcome {
96        DispatchOutcome::Drop
97    }
98}
99
100impl<T: Dispatcher + ?Sized> Dispatcher for Arc<T> {
101    fn dispatch(&self, req: Msg, responder: ServerSink) -> DispatchOutcome {
102        (**self).dispatch(req, responder)
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use crate::msg::MsgType;
110
111    #[test]
112    fn noop_returns_drop() {
113        let (tx, _rx) = mpsc::channel(1);
114        let outcome = NoopDispatcher.dispatch(Msg::new(1, MsgType::ReqRedisGet, true), tx);
115        matches!(outcome, DispatchOutcome::Drop);
116    }
117}