celerix_store/server/
router.rs1use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
2use std::sync::Arc;
3use tokio::net::{TcpListener, TcpStream};
4use crate::{CelerixStore, Result};
5use log::{info, error};
6use tokio::sync::Semaphore;
7
8pub struct Router {
13 store: Arc<dyn CelerixStore>,
14 semaphore: Arc<Semaphore>,
15}
16
17impl Router {
18 pub fn new(store: Arc<dyn CelerixStore>) -> Self {
20 Self {
21 store,
22 semaphore: Arc::new(Semaphore::new(100)),
23 }
24 }
25
26 pub async fn listen(&self, port: &str) -> Result<()> {
30 let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
31 info!("Celerix Store listening on port {}", port);
32
33 loop {
34 let (socket, _) = listener.accept().await?;
35 let store = self.store.clone();
36 let sem = self.semaphore.clone();
37
38 tokio::spawn(async move {
39 let _permit = match sem.try_acquire() {
40 Ok(p) => p,
41 Err(_) => {
42 error!("Server busy: too many concurrent connections. Rejecting...");
43 let mut socket = socket;
45 let _ = socket.shutdown().await;
46 return;
47 }
48 };
49
50 if let Err(e) = handle_connection(socket, store).await {
51 error!("Connection error: {}", e);
52 }
53 });
54 }
55 }
56}
57
58pub async fn handle_connection(mut socket: TcpStream, store: Arc<dyn CelerixStore>) -> Result<()> {
63 let (reader, mut writer) = socket.split();
64 let mut reader = BufReader::new(reader);
65 let mut line = String::new();
66
67 loop {
68 line.clear();
69 let bytes_read = reader.read_line(&mut line).await?;
70 if bytes_read == 0 {
71 break;
72 }
73
74 let parts: Vec<&str> = line.trim().split_whitespace().collect();
75 if parts.is_empty() {
76 continue;
77 }
78
79 let command = parts[0].to_uppercase();
80 let response = match command.as_str() {
81 "GET" => {
82 if parts.len() < 4 {
83 "ERR missing arguments".to_string()
84 } else {
85 match store.get(parts[1], parts[2], parts[3]).await {
86 Ok(val) => format!("OK {}", serde_json::to_string(&val)?),
87 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
88 }
89 }
90 }
91 "SET" => {
92 if parts.len() < 5 {
93 "ERR missing arguments".to_string()
94 } else {
95 let val_str = parts[4..].join(" ");
96 match serde_json::from_str(&val_str) {
97 Ok(val) => match store.set(parts[1], parts[2], parts[3], val).await {
98 Ok(_) => "OK".to_string(),
99 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
100 },
101 Err(_) => "ERR invalid json value".to_string(),
102 }
103 }
104 }
105 "DEL" => {
106 if parts.len() < 4 {
107 "ERR missing arguments".to_string()
108 } else {
109 match store.delete(parts[1], parts[2], parts[3]).await {
110 Ok(_) => "OK".to_string(),
111 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
112 }
113 }
114 }
115 "LIST_PERSONAS" => {
116 match store.get_personas().await {
117 Ok(list) => format!("OK {}", serde_json::to_string(&list)?),
118 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
119 }
120 }
121 "LIST_APPS" => {
122 if parts.len() < 2 {
123 "ERR missing arguments".to_string()
124 } else {
125 match store.get_apps(parts[1]).await {
126 Ok(list) => format!("OK {}", serde_json::to_string(&list)?),
127 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
128 }
129 }
130 }
131 "DUMP" => {
132 if parts.len() < 3 {
133 "ERR missing arguments".to_string()
134 } else {
135 match store.get_app_store(parts[1], parts[2]).await {
136 Ok(data) => format!("OK {}", serde_json::to_string(&data)?),
137 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
138 }
139 }
140 }
141 "DUMP_APP" => {
142 if parts.len() < 2 {
143 "ERR missing arguments".to_string()
144 } else {
145 match store.dump_app(parts[1]).await {
146 Ok(data) => format!("OK {}", serde_json::to_string(&data)?),
147 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
148 }
149 }
150 }
151 "GET_GLOBAL" => {
152 if parts.len() < 3 {
153 "ERR missing arguments".to_string()
154 } else {
155 match store.get_global(parts[1], parts[2]).await {
156 Ok((val, persona)) => {
157 let out = serde_json::json!({
158 "persona": persona,
159 "value": val
160 });
161 format!("OK {}", serde_json::to_string(&out)?)
162 },
163 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
164 }
165 }
166 }
167 "MOVE" => {
168 if parts.len() < 5 {
169 "ERR missing arguments".to_string()
170 } else {
171 match store.move_key(parts[1], parts[2], parts[3], parts[4]).await {
172 Ok(_) => "OK".to_string(),
173 Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
174 }
175 }
176 }
177 "PING" => "PONG".to_string(),
178 "QUIT" => break,
179 _ => "ERR unknown command".to_string(),
180 };
181
182 writer.write_all(format!("{}\n", response).as_bytes()).await?;
183 }
184 Ok(())
185}