thruster_server/server.rs
1use std::net::ToSocketAddrs;
2use std::error::Error;
3use std::sync::Arc;
4use futures::SinkExt;
5
6use tokio;
7use tokio::net::{TcpStream, TcpListener};
8use tokio_util::codec::Framed;
9use tokio::stream::StreamExt;
10
11use thruster_app::app::App;
12use thruster_core::context::Context;
13use thruster_core::http::Http;
14use thruster_core::response::Response;
15use thruster_core::request::Request;
16
17// use std::thread;
18// use num_cpus;
19// use net2::TcpBuilder;
20// #[cfg(not(windows))]
21// use net2::unix::UnixTcpBuilderExt;
22
23use crate::thruster_server::ThrusterServer;
24
25pub struct Server<T: 'static + Context<Response = Response> + Send> {
26 app: App<Request, T>
27}
28
29// impl<T: 'static + Context<Response = Response> + Send> Server<T> {
30// ///
31// /// Starts the app with the default tokio runtime execution model
32// ///
33// pub fn start_work_stealing_optimized(self, host: &str, port: u16) {
34// self.start(host, port);
35// }
36
37// ///
38// /// Starts the app with a thread pool optimized for small requests and quick timeouts. This
39// /// is done internally by spawning a separate thread for each reactor core. This is valuable
40// /// if all server endpoints are similar in their load, as work is divided evenly among threads.
41// /// As seanmonstar points out though, this is a very specific use case and might not be useful
42// /// for everyday work loads.alloc
43// ///
44// /// See the discussion here for more information:
45// ///
46// /// https://users.rust-lang.org/t/getting-tokio-to-match-actix-web-performance/18659/7
47// ///
48// pub fn start_small_load_optimized(mut self, host: &str, port: u16) {
49// let addr = (host, port).to_socket_addrs().unwrap().next().unwrap();
50// let mut threads = Vec::new();
51// self.app._route_parser.optimize();
52// let arc_app = Arc::new(self.app);
53
54// for _ in 0..num_cpus::get() {
55// let arc_app = arc_app.clone();
56// threads.push(thread::spawn(move || {
57// let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
58
59// let server = async || {
60// let listener = {
61// let builder = TcpBuilder::new_v4().unwrap();
62// #[cfg(not(windows))]
63// builder.reuse_address(true).unwrap();
64// #[cfg(not(windows))]
65// builder.reuse_port(true).unwrap();
66// builder.bind(addr).unwrap();
67// builder.listen(2048).unwrap()
68// };
69// let listener = TcpListener::from_std(listener, &tokio::reactor::Handle::default()).unwrap();
70
71// listener.incoming().for_each(move |socket| {
72// process(Arc::clone(&arc_app), socket);
73// Ok(())
74// })
75// .map_err(|err| eprintln!("accept error = {:?}", err))
76// };
77
78// runtime.spawn(server);
79// runtime.run().unwrap();
80// }));
81// }
82
83// println!("Server running on {}", addr);
84
85// for thread in threads {
86// thread.join().unwrap();
87// }
88
89// fn process<T: Context<Response = Response> + Send>(app: Arc<App<Request, T>>, socket: TcpStream) {
90// let framed = Framed::new(socket, Http);
91// let (tx, rx) = framed.split();
92
93// let task = tx.send_all(rx.and_then(move |request: Request| {
94// let matched = app.resolve_from_method_and_path(request.method(), request.path());
95// app.resolve(request, matched)
96// }));
97
98// // Spawn the task that handles the connection.
99// tokio::spawn(task);
100// }
101// }
102// }
103
104impl<T: Context<Response = Response> + Send> ThrusterServer for Server<T> {
105 type Context = T;
106 type Response = Response;
107 type Request = Request;
108
109 fn new(app: App<Self::Request, T>) -> Self {
110 Server {
111 app
112 }
113 }
114
115 ///
116 /// Alias for start_work_stealing_optimized
117 ///
118 fn start(mut self, host: &str, port: u16) {
119 let mut rt = tokio::runtime::Runtime::new().unwrap();
120 rt.block_on(async {
121 let addr = (host, port).to_socket_addrs().unwrap().next().unwrap();
122
123 self.app._route_parser.optimize();
124
125 let mut listener = TcpListener::bind(&addr).await.unwrap();
126 let mut incoming = listener.incoming();
127 let arc_app = Arc::new(self.app);
128
129 while let Some(Ok(stream)) = incoming.next().await {
130 let cloned = arc_app.clone();
131 tokio::spawn(async move {
132 if let Err(e) = process(cloned, stream).await {
133 println!("failed to process connection; error = {}", e);
134 }
135 });
136 }
137 });
138
139 async fn process<T: Context<Response = Response> + Send>(app: Arc<App<Request, T>>, socket: TcpStream) -> Result<(), Box<dyn Error>> {
140 let mut framed = Framed::new(socket, Http);
141
142 while let Some(request) = framed.next().await {
143 match request {
144 Ok(request) => {
145 let matched = app.resolve_from_method_and_path(request.method(), request.path());
146 let response = app.resolve(request, matched).await?;
147 framed.send(response).await?;
148 }
149 Err(e) => return Err(e.into()),
150 }
151 }
152
153 Ok(())
154 }
155 }
156}