celerix_store/server/
router.rs

1use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
2use std::sync::Arc;
3use tokio::net::{TcpListener, TcpStream};
4use crate::{CelerixStore, Result};
5use log::{info, error};
6use tokio::sync::Semaphore;
7
8pub struct Router {
9    store: Arc<dyn CelerixStore>,
10    semaphore: Arc<Semaphore>,
11}
12
13impl Router {
14    pub fn new(store: Arc<dyn CelerixStore>) -> Self {
15        Self { 
16            store,
17            semaphore: Arc::new(Semaphore::new(100)),
18        }
19    }
20
21    pub async fn listen(&self, port: &str) -> Result<()> {
22        let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
23        info!("Celerix Store listening on port {}", port);
24
25        loop {
26            let (socket, _) = listener.accept().await?;
27            let store = self.store.clone();
28            let sem = self.semaphore.clone();
29
30            tokio::spawn(async move {
31                let _permit = match sem.try_acquire() {
32                    Ok(p) => p,
33                    Err(_) => {
34                        error!("Server busy: too many concurrent connections. Rejecting...");
35                        // Ensure it's closed
36                        let mut socket = socket;
37                        let _ = socket.shutdown().await;
38                        return;
39                    }
40                };
41                
42                if let Err(e) = handle_connection(socket, store).await {
43                    error!("Connection error: {}", e);
44                }
45            });
46        }
47    }
48}
49
50pub async fn handle_connection(mut socket: TcpStream, store: Arc<dyn CelerixStore>) -> Result<()> {
51    let (reader, mut writer) = socket.split();
52    let mut reader = BufReader::new(reader);
53    let mut line = String::new();
54
55    loop {
56        line.clear();
57        let bytes_read = reader.read_line(&mut line).await?;
58        if bytes_read == 0 {
59            break;
60        }
61
62        let parts: Vec<&str> = line.trim().split_whitespace().collect();
63        if parts.is_empty() {
64            continue;
65        }
66
67        let command = parts[0].to_uppercase();
68        let response = match command.as_str() {
69            "GET" => {
70                if parts.len() < 4 {
71                    "ERR missing arguments".to_string()
72                } else {
73                    match store.get(parts[1], parts[2], parts[3]).await {
74                        Ok(val) => format!("OK {}", serde_json::to_string(&val)?),
75                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
76                    }
77                }
78            }
79            "SET" => {
80                if parts.len() < 5 {
81                    "ERR missing arguments".to_string()
82                } else {
83                    let val_str = parts[4..].join(" ");
84                    match serde_json::from_str(&val_str) {
85                        Ok(val) => match store.set(parts[1], parts[2], parts[3], val).await {
86                            Ok(_) => "OK".to_string(),
87                            Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
88                        },
89                        Err(_) => "ERR invalid json value".to_string(),
90                    }
91                }
92            }
93            "DEL" => {
94                if parts.len() < 4 {
95                    "ERR missing arguments".to_string()
96                } else {
97                    match store.delete(parts[1], parts[2], parts[3]).await {
98                        Ok(_) => "OK".to_string(),
99                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
100                    }
101                }
102            }
103            "LIST_PERSONAS" => {
104                match store.get_personas().await {
105                    Ok(list) => format!("OK {}", serde_json::to_string(&list)?),
106                    Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
107                }
108            }
109            "LIST_APPS" => {
110                if parts.len() < 2 {
111                    "ERR missing arguments".to_string()
112                } else {
113                    match store.get_apps(parts[1]).await {
114                        Ok(list) => format!("OK {}", serde_json::to_string(&list)?),
115                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
116                    }
117                }
118            }
119            "DUMP" => {
120                if parts.len() < 3 {
121                    "ERR missing arguments".to_string()
122                } else {
123                    match store.get_app_store(parts[1], parts[2]).await {
124                        Ok(data) => format!("OK {}", serde_json::to_string(&data)?),
125                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
126                    }
127                }
128            }
129            "DUMP_APP" => {
130                if parts.len() < 2 {
131                    "ERR missing arguments".to_string()
132                } else {
133                    match store.dump_app(parts[1]).await {
134                        Ok(data) => format!("OK {}", serde_json::to_string(&data)?),
135                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
136                    }
137                }
138            }
139            "GET_GLOBAL" => {
140                if parts.len() < 3 {
141                    "ERR missing arguments".to_string()
142                } else {
143                    match store.get_global(parts[1], parts[2]).await {
144                        Ok((val, persona)) => {
145                            let out = serde_json::json!({
146                                "persona": persona,
147                                "value": val
148                            });
149                            format!("OK {}", serde_json::to_string(&out)?)
150                        },
151                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
152                    }
153                }
154            }
155            "MOVE" => {
156                if parts.len() < 5 {
157                    "ERR missing arguments".to_string()
158                } else {
159                    match store.move_key(parts[1], parts[2], parts[3], parts[4]).await {
160                        Ok(_) => "OK".to_string(),
161                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
162                    }
163                }
164            }
165            "PING" => "PONG".to_string(),
166            "QUIT" => break,
167            _ => "ERR unknown command".to_string(),
168        };
169
170        writer.write_all(format!("{}\n", response).as_bytes()).await?;
171    }
172    Ok(())
173}