use super::{environment::EnvironmentCache, inventory::InventoryData};
use crate::{
capabilities::SendCapabilityRequest, inventory::RefreshInventoryEvent,
transport::http_handler::login_to_simulator,
};
use actix::prelude::*;
use actix_rt::time;
use log::{error, info};
use metaverse_agent::avatar::Avatar;
use metaverse_messages::{
http::capabilities::{Capability, CapabilityRequest},
packet::{
message::{UIMessage, UIResponse},
packet::Packet,
},
udp::{
chat::chat_from_viewer::ChatFromViewer,
core::{
agent_throttle::{AgentThrottle, ThrottleData},
circuit_code::CircuitCode,
complete_agent_movement::CompleteAgentMovementData,
complete_ping_check::CompletePingCheck,
logout_request::LogoutRequest,
packet_ack::PacketAck,
region_handshake::RegionHandshake,
region_handshake_reply::RegionHandshakeReply,
},
},
ui::{
errors::{
CapabilityError, CircuitCodeError, CompleteAgentMovementError, FeatureError,
MailboxSessionError, SessionError,
},
login_event::Login,
},
};
use sqlx::{Pool, Sqlite};
use std::{
collections::{HashMap, HashSet},
net::UdpSocket as SyncUdpSocket,
path::PathBuf,
sync::{Arc, Mutex},
thread::sleep,
};
use tokio::{net::UdpSocket, sync::Notify, time::Duration};
use uuid::Uuid;
#[derive(Debug)]
pub struct Mailbox {
pub client_socket: u16,
pub server_to_ui_socket: String,
pub inventory_db_connection: Pool<Sqlite>,
pub inventory_db_location: PathBuf,
pub server_acks: HashSet<u32>,
pub viewer_acks: HashSet<u32>,
pub state: Arc<Mutex<ServerState>>,
pub notify: Arc<Notify>,
pub session: Option<Session>,
pub sent_packet_count: u16,
pub ping_info: PingInfo,
}
#[derive(Debug)]
pub struct PingInfo {
pub ping_number: u8,
pub ping_latency: Duration,
pub last_ping: time::Instant,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct Session {
pub address: String,
pub agent_id: Uuid,
pub session_id: Uuid,
pub socket: Option<Arc<UdpSocket>>,
pub sequence_number: u16,
pub local_ip: std::net::IpAddr,
pub seed_capability_url: String,
pub capability_urls: HashMap<Capability, String>,
pub inventory_data: InventoryData,
pub environment_cache: EnvironmentCache,
pub avatars: HashMap<Uuid, Avatar>,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct HandlePing {
pub ping_id: u8,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ServerState {
Starting,
Running,
Stopping,
Stopped,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct SendAckList {}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct AddToAckList {
pub id: u32,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct ResendPacket {
pub packet: Packet,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct OutgoingPacket {
pub packet: Packet,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct HandleRegionHandshake {
pub region_handshake: RegionHandshake,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct HandlePacketAck {
pub packet_ack: PacketAck,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct HandleUIResponse {
pub ui_response: UIResponse,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct SendUIMessage {
pub ui_message: UIMessage,
}
impl Mailbox {
pub fn set_state(&mut self, new_state: ServerState, _ctx: &mut Context<Self>) {
let state_clone: Arc<Mutex<ServerState>> = Arc::clone(&self.state);
{
let mut state = state_clone.lock().unwrap();
*state = new_state.clone();
}
if new_state == ServerState::Running || new_state == ServerState::Stopped {
self.notify.notify_one();
}
}
}
impl Actor for Mailbox {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("Actix Mailbox has started");
self.set_state(ServerState::Running, ctx);
}
}
impl Handler<Session> for Mailbox {
type Result = ();
fn handle(&mut self, mut msg: Session, ctx: &mut Self::Context) -> Self::Result {
if let Some(session) = self.session.as_ref() {
msg.socket = session.socket.clone();
}
self.session = Some(msg);
if let Some(session) = self.session.as_ref()
&& session.socket.is_none()
{
let addr = format!("{}:{}", session.local_ip, self.client_socket);
let addr_clone = addr.clone();
let mailbox_addr = ctx.address();
info!("session established, starting UDP processing");
let fut = async move {
match UdpSocket::bind(&addr).await {
Ok(sock) => {
info!("Successfully bound to {}", &addr);
let sock = Arc::new(sock);
tokio::spawn(Mailbox::start_udp_read(
sock.clone(),
mailbox_addr,
));
Ok(sock) }
Err(e) => {
error!("Failed to bind to {}: {}", &addr_clone, e);
Err(e)
}
}
};
ctx.spawn(fut.into_actor(self).map(|result, act, _| match result {
Ok(sock) => {
if let Some(session) = &mut act.session {
session.socket = Some(sock);
}
}
Err(_) => {
panic!("Socket binding failed");
}
}));
}
}
}
impl Handler<SendUIMessage> for Mailbox {
type Result = ();
fn handle(&mut self, msg: SendUIMessage, _: &mut Self::Context) -> Self::Result {
let client_socket = SyncUdpSocket::bind("0.0.0.0:0").unwrap();
if let Err(e) = client_socket.send_to(&msg.ui_message.to_bytes(), &self.server_to_ui_socket)
{
error!("Failed to send UI message:{:?}", e)
}
}
}
impl Handler<HandleUIResponse> for Mailbox {
type Result = ();
fn handle(&mut self, msg: HandleUIResponse, ctx: &mut Self::Context) -> Self::Result {
if let UIResponse::Login(data) = &msg.ui_response {
let ctx_addr = ctx.address().clone();
let login_data = data.clone();
actix::spawn(async move {
if let Err(e) = handle_login(login_data, &ctx_addr).await {
error!("{:?}", e);
};
});
}
if let Some(ref session) = self.session {
match msg.ui_response {
UIResponse::ChatFromViewer(data) => {
ctx.address().do_send(OutgoingPacket {
packet: Packet::new_chat_from_viewer(ChatFromViewer {
session_id: session.session_id,
agent_id: session.agent_id,
channel: data.channel,
message: data.message,
message_type: data.message_type,
}),
});
}
UIResponse::Logout(_) => {
ctx.address().do_send(OutgoingPacket {
packet: Packet::new_logout_request(LogoutRequest {
session_id: session.session_id,
agent_id: session.agent_id,
}),
});
}
UIResponse::AgentUpdate(mut data) => {
data.agent_id = session.agent_id;
data.session_id = session.session_id;
ctx.address().do_send(OutgoingPacket {
packet: Packet::new_agent_update(data),
});
}
data => {
error!("Unrecognized UIMessage: {:?}", data)
}
}
}
}
}
impl Handler<OutgoingPacket> for Mailbox {
type Result = ();
fn handle(&mut self, mut msg: OutgoingPacket, ctx: &mut Self::Context) -> Self::Result {
if let Some(session) = self.session.as_mut() {
let addr = session.address.clone();
if !msg.packet.header.resent {
msg.packet.header.sequence_number = session.sequence_number as u32;
session.sequence_number += 1;
}
let data = msg.packet.to_bytes().clone();
let socket_clone = session.socket.as_ref().unwrap().clone();
let fut = async move {
if let Err(e) = socket_clone.send_to(&data, &addr).await {
error!("Failed to send data: {}", e);
}
};
ctx.spawn(fut.into_actor(self));
if msg.packet.header.reliable {
self.viewer_acks.insert(msg.packet.header.sequence_number);
ctx.notify_later(ResendPacket { packet: msg.packet }, Duration::from_secs(1));
};
}
}
}
impl Handler<ResendPacket> for Mailbox {
type Result = ();
fn handle(&mut self, mut msg: ResendPacket, ctx: &mut Self::Context) -> Self::Result {
if self
.viewer_acks
.contains(&msg.packet.header.sequence_number)
{
msg.packet.header.resent = true;
ctx.address().do_send(OutgoingPacket { packet: msg.packet });
}
}
}
impl Handler<HandleRegionHandshake> for Mailbox {
type Result = ();
fn handle(&mut self, _: HandleRegionHandshake, ctx: &mut Self::Context) -> Self::Result {
ctx.address().do_send(OutgoingPacket {
packet: Packet::new_region_handshake_reply(RegionHandshakeReply {
session_id: self.session.as_ref().unwrap().session_id,
agent_id: self.session.as_ref().unwrap().agent_id,
flags: 0,
}),
});
}
}
impl Handler<HandlePing> for Mailbox {
type Result = ();
fn handle(&mut self, msg: HandlePing, ctx: &mut Self::Context) -> Self::Result {
ctx.address().do_send(OutgoingPacket {
packet: Packet::new_complete_ping_check(CompletePingCheck {
ping_id: msg.ping_id,
}),
});
self.ping_info.ping_latency = time::Instant::now() - self.ping_info.last_ping;
}
}
impl Handler<HandlePacketAck> for Mailbox {
type Result = ();
fn handle(&mut self, msg: HandlePacketAck, _ctx: &mut Self::Context) -> Self::Result {
for id in msg.packet_ack.packet_ids {
self.viewer_acks.remove(&id);
}
}
}
impl Handler<AddToAckList> for Mailbox {
type Result = ();
fn handle(&mut self, msg: AddToAckList, ctx: &mut Self::Context) -> Self::Result {
self.server_acks.insert(msg.id);
ctx.address().do_send(SendAckList {});
}
}
impl Handler<SendAckList> for Mailbox {
type Result = ();
fn handle(&mut self, _: SendAckList, ctx: &mut Self::Context) -> Self::Result {
if let Some(ref session) = self.session {
if self.server_acks.is_empty() {
return;
}
let packet_ids: Vec<u32> = self.server_acks.drain().collect();
let addr = session.address.clone();
let packet = Packet::new_packet_ack(PacketAck { packet_ids }).to_bytes();
let sock_clone = session.socket.clone().unwrap();
let ack_wait = async move {
if let Err(e) = sock_clone.send_to(&packet, addr).await {
println!("Failed to send ack: {:?}", e)
};
};
ctx.spawn(ack_wait.into_actor(self));
}
}
}
async fn handle_login(
login_data: Login,
mailbox_addr: &actix::Addr<Mailbox>,
) -> Result<(), SessionError> {
let (login_response, local_ip) = match login_to_simulator(login_data).await {
Ok((login_response, local_ip)) => {
if let Err(e) = mailbox_addr
.send(SendUIMessage {
ui_message: UIMessage::new_login_response_event(
metaverse_messages::ui::login_response::LoginResponse {
firstname: login_response.first_name.clone(),
lastname: login_response.last_name.clone(),
},
),
})
.await
{
error!("Failed to send login response to UI {:?}", e)
};
(login_response, local_ip)
}
Err(e) => {
if let Err(e) = mailbox_addr
.send(SendUIMessage {
ui_message: UIMessage::new_session_error(e.clone().into()),
})
.await
{
error!("Failed to send session error: {:?}", e)
};
Err(e)?
}
};
if let Err(e) = mailbox_addr
.send(Session {
agent_id: login_response.agent_id,
session_id: login_response.session_id,
address: format!("{}:{}", login_response.sim_ip, login_response.sim_port),
seed_capability_url: login_response.seed_capability.unwrap(),
sequence_number: 0,
local_ip,
capability_urls: HashMap::new(),
#[cfg(feature = "environment")]
environment_cache: EnvironmentCache {
patch_queue: HashMap::new(),
patch_cache: HashMap::new(),
},
#[cfg(feature = "inventory")]
inventory_data: InventoryData {
inventory_root: login_response.inventory_root.ok_or_else(|| {
FeatureError::Inventory(
"Login response contained no inventory_root".to_string(),
)
})?,
inventory_lib_owner: login_response.inventory_lib_owner.ok_or_else(|| {
FeatureError::Inventory(
"Login response contained no inventory_lib_owner".to_string(),
)
})?,
inventory_init: false,
},
socket: None,
#[cfg(feature = "agent")]
avatars: HashMap::new(),
})
.await
{
Err(MailboxSessionError {
message: e.to_string(),
})?;
};
match CapabilityRequest::new_capability_request(vec![
Capability::ViewerAsset,
Capability::FetchInventoryDescendents2,
]) {
Ok(caps) => {
if let Err(e) = mailbox_addr
.send(SendCapabilityRequest {
capability_request: caps,
})
.await
{
Err(CapabilityError {
message: e.to_string(),
})?
}
}
Err(e) => {
error!("{:?}", e)
}
};
if let Err(e) = mailbox_addr
.send(OutgoingPacket {
packet: Packet::new_circuit_code(CircuitCode {
code: login_response.circuit_code,
session_id: login_response.session_id,
id: login_response.agent_id,
}),
})
.await
{
Err(CircuitCodeError {
message: e.to_string(),
})?;
};
sleep(Duration::from_secs(1));
if let Err(e) = mailbox_addr
.send(OutgoingPacket {
packet: Packet::new_complete_agent_movement(CompleteAgentMovementData {
circuit_code: login_response.circuit_code,
session_id: login_response.session_id,
agent_id: login_response.agent_id,
}),
})
.await
{
Err(CompleteAgentMovementError {
message: e.to_string(),
})?;
};
if let Err(e) = mailbox_addr
.send(OutgoingPacket {
packet: Packet::new_agent_throttle(AgentThrottle {
agent_id: login_response.agent_id,
session_id: login_response.session_id,
circuit_code: login_response.circuit_code,
gen_counter: 0,
throttles: ThrottleData {
..Default::default()
},
}),
})
.await
{
Err(CompleteAgentMovementError {
message: e.to_string(),
})?;
};
#[cfg(feature = "inventory")]
if let Err(e) = mailbox_addr
.send(RefreshInventoryEvent {
agent_id: login_response.agent_id,
})
.await
{
Err(CapabilityError {
message: e.to_string(),
})?
}
Ok(())
}