celerix_store/server/
router.rs

1use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
2use std::sync::Arc;
3use tokio::net::{TcpListener, TcpStream};
4use crate::{CelerixStore, Result};
5use log::{info, error};
6use tokio::sync::Semaphore;
7
8/// A TCP router that dispatches incoming commands to a [`CelerixStore`].
9/// 
10/// `Router` handles concurrent TCP connections (up to 100 by default) and 
11/// implements the Celerix Store network protocol.
12pub struct Router {
13    store: Arc<dyn CelerixStore>,
14    semaphore: Arc<Semaphore>,
15}
16
17impl Router {
18    /// Creates a new `Router` for the given store.
19    pub fn new(store: Arc<dyn CelerixStore>) -> Self {
20        Self { 
21            store,
22            semaphore: Arc::new(Semaphore::new(100)),
23        }
24    }
25
26    /// Starts the TCP server and listens for incoming connections on the specified port.
27    /// 
28    /// This method runs indefinitely until the process is terminated.
29    pub async fn listen(&self, port: &str) -> Result<()> {
30        let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
31        info!("Celerix Store listening on port {}", port);
32
33        loop {
34            let (socket, _) = listener.accept().await?;
35            let store = self.store.clone();
36            let sem = self.semaphore.clone();
37
38            tokio::spawn(async move {
39                let _permit = match sem.try_acquire() {
40                    Ok(p) => p,
41                    Err(_) => {
42                        error!("Server busy: too many concurrent connections. Rejecting...");
43                        // Ensure it's closed
44                        let mut socket = socket;
45                        let _ = socket.shutdown().await;
46                        return;
47                    }
48                };
49                
50                if let Err(e) = handle_connection(socket, store).await {
51                    error!("Connection error: {}", e);
52                }
53            });
54        }
55    }
56}
57
58/// Handles a single TCP connection by reading commands and writing responses.
59/// 
60/// This function is used by both the [`Router`] and the integration tests to
61/// process the Celerix Store protocol.
62pub async fn handle_connection(mut socket: TcpStream, store: Arc<dyn CelerixStore>) -> Result<()> {
63    let (reader, mut writer) = socket.split();
64    let mut reader = BufReader::new(reader);
65    let mut line = String::new();
66
67    loop {
68        line.clear();
69        let bytes_read = reader.read_line(&mut line).await?;
70        if bytes_read == 0 {
71            break;
72        }
73
74        let parts: Vec<&str> = line.trim().split_whitespace().collect();
75        if parts.is_empty() {
76            continue;
77        }
78
79        let command = parts[0].to_uppercase();
80        let response = match command.as_str() {
81            "GET" => {
82                if parts.len() < 4 {
83                    "ERR missing arguments".to_string()
84                } else {
85                    match store.get(parts[1], parts[2], parts[3]).await {
86                        Ok(val) => format!("OK {}", serde_json::to_string(&val)?),
87                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
88                    }
89                }
90            }
91            "SET" => {
92                if parts.len() < 5 {
93                    "ERR missing arguments".to_string()
94                } else {
95                    let val_str = parts[4..].join(" ");
96                    match serde_json::from_str(&val_str) {
97                        Ok(val) => match store.set(parts[1], parts[2], parts[3], val).await {
98                            Ok(_) => "OK".to_string(),
99                            Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
100                        },
101                        Err(_) => "ERR invalid json value".to_string(),
102                    }
103                }
104            }
105            "DEL" => {
106                if parts.len() < 4 {
107                    "ERR missing arguments".to_string()
108                } else {
109                    match store.delete(parts[1], parts[2], parts[3]).await {
110                        Ok(_) => "OK".to_string(),
111                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
112                    }
113                }
114            }
115            "LIST_PERSONAS" => {
116                match store.get_personas().await {
117                    Ok(list) => format!("OK {}", serde_json::to_string(&list)?),
118                    Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
119                }
120            }
121            "LIST_APPS" => {
122                if parts.len() < 2 {
123                    "ERR missing arguments".to_string()
124                } else {
125                    match store.get_apps(parts[1]).await {
126                        Ok(list) => format!("OK {}", serde_json::to_string(&list)?),
127                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
128                    }
129                }
130            }
131            "DUMP" => {
132                if parts.len() < 3 {
133                    "ERR missing arguments".to_string()
134                } else {
135                    match store.get_app_store(parts[1], parts[2]).await {
136                        Ok(data) => format!("OK {}", serde_json::to_string(&data)?),
137                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
138                    }
139                }
140            }
141            "DUMP_APP" => {
142                if parts.len() < 2 {
143                    "ERR missing arguments".to_string()
144                } else {
145                    match store.dump_app(parts[1]).await {
146                        Ok(data) => format!("OK {}", serde_json::to_string(&data)?),
147                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
148                    }
149                }
150            }
151            "GET_GLOBAL" => {
152                if parts.len() < 3 {
153                    "ERR missing arguments".to_string()
154                } else {
155                    match store.get_global(parts[1], parts[2]).await {
156                        Ok((val, persona)) => {
157                            let out = serde_json::json!({
158                                "persona": persona,
159                                "value": val
160                            });
161                            format!("OK {}", serde_json::to_string(&out)?)
162                        },
163                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
164                    }
165                }
166            }
167            "MOVE" => {
168                if parts.len() < 5 {
169                    "ERR missing arguments".to_string()
170                } else {
171                    match store.move_key(parts[1], parts[2], parts[3], parts[4]).await {
172                        Ok(_) => "OK".to_string(),
173                        Err(e) => format!("ERR {}", e.to_string().to_lowercase()),
174                    }
175                }
176            }
177            "PING" => "PONG".to_string(),
178            "QUIT" => break,
179            _ => "ERR unknown command".to_string(),
180        };
181
182        writer.write_all(format!("{}\n", response).as_bytes()).await?;
183    }
184    Ok(())
185}