use std::sync::{Arc, Mutex};
use ack_handler::{AckBuffer, AckConsumer};
use connection::{ConnectionBuffer, ConnectionManager};
use tokio::{net::UdpSocket, spawn, sync::RwLock};
use crate::{
core::{
context::BluefinHost,
header::{BluefinHeader, BluefinSecurityFields, PacketType},
packet::BluefinPacket,
},
utils::{common::BluefinResult, get_connected_udp_socket},
worker::{conn_reader::ConnReaderHandler, reader::ReaderTxChannel},
};
pub mod ack_handler;
pub mod client;
pub mod connection;
pub mod ordered_bytes;
pub mod server;
pub(crate) const BLUEFIN_HEADER_SIZE_BYTES: usize = 20;
pub(crate) const MAX_BLUEFIN_PAYLOAD_SIZE_BYTES: usize = 1500;
pub(crate) const MAX_BLUEFIN_PACKETS_IN_UDP_DATAGRAM: usize = 10;
pub(crate) const MAX_BLUEFIN_BYTES_IN_UDP_DATAGRAM: usize = MAX_BLUEFIN_PACKETS_IN_UDP_DATAGRAM
* (BLUEFIN_HEADER_SIZE_BYTES + MAX_BLUEFIN_PAYLOAD_SIZE_BYTES);
#[derive(Clone)]
pub(crate) struct ConnectionManagedBuffers {
pub(crate) conn_buff: Arc<Mutex<ConnectionBuffer>>,
pub(crate) ack_buff: Arc<Mutex<AckBuffer>>,
}
#[inline]
fn build_and_start_tx(
num_tx_workers: u16,
socket: Arc<UdpSocket>,
conn_manager: Arc<RwLock<ConnectionManager>>,
pending_accept_ids: Arc<Mutex<Vec<u32>>>,
host_type: BluefinHost,
) {
let tx = ReaderTxChannel::new(socket, conn_manager, pending_accept_ids, host_type);
for id in 0..num_tx_workers {
let mut tx_clone = tx.clone();
tx_clone.id = id;
spawn(async move {
let _ = tx_clone.run().await;
});
}
}
#[inline]
fn build_and_start_conn_reader_tx_channels(
socket: Arc<UdpSocket>,
conn_bufs: Arc<ConnectionManagedBuffers>,
) -> BluefinResult<()> {
let handler = ConnReaderHandler::new(socket, conn_bufs);
handler.start()
}
#[inline]
fn build_and_start_ack_consumer_workers(
num_ack_consumer_workers: u8,
ack_buffer: Arc<Mutex<AckBuffer>>,
) {
let largest_recv_acked_packet_num = Arc::new(RwLock::new(0));
let ack_consumer = AckConsumer::new(Arc::clone(&ack_buffer), largest_recv_acked_packet_num);
for _ in 0..num_ack_consumer_workers {
let ack_consumer_clone = ack_consumer.clone();
spawn(async move {
ack_consumer_clone.run().await;
});
}
}
#[inline]
pub(crate) fn is_hello_packet(host_type: BluefinHost, packet: &BluefinPacket) -> bool {
let other_id = packet.header.source_connection_id;
let this_id = packet.header.destination_connection_id;
if host_type == BluefinHost::PackLeader
&& packet.header.type_field != PacketType::UnencryptedClientHello
{
return false;
}
if host_type == BluefinHost::Client
&& packet.header.type_field != PacketType::UnencryptedServerHello
{
return false;
}
if host_type == BluefinHost::Client && (other_id == 0x0 || this_id == 0x0) {
return false;
}
if host_type == BluefinHost::PackLeader && other_id == 0x0 {
return false;
}
if host_type == BluefinHost::PackLeader && this_id != 0x0 {
return false;
}
true
}
#[inline]
pub(crate) fn is_client_ack_packet(host_type: BluefinHost, packet: &BluefinPacket) -> bool {
let other_id = packet.header.source_connection_id;
let this_id = packet.header.destination_connection_id;
if host_type == BluefinHost::PackLeader
&& packet.header.type_field == PacketType::ClientAck
&& other_id != 0x0
&& this_id != 0x0
{
return true;
}
false
}
#[inline]
pub(crate) fn build_empty_encrypted_packet(
src_conn_id: u32,
dst_conn_id: u32,
packet_number: u64,
packet_type: PacketType,
) -> BluefinPacket {
let security_fields = BluefinSecurityFields::new(false, 0x0);
let mut header =
BluefinHeader::new(src_conn_id, dst_conn_id, packet_type, 0x0, security_fields);
header.with_packet_number(packet_number);
BluefinPacket::builder().header(header).build()
}