Skip to main content

chopin_core/
server.rs

1// src/server.rs
2use crate::error::ChopinError;
3use crate::router::Router;
4use crate::syscalls::{self};
5use crate::worker::Worker;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::thread;
9
10/// High-level application builder for Chopin.
11///
12/// Collects routes registered via `#[get]`/`#[post]`/… macros, optionally
13/// mounts OpenAPI documentation, and starts the multi-threaded server.
14///
15/// # Example
16///
17/// ```rust,no_run
18/// use chopin_core::{get, Context, Response, Chopin};
19///
20/// #[get("/")]
21/// fn index(_ctx: Context) -> Response {
22///     Response::text("Hello!")
23/// }
24///
25/// fn main() {
26///     Chopin::new()
27///         .mount_all_routes()
28///         .serve("0.0.0.0:8080")
29///         .unwrap();
30/// }
31/// ```
32pub struct Chopin {
33    router: Router,
34}
35
36impl Default for Chopin {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl Chopin {
43    /// Create a new Chopin application with an empty router.
44    pub fn new() -> Self {
45        Self {
46            router: Router::new(),
47        }
48    }
49
50    /// Discover and register all routes annotated with `#[get]`, `#[post]`, etc.
51    pub fn mount_all_routes(mut self) -> Self {
52        for route in inventory::iter::<crate::router::RouteDef> {
53            self.router.add(route.method, route.path, route.handler);
54        }
55        self.router.finalize();
56        self
57    }
58
59    /// Enable the built-in OpenAPI documentation at `/openapi.json` and `/docs`.
60    pub fn with_openapi(mut self) -> Self {
61        self.router
62            .get("/openapi.json", crate::openapi::openapi_json_handler);
63        self.router
64            .get("/docs", crate::openapi::scalar_docs_handler);
65        self
66    }
67
68    /// Start the server, binding to `host_port` (e.g. `"0.0.0.0:8080"`).
69    pub fn serve(self, host_port: &str) -> crate::error::ChopinResult<()> {
70        let server = Server::bind(host_port);
71        server.serve(self.router)
72    }
73}
74
75/// Low-level multi-threaded server.
76///
77/// Use this when you want full control over the [`Router`] (e.g. adding
78/// middleware, merging sub-routers) instead of the macro-driven [`Chopin`]
79/// builder.
80///
81/// # Example
82///
83/// ```rust,ignore
84/// use chopin_core::{Router, Server, Context, Response};
85///
86/// fn ping(_ctx: Context) -> Response { Response::text("pong") }
87///
88/// let mut router = Router::new();
89/// router.get("/ping", ping);
90///
91/// Server::bind("0.0.0.0:8080")
92///     .workers(4)
93///     .serve(router)
94///     .unwrap();
95/// ```
96pub struct Server {
97    host_port: String,
98    workers: usize,
99}
100
101impl Server {
102    /// Bind to the given address. Defaults to one worker per logical CPU.
103    pub fn bind(host_port: &str) -> Self {
104        Self {
105            host_port: host_port.to_string(),
106            workers: num_cpus::get(),
107        }
108    }
109
110    /// Set the number of worker threads (defaults to `num_cpus::get()`).
111    pub fn workers(mut self, workers: usize) -> Self {
112        self.workers = workers;
113        self
114    }
115
116    /// Start the server with the provided router. Spawns one thread per worker,
117    /// each pinned to a CPU core, and blocks until shutdown.
118    pub fn serve(self, mut router: Router) -> crate::error::ChopinResult<()> {
119        // Sort children at every trie level for binary-search matching.
120        router.finalize();
121
122        let core_ids = core_affinity::get_core_ids().unwrap_or_default();
123        let shutdown_flag = Arc::new(AtomicBool::new(false));
124
125        let shutdown_signal = shutdown_flag.clone();
126        ctrlc::set_handler(move || {
127            shutdown_signal.store(true, Ordering::Release);
128        })
129        .map_err(|e| ChopinError::Other(format!("Failed to set Ctrl-C handler: {e}")))?;
130
131        let mut worker_metrics = Vec::with_capacity(self.workers);
132        for _ in 0..self.workers {
133            worker_metrics.push(Arc::new(crate::metrics::WorkerMetrics::new()));
134        }
135
136        let _metrics_clones = worker_metrics.clone();
137        let _shutdown_metrics = shutdown_flag.clone();
138
139        let Parts { host, port } = parse_host_port(&self.host_port)?;
140
141        let mut handles: Vec<thread::JoinHandle<()>> = Vec::with_capacity(self.workers);
142        for (i, metrics_worker) in worker_metrics.iter().enumerate().take(self.workers) {
143            let core_id = core_ids.get(i % core_ids.len()).copied();
144            let router_clone = router.clone();
145            let shutdown = shutdown_flag.clone();
146            let metrics_worker = metrics_worker.clone();
147
148            let host_clone = host.clone();
149            let port_clone = port;
150
151            let handle = thread::Builder::new()
152                .name(format!("chopin-worker-{}", i))
153                .spawn(move || {
154                    if let Some(id) = core_id {
155                        core_affinity::set_for_current(id);
156                    }
157
158                    // Create dedicated SO_REUSEPORT listener for this worker
159                    match syscalls::create_listen_socket_reuseport(&host_clone, port_clone) {
160                        Ok(listen_fd) => {
161                            let mut worker =
162                                Worker::new(i, router_clone, metrics_worker, listen_fd);
163                            if let Err(_e) = worker.run(shutdown) {
164                                // Error suppressed in production
165                            }
166                            unsafe {
167                                libc::close(listen_fd);
168                            }
169                        }
170                        Err(_e) => {
171                            // Error suppressed in production
172                        }
173                    }
174                })
175                .map_err(ChopinError::from)?;
176
177            handles.push(handle);
178        }
179
180        for handle in handles {
181            let _ = handle.join();
182        }
183
184        Ok(())
185    }
186}
187
188struct Parts {
189    host: String,
190    port: u16,
191}
192
193fn parse_host_port(hp: &str) -> crate::error::ChopinResult<Parts> {
194    // D.6: Support IPv6 bracket notation, e.g. "[::1]:8080"
195    if let Some(rest) = hp.strip_prefix('[') {
196        // IPv6 bracketed form: [host]:port
197        let bracket_end = rest.find(']').ok_or_else(|| {
198            crate::error::ChopinError::Other("Missing closing ']' in IPv6 address".to_string())
199        })?;
200        let host = rest[..bracket_end].to_string();
201        let after = &rest[bracket_end + 1..];
202        let port_str = after.strip_prefix(':').ok_or_else(|| {
203            crate::error::ChopinError::Other("Missing port after IPv6 address".to_string())
204        })?;
205        let port = port_str
206            .parse::<u16>()
207            .map_err(|_| crate::error::ChopinError::Other("Invalid port number".to_string()))?;
208        Ok(Parts { host, port })
209    } else {
210        // IPv4 / hostname: split on last colon
211        let colon = hp.rfind(':').ok_or_else(|| {
212            crate::error::ChopinError::Other("Missing port in address".to_string())
213        })?;
214        let host = hp[..colon].to_string();
215        let port = hp[colon + 1..]
216            .parse::<u16>()
217            .map_err(|_| crate::error::ChopinError::Other("Invalid port number".to_string()))?;
218        Ok(Parts { host, port })
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn test_parse_ipv4() {
228        let p = parse_host_port("0.0.0.0:8080").unwrap();
229        assert_eq!(p.host, "0.0.0.0");
230        assert_eq!(p.port, 8080);
231    }
232
233    #[test]
234    fn test_parse_ipv6_bracket() {
235        let p = parse_host_port("[::1]:9090").unwrap();
236        assert_eq!(p.host, "::1");
237        assert_eq!(p.port, 9090);
238    }
239
240    #[test]
241    fn test_parse_ipv6_full() {
242        let p = parse_host_port("[::]:3000").unwrap();
243        assert_eq!(p.host, "::");
244        assert_eq!(p.port, 3000);
245    }
246
247    #[test]
248    fn test_parse_localhost() {
249        let p = parse_host_port("localhost:4000").unwrap();
250        assert_eq!(p.host, "localhost");
251        assert_eq!(p.port, 4000);
252    }
253
254    #[test]
255    fn test_parse_missing_port() {
256        assert!(parse_host_port("0.0.0.0").is_err());
257    }
258}