use crate::{
communicate,
connection::{BootstrapGroupRef, Connection, FromPeer, QConn, ToPeer},
context::ctx_mut,
event::Event,
peer::Peer,
QuicP2pError,
};
use futures::future::{self, TryFutureExt};
use futures::stream::StreamExt;
use log::{debug, info, trace};
pub fn listen(incoming_connections: quinn::Incoming) {
let leaf = incoming_connections.for_each(move |connecting| {
let leaf = connecting
.map_err(|e| debug!("New connection errored out: {}", e))
.map_ok(handle_new_conn);
let _ = tokio::spawn(leaf);
future::ready(())
});
let _ = tokio::spawn(leaf);
}
enum Action {
HandleDuplicate(QConn),
HandleAlreadyBootstrapped,
Continue(Option<BootstrapGroupRef>),
}
fn handle_new_conn(
quinn::NewConnection {
connection,
uni_streams,
bi_streams,
..
}: quinn::NewConnection,
) {
let q_conn = QConn::from(connection);
let peer_addr = q_conn.remote_address();
trace!("Incoming connection from peer: {}", peer_addr);
let state = ctx_mut(|c| {
let event_tx = c.event_tx.clone();
let conn = c
.connections
.entry(peer_addr)
.or_insert_with(|| Connection::new(peer_addr, event_tx, None));
if conn.from_peer.is_no_connection() {
conn.from_peer = FromPeer::Established {
q_conn,
pending_reads: Default::default(),
};
if let ToPeer::Established { .. } = conn.to_peer {
if let Some(mut bootstrap_group_ref) = conn.bootstrap_group_ref.take() {
if bootstrap_group_ref.is_bootstrap_successful_yet() {
return Action::HandleAlreadyBootstrapped;
}
if let Err(e) =
bootstrap_group_ref.send(Event::BootstrappedTo { node: peer_addr })
{
info!("ERROR in informing user about a new peer: {:?} - {}", e, e);
}
return Action::Continue(Some(bootstrap_group_ref));
} else if let Err(e) = c.event_tx.send(Event::ConnectedTo {
peer: Peer::Node(peer_addr),
}) {
info!("ERROR in informing user about a new peer: {:?} - {}", e, e);
};
};
Action::Continue(None)
} else {
Action::HandleDuplicate(q_conn)
}
});
match state {
Action::HandleDuplicate(_q_conn) => {
debug!("Not allowing duplicate connection from peer: {}", peer_addr);
}
Action::HandleAlreadyBootstrapped => crate::utils::handle_communication_err(
peer_addr,
&QuicP2pError::ConnectionCancelled,
"Connection already bootstrapped",
None,
),
Action::Continue(bootstrap_group) => {
if let Some(bootstrap_group_ref) = bootstrap_group {
bootstrap_group_ref.terminate_group(true);
}
communicate::read_from_peer(peer_addr, uni_streams, bi_streams);
}
}
}