use std::net::SocketAddr;
use std::sync::Arc;
use crate::utils::gear::Socket;
#[cfg(not(feature = "bench"))]
use crate::VERSION;
use anyhow::{Error, Result};
use chrono::Local;
use colored::Colorize;
#[cfg(feature = "bench")]
use std::process;
use tokio::net::{TcpListener, TcpStream};
#[cfg(feature = "perf")]
use tokio::time::Instant;
use super::packet::{OED, OSC};
use super::router::Router;
use super::session::Session;
#[inline]
async fn _handle(router: &Router, stream: TcpStream, peer: SocketAddr) -> Result<()> {
#[cfg(feature = "perf")]
let now = std::time::Instant::now();
stream.set_ttl(20)?;
stream.set_nodelay(true)?;
stream.set_linger(Some(std::time::Duration::from_secs(0)))?;
socket2::SockRef::from(&stream).set_keepalive(true)?;
let mut session = Session::new(Socket::new(stream))?;
if let Err(error) = session.handshake(1).await {
eprintln!(
"{} -> [{}] \"{}\" {}",
peer.ip().to_string().cyan(),
Local::now().format("%d/%m/%Y %H:%M:%S"),
"CONNECT - Oblivion/2.0".yellow(),
"500".red()
);
eprintln!("{}", error.to_string().bright_red());
#[cfg(feature = "bench")]
{
eprintln!("Handshake failed in benchmark test unexpectedly.");
process::exit(1);
}
#[cfg(not(feature = "bench"))]
return Ok(());
}
#[cfg(feature = "perf")]
println!(
"握手时长: {}μs",
now.elapsed().as_micros().to_string().bright_magenta()
);
#[cfg(not(any(feature = "perf", feature = "bench")))]
let header = session.header().to_string();
#[cfg(not(any(feature = "perf", feature = "bench")))]
let ip_addr = session.get_ip().to_string();
let aes_key = session.aes_key;
#[cfg(not(any(feature = "perf", feature = "bench")))]
println!(
"{} -> [{}] \"{}\" {}",
ip_addr.cyan(),
Local::now().format("%d/%m/%Y %H:%M:%S"),
header.green(),
"OK".cyan()
);
#[cfg(feature = "perf")]
let now = Instant::now();
let socket = Arc::clone(&session.socket);
let callback = router.get_handler(&session.request.entrance)?(session).await?;
#[cfg(feature = "perf")]
println!(
"业务函数时长: {}μs",
now.elapsed().as_micros().to_string().bright_magenta()
);
#[cfg(feature = "perf")]
let now = Instant::now();
OSC::from_u32(1).to_stream(&socket).await?;
OED::new(&aes_key)
.from_bytes(callback.as_bytes()?)?
.to_stream(&socket)
.await?;
socket.close().await?;
#[cfg(feature = "perf")]
println!(
"结束函数时长: {}μs",
now.elapsed().as_micros().to_string().bright_magenta()
);
#[cfg(not(any(feature = "perf", feature = "bench")))]
println!(
"{} <- [{}] \"{}\" {}",
ip_addr.cyan(),
Local::now().format("%d/%m/%Y %H:%M:%S"),
header.green(),
"OK".cyan()
);
Ok(())
}
pub async fn handle(router: Arc<Router>, stream: TcpStream, peer: SocketAddr) {
#[cfg(feature = "perf")]
let now = Instant::now();
#[cfg(feature = "perf")]
println!("=================");
if let Err(error) = _handle(&router, stream, peer).await {
eprintln!(
"{} <-> [{}] \"{}\" {}",
peer.ip().to_string().cyan(),
Local::now().format("%d/%m/%Y %H:%M:%S"),
"CONNECT - Oblivion/2.0".yellow(),
"501".red()
);
eprintln!("{}", error.to_string().bright_red());
#[cfg(feature = "bench")]
{
eprintln!("An error occurred in handling runtime unexpectedly.");
process::exit(1);
}
}
#[cfg(feature = "perf")]
println!(
"总执行时长: {}μs\n=================",
now.elapsed().as_micros().to_string().bright_magenta()
);
}
pub struct Server {
host: String,
port: i32,
router: Arc<Router>,
}
impl Server {
pub fn new(host: &str, port: i32, router: Router) -> Self {
Self {
host: host.to_string(),
port,
router: Arc::new(router),
}
}
pub async fn run(&self) -> Result<()> {
#[cfg(not(feature = "bench"))]
println!("Performing system checks...\n");
let address = format!("{}:{}", self.host, self.port);
let tcp = match TcpListener::bind(&address).await {
Ok(tcp) => tcp,
Err(error) => {
eprintln!(
"{}",
format!(
"Destination address [{}] is already occupied!",
address.bright_magenta()
)
.red()
);
return Err(Error::from(error));
}
};
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e.to_string().red());
std::process::exit(1);
}
}
std::process::exit(0);
});
#[cfg(not(feature = "bench"))]
println!(
"Oblivion version {}, using '{}'",
VERSION.bright_yellow(),
"ring".bright_green()
);
#[cfg(not(feature = "bench"))]
println!(
"Starting server at {}",
format!("Oblivion://{}:{}/", self.host, self.port).bright_cyan()
);
#[cfg(not(feature = "bench"))]
println!("Quit the server by CTRL-BREAK.\n");
while let Ok((stream, peer)) = tcp.accept().await {
tokio::spawn(handle(Arc::clone(&self.router), stream, peer));
}
Ok(())
}
}