dynomite/net/server.rs
1//! SERVER-role connection driver.
2//!
3//! The SERVER role holds an outbound connection to the backend
4//! datastore (Redis or memcache). The driver pulls requests off
5//! the connection's in-queue, encodes them onto the wire, and
6//! parses response bytes back into [`Msg`]s that it dispatches to
7//! the originating client.
8//!
9//! Stage 9 ships a minimal, transport-agnostic driver suitable
10//! for the loopback echo integration test. Stage 10 wires the
11//! driver to the cluster's [`Dispatcher`] so client / server
12//! connections form a complete request-response pipeline.
13//!
14//! [`Dispatcher`]: crate::net::Dispatcher
15//! [`Msg`]: crate::msg::Msg
16
17use tokio::io::{AsyncReadExt, AsyncWriteExt};
18use tokio::sync::mpsc;
19use tracing::Instrument as _;
20
21use crate::conf::DataStore;
22use crate::core::types::MsgId;
23use crate::io::reactor::ConnRole;
24use crate::msg::{Msg, MsgParseResult, MsgType};
25use crate::net::conn::Conn;
26use crate::net::dispatcher::OutboundEnvelope;
27use crate::net::NetError;
28use crate::proto::dnode::DmsgType;
29
30/// Outbound server-side connection driver.
31///
32/// The struct owns the transport-bound [`Conn`] plus the receiving
33/// half of the request channel that feeds it.
34pub struct ServerConn {
35 conn: Conn,
36 requests: mpsc::Receiver<OutboundRequest>,
37 data_store: DataStore,
38 pending_responses: std::collections::VecDeque<(MsgId, tracing::Span, Option<u32>)>,
39}
40
41/// Envelope sent into the server driver.
42///
43/// The driver writes `bytes` to the transport then awaits a
44/// response, which it forwards as an [`OutboundEnvelope`] on
45/// `responder` along with `req_id`.
46///
47/// `span` carries the originating client request's
48/// [`tracing::Span`] across the mpsc channel boundary so the
49/// receiving task's work nests under the originating client
50/// span when distributed tracing is enabled. The default value
51/// is [`tracing::Span::none`], which has no overhead when no
52/// subscriber is installed.
53///
54/// `ty` selects the dnode message-type header emitted on the
55/// peer plane. Data-plane callers leave it at
56/// [`DmsgType::Req`]; the gossip task uses
57/// [`DmsgType::GossipSyn`] / [`DmsgType::GossipShutdown`] for
58/// fire-and-forget control frames whose `responder` is never
59/// signalled.
60#[derive(Debug)]
61pub struct OutboundRequest {
62 /// Wire bytes already encoded by the dispatcher.
63 pub bytes: Vec<u8>,
64 /// Request id for response tagging.
65 pub req_id: MsgId,
66 /// Channel the driver pushes the parsed response onto.
67 pub responder: mpsc::Sender<OutboundEnvelope>,
68 /// Originating client request span; entered by the receiver
69 /// to nest backend / peer work under the request tree.
70 pub span: tracing::Span,
71 /// dnode message-type header emitted by the peer driver.
72 /// Defaults to [`DmsgType::Req`] for data-plane requests.
73 pub ty: DmsgType,
74 /// Index of the target peer the dispatcher is forwarding the
75 /// request to. The local backend driver and dnode-peer driver
76 /// stamp this onto the [`OutboundEnvelope`] they produce so
77 /// the reply coalescer can identify the responding replica.
78 /// `None` is used for single-target paths where the responder
79 /// already implies the source.
80 pub target_peer_idx: Option<u32>,
81}
82
83impl ServerConn {
84 /// Wrap an outbound [`Conn`] with the given request-channel
85 /// receiver and data-store flavor.
86 ///
87 /// # Examples
88 ///
89 /// ```no_run
90 /// use dynomite::conf::DataStore;
91 /// use dynomite::io::reactor::{ConnRole, TcpTransport};
92 /// use dynomite::net::{Conn, ServerConn};
93 /// use tokio::sync::mpsc;
94 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
95 /// let s = tokio::net::TcpStream::connect("127.0.0.1:6379").await.unwrap();
96 /// let conn = Conn::new(Box::new(TcpTransport::new(s, ConnRole::Server)), ConnRole::Server);
97 /// let (_tx, rx) = mpsc::channel(8);
98 /// let _ = ServerConn::new(conn, rx, DataStore::Redis);
99 /// # });
100 /// ```
101 #[must_use]
102 pub fn new(
103 conn: Conn,
104 requests: mpsc::Receiver<OutboundRequest>,
105 data_store: DataStore,
106 ) -> Self {
107 debug_assert!(matches!(
108 conn.role(),
109 ConnRole::Server | ConnRole::DnodePeerServer
110 ));
111 Self {
112 conn,
113 requests,
114 data_store,
115 pending_responses: std::collections::VecDeque::new(),
116 }
117 }
118
119 /// Borrow the underlying connection.
120 #[must_use]
121 pub fn conn(&self) -> &Conn {
122 &self.conn
123 }
124
125 /// Mutably borrow the underlying connection.
126 pub fn conn_mut(&mut self) -> &mut Conn {
127 &mut self.conn
128 }
129
130 /// Drive the server FSM until either the request channel is
131 /// closed or the transport hits EOF / error.
132 ///
133 /// # Errors
134 /// Surfaces transport- and protocol-level errors.
135 pub async fn run(mut self) -> Result<(), NetError> {
136 let mut read_buf = vec![0u8; 4096];
137 let mut accumulated = Vec::<u8>::new();
138 let mut pending_responder: Option<mpsc::Sender<OutboundEnvelope>> = None;
139
140 loop {
141 if self.conn.is_eof() && self.pending_responses.is_empty() {
142 self.conn.set_done();
143 return Ok(());
144 }
145
146 tokio::select! {
147 res = self.requests.recv() => {
148 let Some(out_req) = res else {
149 // Channel closed; drain pending responses and exit.
150 if self.pending_responses.is_empty() {
151 self.conn.set_done();
152 return Ok(());
153 }
154 continue;
155 };
156 let send_span = tracing::info_span!(
157 parent: &out_req.span,
158 "backend.send",
159 req_id = out_req.req_id,
160 bytes = out_req.bytes.len(),
161 );
162 let req_span = out_req.span.clone();
163 let req_bytes = out_req.bytes;
164 let transport = self.conn.transport_mut().ok_or(NetError::Closed)?;
165 let write_res = async { transport.write_all(&req_bytes).await }
166 .instrument(send_span)
167 .await;
168 write_res?;
169 self.conn.record_send(req_bytes.len());
170 self.pending_responses
171 .push_back((out_req.req_id, req_span, out_req.target_peer_idx));
172 pending_responder = Some(out_req.responder);
173 }
174 read_res = async {
175 if let Some(t) = self.conn.transport_mut() {
176 t.read(&mut read_buf).await
177 } else {
178 Ok(0)
179 }
180 } => {
181 let n = read_res?;
182 if n == 0 {
183 self.conn.set_eof();
184 continue;
185 }
186 self.conn.record_recv(n);
187 accumulated.extend_from_slice(&read_buf[..n]);
188 self.drive_response_parser(&mut accumulated, &mut pending_responder).await?;
189 }
190 }
191 }
192 }
193
194 async fn drive_response_parser(
195 &mut self,
196 accumulated: &mut Vec<u8>,
197 responder: &mut Option<mpsc::Sender<OutboundEnvelope>>,
198 ) -> Result<(), NetError> {
199 use crate::proto::memcache::memcache_parse_rsp;
200 use crate::proto::redis::redis_parse_rsp;
201
202 while !accumulated.is_empty() {
203 let head_span = self
204 .pending_responses
205 .front()
206 .map_or_else(tracing::Span::current, |(_, s, _)| s.clone());
207 let id = self.pending_responses.front().map_or(0, |(i, _, _)| *i);
208 let mut msg = Msg::new(id, MsgType::Unknown, false);
209 let result = match self.data_store {
210 DataStore::Redis | DataStore::Noxu => redis_parse_rsp(&mut msg, accumulated),
211 DataStore::Memcache => memcache_parse_rsp(&mut msg, accumulated),
212 };
213 match result {
214 MsgParseResult::Ok => {
215 let consumed = msg.parser_pos();
216 if consumed == 0 {
217 return Err(NetError::Parse("server parser stalled".into()));
218 }
219 let bytes = accumulated[..consumed].to_vec();
220 accumulated.drain(0..consumed);
221 let (req_id, req_span, source_peer_idx) = self
222 .pending_responses
223 .pop_front()
224 .unwrap_or((0, head_span, None));
225 let parse_span = tracing::info_span!(
226 parent: &req_span,
227 "backend.parse",
228 req_id,
229 bytes = consumed,
230 );
231 let env = parse_span.in_scope(|| {
232 let mut rsp = msg;
233 let pool = self.conn.mbuf_pool().clone();
234 let mut buf = pool.get();
235 buf.recv(&bytes);
236 rsp.mbufs_mut().push_back(buf);
237 rsp.recompute_mlen();
238 OutboundEnvelope {
239 req_id,
240 rsp,
241 span: req_span,
242 source_peer_idx,
243 }
244 });
245 if let Some(sender) = responder.as_ref() {
246 let _ = sender.send(env).await;
247 }
248 }
249 MsgParseResult::Again => return Ok(()),
250 MsgParseResult::Repair | MsgParseResult::Noop | MsgParseResult::Fragment => {
251 let consumed = msg.parser_pos();
252 if consumed > 0 {
253 accumulated.drain(0..consumed);
254 } else {
255 return Ok(());
256 }
257 }
258 MsgParseResult::Error | MsgParseResult::OomError | MsgParseResult::DynoConfig => {
259 return Err(NetError::Parse(format!("{result:?}")));
260 }
261 }
262 }
263 Ok(())
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use crate::io::reactor::TcpTransport;
271 use tokio::net::{TcpListener, TcpStream};
272
273 #[tokio::test]
274 async fn build_server_conn() {
275 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
276 let addr = listener.local_addr().unwrap();
277 let _accept = tokio::spawn(async move {
278 let (s, _) = listener.accept().await.unwrap();
279 drop(s);
280 });
281 let s = TcpStream::connect(addr).await.unwrap();
282 let conn = Conn::new(
283 Box::new(TcpTransport::new(s, ConnRole::Server)),
284 ConnRole::Server,
285 );
286 let (_tx, rx) = mpsc::channel(1);
287 let server = ServerConn::new(conn, rx, DataStore::Redis);
288 assert_eq!(server.conn().role(), ConnRole::Server);
289 }
290}