use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt, io,
net::{IpAddr, SocketAddr},
sync::{Arc, RwLock},
time::{Duration, Instant},
};
#[cfg(any(feature = "auth_tcp", feature = "auth_tls"))]
use chacha20poly1305::{aead::KeyInit, ChaChaPoly1305, Key};
use dashmap::{DashMap, DashSet};
use crate::{
internal::{
messages::{DeserializedMessage, MessagePartMap, UDP_BUFFER_SIZE},
node::{
ActiveAwaitableHandler, ActiveCancelableHandler, ActiveDisposableHandler, NodeInternal,
NodeState, NodeType, PartnerMessaging,
},
rt::{try_lock, AsyncRwLock, Mutex, TaskHandle, TaskRunner, UdpSocket},
utils::{DurationMonitor, RttCalculator},
JustifiedRejectionContext, MessageChannel,
},
packets::{
Packet, PacketRegistry, SerializedPacket, SerializedPacketList, ServerTickEndPacket,
},
LimitedMessage, MessagingProperties, ReadHandlerProperties, MESSAGE_CHANNEL_SIZE,
};
#[cfg(feature = "store_unexpected")]
use crate::internal::node::StoreUnexpectedErrors;
pub use dashmap::{iter::Iter as DashIter, mapref::one::Ref as DashRef};
use crate::internal::auth::InnerAuth;
#[cfg(any(feature = "auth_tcp", feature = "auth_tls"))]
use crate::internal::auth::InnerAuthTcpBased;
pub use crate::internal::node::Partner as ConnectedClient;
pub use auth::*;
mod auth;
mod init;
#[derive(Debug)]
pub enum ReadClientBytesResult {
DoneDisconnectConfirm,
IgnoredClientHandle,
ClientReceivedBytes,
DonePendingAuth,
PublicKeySend,
RecentClientDisconnectConfirm,
InsufficientBytesLen,
PendingDisconnectConfirm,
AddrInAuth,
AuthInsufficientBytesLen,
ClientMaxTickByteLenOverflow,
InvalidPendingAuth,
PendingPendingAuth,
InvalidPublicKeySend(u16),
AlreadyConnected,
PendingAuthFull,
}
impl ReadClientBytesResult {
pub fn is_unexpected(&self) -> bool {
match self {
ReadClientBytesResult::DoneDisconnectConfirm => false,
ReadClientBytesResult::IgnoredClientHandle => false,
ReadClientBytesResult::ClientReceivedBytes => false,
ReadClientBytesResult::DonePendingAuth => false,
ReadClientBytesResult::PublicKeySend => false,
ReadClientBytesResult::RecentClientDisconnectConfirm => false,
ReadClientBytesResult::InsufficientBytesLen => true,
ReadClientBytesResult::PendingDisconnectConfirm => true,
ReadClientBytesResult::AddrInAuth => true,
ReadClientBytesResult::AuthInsufficientBytesLen => true,
ReadClientBytesResult::ClientMaxTickByteLenOverflow => true,
ReadClientBytesResult::InvalidPendingAuth => true,
ReadClientBytesResult::PendingPendingAuth => true,
ReadClientBytesResult::InvalidPublicKeySend(_) => true,
ReadClientBytesResult::AlreadyConnected => true,
ReadClientBytesResult::PendingAuthFull => true,
}
}
}
#[derive(Debug)]
pub enum ClientDisconnectReason {
PendingMessageConfirmationTimeout,
MessageReceiveTimeout,
WriteUnlockTimeout,
ByteSendError(io::Error),
ManualDisconnect,
DisconnectRequest(DeserializedMessage),
}
pub struct GracefullyDisconnection {
pub timeout: Duration,
pub message: LimitedMessage,
}
#[derive(Debug)]
pub enum ServerDisconnectClientState {
Confirmed,
ConfirmationTimeout,
SendIoError(io::Error),
ReceiveIoError(io::Error),
}
#[derive(Debug)]
pub enum ServerDisconnectState {
Confirmations(HashMap<SocketAddr, ServerDisconnectClientState>),
WithoutReason,
}
pub struct ServerProperties {
pub pending_auth_packet_loss_interpretation: Duration,
pub max_pending_auth: usize,
pub invalid_message_punishment: Option<Duration>,
}
impl Default for ServerProperties {
fn default() -> Self {
Self {
pending_auth_packet_loss_interpretation: Duration::from_secs(3),
max_pending_auth: usize::MAX,
invalid_message_punishment: Some(Duration::from_secs(5)),
}
}
}
pub struct BindResult {
pub server: Server,
}
#[derive(Debug)]
pub enum BindError {
MissingEssentialPackets,
SocketBindError(io::Error),
AuthenticatorConnectIoError(io::Error),
}
impl fmt::Display for BindError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BindError::MissingEssentialPackets => write!(
f,
"Packet registry has not registered the essential packets."
),
BindError::SocketBindError(e) => write!(f, "Failed to bind UDP socket: {}", e),
BindError::AuthenticatorConnectIoError(ref err) => {
write!(f, "Authenticator connect IO error: {}", err)
}
}
}
}
impl std::error::Error for BindError {}
#[derive(Debug)]
pub enum BadAuthenticateUsageError {
AlreadyConnected,
NotMarkedToAuthenticate,
}
impl fmt::Display for BadAuthenticateUsageError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BadAuthenticateUsageError::AlreadyConnected => write!(f, "Addr is already connected."),
BadAuthenticateUsageError::NotMarkedToAuthenticate => write!(
f,
"Addr was not marked in the last tick to be possibly authenticated."
),
}
}
}
impl std::error::Error for BadAuthenticateUsageError {}
#[derive(Debug, PartialEq, Eq)]
pub enum ServerTickState {
TickStartPending,
TickEndPending,
}
#[cfg(feature = "store_unexpected")]
#[derive(Debug)]
pub enum UnexpectedError {
OfReadAddrBytes(SocketAddr, ReadClientBytesResult),
#[cfg(any(feature = "auth_tcp", feature = "auth_tls"))]
OfTcpBasedHandlerAccept(SocketAddr, ReadClientBytesResult),
#[cfg(any(feature = "auth_tcp", feature = "auth_tls"))]
OfTcpBasedHandlerAcceptIoError(SocketAddr, io::Error),
InvalidProtocolCommunication(SocketAddr),
}
pub struct AuthEntry {
addr: SocketAddr,
addr_to_auth: AddrToAuth,
}
impl PartialEq for AuthEntry {
fn eq(&self, other: &Self) -> bool {
self.addr == other.addr
}
}
impl Eq for AuthEntry {}
impl std::hash::Hash for AuthEntry {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.addr.hash(state);
}
}
impl AuthEntry {
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
}
pub struct ServerTickResult {
pub received_messages: HashMap<SocketAddr, Vec<DeserializedMessage>>,
pub to_auth: HashMap<AuthEntry, DeserializedMessage>,
pub disconnected: HashMap<SocketAddr, ClientDisconnectReason>,
#[cfg(feature = "store_unexpected")]
pub unexpected_errors: Vec<UnexpectedError>,
}
struct ServerNode {
clients_to_auth_sender: async_channel::Sender<(SocketAddr, (AddrToAuth, DeserializedMessage))>,
clients_to_disconnect_sender: async_channel::Sender<(
SocketAddr,
(ClientDisconnectReason, Option<JustifiedRejectionContext>),
)>,
rejections_to_confirm_signal_sender: async_channel::Sender<()>,
pending_rejection_confirm_resend_sender: async_channel::Sender<SocketAddr>,
clients_to_auth_receiver:
async_channel::Receiver<(SocketAddr, (AddrToAuth, DeserializedMessage))>,
clients_to_disconnect_receiver: async_channel::Receiver<(
SocketAddr,
(ClientDisconnectReason, Option<JustifiedRejectionContext>),
)>,
authenticator_mode: AuthenticatorModeInternal,
tick_state: RwLock<ServerTickState>,
server_properties: Arc<ServerProperties>,
connected_clients: DashMap<SocketAddr, Arc<ConnectedClient>>,
ignored_ips: DashSet<IpAddr>,
temporary_ignored_ips: DashMap<IpAddr, Instant>,
assigned_addrs_in_auth: RwLock<HashSet<SocketAddr>>,
recently_disconnected: DashMap<SocketAddr, Instant>,
pending_rejection_confirm: DashMap<SocketAddr, (JustifiedRejectionContext, Option<Instant>)>,
rejections_to_confirm: DashSet<SocketAddr>,
}
impl ServerNode {
fn ignore_ip(&self, ip: IpAddr) {
self.temporary_ignored_ips.remove(&ip);
self.ignored_ips.insert(ip);
}
fn ignore_ip_temporary(&self, ip: IpAddr, until_to: Instant) {
self.ignored_ips.insert(ip);
self.temporary_ignored_ips.insert(ip, until_to);
}
fn remove_ignore_ip(&self, ip: &IpAddr) {
self.ignored_ips.remove(ip);
self.temporary_ignored_ips.remove(ip);
}
async fn read_next_bytes(
node: &NodeInternal<ServerNode>,
tuple: (SocketAddr, Vec<u8>),
) -> ReadClientBytesResult {
let node_type = &node.node_type;
let (addr, bytes) = tuple;
if bytes.len() < MESSAGE_CHANNEL_SIZE {
return ReadClientBytesResult::InsufficientBytesLen;
}
if node_type.pending_rejection_confirm.contains_key(&addr) {
if bytes[0] == MessageChannel::REJECTION_CONFIRM {
node_type.pending_rejection_confirm.remove(&addr);
return ReadClientBytesResult::DoneDisconnectConfirm;
} else {
return ReadClientBytesResult::PendingDisconnectConfirm;
}
}
if bytes[0] == MessageChannel::REJECTION_JUSTIFICATION {
if node_type.recently_disconnected.contains_key(&addr) {
node_type.rejections_to_confirm.insert(addr);
return ReadClientBytesResult::RecentClientDisconnectConfirm;
}
}
match &node_type.authenticator_mode {
AuthenticatorModeInternal::NoCryptography(auth_mode) => {
NoCryptographyAuth::read_next_bytes(&node, addr, bytes, auth_mode).await
}
#[cfg(feature = "auth_tcp")]
AuthenticatorModeInternal::RequireTcp(auth_mode) => {
auth_mode.read_next_bytes(&node, addr, bytes).await
}
#[cfg(feature = "auth_tls")]
AuthenticatorModeInternal::RequireTls(auth_mode) => {
auth_mode.read_next_bytes(&node, addr, bytes).await
}
}
}
}
impl NodeType for ServerNode {
type Skt = (SocketAddr, Vec<u8>);
#[cfg(feature = "store_unexpected")]
type UnEr = UnexpectedError;
async fn pre_read_next_bytes(socket: &Arc<UdpSocket>) -> io::Result<Self::Skt> {
let mut buf = [0u8; UDP_BUFFER_SIZE];
let (len, addr) = socket.recv_from(&mut buf).await?;
Ok((addr, buf[..len].to_vec()))
}
async fn consume_read_bytes_result(node: &Arc<NodeInternal<Self>>, result: Self::Skt) {
#[cfg(feature = "store_unexpected")]
let addr = result.0.clone();
let _read_result = Self::read_next_bytes(&node, result).await;
#[cfg(feature = "store_unexpected")]
if _read_result.is_unexpected() {
let _ = node
.store_unexpected_errors
.error_sender
.send(UnexpectedError::OfReadAddrBytes(addr, _read_result))
.await;
}
}
fn on_inactivated(node: &Arc<NodeInternal<Self>>) -> TaskHandle<()> {
let node_clone = Arc::clone(&node);
node.task_runner.spawn(async move {
for dash_entry in node_clone.node_type.connected_clients.iter() {
NodeInternal::on_partner_disposed(&node_clone, &dash_entry.value()).await;
}
})
}
}
pub struct Server {
internal: Arc<NodeInternal<ServerNode>>,
}
impl Server {
pub fn bind(
addr: SocketAddr,
packet_registry: Arc<PacketRegistry>,
messaging_properties: Arc<MessagingProperties>,
read_handler_properties: Arc<ReadHandlerProperties>,
server_properties: Arc<ServerProperties>,
authenticator_mode: AuthenticatorMode,
#[cfg(any(feature = "rt_tokio", feature = "rt_async_executor"))]
runtime: crate::internal::rt::Runtime,
) -> TaskHandle<Result<BindResult, BindError>> {
#[cfg(any(feature = "rt_tokio", feature = "rt_async_executor"))]
let task_runner = Arc::new(TaskRunner { runtime });
#[cfg(not(any(feature = "rt_tokio", feature = "rt_async_executor")))]
let task_runner = Arc::new(TaskRunner {});
let task_runner_exit = Arc::clone(&task_runner);
let bind_result_body = async move {
if !packet_registry.check_essential() {
return Err(BindError::MissingEssentialPackets);
}
let socket = match UdpSocket::bind(addr).await {
Ok(socket) => Arc::new(socket),
Err(e) => return Err(BindError::SocketBindError(e)),
};
let (clients_to_auth_sender, clients_to_auth_receiver) = async_channel::unbounded();
let (clients_to_disconnect_sender, clients_to_disconnect_receiver) =
async_channel::unbounded();
let (
pending_rejection_confirm_resend_sender,
pending_rejection_confirm_resend_receiver,
) = async_channel::unbounded();
let (rejections_to_confirm_signal_sender, rejections_to_confirm_signal_receiver) =
async_channel::unbounded();
let (awaitable_tasks_sender, awaitable_tasks_receiver) = async_channel::unbounded();
#[cfg(feature = "store_unexpected")]
let (store_unexpected_errors, store_unexpected_errors_create_list_signal_receiver) =
StoreUnexpectedErrors::new();
let mut authenticator_mode_build = authenticator_mode.build();
let server = Arc::new(NodeInternal {
disposable_handlers_keeper: Mutex::new(Vec::new()),
cancelable_handlers_keeper: Mutex::new(Vec::new()),
awaitable_tasks_sender,
socket,
#[cfg(feature = "store_unexpected")]
store_unexpected_errors,
packet_registry,
messaging_properties,
read_handler_properties,
task_runner,
state: AsyncRwLock::new(NodeState::Active),
node_type: ServerNode {
clients_to_auth_sender,
clients_to_disconnect_sender,
rejections_to_confirm_signal_sender,
pending_rejection_confirm_resend_sender,
clients_to_auth_receiver,
clients_to_disconnect_receiver,
authenticator_mode: authenticator_mode_build.take_authenticator_mode_internal(),
tick_state: RwLock::new(ServerTickState::TickStartPending),
server_properties,
connected_clients: DashMap::new(),
ignored_ips: DashSet::new(),
temporary_ignored_ips: DashMap::new(),
assigned_addrs_in_auth: RwLock::new(HashSet::new()),
recently_disconnected: DashMap::new(),
pending_rejection_confirm: DashMap::new(),
rejections_to_confirm: DashSet::new(),
},
});
let mut disposable_handlers_keeper = server.disposable_handlers_keeper.lock().await;
let mut cancelable_handlers_keeper = server.cancelable_handlers_keeper.lock().await;
let authenticator_handler_task = match authenticator_mode_build.apply(&server).await {
Ok(authenticator_task) => authenticator_task,
Err(e) => {
return Err(BindError::AuthenticatorConnectIoError(e));
}
};
disposable_handlers_keeper.push(ActiveDisposableHandler {
task: authenticator_handler_task,
});
let server_downgraded = Arc::downgrade(&server);
disposable_handlers_keeper.push(ActiveDisposableHandler {
task: server.task_runner.spawn(
init::server::create_pending_rejection_confirm_resend_handler(
server_downgraded,
pending_rejection_confirm_resend_receiver,
),
),
});
let server_downgraded = Arc::downgrade(&server);
disposable_handlers_keeper.push(ActiveDisposableHandler {
task: server
.task_runner
.spawn(init::server::create_rejections_to_confirm_handler(
server_downgraded,
rejections_to_confirm_signal_receiver,
)),
});
#[cfg(feature = "store_unexpected")]
{
let server_downgraded = Arc::downgrade(&server);
disposable_handlers_keeper.push(ActiveDisposableHandler {
task: server.task_runner.spawn(
init::server::create_store_unexpected_error_list_handler(
server_downgraded,
store_unexpected_errors_create_list_signal_receiver,
),
),
});
}
{
for _ in 0..server.read_handler_properties.target_tasks_size {
let (cancel_sender, cancel_receiver) = async_channel::bounded(1);
cancelable_handlers_keeper.push(ActiveCancelableHandler {
cancel_sender,
task: server.task_runner.spawn(NodeType::create_read_handler(
Arc::downgrade(&server),
Arc::clone(&server.socket),
cancel_receiver,
)),
});
}
}
{
let (cancel_sender, cancel_receiver) = async_channel::bounded(1);
cancelable_handlers_keeper.push(ActiveCancelableHandler {
cancel_sender,
task: server
.task_runner
.spawn(ActiveAwaitableHandler::create_holder(
awaitable_tasks_receiver,
cancel_receiver,
)),
});
}
drop(disposable_handlers_keeper);
drop(cancelable_handlers_keeper);
Ok(BindResult {
server: Server { internal: server },
})
};
task_runner_exit.spawn(bind_result_body)
}
pub fn packet_registry(&self) -> &PacketRegistry {
&self.internal.packet_registry
}
pub fn messaging_properties(&self) -> &MessagingProperties {
&self.internal.messaging_properties
}
pub fn read_handler_properties(&self) -> &ReadHandlerProperties {
&self.internal.read_handler_properties
}
pub fn server_properties(&self) -> &ServerProperties {
&self.internal.node_type.server_properties
}
pub fn local_addr(&self) -> SocketAddr {
self.internal.socket.local_addr().unwrap()
}
pub fn try_tick_start(&self) -> Result<ServerTickResult, ()> {
let internal = &self.internal;
let node_type = &internal.node_type;
{
let mut tick_state = node_type.tick_state.write().unwrap();
if *tick_state != ServerTickState::TickStartPending {
return Err(());
} else {
*tick_state = ServerTickState::TickEndPending;
}
}
let now = Instant::now();
let mut assigned_addrs_in_auth = node_type.assigned_addrs_in_auth.write().unwrap();
let dispatched_assigned_addrs_in_auth = std::mem::take(&mut *assigned_addrs_in_auth);
match &node_type.authenticator_mode {
AuthenticatorModeInternal::NoCryptography(auth_mode) => {
auth_mode.tick_start(internal, now, dispatched_assigned_addrs_in_auth);
}
#[cfg(feature = "auth_tcp")]
AuthenticatorModeInternal::RequireTcp(auth_mode) => {
auth_mode.tick_start(internal, now, dispatched_assigned_addrs_in_auth);
}
#[cfg(feature = "auth_tls")]
AuthenticatorModeInternal::RequireTls(auth_mode) => {
auth_mode.tick_start(internal, now, dispatched_assigned_addrs_in_auth);
}
}
#[cfg(feature = "store_unexpected")]
let unexpected_errors = match internal
.store_unexpected_errors
.error_list_receiver
.try_recv()
{
Ok(list) => list,
Err(_) => Vec::new(),
};
node_type.recently_disconnected.retain(|_, received_time| {
now - *received_time < internal.messaging_properties.timeout_interpretation
});
node_type.temporary_ignored_ips.retain(|addr, until_to| {
if now < *until_to {
true
} else {
node_type.ignored_ips.remove(addr);
false
}
});
let mut received_messages: HashMap<SocketAddr, Vec<DeserializedMessage>> = HashMap::new();
let mut to_auth: HashMap<AuthEntry, DeserializedMessage> = HashMap::new();
let mut disconnected: HashMap<SocketAddr, ClientDisconnectReason> = HashMap::new();
let mut addrs_to_disconnect: HashMap<
SocketAddr,
(ClientDisconnectReason, Option<JustifiedRejectionContext>),
> = HashMap::new();
while let Ok((addr, (addr_to_auth, message))) =
node_type.clients_to_auth_receiver.try_recv()
{
to_auth.insert(AuthEntry { addr, addr_to_auth }, message);
}
while let Ok((addr, reason)) = node_type.clients_to_disconnect_receiver.try_recv() {
if !addrs_to_disconnect.contains_key(&addr) {
addrs_to_disconnect.insert(addr, reason);
}
}
'l1: for client in node_type.connected_clients.iter() {
if addrs_to_disconnect.contains_key(client.key()) {
continue 'l1;
}
if let Some(mut messaging) = try_lock(&client.messaging) {
*client.last_messaging_write.write().unwrap() = now;
*client.average_latency.write().unwrap() =
messaging.latency_monitor.average_value();
let average_packet_loss_rtt = messaging.average_packet_loss_rtt;
let mut messages_to_resend: Vec<Arc<Vec<u8>>> = Vec::new();
for (sent_instant, pending_part_id_map) in
messaging.pending_confirmation.values_mut()
{
if now - *sent_instant > internal.messaging_properties.timeout_interpretation {
addrs_to_disconnect.insert(
client.key().clone(),
(
ClientDisconnectReason::PendingMessageConfirmationTimeout,
None,
),
);
continue 'l1;
}
for sent_part in pending_part_id_map.values_mut() {
if now - sent_part.last_sent_time > average_packet_loss_rtt {
sent_part.last_sent_time = now;
messages_to_resend.push(Arc::clone(&sent_part.finished_bytes));
}
}
}
for finished_bytes in messages_to_resend {
client
.shared_socket_bytes_send_sender
.try_send(finished_bytes)
.unwrap();
}
if !messaging.received_messages.is_empty() {
let messages = std::mem::replace(&mut messaging.received_messages, Vec::new());
messaging.tick_bytes_len = 0;
received_messages.insert(client.key().clone(), messages);
} else if now - messaging.last_received_message_instant
>= internal.messaging_properties.timeout_interpretation
{
addrs_to_disconnect.insert(
client.key().clone(),
(ClientDisconnectReason::MessageReceiveTimeout, None),
);
continue 'l1;
}
} else if now - *client.last_messaging_write.read().unwrap()
>= internal.messaging_properties.timeout_interpretation
{
addrs_to_disconnect.insert(
client.key().clone(),
(ClientDisconnectReason::WriteUnlockTimeout, None),
);
continue 'l1;
}
}
node_type
.pending_rejection_confirm
.retain(|_, (context, _)| {
now - context.rejection_instant
< internal
.messaging_properties
.disconnect_reason_resend_cancel
});
for tuple in node_type.pending_rejection_confirm.iter() {
let (_, last_sent_time) = tuple.value();
if let Some(last_sent_time) = last_sent_time {
if now - *last_sent_time
< internal.messaging_properties.disconnect_reason_resend_delay
{
continue;
}
}
node_type
.pending_rejection_confirm_resend_sender
.try_send(tuple.key().clone())
.unwrap();
}
for (addr, (reason, context)) in addrs_to_disconnect {
let connected_client = node_type.connected_clients.remove(&addr).unwrap().1;
let internal_clone = Arc::clone(&internal);
let _ = internal
.awaitable_tasks_sender
.try_send(ActiveAwaitableHandler {
task: internal.task_runner.spawn(async move {
NodeInternal::on_partner_disposed(&internal_clone, &connected_client).await
}),
});
if let Some(context) = context {
node_type
.pending_rejection_confirm
.insert(addr, (context, None));
}
disconnected.insert(addr, reason);
}
for auth_entry in to_auth.keys() {
assigned_addrs_in_auth.insert(*auth_entry.addr());
}
match &node_type.authenticator_mode {
AuthenticatorModeInternal::NoCryptography(auth_mode) => {
auth_mode.call_tick_start_signal();
}
#[cfg(feature = "auth_tcp")]
AuthenticatorModeInternal::RequireTcp(auth_mode) => {
auth_mode.call_tick_start_signal();
}
#[cfg(feature = "auth_tls")]
AuthenticatorModeInternal::RequireTls(auth_mode) => {
auth_mode.call_tick_start_signal();
}
}
#[cfg(feature = "store_unexpected")]
internal
.store_unexpected_errors
.create_list_signal_sender
.try_send(())
.unwrap();
node_type
.rejections_to_confirm_signal_sender
.try_send(())
.unwrap();
Ok(ServerTickResult {
received_messages,
to_auth,
disconnected,
#[cfg(feature = "store_unexpected")]
unexpected_errors,
})
}
#[cfg(not(feature = "no_panics"))]
pub fn tick_start(&self) -> ServerTickResult {
self.try_tick_start().expect("Invalid server tick state.")
}
pub fn try_tick_end(&self) -> Result<(), ()> {
let internal = &self.internal;
let node_type = &internal.node_type;
{
let mut tick_state = node_type.tick_state.write().unwrap();
if *tick_state != ServerTickState::TickEndPending {
return Err(());
} else {
*tick_state = ServerTickState::TickStartPending;
}
}
let tick_packet_serialized = internal
.packet_registry
.try_serialize(&ServerTickEndPacket)
.unwrap();
for client in node_type.connected_clients.iter() {
self.send_packet_serialized(&client, tick_packet_serialized.clone());
client.packets_to_send_sender.try_send(None).unwrap();
}
Ok(())
}
#[cfg(not(feature = "no_panics"))]
pub fn tick_end(&self) {
self.try_tick_end().expect("Invalid server tick state.")
}
pub fn try_authenticate(
&self,
auth_entry: AuthEntry,
initial_message: SerializedPacketList,
) -> Result<(), BadAuthenticateUsageError> {
let internal = &self.internal;
let node_type = &internal.node_type;
let addr = auth_entry.addr;
let addr_to_auth = auth_entry.addr_to_auth;
if node_type.connected_clients.contains_key(&addr) {
Err(BadAuthenticateUsageError::AlreadyConnected)
} else if !node_type
.assigned_addrs_in_auth
.write()
.unwrap()
.remove(&addr)
{
Err(BadAuthenticateUsageError::NotMarkedToAuthenticate)
} else {
match &node_type.authenticator_mode {
AuthenticatorModeInternal::NoCryptography(auth_mode) => {
auth_mode.remove_from_auth(&addr)
}
#[cfg(feature = "auth_tcp")]
AuthenticatorModeInternal::RequireTcp(auth_mode) => {
auth_mode.remove_from_auth(&addr)
}
#[cfg(feature = "auth_tls")]
AuthenticatorModeInternal::RequireTls(auth_mode) => {
auth_mode.remove_from_auth(&addr)
}
}
.expect("Addr was not marked in the last tick to be possibly authenticated.");
let (receiving_bytes_sender, receiving_bytes_receiver) = async_channel::unbounded();
let (packets_to_send_sender, packets_to_send_receiver) = async_channel::unbounded();
let (message_part_confirmation_sender, message_part_confirmation_receiver) =
async_channel::unbounded();
let (shared_socket_bytes_send_sender, shared_socket_bytes_send_receiver) =
async_channel::unbounded();
let now = Instant::now();
let initial_next_message_part_id =
internal.messaging_properties.initial_next_message_part_id + 1;
let messaging = PartnerMessaging {
pending_confirmation: BTreeMap::new(),
incoming_messages: MessagePartMap::new(initial_next_message_part_id),
tick_bytes_len: 0,
last_received_message_instant: now,
received_messages: Vec::new(),
packet_loss_rtt_calculator: RttCalculator::new(
internal.messaging_properties.initial_latency,
),
average_packet_loss_rtt: internal.messaging_properties.initial_latency,
latency_monitor: DurationMonitor::try_filled_with(
internal.messaging_properties.initial_latency,
16,
)
.unwrap(),
};
let client = Arc::new(ConnectedClient {
disposable_handlers_keeper: Mutex::new(Vec::new()),
receiving_bytes_sender,
packets_to_send_sender,
message_part_confirmation_sender,
shared_socket_bytes_send_sender,
addr,
inner_auth: addr_to_auth.inner_auth,
messaging: Mutex::new(messaging),
last_messaging_write: RwLock::new(now),
average_latency: RwLock::new(internal.messaging_properties.initial_latency),
incoming_messages_total_size: RwLock::new(0),
});
NodeType::push_completed_message_tick(
&internal,
&client,
&mut client.messaging.try_lock().unwrap(),
&client.shared_socket_bytes_send_sender,
initial_next_message_part_id - 1,
initial_message,
);
let mut disposable_handlers_keeper =
try_lock(&client.disposable_handlers_keeper).unwrap();
let server_downgraded = Arc::downgrade(&internal);
let client_downgraded = Arc::downgrade(&client);
disposable_handlers_keeper.push(ActiveDisposableHandler {
task: internal
.task_runner
.spawn(init::client::create_receiving_bytes_handler(
server_downgraded,
addr,
client_downgraded,
receiving_bytes_receiver,
)),
});
let server_downgraded = Arc::downgrade(&internal);
let client_downgraded = Arc::downgrade(&client);
disposable_handlers_keeper.push(ActiveDisposableHandler {
task: internal
.task_runner
.spawn(init::client::create_packets_to_send_handler(
server_downgraded,
client_downgraded,
packets_to_send_receiver,
initial_next_message_part_id,
)),
});
let server_downgraded = Arc::downgrade(&internal);
disposable_handlers_keeper.push(ActiveDisposableHandler {
task: internal.task_runner.spawn(
init::client::create_message_part_confirmation_handler(
server_downgraded,
addr,
message_part_confirmation_receiver,
),
),
});
let server_downgraded = Arc::downgrade(&internal);
disposable_handlers_keeper.push(ActiveDisposableHandler {
task: internal.task_runner.spawn(
init::client::create_shared_socket_bytes_send_handler(
server_downgraded,
addr,
shared_socket_bytes_send_receiver,
),
),
});
drop(disposable_handlers_keeper);
node_type.connected_clients.insert(addr, client);
Ok(())
}
}
#[cfg(not(feature = "no_panics"))]
pub fn authenticate(&self, auth_entry: AuthEntry, initial_message: SerializedPacketList) {
self.try_authenticate(auth_entry, initial_message).unwrap()
}
pub fn try_refuse(
&self,
auth_entry: AuthEntry,
message: LimitedMessage,
) -> Result<(), BadAuthenticateUsageError> {
let internal = &self.internal;
let node_type = &internal.node_type;
let addr = auth_entry.addr;
let addr_to_auth = auth_entry.addr_to_auth;
if node_type.connected_clients.contains_key(&addr) {
Err(BadAuthenticateUsageError::AlreadyConnected)
} else if !node_type
.assigned_addrs_in_auth
.write()
.unwrap()
.remove(&addr)
{
Err(BadAuthenticateUsageError::NotMarkedToAuthenticate)
} else {
node_type.pending_rejection_confirm.insert(
addr,
(
addr_to_auth
.inner_auth
.rejection_of(Instant::now(), message),
None,
),
);
Ok(())
}
}
#[cfg(not(feature = "no_panics"))]
pub fn refuse(&self, auth_entry: AuthEntry, message: LimitedMessage) {
self.try_refuse(auth_entry, message).unwrap()
}
pub fn disconnect_from(&self, client: &ConnectedClient, message: Option<LimitedMessage>) {
let internal = &self.internal;
let node_type = &internal.node_type;
let context = {
if let Some(message) = message {
Some(client.inner_auth.rejection_of(Instant::now(), message))
} else {
None
}
};
node_type
.clients_to_disconnect_sender
.try_send((
client.addr,
(ClientDisconnectReason::ManualDisconnect, context),
))
.unwrap();
}
pub fn ignore_ip(&self, ip: IpAddr) {
let node_type = &self.internal.node_type;
node_type.ignore_ip(ip);
}
pub fn ignore_ip_temporary(&self, ip: IpAddr, until_to: Instant) {
let node_type = &self.internal.node_type;
node_type.ignore_ip_temporary(ip, until_to);
}
pub fn remove_ignore_ip(&self, ip: &IpAddr) {
let node_type = &self.internal.node_type;
node_type.remove_ignore_ip(ip);
}
pub fn get_connected_client(
&self,
addr: &SocketAddr,
) -> Option<DashRef<SocketAddr, Arc<ConnectedClient>>> {
let node_type = &self.internal.node_type;
node_type.connected_clients.get(addr)
}
pub fn connected_clients_size(&self) -> usize {
let node_type = &self.internal.node_type;
node_type.connected_clients.len()
}
pub fn connected_clients_iter(&self) -> DashIter<SocketAddr, Arc<ConnectedClient>> {
let node_type = &self.internal.node_type;
node_type.connected_clients.iter()
}
pub fn try_send_packet<P: Packet>(
&self,
client: &ConnectedClient,
packet: &P,
) -> Result<(), io::Error> {
let internal = &self.internal;
let serialized = internal.packet_registry.try_serialize(packet)?;
self.send_packet_serialized(client, serialized);
Ok(())
}
#[cfg(not(feature = "no_panics"))]
pub fn send_packet<P: Packet>(&self, client: &ConnectedClient, packet: &P) {
self.try_send_packet(client, packet)
.expect("Failed to send packet.");
}
pub fn send_packet_serialized(
&self,
client: &ConnectedClient,
packet_serialized: SerializedPacket,
) {
client
.packets_to_send_sender
.try_send(Some(packet_serialized))
.unwrap();
}
pub fn disconnect(
self,
disconnection: Option<GracefullyDisconnection>,
) -> TaskHandle<ServerDisconnectState> {
let tasks_keeper_exit = Arc::clone(&self.internal.task_runner);
tasks_keeper_exit.spawn(async move {
NodeInternal::set_state_inactive(&self.internal).await;
if let Some(disconnection) = disconnection {
let mut confirmations = HashMap::<SocketAddr, ServerDisconnectClientState>::new();
let mut confirmations_pending =
HashMap::<SocketAddr, (Duration, Instant, JustifiedRejectionContext)>::new();
let timeout_interpretation = disconnection.timeout;
{
let now = Instant::now();
for connected_client in self.connected_clients_iter() {
let addr = connected_client.key().clone();
let packet_loss_timeout = connected_client
.messaging
.lock()
.await
.average_packet_loss_rtt
.min(timeout_interpretation);
confirmations_pending.insert(
addr,
(
packet_loss_timeout,
now,
connected_client.inner_auth.rejection_of(
Instant::now(),
LimitedMessage::clone(&disconnection.message),
),
),
);
}
}
let socket = Arc::clone(&self.internal.socket);
let rejection_confirm_bytes = &vec![MessageChannel::REJECTION_CONFIRM];
while !confirmations_pending.is_empty() {
let now = Instant::now();
let mut min_try_read_time = Duration::MAX;
let mut addrs_confirmed =
HashMap::<SocketAddr, ServerDisconnectClientState>::new();
for (addr, (packet_loss_timeout, last_sent_time, rejection_context)) in
confirmations_pending.iter_mut()
{
if now - rejection_context.rejection_instant > timeout_interpretation {
addrs_confirmed
.insert(*addr, ServerDisconnectClientState::ConfirmationTimeout);
continue;
}
let last_sent_time_copy = *last_sent_time;
let packet_loss_timeout_copy = *packet_loss_timeout;
let time_diff = now - last_sent_time_copy;
if now == last_sent_time_copy || time_diff >= packet_loss_timeout_copy {
*last_sent_time = now;
if let Err(e) = socket
.send_to(&rejection_context.finished_bytes, addr)
.await
{
addrs_confirmed
.insert(*addr, ServerDisconnectClientState::SendIoError(e));
} else {
min_try_read_time = Duration::ZERO;
}
} else {
let remaining_to_resend = packet_loss_timeout_copy - time_diff;
if remaining_to_resend < min_try_read_time {
min_try_read_time = remaining_to_resend;
}
}
}
for (addr, state) in addrs_confirmed {
confirmations_pending.remove(&addr);
confirmations.insert(addr, state);
}
if confirmations_pending.is_empty() {
break;
}
if min_try_read_time != Duration::ZERO {
let pre_read_next_bytes_result =
ServerNode::pre_read_next_bytes_timeout(&socket, min_try_read_time)
.await;
match pre_read_next_bytes_result {
Ok((addr, result)) => {
if &result == rejection_confirm_bytes {
if let Some(_) = confirmations_pending.remove(&addr) {
confirmations
.insert(addr, ServerDisconnectClientState::Confirmed);
}
}
}
Err(e) if e.kind() == io::ErrorKind::TimedOut => {}
Err(e) => {
for (addr, _) in confirmations_pending {
confirmations.insert(
addr,
ServerDisconnectClientState::ReceiveIoError(
io::Error::new(
io::ErrorKind::Other,
format!(
"Error trying read data from udp socket: {}",
e
),
),
),
);
}
break;
}
}
}
}
ServerDisconnectState::Confirmations(confirmations)
} else {
ServerDisconnectState::WithoutReason
}
})
}
}
impl Drop for Server {
fn drop(&mut self) {
NodeInternal::on_holder_drop(&self.internal);
}
}