allsource_core/infrastructure/resp/
server.rs1use crate::store::EventStore;
7use std::sync::Arc;
8use tokio::{
9 io::BufReader,
10 net::{TcpListener, TcpStream},
11};
12
13use super::{
14 commands,
15 protocol::{self, RespValue},
16};
17
18pub struct RespServer {
20 store: Arc<EventStore>,
21}
22
23impl RespServer {
24 pub fn new(store: Arc<EventStore>) -> Self {
25 Self { store }
26 }
27
28 pub async fn serve(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
32 let addr = format!("0.0.0.0:{port}");
33 let listener = TcpListener::bind(&addr).await?;
34
35 tracing::info!("RESP3 server listening on {}", addr);
36
37 loop {
38 match listener.accept().await {
39 Ok((stream, peer_addr)) => {
40 tracing::debug!("RESP3 client connected from {}", peer_addr);
41 let server = Arc::clone(&self);
42 tokio::spawn(async move {
43 if let Err(e) = server.handle_connection(stream).await {
44 tracing::debug!("RESP3 client {} disconnected: {}", peer_addr, e);
45 }
46 });
47 }
48 Err(e) => {
49 tracing::error!("RESP3 accept error: {}", e);
50 }
51 }
52 }
53 }
54
55 async fn handle_connection(&self, stream: TcpStream) -> anyhow::Result<()> {
56 let (reader, mut writer) = stream.into_split();
57 let mut reader = BufReader::new(reader);
58
59 loop {
60 let value = match protocol::parse_value(&mut reader).await {
61 Ok(Some(v)) => v,
62 Ok(None) => return Ok(()), Err(e) => {
64 let _ = protocol::write_value(
65 &mut writer,
66 &RespValue::err(format!("protocol error: {e}")),
67 )
68 .await;
69 return Err(e.into());
70 }
71 };
72
73 let args = match value {
75 RespValue::Array(items) => items,
76 other => {
77 if let Some(s) = other.as_str() {
79 s.split_whitespace().map(RespValue::bulk_string).collect()
80 } else {
81 protocol::write_value(
82 &mut writer,
83 &RespValue::err("expected array or inline command"),
84 )
85 .await?;
86 continue;
87 }
88 }
89 };
90
91 if let Some(cmd) = args.first().and_then(|v| v.as_str())
93 && cmd.eq_ignore_ascii_case("QUIT")
94 {
95 protocol::write_value(&mut writer, &RespValue::ok()).await?;
96 return Ok(());
97 }
98
99 let (response, subscription) = commands::execute(&args, &self.store);
100
101 protocol::write_value(&mut writer, &response).await?;
103
104 if let Some(sub_info) = subscription {
106 self.run_subscription(&mut writer, sub_info).await?;
107 return Ok(()); }
109 }
110 }
111
112 async fn run_subscription(
116 &self,
117 writer: &mut (impl tokio::io::AsyncWrite + Unpin),
118 mut sub_info: commands::SubscriptionInfo,
119 ) -> anyhow::Result<()> {
120 use crate::application::services::consumer::ConsumerRegistry;
121
122 loop {
123 match sub_info.rx.recv().await {
124 Ok(event) => {
125 if !sub_info.filters.is_empty()
127 && !ConsumerRegistry::matches_filters(
128 event.event_type_str(),
129 &sub_info.filters,
130 )
131 {
132 continue;
133 }
134
135 let channel = format!("events:{}", event.event_type_str());
138 let payload =
139 serde_json::to_string(event.as_ref()).unwrap_or_else(|_| "{}".to_string());
140
141 let msg = RespValue::Array(vec![
142 RespValue::bulk_string("message"),
143 RespValue::bulk_string(&channel),
144 RespValue::bulk_string(&payload),
145 ]);
146
147 if let Err(e) = protocol::write_value(writer, &msg).await {
148 tracing::debug!("RESP3 subscription write error: {}", e);
149 return Ok(());
150 }
151 }
152 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
153 tracing::warn!("RESP3 subscriber lagged by {} messages", n);
154 }
156 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
157 return Ok(());
158 }
159 }
160 }
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use tokio::{
168 io::{AsyncReadExt, AsyncWriteExt},
169 net::TcpStream,
170 };
171
172 async fn start_test_server() -> (u16, Arc<EventStore>) {
174 let store = Arc::new(EventStore::new());
175 let server = Arc::new(RespServer::new(Arc::clone(&store)));
176
177 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
179 let port = listener.local_addr().unwrap().port();
180
181 let server_clone = Arc::clone(&server);
182 tokio::spawn(async move {
183 while let Ok((stream, _)) = listener.accept().await {
184 let s = Arc::clone(&server_clone);
185 tokio::spawn(async move {
186 let _ = s.handle_connection(stream).await;
187 });
188 }
189 });
190
191 (port, store)
192 }
193
194 async fn send_command(stream: &mut TcpStream, parts: &[&str]) -> String {
195 let cmd = RespValue::Array(parts.iter().map(|s| RespValue::bulk_string(s)).collect());
197 stream.write_all(&cmd.encode()).await.unwrap();
198 stream.flush().await.unwrap();
199
200 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
202 let mut buf = vec![0u8; 4096];
203 let n = tokio::time::timeout(std::time::Duration::from_millis(200), stream.read(&mut buf))
204 .await
205 .unwrap_or(Ok(0))
206 .unwrap_or(0);
207 String::from_utf8_lossy(&buf[..n]).to_string()
208 }
209
210 #[tokio::test]
211 async fn test_server_ping() {
212 let (port, _store) = start_test_server().await;
213 let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
214 .await
215 .unwrap();
216
217 let resp = send_command(&mut stream, &["PING"]).await;
218 assert!(resp.contains("PONG"), "got: {resp}");
219 }
220
221 #[tokio::test]
222 async fn test_server_xadd_xrange() {
223 let (port, _store) = start_test_server().await;
224 let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
225 .await
226 .unwrap();
227
228 let resp = send_command(
230 &mut stream,
231 &[
232 "XADD",
233 "default",
234 "*",
235 "event_type",
236 "user.created",
237 "entity_id",
238 "user-1",
239 ],
240 )
241 .await;
242 assert!(
243 resp.contains("-0"),
244 "stream ID should end in -0, got: {resp}"
245 );
246
247 let resp = send_command(&mut stream, &["XRANGE", "default", "-", "+"]).await;
249 assert!(resp.contains("user.created"), "got: {resp}");
250 }
251
252 #[tokio::test]
253 async fn test_server_quit() {
254 let (port, _store) = start_test_server().await;
255 let mut stream = TcpStream::connect(format!("127.0.0.1:{port}"))
256 .await
257 .unwrap();
258
259 let resp = send_command(&mut stream, &["QUIT"]).await;
260 assert!(resp.contains("OK"), "got: {resp}");
261 }
262}