mod models;
mod session;
use std::time::{Duration, Instant};
use actix::{
Actor, AsyncContext, Context, Handler, Message as ActixMessage, MessageResult, Recipient,
};
use fern::colors::{Color, ColoredLevelConfig};
use hashbrown::HashMap;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use log::{info, warn};
use nanoid::nanoid;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Arc;
use crate::{
errors::AddWorldError,
world::{Registry, World, WorldConfig},
ChunkStatus, Mesher, MessageQueue, Stats,
};
pub use models::*;
pub use session::*;
#[derive(Serialize, Deserialize)]
pub struct OnJoinRequest {
world: String,
username: String,
}
#[derive(Serialize, Deserialize)]
struct OnActionRequest {
action: String,
data: Value,
}
type ServerInfoHandle = fn(&Server) -> Value;
fn default_info_handle(server: &Server) -> Value {
let mut info = HashMap::new();
info.insert(
"lost_sessions".to_owned(),
json!(server.lost_sessions.len()),
);
let mut connections = HashMap::new();
for (id, (_, world)) in server.connections.iter() {
connections.insert(id.to_owned(), json!(world));
}
info.insert("connections".to_owned(), json!(connections));
let mut transports = vec![];
for (id, _) in server.transport_sessions.iter() {
transports.push(id.to_owned());
}
info.insert("transports".to_owned(), json!(transports));
let mut worlds = HashMap::new();
for (name, world) in server.worlds.iter() {
let mut world_info = HashMap::new();
{
let clients = world.clients();
world_info.insert(
"clients".to_owned(),
json!(clients
.values()
.map(|client| json!({
"id": client.id.to_owned(),
"username": client.username.to_owned(),
}))
.collect::<Vec<_>>()),
);
}
{
let config = world.config();
world_info.insert("config".to_owned(), json!(*config));
}
{
let stats = world.read_resource::<Stats>();
let mut stats_info = HashMap::new();
stats_info.insert("tick".to_owned(), json!(stats.tick));
stats_info.insert("delta".to_owned(), json!(stats.delta));
world_info.insert("stats".to_owned(), json!(stats_info));
}
{
let chunks = world.chunks();
let pipeline = world.pipeline();
let mesher = world.read_resource::<Mesher>();
let mut generating: i32 = 0;
let mut meshing: i32 = 0;
let mut ready: i32 = 0;
for chunk in chunks.map.values() {
match chunk.status {
ChunkStatus::Generating(_) => generating += 1,
ChunkStatus::Meshing => meshing += 1,
ChunkStatus::Ready => ready += 1,
}
}
world_info.insert(
"chunks".to_owned(),
json!({
"count": chunks.map.len(),
"generating": generating,
"meshing": meshing,
"ready": ready,
"pipeline_chunks": pipeline.chunks,
"pipeline_queue": pipeline.queue,
"mesher_chunks": mesher.map,
"mesher_queue": mesher.queue,
"active_voxels": chunks.active_voxels.len()
}),
);
}
{
let pipeline = world.pipeline();
let pipeline_info = json!({
"count": json!(pipeline.chunks.len()),
"stages": json!(
pipeline
.stages
.iter()
.map(|stage| json!(stage.name()))
.collect::<Vec<_>>()
)
});
world_info.insert("pipeline".to_owned(), pipeline_info);
}
worlds.insert(name.to_owned(), json!(world_info));
}
info.insert("worlds".to_owned(), json!(worlds));
serde_json::to_value(info).unwrap()
}
pub struct Server {
pub port: u16,
pub addr: String,
pub started: bool,
pub serve: String,
pub debug: bool,
pub interval: u64,
pub secret: Option<String>,
pub worlds: HashMap<String, World>,
pub registry: Registry,
pub lost_sessions: HashMap<String, Recipient<EncodedMessage>>,
pub transport_sessions: HashMap<String, Recipient<EncodedMessage>>,
pub connections: HashMap<String, (Recipient<EncodedMessage>, String)>,
info_handle: ServerInfoHandle,
action_handles: HashMap<String, Arc<dyn Fn(Value, &mut Server)>>,
}
impl Server {
pub fn new() -> ServerBuilder {
ServerBuilder::new()
}
pub fn add_world(&mut self, mut world: World) -> Result<&mut World, AddWorldError> {
let name = world.name.clone();
let saving = world.config().saving;
let save_dir = world.config().save_dir.clone();
world.ecs_mut().insert(self.registry.clone());
if self.worlds.insert(name.to_owned(), world).is_some() {
return Err(AddWorldError);
};
info!(
"🌎 World created: {} ({})",
name,
if saving {
format!("on-disk @ {}", save_dir)
} else {
"in-memory".to_owned()
}
);
Ok(self.worlds.get_mut(&name).unwrap())
}
pub fn create_world(
&mut self,
name: &str,
config: &WorldConfig,
) -> Result<&mut World, AddWorldError> {
let mut world = World::new(name, config);
world.ecs_mut().insert(self.registry.clone());
self.add_world(world)
}
pub fn get_world(&self, world_name: &str) -> Option<&World> {
self.worlds.get(world_name)
}
pub fn get_world_mut(&mut self, world_name: &str) -> Option<&mut World> {
self.worlds.get_mut(world_name)
}
pub fn get_info(&mut self) -> Value {
(self.info_handle)(self)
}
pub(crate) fn on_request(&mut self, id: &str, data: Message) -> Option<String> {
if data.r#type == MessageType::Join as i32 {
let json: OnJoinRequest = serde_json::from_str(&data.json)
.expect("`on_join` error. Could not read JSON string.");
if !self.lost_sessions.contains_key(id) {
return Some(format!(
"Client at {} is already in world: {}",
id, json.world
));
}
if let Some(world) = self.worlds.get_mut(&json.world) {
if let Some(addr) = self.lost_sessions.remove(id) {
world.add_client(id, &json.username, &addr);
self.connections.insert(id.to_owned(), (addr, json.world));
return None;
}
return Some("Something went wrong with joining. Maybe you called .join twice on the client?".to_owned());
}
return Some(format!(
"ID {} is attempting to connect to a non-existent world!",
id
));
} else if data.r#type == MessageType::Leave as i32 {
if let Some(world) = self.worlds.get_mut(&data.text) {
let (addr, _) = self.connections.remove(id).unwrap();
self.lost_sessions.insert(id.to_owned(), addr);
world.remove_client(id);
}
return None;
} else if data.r#type == MessageType::Action as i32 {
self.on_action(id, &data);
return None;
} else if data.r#type == MessageType::Transport as i32
|| self.transport_sessions.contains_key(id)
{
if !self.transport_sessions.contains_key(id) {
return Some(
"Someone who isn't a transport server is attempting to transport.".to_owned(),
);
}
if let Some(world) = self.get_world_mut(&data.text) {
world.on_request(id, data);
return None;
} else {
return Some(
"Transport message did not have a world. Use the 'text' field.".to_owned(),
);
}
}
let connection = self.connections.get(id);
if connection.is_none() {
return Some("You are not connected to a world!".to_owned());
}
let (_, world_name) = connection.unwrap().to_owned();
if let Some(world) = self.get_world_mut(&world_name) {
world.on_request(id, data);
}
None
}
pub fn prepare(&mut self) {
for world in self.worlds.values_mut() {
world.prepare();
}
self.preload();
}
pub(crate) fn preload(&mut self) {
let m = MultiProgress::new();
let sty = ProgressStyle::with_template(
"[{elapsed_precise}] [{bar:40.cyan/blue}] {msg} {spinner:.green} {percent:>7}%",
)
.unwrap()
.progress_chars("#>-");
let mut bars = vec![];
for world in self.worlds.values_mut() {
if !world.config().preload {
bars.push(None);
continue;
}
world.preload();
let bar = m.insert_from_back(0, ProgressBar::new(100));
bar.set_message(world.name.clone());
bar.set_style(sty.clone());
bar.set_position(0);
bars.push(Some(bar));
}
let start = Instant::now();
loop {
let mut done = true;
for (i, world) in self.worlds.values_mut().enumerate() {
if bars[i].is_none() || !world.config().preload {
continue;
}
let bar = bars[i].as_mut().unwrap();
if !world.preloading || world.preload_progress >= 1.0 {
bar.finish_and_clear();
continue;
}
world.tick();
let at = (world.preload_progress * 100.0) as u64;
done = false;
bar.set_position(at);
}
if done {
m.clear().unwrap();
break;
}
}
let preload_len = self
.worlds
.values()
.filter(|world| world.config().preload)
.collect::<Vec<&World>>()
.len();
info!(
"✅ Total of {} world{} preloaded in {}s",
preload_len,
if preload_len == 1 { "" } else { "s" },
(Instant::now() - start).as_millis() as f64 / 1000.0
);
}
pub(crate) fn tick(&mut self) {
for world in self.worlds.values_mut() {
world.tick();
}
}
fn setup_logger() {
fern::Dispatch::new()
.format(|out, message, record| {
let colors = ColoredLevelConfig::new().info(Color::Green);
out.finish(format_args!(
"{} [{}] [{}]: {}",
chrono::Local::now().format("[%H:%M:%S]"),
colors.color(record.level()),
record.target(),
message
))
})
.level(log::LevelFilter::Debug)
.level_for("tungstenite", log::LevelFilter::Info)
.chain(std::io::stdout())
.apply()
.expect("Fern did not run successfully");
}
pub fn set_action_handle<F: Fn(Value, &mut Server) + 'static>(
&mut self,
action: &str,
handle: F,
) {
self.action_handles
.insert(action.to_lowercase(), Arc::new(handle));
}
fn on_action(&mut self, _: &str, data: &Message) {
let json: OnActionRequest = serde_json::from_str(&data.json)
.expect("`on_action` error. Could not read JSON string.");
let action = json.action.to_lowercase();
info!("{:?}", &self.action_handles.keys());
info!("{:?}", &action);
if !self.action_handles.contains_key(&action) {
warn!("`Action` type messages received, but no action handler set.");
return;
}
let handle = self.action_handles.get(&action).unwrap().to_owned();
handle(json.data, self);
}
}
#[derive(ActixMessage)]
#[rtype(result = "String")]
pub struct Connect {
pub id: Option<String>,
pub is_transport: bool,
pub addr: Recipient<EncodedMessage>,
}
#[derive(ActixMessage, Clone)]
#[rtype(result = "()")]
pub struct EncodedMessage(pub Vec<u8>);
#[derive(ActixMessage)]
#[rtype(result = "()")]
pub struct Disconnect {
pub id: String,
}
#[derive(ActixMessage)]
#[rtype(result = "Value")]
pub struct Info;
#[derive(ActixMessage)]
#[rtype(result = "f32")]
pub struct Time(pub String);
#[derive(ActixMessage)]
#[rtype(result = "Option<String>")]
pub struct ClientMessage {
pub id: String,
pub data: Message,
}
impl Actor for Server {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(Duration::from_millis(self.interval), |act, _| {
act.tick();
});
}
}
impl Handler<Connect> for Server {
type Result = MessageResult<Connect>;
fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
let id = if msg.id.is_none() {
nanoid!()
} else {
msg.id.unwrap()
};
if msg.is_transport {
self.worlds
.values_mut()
.for_each(|world| world.add_transport(&id, &msg.addr));
self.transport_sessions.insert(id.to_owned(), msg.addr);
return MessageResult(id);
}
if self.lost_sessions.contains_key(&id) {
return MessageResult(nanoid!());
}
self.lost_sessions.insert(id.to_owned(), msg.addr);
MessageResult(id)
}
}
impl Handler<Disconnect> for Server {
type Result = ();
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
if let Some((_, world_name)) = self.connections.remove(&msg.id) {
if let Some(world) = self.worlds.get_mut(&world_name) {
world.remove_client(&msg.id);
}
}
if let Some(_) = self.transport_sessions.remove(&msg.id) {
self.worlds.values_mut().for_each(|world| {
world.remove_transport(&msg.id);
});
info!("A transport server connection has ended.")
}
self.lost_sessions.remove(&msg.id);
}
}
impl Handler<Info> for Server {
type Result = MessageResult<Info>;
fn handle(&mut self, _: Info, _: &mut Context<Self>) -> Self::Result {
MessageResult(self.get_info())
}
}
impl Handler<Time> for Server {
type Result = MessageResult<Time>;
fn handle(&mut self, Time(world_name): Time, _: &mut Context<Self>) -> Self::Result {
let world = self.worlds.get(&world_name);
if world.is_none() {
return MessageResult(0.0);
}
let world = world.unwrap();
MessageResult(world.read_resource::<Stats>().time)
}
}
impl Handler<ClientMessage> for Server {
type Result = Option<String>;
fn handle(&mut self, msg: ClientMessage, _: &mut Context<Self>) -> Self::Result {
self.on_request(&msg.id, msg.data)
}
}
const DEFAULT_DEBUG: bool = true;
const DEFAULT_PORT: u16 = 4000;
const DEFAULT_ADDR: &str = "0.0.0.0";
const DEFAULT_SERVE: &str = "";
const DEFAULT_INTERVAL: u64 = 8;
pub struct ServerBuilder {
port: u16,
debug: bool,
addr: String,
serve: String,
interval: u64,
secret: Option<String>,
registry: Option<Registry>,
}
impl ServerBuilder {
pub fn new() -> Self {
Self {
debug: DEFAULT_DEBUG,
port: DEFAULT_PORT,
addr: DEFAULT_ADDR.to_owned(),
serve: DEFAULT_SERVE.to_owned(),
interval: DEFAULT_INTERVAL,
secret: None,
registry: None,
}
}
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn addr(mut self, addr: &str) -> Self {
self.addr = addr.to_owned();
self
}
pub fn debug(mut self, debug: bool) -> Self {
self.debug = debug;
self
}
pub fn serve(mut self, serve: &str) -> Self {
self.serve = serve.to_owned();
self
}
pub fn interval(mut self, interval: u64) -> Self {
self.interval = interval;
self
}
pub fn secret(mut self, secret: &str) -> Self {
self.secret = Some(secret.to_owned());
self
}
pub fn registry(mut self, registry: &Registry) -> Self {
self.registry = Some(registry.to_owned());
self
}
pub fn build(self) -> Server {
let mut registry = self.registry.unwrap_or(Registry::new());
registry.generate();
if self.debug {
Server::setup_logger();
}
Server {
port: self.port,
addr: self.addr,
serve: self.serve,
debug: self.debug,
interval: self.interval,
secret: self.secret,
registry,
started: false,
connections: HashMap::default(),
lost_sessions: HashMap::default(),
transport_sessions: HashMap::default(),
worlds: HashMap::default(),
info_handle: default_info_handle,
action_handles: HashMap::default(),
}
}
}