celerix_store/server/
router.rs1use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
2use std::sync::Arc;
3use tokio::net::{TcpListener, TcpStream};
4use crate::{CelerixStore, Result};
5use log::{info, error};
6use tokio::sync::Semaphore;
7
8pub struct Router {
9 store: Arc<dyn CelerixStore>,
10 semaphore: Arc<Semaphore>,
11}
12
13impl Router {
14 pub fn new(store: Arc<dyn CelerixStore>) -> Self {
15 Self {
16 store,
17 semaphore: Arc::new(Semaphore::new(100)),
18 }
19 }
20
21 pub async fn listen(&self, port: &str) -> Result<()> {
22 let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
23 info!("Celerix Store listening on port {}", port);
24
25 loop {
26 let (socket, _) = listener.accept().await?;
27 let store = self.store.clone();
28 let sem = self.semaphore.clone();
29
30 tokio::spawn(async move {
31 let _permit = match sem.try_acquire() {
32 Ok(p) => p,
33 Err(_) => {
34 error!("Server busy: too many concurrent connections. Rejecting...");
35 let mut socket = socket;
37 let _ = socket.shutdown().await;
38 return;
39 }
40 };
41
42 if let Err(e) = handle_connection(socket, store).await {
43 error!("Connection error: {}", e);
44 }
45 });
46 }
47 }
48}
49
50pub async fn handle_connection(mut socket: TcpStream, store: Arc<dyn CelerixStore>) -> Result<()> {
51 let (reader, mut writer) = socket.split();
52 let mut reader = BufReader::new(reader);
53 let mut line = String::new();
54
55 loop {
56 line.clear();
57 let bytes_read = reader.read_line(&mut line).await?;
58 if bytes_read == 0 {
59 break;
60 }
61
62 let parts: Vec<&str> = line.trim().split_whitespace().collect();
63 if parts.is_empty() {
64 continue;
65 }
66
67 let command = parts[0].to_uppercase();
68 let response = match command.as_str() {
69 "GET" => {
70 if parts.len() < 4 {
71 "ERR missing arguments".to_string()
72 } else {
73 match store.get(parts[1], parts[2], parts[3]).await {
74 Ok(val) => format!("OK {}", serde_json::to_string(&val)?),
75 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
76 }
77 }
78 }
79 "SET" => {
80 if parts.len() < 5 {
81 "ERR missing arguments".to_string()
82 } else {
83 let val_str = parts[4..].join(" ");
84 match serde_json::from_str(&val_str) {
85 Ok(val) => match store.set(parts[1], parts[2], parts[3], val).await {
86 Ok(_) => "OK".to_string(),
87 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
88 },
89 Err(_) => "ERR invalid json value".to_string(),
90 }
91 }
92 }
93 "DEL" => {
94 if parts.len() < 4 {
95 "ERR missing arguments".to_string()
96 } else {
97 match store.delete(parts[1], parts[2], parts[3]).await {
98 Ok(_) => "OK".to_string(),
99 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
100 }
101 }
102 }
103 "LIST_PERSONAS" => {
104 match store.get_personas().await {
105 Ok(list) => format!("OK {}", serde_json::to_string(&list)?),
106 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
107 }
108 }
109 "LIST_APPS" => {
110 if parts.len() < 2 {
111 "ERR missing arguments".to_string()
112 } else {
113 match store.get_apps(parts[1]).await {
114 Ok(list) => format!("OK {}", serde_json::to_string(&list)?),
115 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
116 }
117 }
118 }
119 "DUMP" => {
120 if parts.len() < 3 {
121 "ERR missing arguments".to_string()
122 } else {
123 match store.get_app_store(parts[1], parts[2]).await {
124 Ok(data) => format!("OK {}", serde_json::to_string(&data)?),
125 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
126 }
127 }
128 }
129 "DUMP_APP" => {
130 if parts.len() < 2 {
131 "ERR missing arguments".to_string()
132 } else {
133 match store.dump_app(parts[1]).await {
134 Ok(data) => format!("OK {}", serde_json::to_string(&data)?),
135 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
136 }
137 }
138 }
139 "GET_GLOBAL" => {
140 if parts.len() < 3 {
141 "ERR missing arguments".to_string()
142 } else {
143 match store.get_global(parts[1], parts[2]).await {
144 Ok((val, persona)) => {
145 let out = serde_json::json!({
146 "persona": persona,
147 "value": val
148 });
149 format!("OK {}", serde_json::to_string(&out)?)
150 },
151 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
152 }
153 }
154 }
155 "MOVE" => {
156 if parts.len() < 5 {
157 "ERR missing arguments".to_string()
158 } else {
159 match store.move_key(parts[1], parts[2], parts[3], parts[4]).await {
160 Ok(_) => "OK".to_string(),
161 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
162 }
163 }
164 }
165 "PING" => "PONG".to_string(),
166 "QUIT" => break,
167 _ => "ERR unknown command".to_string(),
168 };
169
170 writer.write_all(format!("{}\n", response).as_bytes()).await?;
171 }
172 Ok(())
173}