Skip to main content

dynomite/net/
server.rs

1//! SERVER-role connection driver.
2//!
3//! The SERVER role holds an outbound connection to the backend
4//! datastore (Redis or memcache). The driver pulls requests off
5//! the connection's in-queue, encodes them onto the wire, and
6//! parses response bytes back into [`Msg`]s that it dispatches to
7//! the originating client.
8//!
9//! Stage 9 ships a minimal, transport-agnostic driver suitable
10//! for the loopback echo integration test. Stage 10 wires the
11//! driver to the cluster's [`Dispatcher`] so client / server
12//! connections form a complete request-response pipeline.
13//!
14//! [`Dispatcher`]: crate::net::Dispatcher
15//! [`Msg`]: crate::msg::Msg
16
17use tokio::io::{AsyncReadExt, AsyncWriteExt};
18use tokio::sync::mpsc;
19use tracing::Instrument as _;
20
21use crate::conf::DataStore;
22use crate::core::types::MsgId;
23use crate::io::reactor::ConnRole;
24use crate::msg::{Msg, MsgParseResult, MsgType};
25use crate::net::conn::Conn;
26use crate::net::dispatcher::OutboundEnvelope;
27use crate::net::NetError;
28use crate::proto::dnode::DmsgType;
29
30/// Outbound server-side connection driver.
31///
32/// The struct owns the transport-bound [`Conn`] plus the receiving
33/// half of the request channel that feeds it.
34pub struct ServerConn {
35    conn: Conn,
36    requests: mpsc::Receiver<OutboundRequest>,
37    data_store: DataStore,
38    pending_responses: std::collections::VecDeque<(MsgId, tracing::Span, Option<u32>)>,
39}
40
41/// Envelope sent into the server driver.
42///
43/// The driver writes `bytes` to the transport then awaits a
44/// response, which it forwards as an [`OutboundEnvelope`] on
45/// `responder` along with `req_id`.
46///
47/// `span` carries the originating client request's
48/// [`tracing::Span`] across the mpsc channel boundary so the
49/// receiving task's work nests under the originating client
50/// span when distributed tracing is enabled. The default value
51/// is [`tracing::Span::none`], which has no overhead when no
52/// subscriber is installed.
53///
54/// `ty` selects the dnode message-type header emitted on the
55/// peer plane. Data-plane callers leave it at
56/// [`DmsgType::Req`]; the gossip task uses
57/// [`DmsgType::GossipSyn`] / [`DmsgType::GossipShutdown`] for
58/// fire-and-forget control frames whose `responder` is never
59/// signalled.
60#[derive(Debug)]
61pub struct OutboundRequest {
62    /// Wire bytes already encoded by the dispatcher.
63    pub bytes: Vec<u8>,
64    /// Request id for response tagging.
65    pub req_id: MsgId,
66    /// Channel the driver pushes the parsed response onto.
67    pub responder: mpsc::Sender<OutboundEnvelope>,
68    /// Originating client request span; entered by the receiver
69    /// to nest backend / peer work under the request tree.
70    pub span: tracing::Span,
71    /// dnode message-type header emitted by the peer driver.
72    /// Defaults to [`DmsgType::Req`] for data-plane requests.
73    pub ty: DmsgType,
74    /// Index of the target peer the dispatcher is forwarding the
75    /// request to. The local backend driver and dnode-peer driver
76    /// stamp this onto the [`OutboundEnvelope`] they produce so
77    /// the reply coalescer can identify the responding replica.
78    /// `None` is used for single-target paths where the responder
79    /// already implies the source.
80    pub target_peer_idx: Option<u32>,
81}
82
83impl ServerConn {
84    /// Wrap an outbound [`Conn`] with the given request-channel
85    /// receiver and data-store flavor.
86    ///
87    /// # Examples
88    ///
89    /// ```no_run
90    /// use dynomite::conf::DataStore;
91    /// use dynomite::io::reactor::{ConnRole, TcpTransport};
92    /// use dynomite::net::{Conn, ServerConn};
93    /// use tokio::sync::mpsc;
94    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
95    /// let s = tokio::net::TcpStream::connect("127.0.0.1:6379").await.unwrap();
96    /// let conn = Conn::new(Box::new(TcpTransport::new(s, ConnRole::Server)), ConnRole::Server);
97    /// let (_tx, rx) = mpsc::channel(8);
98    /// let _ = ServerConn::new(conn, rx, DataStore::Redis);
99    /// # });
100    /// ```
101    #[must_use]
102    pub fn new(
103        conn: Conn,
104        requests: mpsc::Receiver<OutboundRequest>,
105        data_store: DataStore,
106    ) -> Self {
107        debug_assert!(matches!(
108            conn.role(),
109            ConnRole::Server | ConnRole::DnodePeerServer
110        ));
111        Self {
112            conn,
113            requests,
114            data_store,
115            pending_responses: std::collections::VecDeque::new(),
116        }
117    }
118
119    /// Borrow the underlying connection.
120    #[must_use]
121    pub fn conn(&self) -> &Conn {
122        &self.conn
123    }
124
125    /// Mutably borrow the underlying connection.
126    pub fn conn_mut(&mut self) -> &mut Conn {
127        &mut self.conn
128    }
129
130    /// Drive the server FSM until either the request channel is
131    /// closed or the transport hits EOF / error.
132    ///
133    /// # Errors
134    /// Surfaces transport- and protocol-level errors.
135    pub async fn run(mut self) -> Result<(), NetError> {
136        let mut read_buf = vec![0u8; 4096];
137        let mut accumulated = Vec::<u8>::new();
138        let mut pending_responder: Option<mpsc::Sender<OutboundEnvelope>> = None;
139
140        loop {
141            if self.conn.is_eof() && self.pending_responses.is_empty() {
142                self.conn.set_done();
143                return Ok(());
144            }
145
146            tokio::select! {
147                res = self.requests.recv() => {
148                    let Some(out_req) = res else {
149                        // Channel closed; drain pending responses and exit.
150                        if self.pending_responses.is_empty() {
151                            self.conn.set_done();
152                            return Ok(());
153                        }
154                        continue;
155                    };
156                    let send_span = tracing::info_span!(
157                        parent: &out_req.span,
158                        "backend.send",
159                        req_id = out_req.req_id,
160                        bytes = out_req.bytes.len(),
161                    );
162                    let req_span = out_req.span.clone();
163                    let req_bytes = out_req.bytes;
164                    let transport = self.conn.transport_mut().ok_or(NetError::Closed)?;
165                    let write_res = async { transport.write_all(&req_bytes).await }
166                        .instrument(send_span)
167                        .await;
168                    write_res?;
169                    self.conn.record_send(req_bytes.len());
170                    self.pending_responses
171                        .push_back((out_req.req_id, req_span, out_req.target_peer_idx));
172                    pending_responder = Some(out_req.responder);
173                }
174                read_res = async {
175                    if let Some(t) = self.conn.transport_mut() {
176                        t.read(&mut read_buf).await
177                    } else {
178                        Ok(0)
179                    }
180                } => {
181                    let n = read_res?;
182                    if n == 0 {
183                        self.conn.set_eof();
184                        continue;
185                    }
186                    self.conn.record_recv(n);
187                    accumulated.extend_from_slice(&read_buf[..n]);
188                    self.drive_response_parser(&mut accumulated, &mut pending_responder).await?;
189                }
190            }
191        }
192    }
193
194    async fn drive_response_parser(
195        &mut self,
196        accumulated: &mut Vec<u8>,
197        responder: &mut Option<mpsc::Sender<OutboundEnvelope>>,
198    ) -> Result<(), NetError> {
199        use crate::proto::memcache::memcache_parse_rsp;
200        use crate::proto::redis::redis_parse_rsp;
201
202        while !accumulated.is_empty() {
203            let head_span = self
204                .pending_responses
205                .front()
206                .map_or_else(tracing::Span::current, |(_, s, _)| s.clone());
207            let id = self.pending_responses.front().map_or(0, |(i, _, _)| *i);
208            let mut msg = Msg::new(id, MsgType::Unknown, false);
209            let result = match self.data_store {
210                DataStore::Redis | DataStore::Noxu => redis_parse_rsp(&mut msg, accumulated),
211                DataStore::Memcache => memcache_parse_rsp(&mut msg, accumulated),
212            };
213            match result {
214                MsgParseResult::Ok => {
215                    let consumed = msg.parser_pos();
216                    if consumed == 0 {
217                        return Err(NetError::Parse("server parser stalled".into()));
218                    }
219                    let bytes = accumulated[..consumed].to_vec();
220                    accumulated.drain(0..consumed);
221                    let (req_id, req_span, source_peer_idx) = self
222                        .pending_responses
223                        .pop_front()
224                        .unwrap_or((0, head_span, None));
225                    let parse_span = tracing::info_span!(
226                        parent: &req_span,
227                        "backend.parse",
228                        req_id,
229                        bytes = consumed,
230                    );
231                    let env = parse_span.in_scope(|| {
232                        let mut rsp = msg;
233                        let pool = self.conn.mbuf_pool().clone();
234                        let mut buf = pool.get();
235                        buf.recv(&bytes);
236                        rsp.mbufs_mut().push_back(buf);
237                        rsp.recompute_mlen();
238                        OutboundEnvelope {
239                            req_id,
240                            rsp,
241                            span: req_span,
242                            source_peer_idx,
243                        }
244                    });
245                    if let Some(sender) = responder.as_ref() {
246                        let _ = sender.send(env).await;
247                    }
248                }
249                MsgParseResult::Again => return Ok(()),
250                MsgParseResult::Repair | MsgParseResult::Noop | MsgParseResult::Fragment => {
251                    let consumed = msg.parser_pos();
252                    if consumed > 0 {
253                        accumulated.drain(0..consumed);
254                    } else {
255                        return Ok(());
256                    }
257                }
258                MsgParseResult::Error | MsgParseResult::OomError | MsgParseResult::DynoConfig => {
259                    return Err(NetError::Parse(format!("{result:?}")));
260                }
261            }
262        }
263        Ok(())
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use crate::io::reactor::TcpTransport;
271    use tokio::net::{TcpListener, TcpStream};
272
273    #[tokio::test]
274    async fn build_server_conn() {
275        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
276        let addr = listener.local_addr().unwrap();
277        let _accept = tokio::spawn(async move {
278            let (s, _) = listener.accept().await.unwrap();
279            drop(s);
280        });
281        let s = TcpStream::connect(addr).await.unwrap();
282        let conn = Conn::new(
283            Box::new(TcpTransport::new(s, ConnRole::Server)),
284            ConnRole::Server,
285        );
286        let (_tx, rx) = mpsc::channel(1);
287        let server = ServerConn::new(conn, rx, DataStore::Redis);
288        assert_eq!(server.conn().role(), ConnRole::Server);
289    }
290}