use crate::EndpointBuilder;
use crate::{connection::ConnectionIncoming, endpoint_builder::SERVER_NAME};
use super::{connection::Connection, error::ConnectionError};
use std::net::SocketAddr;
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver};
use tracing::{error, trace, warn};
#[derive(Debug)]
pub struct IncomingConnections(pub(crate) Receiver<(Connection, ConnectionIncoming)>);
impl IncomingConnections {
pub async fn next(&mut self) -> Option<(Connection, ConnectionIncoming)> {
self.0.recv().await
}
pub fn try_recv(&mut self) -> Result<(Connection, ConnectionIncoming), TryRecvError> {
self.0.try_recv()
}
}
#[derive(Clone)]
pub struct Endpoint {
pub(crate) inner: quinn::Endpoint,
pub(crate) local_addr: SocketAddr,
}
impl std::fmt::Debug for Endpoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Endpoint")
.field("local_addr", &self.local_addr)
.field("quinn_endpoint", &"<endpoint omitted>")
.finish()
}
}
impl Endpoint {
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub async fn connect_to(
&self,
node_addr: &SocketAddr,
) -> Result<(Connection, ConnectionIncoming), ConnectionError> {
self.new_connection(node_addr).await
}
pub async fn connect_to_any(
&self,
peer_addrs: &[SocketAddr],
) -> Option<(Connection, ConnectionIncoming)> {
trace!("Connecting to any of {:?}", peer_addrs);
if peer_addrs.is_empty() {
return None;
}
let tasks = peer_addrs
.iter()
.map(|addr| Box::pin(self.new_connection(addr)));
match futures::future::select_ok(tasks).await {
Ok((connection, _)) => Some(connection),
Err(error) => {
error!("Failed to bootstrap to the network, last error: {}", error);
None
}
}
}
pub fn close(&self) {
trace!("Closing endpoint");
self.inner.close(0_u32.into(), b"Endpoint closed")
}
async fn new_connection(
&self,
node_addr: &SocketAddr,
) -> Result<(Connection, ConnectionIncoming), ConnectionError> {
trace!("Attempting to connect to {:?}", node_addr);
let connecting = match self.inner.connect(*node_addr, SERVER_NAME) {
Ok(conn) => Ok(conn),
Err(error) => {
warn!(
"Connection attempt to {node_addr:?} failed due to {:?}",
error
);
Err(ConnectionError::from(error))
}
}?;
let new_conn = match connecting.await {
Ok(new_conn) => {
let connection = Connection::new(new_conn);
trace!(
"Successfully connected to peer {node_addr}, conn_id={}",
connection.0.id()
);
Ok(connection)
}
Err(error) => Err(ConnectionError::from(error)),
}?;
Ok(new_conn)
}
pub fn builder() -> EndpointBuilder {
EndpointBuilder::default()
}
}
pub(super) fn listen_for_incoming_connections(
quinn_endpoint: quinn::Endpoint,
connection_tx: mpsc::Sender<(Connection, ConnectionIncoming)>,
) {
let _handle = tokio::spawn(async move {
while let Some(quinn_conn) = quinn_endpoint.accept().await {
let conn_sender = connection_tx.clone();
let _handle = tokio::spawn(async move {
match quinn_conn.await {
Ok(connection) => {
let connection = Connection::new(connection);
let conn_id = connection.0.id();
trace!("Incoming new connection conn_id={conn_id}");
if conn_sender.send(connection).await.is_err() {
warn!("Dropping incoming connection conn_id={conn_id}, because receiver was dropped");
}
}
Err(err) => {
warn!("An incoming connection failed because of: {:?}", err);
}
}
});
}
trace!(
"quinn::Endpoint::accept() returned None. There will be no more incoming connections"
);
});
}