use crate::ReusableBoxFuture;
use async_trait::async_trait;
use futures::{FutureExt, SinkExt, StreamExt};
use socket2::{Domain, Socket, Type};
use std::net::ToSocketAddrs;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_util::codec::Framed;
use crate::app::App;
use crate::core::context::Context;
use crate::core::http::Http;
use crate::core::request::Request;
use crate::core::response::Response;
use crate::server::ThrusterServer;
pub struct Server<
T: 'static + Context<Response = Response> + Clone + Send + Sync,
S: 'static + Send + Sync,
> {
app: Arc<App<Request, T, S>>,
}
impl<T: 'static + Context<Response = Response> + Clone + Send + Sync, S: 'static + Send + Sync>
Server<T, S>
{
pub fn start_work_stealing_optimized(self, host: &str, port: u16) {
self.start(host, port);
}
pub fn start_small_load_optimized(self, host: &str, port: u16) {
let addr = (host, port).to_socket_addrs().unwrap().next().unwrap();
let mut threads = Vec::new();
let arc_app = Arc::new(self.app);
for _ in 0..num_cpus::get() {
let arc_app = arc_app.clone();
threads.push(std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let server = async move {
let listener = {
let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
let address = addr.into();
socket.set_reuse_address(true).unwrap();
#[cfg(unix)]
socket.set_reuse_port(true).unwrap();
socket.bind(&address).unwrap();
socket.listen(1024).unwrap();
socket.set_nonblocking(true).unwrap();
let listener: std::net::TcpListener = socket.into();
tokio::net::TcpListener::from_std(listener).unwrap()
};
TcpListenerStream::new(listener)
.for_each(move |socket| {
process(Arc::clone(&arc_app), socket.unwrap());
async {}
})
.await;
};
runtime.block_on(server);
}));
}
for thread in threads {
thread.join().unwrap();
}
}
}
#[async_trait]
impl<T: Context<Response = Response> + Clone + Send + Sync, S: 'static + Send + Sync> ThrusterServer
for Server<T, S>
{
type Context = T;
type Response = Response;
type Request = Request;
type State = S;
fn new(mut app: App<Self::Request, T, S>) -> Self {
app = app.commit();
Server { app: Arc::new(app) }
}
fn build(self, host: &str, port: u16) -> ReusableBoxFuture<()> {
let addr = (host, port).to_socket_addrs().unwrap().next().unwrap();
let arc_app = self.app;
let listener_fut = TcpListener::bind(addr).then(move |listener| {
TcpListenerStream::new(listener.unwrap()).for_each(move |res| {
if let Ok(stream) = res {
let cloned = arc_app.clone();
tokio::spawn(process(cloned, stream));
}
async {}
})
});
ReusableBoxFuture::new(listener_fut)
}
}
struct _Error {
_message: String,
}
fn process<T: Context<Response = Response> + Clone + Send + Sync, S: 'static + Send + Sync>(
app: Arc<App<Request, T, S>>,
socket: TcpStream,
) -> ReusableBoxFuture<Result<(), _Error>> {
ReusableBoxFuture::new(async move {
let mut framed = Framed::new(socket, Http);
while let Some(request) = framed.next().await {
match request {
Ok(request) => {
let path = request.path().to_owned();
let method = &request.method().to_owned();
let matched = app.resolve_from_method_and_path(method, path);
let response = app.resolve(request, matched).await.map_err(|e| _Error {
_message: e.to_string(),
})?;
framed.send(response).await.map_err(|e| _Error {
_message: e.to_string(),
})?;
}
Err(e) => {
return Err(_Error {
_message: e.to_string(),
})
}
}
}
Ok(())
})
}