thruster_server/
server.rs

1use std::net::ToSocketAddrs;
2use std::error::Error;
3use std::sync::Arc;
4use futures::SinkExt;
5
6use tokio;
7use tokio::net::{TcpStream, TcpListener};
8use tokio_util::codec::Framed;
9use tokio::stream::StreamExt;
10
11use thruster_app::app::App;
12use thruster_core::context::Context;
13use thruster_core::http::Http;
14use thruster_core::response::Response;
15use thruster_core::request::Request;
16
17// use std::thread;
18// use num_cpus;
19// use net2::TcpBuilder;
20// #[cfg(not(windows))]
21// use net2::unix::UnixTcpBuilderExt;
22
23use crate::thruster_server::ThrusterServer;
24
25pub struct Server<T: 'static + Context<Response = Response> + Send> {
26  app: App<Request, T>
27}
28
29// impl<T: 'static + Context<Response = Response> + Send> Server<T> {
30//   ///
31//   /// Starts the app with the default tokio runtime execution model
32//   ///
33//   pub fn start_work_stealing_optimized(self, host: &str, port: u16) {
34//     self.start(host, port);
35//   }
36
37//   ///
38//   /// Starts the app with a thread pool optimized for small requests and quick timeouts. This
39//   /// is done internally by spawning a separate thread for each reactor core. This is valuable
40//   /// if all server endpoints are similar in their load, as work is divided evenly among threads.
41//   /// As seanmonstar points out though, this is a very specific use case and might not be useful
42//   /// for everyday work loads.alloc
43//   ///
44//   /// See the discussion here for more information:
45//   ///
46//   /// https://users.rust-lang.org/t/getting-tokio-to-match-actix-web-performance/18659/7
47//   ///
48//   pub fn start_small_load_optimized(mut self, host: &str, port: u16) {
49//     let addr = (host, port).to_socket_addrs().unwrap().next().unwrap();
50//     let mut threads = Vec::new();
51//     self.app._route_parser.optimize();
52//     let arc_app = Arc::new(self.app);
53
54//     for _ in 0..num_cpus::get() {
55//       let arc_app = arc_app.clone();
56//       threads.push(thread::spawn(move || {
57//         let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
58
59//         let server = async || {
60//           let listener = {
61//             let builder = TcpBuilder::new_v4().unwrap();
62//             #[cfg(not(windows))]
63//             builder.reuse_address(true).unwrap();
64//             #[cfg(not(windows))]
65//             builder.reuse_port(true).unwrap();
66//             builder.bind(addr).unwrap();
67//             builder.listen(2048).unwrap()
68//           };
69//           let listener = TcpListener::from_std(listener, &tokio::reactor::Handle::default()).unwrap();
70
71//           listener.incoming().for_each(move |socket| {
72//             process(Arc::clone(&arc_app), socket);
73//             Ok(())
74//           })
75//           .map_err(|err| eprintln!("accept error = {:?}", err))
76//         };
77
78//         runtime.spawn(server);
79//         runtime.run().unwrap();
80//       }));
81//     }
82
83//     println!("Server running on {}", addr);
84
85//     for thread in threads {
86//       thread.join().unwrap();
87//     }
88
89//     fn process<T: Context<Response = Response> + Send>(app: Arc<App<Request, T>>, socket: TcpStream) {
90//       let framed = Framed::new(socket, Http);
91//       let (tx, rx) = framed.split();
92
93//       let task = tx.send_all(rx.and_then(move |request: Request| {
94//             let matched = app.resolve_from_method_and_path(request.method(), request.path());
95//             app.resolve(request, matched)
96//           }));
97
98//       // Spawn the task that handles the connection.
99//       tokio::spawn(task);
100//     }
101//   }
102// }
103
104impl<T: Context<Response = Response> + Send> ThrusterServer for Server<T> {
105  type Context = T;
106  type Response = Response;
107  type Request = Request;
108
109  fn new(app: App<Self::Request, T>) -> Self {
110    Server {
111      app
112    }
113  }
114
115  ///
116  /// Alias for start_work_stealing_optimized
117  ///
118  fn start(mut self, host: &str, port: u16) {
119    let mut rt = tokio::runtime::Runtime::new().unwrap();
120    rt.block_on(async {
121      let addr = (host, port).to_socket_addrs().unwrap().next().unwrap();
122
123      self.app._route_parser.optimize();
124
125      let mut listener = TcpListener::bind(&addr).await.unwrap();
126      let mut incoming = listener.incoming();
127      let arc_app = Arc::new(self.app);
128
129      while let Some(Ok(stream)) = incoming.next().await {
130          let cloned = arc_app.clone();
131          tokio::spawn(async move {
132              if let Err(e) = process(cloned, stream).await {
133                  println!("failed to process connection; error = {}", e);
134              }
135          });
136      }
137    });
138
139    async fn process<T: Context<Response = Response> + Send>(app: Arc<App<Request, T>>, socket: TcpStream) -> Result<(), Box<dyn Error>> {
140      let mut framed = Framed::new(socket, Http);
141
142      while let Some(request) = framed.next().await {
143        match request {
144            Ok(request) => {
145              let matched = app.resolve_from_method_and_path(request.method(), request.path());
146              let response = app.resolve(request, matched).await?;
147              framed.send(response).await?;
148            }
149            Err(e) => return Err(e.into()),
150        }
151      }
152
153      Ok(())
154    }
155  }
156}