1use crate::error::ChopinError;
3use crate::router::Router;
4use crate::syscalls::{self};
5use crate::worker::Worker;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::thread;
9
10pub struct Chopin {
33 router: Router,
34}
35
36impl Default for Chopin {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl Chopin {
43 pub fn new() -> Self {
45 Self {
46 router: Router::new(),
47 }
48 }
49
50 pub fn mount_all_routes(mut self) -> Self {
52 for route in inventory::iter::<crate::router::RouteDef> {
53 self.router.add(route.method, route.path, route.handler);
54 }
55 self.router.finalize();
56 self
57 }
58
59 pub fn with_openapi(mut self) -> Self {
61 self.router
62 .get("/openapi.json", crate::openapi::openapi_json_handler);
63 self.router
64 .get("/docs", crate::openapi::scalar_docs_handler);
65 self
66 }
67
68 pub fn serve(self, host_port: &str) -> crate::error::ChopinResult<()> {
70 let server = Server::bind(host_port);
71 server.serve(self.router)
72 }
73}
74
75pub struct Server {
97 host_port: String,
98 workers: usize,
99}
100
101impl Server {
102 pub fn bind(host_port: &str) -> Self {
104 Self {
105 host_port: host_port.to_string(),
106 workers: num_cpus::get(),
107 }
108 }
109
110 pub fn workers(mut self, workers: usize) -> Self {
112 self.workers = workers;
113 self
114 }
115
116 pub fn serve(self, mut router: Router) -> crate::error::ChopinResult<()> {
119 router.finalize();
121
122 let core_ids = core_affinity::get_core_ids().unwrap_or_default();
123 let shutdown_flag = Arc::new(AtomicBool::new(false));
124
125 let shutdown_signal = shutdown_flag.clone();
126 ctrlc::set_handler(move || {
127 shutdown_signal.store(true, Ordering::Release);
128 })
129 .map_err(|e| ChopinError::Other(format!("Failed to set Ctrl-C handler: {e}")))?;
130
131 let mut worker_metrics = Vec::with_capacity(self.workers);
132 for _ in 0..self.workers {
133 worker_metrics.push(Arc::new(crate::metrics::WorkerMetrics::new()));
134 }
135
136 let _metrics_clones = worker_metrics.clone();
137 let _shutdown_metrics = shutdown_flag.clone();
138
139 let Parts { host, port } = parse_host_port(&self.host_port)?;
140
141 let mut handles: Vec<thread::JoinHandle<()>> = Vec::with_capacity(self.workers);
142 for (i, metrics_worker) in worker_metrics.iter().enumerate().take(self.workers) {
143 let core_id = core_ids.get(i % core_ids.len()).copied();
144 let router_clone = router.clone();
145 let shutdown = shutdown_flag.clone();
146 let metrics_worker = metrics_worker.clone();
147
148 let host_clone = host.clone();
149 let port_clone = port;
150
151 let handle = thread::Builder::new()
152 .name(format!("chopin-worker-{}", i))
153 .spawn(move || {
154 if let Some(id) = core_id {
155 core_affinity::set_for_current(id);
156 }
157
158 match syscalls::create_listen_socket_reuseport(&host_clone, port_clone) {
160 Ok(listen_fd) => {
161 let mut worker =
162 Worker::new(i, router_clone, metrics_worker, listen_fd);
163 if let Err(_e) = worker.run(shutdown) {
164 }
166 unsafe {
167 libc::close(listen_fd);
168 }
169 }
170 Err(_e) => {
171 }
173 }
174 })
175 .map_err(ChopinError::from)?;
176
177 handles.push(handle);
178 }
179
180 for handle in handles {
181 let _ = handle.join();
182 }
183
184 Ok(())
185 }
186}
187
188struct Parts {
189 host: String,
190 port: u16,
191}
192
193fn parse_host_port(hp: &str) -> crate::error::ChopinResult<Parts> {
194 if let Some(rest) = hp.strip_prefix('[') {
196 let bracket_end = rest.find(']').ok_or_else(|| {
198 crate::error::ChopinError::Other("Missing closing ']' in IPv6 address".to_string())
199 })?;
200 let host = rest[..bracket_end].to_string();
201 let after = &rest[bracket_end + 1..];
202 let port_str = after.strip_prefix(':').ok_or_else(|| {
203 crate::error::ChopinError::Other("Missing port after IPv6 address".to_string())
204 })?;
205 let port = port_str
206 .parse::<u16>()
207 .map_err(|_| crate::error::ChopinError::Other("Invalid port number".to_string()))?;
208 Ok(Parts { host, port })
209 } else {
210 let colon = hp.rfind(':').ok_or_else(|| {
212 crate::error::ChopinError::Other("Missing port in address".to_string())
213 })?;
214 let host = hp[..colon].to_string();
215 let port = hp[colon + 1..]
216 .parse::<u16>()
217 .map_err(|_| crate::error::ChopinError::Other("Invalid port number".to_string()))?;
218 Ok(Parts { host, port })
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn test_parse_ipv4() {
228 let p = parse_host_port("0.0.0.0:8080").unwrap();
229 assert_eq!(p.host, "0.0.0.0");
230 assert_eq!(p.port, 8080);
231 }
232
233 #[test]
234 fn test_parse_ipv6_bracket() {
235 let p = parse_host_port("[::1]:9090").unwrap();
236 assert_eq!(p.host, "::1");
237 assert_eq!(p.port, 9090);
238 }
239
240 #[test]
241 fn test_parse_ipv6_full() {
242 let p = parse_host_port("[::]:3000").unwrap();
243 assert_eq!(p.host, "::");
244 assert_eq!(p.port, 3000);
245 }
246
247 #[test]
248 fn test_parse_localhost() {
249 let p = parse_host_port("localhost:4000").unwrap();
250 assert_eq!(p.host, "localhost");
251 assert_eq!(p.port, 4000);
252 }
253
254 #[test]
255 fn test_parse_missing_port() {
256 assert!(parse_host_port("0.0.0.0").is_err());
257 }
258}