Skip to main content

allsource_core/infrastructure/resp/
server.rs

1//! RESP3 TCP server.
2//!
3//! Accepts Redis-protocol connections and dispatches commands to the EventStore.
4//! Runs alongside the HTTP API server on a separate port (default 6380).
5
6use crate::store::EventStore;
7use std::sync::Arc;
8use tokio::{
9    io::BufReader,
10    net::{TcpListener, TcpStream},
11};
12
13use super::{
14    commands,
15    protocol::{self, RespValue},
16};
17
18/// RESP3 server that bridges Redis wire protocol to EventStore operations.
19pub struct RespServer {
20    store: Arc<EventStore>,
21}
22
23impl RespServer {
24    pub fn new(store: Arc<EventStore>) -> Self {
25        Self { store }
26    }
27
28    /// Start accepting connections on the given port.
29    ///
30    /// This function runs forever (until the process shuts down).
31    pub async fn serve(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
32        let addr = format!("0.0.0.0:{port}");
33        let listener = TcpListener::bind(&addr).await?;
34
35        tracing::info!("RESP3 server listening on {}", addr);
36
37        loop {
38            match listener.accept().await {
39                Ok((stream, peer_addr)) => {
40                    tracing::debug!("RESP3 client connected from {}", peer_addr);
41                    let server = Arc::clone(&self);
42                    tokio::spawn(async move {
43                        if let Err(e) = server.handle_connection(stream).await {
44                            tracing::debug!("RESP3 client {} disconnected: {}", peer_addr, e);
45                        }
46                    });
47                }
48                Err(e) => {
49                    tracing::error!("RESP3 accept error: {}", e);
50                }
51            }
52        }
53    }
54
55    async fn handle_connection(&self, stream: TcpStream) -> anyhow::Result<()> {
56        let (reader, mut writer) = stream.into_split();
57        let mut reader = BufReader::new(reader);
58
59        loop {
60            let value = match protocol::parse_value(&mut reader).await {
61                Ok(Some(v)) => v,
62                Ok(None) => return Ok(()), // EOF
63                Err(e) => {
64                    let _ = protocol::write_value(
65                        &mut writer,
66                        &RespValue::err(format!("protocol error: {e}")),
67                    )
68                    .await;
69                    return Err(e.into());
70                }
71            };
72
73            // Commands come as arrays of bulk strings
74            let args = match value {
75                RespValue::Array(items) => items,
76                other => {
77                    // Inline command support: simple string treated as single-arg command
78                    if let Some(s) = other.as_str() {
79                        s.split_whitespace().map(RespValue::bulk_string).collect()
80                    } else {
81                        protocol::write_value(
82                            &mut writer,
83                            &RespValue::err("expected array or inline command"),
84                        )
85                        .await?;
86                        continue;
87                    }
88                }
89            };
90
91            // Check for QUIT
92            if let Some(cmd) = args.first().and_then(|v| v.as_str())
93                && cmd.eq_ignore_ascii_case("QUIT")
94            {
95                protocol::write_value(&mut writer, &RespValue::ok()).await?;
96                return Ok(());
97            }
98
99            let (response, subscription) = commands::execute(&args, &self.store);
100
101            // Write the response
102            protocol::write_value(&mut writer, &response).await?;
103
104            // If we got a subscription, enter pub/sub mode
105            if let Some(sub_info) = subscription {
106                self.run_subscription(&mut writer, sub_info).await?;
107                return Ok(()); // subscription mode exits on error/disconnect
108            }
109        }
110    }
111
112    /// Run in subscription mode: forward broadcast events to the client
113    /// as Redis pub/sub messages until the connection drops.
114    /// Events are filtered server-side by prefix patterns from SUBSCRIBE args.
115    async fn run_subscription(
116        &self,
117        writer: &mut (impl tokio::io::AsyncWrite + Unpin),
118        mut sub_info: commands::SubscriptionInfo,
119    ) -> anyhow::Result<()> {
120        use crate::application::services::consumer::ConsumerRegistry;
121
122        loop {
123            match sub_info.rx.recv().await {
124                Ok(event) => {
125                    // Apply server-side prefix filters
126                    if !sub_info.filters.is_empty()
127                        && !ConsumerRegistry::matches_filters(
128                            event.event_type_str(),
129                            &sub_info.filters,
130                        )
131                    {
132                        continue;
133                    }
134
135                    // Format as Redis pub/sub message:
136                    // *3\r\n$7\r\nmessage\r\n$<channel_len>\r\n<channel>\r\n$<payload_len>\r\n<payload>\r\n
137                    let channel = format!("events:{}", event.event_type_str());
138                    let payload =
139                        serde_json::to_string(event.as_ref()).unwrap_or_else(|_| "{}".to_string());
140
141                    let msg = RespValue::Array(vec![
142                        RespValue::bulk_string("message"),
143                        RespValue::bulk_string(&channel),
144                        RespValue::bulk_string(&payload),
145                    ]);
146
147                    if let Err(e) = protocol::write_value(writer, &msg).await {
148                        tracing::debug!("RESP3 subscription write error: {}", e);
149                        return Ok(());
150                    }
151                }
152                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
153                    tracing::warn!("RESP3 subscriber lagged by {} messages", n);
154                    // Continue receiving — we just lost some messages
155                }
156                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
157                    return Ok(());
158                }
159            }
160        }
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use tokio::{
168        io::{AsyncReadExt, AsyncWriteExt},
169        net::TcpStream,
170    };
171
172    /// Helper: start a server on a random port, return the port.
173    async fn start_test_server() -> (u16, Arc<EventStore>) {
174        let store = Arc::new(EventStore::new());
175        let server = Arc::new(RespServer::new(Arc::clone(&store)));
176
177        // Bind to port 0 to get a random available port
178        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
179        let port = listener.local_addr().unwrap().port();
180
181        let server_clone = Arc::clone(&server);
182        tokio::spawn(async move {
183            while let Ok((stream, _)) = listener.accept().await {
184                let s = Arc::clone(&server_clone);
185                tokio::spawn(async move {
186                    let _ = s.handle_connection(stream).await;
187                });
188            }
189        });
190
191        (port, store)
192    }
193
194    async fn send_command(stream: &mut TcpStream, parts: &[&str]) -> String {
195        // Build RESP array
196        let cmd = RespValue::Array(parts.iter().map(|s| RespValue::bulk_string(s)).collect());
197        stream.write_all(&cmd.encode()).await.unwrap();
198        stream.flush().await.unwrap();
199
200        // Read response (simple: read available bytes)
201        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
202        let mut buf = vec![0u8; 4096];
203        let n = tokio::time::timeout(std::time::Duration::from_millis(200), stream.read(&mut buf))
204            .await
205            .unwrap_or(Ok(0))
206            .unwrap_or(0);
207        String::from_utf8_lossy(&buf[..n]).to_string()
208    }
209
210    #[tokio::test]
211    async fn test_server_ping() {
212        let (port, _store) = start_test_server().await;
213        let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
214            .await
215            .unwrap();
216
217        let resp = send_command(&mut stream, &["PING"]).await;
218        assert!(resp.contains("PONG"), "got: {resp}");
219    }
220
221    #[tokio::test]
222    async fn test_server_xadd_xrange() {
223        let (port, _store) = start_test_server().await;
224        let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
225            .await
226            .unwrap();
227
228        // XADD
229        let resp = send_command(
230            &mut stream,
231            &[
232                "XADD",
233                "default",
234                "*",
235                "event_type",
236                "user.created",
237                "entity_id",
238                "user-1",
239            ],
240        )
241        .await;
242        assert!(
243            resp.contains("-0"),
244            "stream ID should end in -0, got: {resp}"
245        );
246
247        // XRANGE
248        let resp = send_command(&mut stream, &["XRANGE", "default", "-", "+"]).await;
249        assert!(resp.contains("user.created"), "got: {resp}");
250    }
251
252    #[tokio::test]
253    async fn test_server_quit() {
254        let (port, _store) = start_test_server().await;
255        let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
256            .await
257            .unwrap();
258
259        let resp = send_command(&mut stream, &["QUIT"]).await;
260        assert!(resp.contains("OK"), "got: {resp}");
261    }
262}