allsource_core/infrastructure/resp/
server.rs1use crate::{domain::entities::Event, store::EventStore};
7use std::sync::Arc;
8use tokio::{
9 io::BufReader,
10 net::{TcpListener, TcpStream},
11};
12
13use super::{
14 commands,
15 protocol::{self, RespValue},
16};
17
18pub struct RespServer {
20 store: Arc<EventStore>,
21}
22
23impl RespServer {
24 pub fn new(store: Arc<EventStore>) -> Self {
25 Self { store }
26 }
27
28 pub async fn serve(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
32 let addr = format!("0.0.0.0:{port}");
33 let listener = TcpListener::bind(&addr).await?;
34
35 tracing::info!("RESP3 server listening on {}", addr);
36
37 loop {
38 match listener.accept().await {
39 Ok((stream, peer_addr)) => {
40 tracing::debug!("RESP3 client connected from {}", peer_addr);
41 let server = Arc::clone(&self);
42 tokio::spawn(async move {
43 if let Err(e) = server.handle_connection(stream).await {
44 tracing::debug!("RESP3 client {} disconnected: {}", peer_addr, e);
45 }
46 });
47 }
48 Err(e) => {
49 tracing::error!("RESP3 accept error: {}", e);
50 }
51 }
52 }
53 }
54
55 async fn handle_connection(&self, stream: TcpStream) -> anyhow::Result<()> {
56 let (reader, mut writer) = stream.into_split();
57 let mut reader = BufReader::new(reader);
58
59 loop {
60 let value = match protocol::parse_value(&mut reader).await {
61 Ok(Some(v)) => v,
62 Ok(None) => return Ok(()), Err(e) => {
64 let _ = protocol::write_value(
65 &mut writer,
66 &RespValue::err(format!("protocol error: {e}")),
67 )
68 .await;
69 return Err(e.into());
70 }
71 };
72
73 let args = match value {
75 RespValue::Array(items) => items,
76 other => {
77 if let Some(s) = other.as_str() {
79 s.split_whitespace().map(RespValue::bulk_string).collect()
80 } else {
81 protocol::write_value(
82 &mut writer,
83 &RespValue::err("expected array or inline command"),
84 )
85 .await?;
86 continue;
87 }
88 }
89 };
90
91 if let Some(cmd) = args.first().and_then(|v| v.as_str())
93 && cmd.eq_ignore_ascii_case("QUIT")
94 {
95 protocol::write_value(&mut writer, &RespValue::ok()).await?;
96 return Ok(());
97 }
98
99 let (response, subscription) = commands::execute(&args, &self.store);
100
101 protocol::write_value(&mut writer, &response).await?;
103
104 if let Some(mut rx) = subscription {
106 self.run_subscription(&mut writer, &mut rx).await?;
107 return Ok(()); }
109 }
110 }
111
112 async fn run_subscription(
115 &self,
116 writer: &mut (impl tokio::io::AsyncWrite + Unpin),
117 rx: &mut tokio::sync::broadcast::Receiver<Arc<Event>>,
118 ) -> anyhow::Result<()> {
119 loop {
120 match rx.recv().await {
121 Ok(event) => {
122 let channel = format!("events:{}", event.event_type_str());
125 let payload =
126 serde_json::to_string(event.as_ref()).unwrap_or_else(|_| "{}".to_string());
127
128 let msg = RespValue::Array(vec![
129 RespValue::bulk_string("message"),
130 RespValue::bulk_string(&channel),
131 RespValue::bulk_string(&payload),
132 ]);
133
134 if let Err(e) = protocol::write_value(writer, &msg).await {
135 tracing::debug!("RESP3 subscription write error: {}", e);
136 return Ok(());
137 }
138 }
139 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
140 tracing::warn!("RESP3 subscriber lagged by {} messages", n);
141 }
143 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
144 return Ok(());
145 }
146 }
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use tokio::{
155 io::{AsyncReadExt, AsyncWriteExt},
156 net::TcpStream,
157 };
158
159 async fn start_test_server() -> (u16, Arc<EventStore>) {
161 let store = Arc::new(EventStore::new());
162 let server = Arc::new(RespServer::new(Arc::clone(&store)));
163
164 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
166 let port = listener.local_addr().unwrap().port();
167
168 let server_clone = Arc::clone(&server);
169 tokio::spawn(async move {
170 while let Ok((stream, _)) = listener.accept().await {
171 let s = Arc::clone(&server_clone);
172 tokio::spawn(async move {
173 let _ = s.handle_connection(stream).await;
174 });
175 }
176 });
177
178 (port, store)
179 }
180
181 async fn send_command(stream: &mut TcpStream, parts: &[&str]) -> String {
182 let cmd = RespValue::Array(parts.iter().map(|s| RespValue::bulk_string(s)).collect());
184 stream.write_all(&cmd.encode()).await.unwrap();
185 stream.flush().await.unwrap();
186
187 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
189 let mut buf = vec![0u8; 4096];
190 let n = tokio::time::timeout(std::time::Duration::from_millis(200), stream.read(&mut buf))
191 .await
192 .unwrap_or(Ok(0))
193 .unwrap_or(0);
194 String::from_utf8_lossy(&buf[..n]).to_string()
195 }
196
197 #[tokio::test]
198 async fn test_server_ping() {
199 let (port, _store) = start_test_server().await;
200 let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
201 .await
202 .unwrap();
203
204 let resp = send_command(&mut stream, &["PING"]).await;
205 assert!(resp.contains("PONG"), "got: {resp}");
206 }
207
208 #[tokio::test]
209 async fn test_server_xadd_xrange() {
210 let (port, _store) = start_test_server().await;
211 let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
212 .await
213 .unwrap();
214
215 let resp = send_command(
217 &mut stream,
218 &[
219 "XADD",
220 "default",
221 "*",
222 "event_type",
223 "user.created",
224 "entity_id",
225 "user-1",
226 ],
227 )
228 .await;
229 assert!(
230 resp.contains("-0"),
231 "stream ID should end in -0, got: {resp}"
232 );
233
234 let resp = send_command(&mut stream, &["XRANGE", "default", "-", "+"]).await;
236 assert!(resp.contains("user.created"), "got: {resp}");
237 }
238
239 #[tokio::test]
240 async fn test_server_quit() {
241 let (port, _store) = start_test_server().await;
242 let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
243 .await
244 .unwrap();
245
246 let resp = send_command(&mut stream, &["QUIT"]).await;
247 assert!(resp.contains("OK"), "got: {resp}");
248 }
249}