Skip to main content

allsource_core/infrastructure/resp/
server.rs

1//! RESP3 TCP server.
2//!
3//! Accepts Redis-protocol connections and dispatches commands to the EventStore.
4//! Runs alongside the HTTP API server on a separate port (default 6380).
5
6use crate::{domain::entities::Event, store::EventStore};
7use std::sync::Arc;
8use tokio::{
9    io::BufReader,
10    net::{TcpListener, TcpStream},
11};
12
13use super::{
14    commands,
15    protocol::{self, RespValue},
16};
17
18/// RESP3 server that bridges Redis wire protocol to EventStore operations.
19pub struct RespServer {
20    store: Arc<EventStore>,
21}
22
23impl RespServer {
24    pub fn new(store: Arc<EventStore>) -> Self {
25        Self { store }
26    }
27
28    /// Start accepting connections on the given port.
29    ///
30    /// This function runs forever (until the process shuts down).
31    pub async fn serve(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
32        let addr = format!("0.0.0.0:{port}");
33        let listener = TcpListener::bind(&addr).await?;
34
35        tracing::info!("RESP3 server listening on {}", addr);
36
37        loop {
38            match listener.accept().await {
39                Ok((stream, peer_addr)) => {
40                    tracing::debug!("RESP3 client connected from {}", peer_addr);
41                    let server = Arc::clone(&self);
42                    tokio::spawn(async move {
43                        if let Err(e) = server.handle_connection(stream).await {
44                            tracing::debug!("RESP3 client {} disconnected: {}", peer_addr, e);
45                        }
46                    });
47                }
48                Err(e) => {
49                    tracing::error!("RESP3 accept error: {}", e);
50                }
51            }
52        }
53    }
54
55    async fn handle_connection(&self, stream: TcpStream) -> anyhow::Result<()> {
56        let (reader, mut writer) = stream.into_split();
57        let mut reader = BufReader::new(reader);
58
59        loop {
60            let value = match protocol::parse_value(&mut reader).await {
61                Ok(Some(v)) => v,
62                Ok(None) => return Ok(()), // EOF
63                Err(e) => {
64                    let _ = protocol::write_value(
65                        &mut writer,
66                        &RespValue::err(format!("protocol error: {e}")),
67                    )
68                    .await;
69                    return Err(e.into());
70                }
71            };
72
73            // Commands come as arrays of bulk strings
74            let args = match value {
75                RespValue::Array(items) => items,
76                other => {
77                    // Inline command support: simple string treated as single-arg command
78                    if let Some(s) = other.as_str() {
79                        s.split_whitespace().map(RespValue::bulk_string).collect()
80                    } else {
81                        protocol::write_value(
82                            &mut writer,
83                            &RespValue::err("expected array or inline command"),
84                        )
85                        .await?;
86                        continue;
87                    }
88                }
89            };
90
91            // Check for QUIT
92            if let Some(cmd) = args.first().and_then(|v| v.as_str())
93                && cmd.eq_ignore_ascii_case("QUIT")
94            {
95                protocol::write_value(&mut writer, &RespValue::ok()).await?;
96                return Ok(());
97            }
98
99            let (response, subscription) = commands::execute(&args, &self.store);
100
101            // Write the response
102            protocol::write_value(&mut writer, &response).await?;
103
104            // If we got a subscription, enter pub/sub mode
105            if let Some(mut rx) = subscription {
106                self.run_subscription(&mut writer, &mut rx).await?;
107                return Ok(()); // subscription mode exits on error/disconnect
108            }
109        }
110    }
111
112    /// Run in subscription mode: forward broadcast events to the client
113    /// as Redis pub/sub messages until the connection drops.
114    async fn run_subscription(
115        &self,
116        writer: &mut (impl tokio::io::AsyncWrite + Unpin),
117        rx: &mut tokio::sync::broadcast::Receiver<Arc<Event>>,
118    ) -> anyhow::Result<()> {
119        loop {
120            match rx.recv().await {
121                Ok(event) => {
122                    // Format as Redis pub/sub message:
123                    // *3\r\n$7\r\nmessage\r\n$<channel_len>\r\n<channel>\r\n$<payload_len>\r\n<payload>\r\n
124                    let channel = format!("events:{}", event.event_type_str());
125                    let payload =
126                        serde_json::to_string(event.as_ref()).unwrap_or_else(|_| "{}".to_string());
127
128                    let msg = RespValue::Array(vec![
129                        RespValue::bulk_string("message"),
130                        RespValue::bulk_string(&channel),
131                        RespValue::bulk_string(&payload),
132                    ]);
133
134                    if let Err(e) = protocol::write_value(writer, &msg).await {
135                        tracing::debug!("RESP3 subscription write error: {}", e);
136                        return Ok(());
137                    }
138                }
139                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
140                    tracing::warn!("RESP3 subscriber lagged by {} messages", n);
141                    // Continue receiving — we just lost some messages
142                }
143                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
144                    return Ok(());
145                }
146            }
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use tokio::{
155        io::{AsyncReadExt, AsyncWriteExt},
156        net::TcpStream,
157    };
158
159    /// Helper: start a server on a random port, return the port.
160    async fn start_test_server() -> (u16, Arc<EventStore>) {
161        let store = Arc::new(EventStore::new());
162        let server = Arc::new(RespServer::new(Arc::clone(&store)));
163
164        // Bind to port 0 to get a random available port
165        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
166        let port = listener.local_addr().unwrap().port();
167
168        let server_clone = Arc::clone(&server);
169        tokio::spawn(async move {
170            while let Ok((stream, _)) = listener.accept().await {
171                let s = Arc::clone(&server_clone);
172                tokio::spawn(async move {
173                    let _ = s.handle_connection(stream).await;
174                });
175            }
176        });
177
178        (port, store)
179    }
180
181    async fn send_command(stream: &mut TcpStream, parts: &[&str]) -> String {
182        // Build RESP array
183        let cmd = RespValue::Array(parts.iter().map(|s| RespValue::bulk_string(s)).collect());
184        stream.write_all(&cmd.encode()).await.unwrap();
185        stream.flush().await.unwrap();
186
187        // Read response (simple: read available bytes)
188        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
189        let mut buf = vec![0u8; 4096];
190        let n = tokio::time::timeout(std::time::Duration::from_millis(200), stream.read(&mut buf))
191            .await
192            .unwrap_or(Ok(0))
193            .unwrap_or(0);
194        String::from_utf8_lossy(&buf[..n]).to_string()
195    }
196
197    #[tokio::test]
198    async fn test_server_ping() {
199        let (port, _store) = start_test_server().await;
200        let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
201            .await
202            .unwrap();
203
204        let resp = send_command(&mut stream, &["PING"]).await;
205        assert!(resp.contains("PONG"), "got: {resp}");
206    }
207
208    #[tokio::test]
209    async fn test_server_xadd_xrange() {
210        let (port, _store) = start_test_server().await;
211        let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
212            .await
213            .unwrap();
214
215        // XADD
216        let resp = send_command(
217            &mut stream,
218            &[
219                "XADD",
220                "default",
221                "*",
222                "event_type",
223                "user.created",
224                "entity_id",
225                "user-1",
226            ],
227        )
228        .await;
229        assert!(
230            resp.contains("-0"),
231            "stream ID should end in -0, got: {resp}"
232        );
233
234        // XRANGE
235        let resp = send_command(&mut stream, &["XRANGE", "default", "-", "+"]).await;
236        assert!(resp.contains("user.created"), "got: {resp}");
237    }
238
239    #[tokio::test]
240    async fn test_server_quit() {
241        let (port, _store) = start_test_server().await;
242        let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
243            .await
244            .unwrap();
245
246        let resp = send_command(&mut stream, &["QUIT"]).await;
247        assert!(resp.contains("OK"), "got: {resp}");
248    }
249}