mod connection;
mod handshake;
pub use connection::ConnectionCommand;
use std::net::SocketAddr;
use {
bytes::Bytes,
crossbeam_channel::Sender,
futures::StreamExt,
quinn::{ClientConfigBuilder, Connection, Endpoint, EndpointDriver, Incoming},
tokio::sync::{
mpsc::{Receiver as TokioReceiver, Sender as TokioSender},
oneshot::Sender as OneshotSender,
},
};
use crate::{worker::connection::handle_connecting, Certificate, PeerId};
#[derive(Debug)]
pub enum WorkerCommand {
Connect {
server_addr: SocketAddr,
server_name: String,
certificate: Certificate,
assigned_peer_id: PeerId,
},
Stop,
}
#[derive(Debug)]
pub enum WorkerEvent {
ConnectionFailed {
peer_id: PeerId,
},
ConnectionStarted {
connection: Connection,
connection_sender: TokioSender<ConnectionCommand>,
server_peer_id: Option<PeerId>,
peer_id_sender: OneshotSender<PeerId>,
},
ConnectionRequested {
peer_id: PeerId,
socket_addr: SocketAddr,
confirm_sender: OneshotSender<bool>,
},
Disconnected {
peer_id: PeerId,
reason: Option<String>,
},
ReceivedDatagram {
peer_id: PeerId,
bytes: Bytes,
},
ReceivedMessage {
peer_id: PeerId,
bytes: Bytes,
},
Stopped,
}
pub async fn driver_worker(driver: EndpointDriver, sender: Sender<WorkerEvent>) {
let result = driver.await;
sender.send(WorkerEvent::Stopped).unwrap();
if let Err(error) = result {
panic!("IO Error: {}", error);
}
}
pub async fn network_worker(
endpoint: Endpoint,
incoming: Incoming,
sender: Sender<WorkerEvent>,
mut receiver: TokioReceiver<WorkerCommand>,
protocol_checksum: u32,
is_listening: bool,
) {
if is_listening {
tokio::spawn(listen_incoming(incoming, sender.clone(), protocol_checksum));
}
while let Some(command) = receiver.recv().await {
match command {
WorkerCommand::Connect {
server_addr,
server_name,
certificate,
assigned_peer_id,
} => {
let mut config_builder = ClientConfigBuilder::default();
config_builder
.add_certificate_authority(certificate)
.unwrap();
let client_config = config_builder.build();
let connecting = endpoint
.connect_with(client_config, &server_addr, &server_name)
.unwrap();
tokio::spawn(handle_connecting(
connecting,
sender.clone(),
Some(assigned_peer_id),
protocol_checksum,
));
}
WorkerCommand::Stop => {
endpoint.close(0u32.into(), b"Endpoint was dropped");
return;
}
}
}
}
async fn listen_incoming(
mut incoming: Incoming,
sender: Sender<WorkerEvent>,
protocol_checksum: u32,
) {
while let Some(connecting) = incoming.next().await {
tokio::spawn(handle_connecting(
connecting,
sender.clone(),
None,
protocol_checksum,
));
}
}