use std::sync::{Arc, Weak};
use crate::{
internal::{
messages::{MessageId, MessagePartId},
MessageChannel,
},
packets::{SerializedPacket, SerializedPacketList},
};
use super::*;
pub mod server {
use crate::internal::node::ReceivedBytesProcessResult;
use super::*;
pub async fn create_receiving_bytes_handler(
node: Weak<NodeInternal<ClientNode>>,
server: Weak<ConnectedServer>,
receiving_bytes_receiver: async_channel::Receiver<Vec<u8>>,
) {
'l1: while let Ok(bytes) = receiving_bytes_receiver.recv().await {
if let Some(node) = NodeInternal::try_upgrade(&node) {
if let Some(server) = server.upgrade() {
match NodeType::handle_received_bytes(&node, &server, bytes).await {
ReceivedBytesProcessResult::InvalidProtocolCommunication => {
#[cfg(feature = "store_unexpected")]
{
let _ = node
.store_unexpected_errors
.error_sender
.try_send(UnexpectedError::InvalidProtocolCommunication);
}
}
ReceivedBytesProcessResult::AuthMessage(_) => {
let _ = node
.node_type
.reason_to_disconnect_sender
.try_send(ServerDisconnectReason::InvalidProtocolCommunication);
break 'l1;
}
ReceivedBytesProcessResult::RejectionJustification(message) => {
let _ = node
.socket
.send(&vec![MessageChannel::REJECTION_CONFIRM])
.await;
let _ = node
.node_type
.reason_to_disconnect_sender
.try_send(ServerDisconnectReason::DisconnectRequest(message));
break 'l1;
}
ReceivedBytesProcessResult::MessagePartConfirm
| ReceivedBytesProcessResult::MessagePartSent => (),
}
} else {
break 'l1;
}
} else {
break 'l1;
}
}
}
pub async fn create_packets_to_send_handler(
node: Weak<NodeInternal<ClientNode>>,
server: Weak<ConnectedServer>,
packets_to_send_receiver: async_channel::Receiver<Option<SerializedPacket>>,
mut next_message_id: MessagePartId,
) {
let mut packets_to_send: Vec<SerializedPacket> = Vec::new();
'l1: while let Ok(serialized_packet) = packets_to_send_receiver.recv().await {
if let Some(serialized_packet) = serialized_packet {
packets_to_send.push(serialized_packet);
} else {
if let Some(node) = NodeInternal::try_upgrade(&node) {
if let Some(server) = server.upgrade() {
let mut messaging = server.messaging.lock().await;
let packets_to_send = std::mem::replace(&mut packets_to_send, Vec::new());
let serialized_packet_list =
SerializedPacketList::try_non_empty(packets_to_send).unwrap();
NodeType::push_completed_message_tick(
&node,
&server,
&mut messaging,
&server.shared_socket_bytes_send_sender,
next_message_id,
serialized_packet_list,
);
next_message_id = next_message_id.wrapping_add(1);
} else {
break 'l1;
}
} else {
break 'l1;
}
}
}
}
pub async fn create_message_part_confirmation_handler(
node: Weak<NodeInternal<ClientNode>>,
message_part_confirmation_receiver: async_channel::Receiver<(
MessageId,
Option<MessagePartId>,
)>,
) {
'l1: while let Ok((message_id, part_id)) = message_part_confirmation_receiver.recv().await {
if let Some(node) = NodeInternal::try_upgrade(&node) {
let message_id_bytes = message_id.to_be_bytes();
let bytes = {
if let Some(part_id) = part_id {
let part_id_bytes = part_id.to_be_bytes();
vec![
MessageChannel::MESSAGE_PART_CONFIRM,
message_id_bytes[0],
message_id_bytes[1],
part_id_bytes[0],
part_id_bytes[1],
]
} else {
vec![
MessageChannel::MESSAGE_PART_CONFIRM,
message_id_bytes[0],
message_id_bytes[1],
]
}
};
if let Err(e) = node.socket.send(&bytes).await {
let _ = node
.node_type
.reason_to_disconnect_sender
.try_send(ServerDisconnectReason::ByteSendError(e));
break 'l1;
}
} else {
break 'l1;
}
}
}
pub async fn create_shared_socket_bytes_send_handler(
node: Weak<NodeInternal<ClientNode>>,
shared_socket_bytes_send_receiver: async_channel::Receiver<Arc<Vec<u8>>>,
) {
'l1: while let Ok(bytes) = shared_socket_bytes_send_receiver.recv().await {
if let Some(node) = NodeInternal::try_upgrade(&node) {
if let Err(e) = node.socket.send(&bytes).await {
let _ = node
.node_type
.reason_to_disconnect_sender
.try_send(ServerDisconnectReason::ByteSendError(e));
break 'l1;
}
} else {
break 'l1;
}
}
}
}
pub mod client {
#[cfg(feature = "store_unexpected")]
use super::*;
#[cfg(feature = "store_unexpected")]
pub async fn create_store_unexpected_error_list_handler(
node: Weak<NodeInternal<ClientNode>>,
create_list_signal_receiver: async_channel::Receiver<()>,
) {
'l1: while let Ok(_) = create_list_signal_receiver.recv().await {
if let Some(node) = NodeInternal::try_upgrade(&node) {
let _ = node
.store_unexpected_errors
.error_list_sender
.try_send(NodeType::store_unexpected_error_list_pick(&node).await);
} else {
break 'l1;
}
}
}
}