use std::{
io::{Error, ErrorKind, Result},
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant, SystemTime},
};
use async_trait::async_trait;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
SinkExt, StreamExt, TryStreamExt,
};
use futures_map::FuturesUnorderedMap;
use rasi::{task::spawn_ok, timer::sleep};
use xstack::{
events,
multiaddr::{Multiaddr, Protocol},
transport_syscall::DriverListener,
AutoNAT, EventSource, P2pConn, ProtocolListener, ProtocolStream, Switch, XStackRpc,
PROTOCOL_IPFS_PING,
};
use xstack_tls::{create_ssl_acceptor, TlsConn};
use crate::{
CircuitV2Rpc, PROTOCOL_CIRCUIT_RELAY_HOP, PROTOCOL_CIRCUIT_RELAY_STOP, PROTOCOL_DCUTR,
};
struct CircuitStopListener(Receiver<P2pConn>);
#[async_trait]
impl DriverListener for CircuitStopListener {
async fn accept(&mut self) -> Result<P2pConn> {
self.0.next().await.ok_or(Error::new(
ErrorKind::BrokenPipe,
"CircuitStopListener broken.",
))
}
fn local_addr(&self) -> Result<Multiaddr> {
Ok(Multiaddr::empty().with(Protocol::P2pCircuit))
}
}
#[derive(Clone)]
pub struct CircuitStopServerBuilder {
reservations: Arc<AtomicUsize>,
activities: Arc<AtomicUsize>,
incoming_buffer: usize,
channel_limits: usize,
ping_duration: Duration,
switch: Switch,
}
impl CircuitStopServerBuilder {
pub fn incoming_buffer(mut self, value: usize) -> Self {
self.incoming_buffer = value;
self
}
pub fn channel_limits(mut self, value: NonZeroUsize) -> Self {
self.channel_limits = value.into();
self
}
pub fn start(self) -> CircuitStopServer {
spawn_ok(self.clone().run_stop_accept());
spawn_ok(self.clone().run_hop_client());
CircuitStopServer {
reservations: self.reservations,
}
}
async fn run_hop_client(self) {
let mut event_source = EventSource::<events::Network>::bind_with(
&self.switch,
NonZeroUsize::new(100).unwrap(),
)
.await;
loop {
loop {
let nat = self.switch.nat().await;
log::trace!("hop client check network: {:?}", nat);
if nat == AutoNAT::NAT {
break;
}
if event_source.next().await.is_none() {
log::trace!("switch closed.");
return;
};
}
log::trace!("start circuit reservation client.");
self.run_reservation_client().await;
}
}
async fn run_stop_accept(self) {
let (sender, receiver) = channel(self.incoming_buffer);
if let Err(err) = self
.switch
.transport_bind_with(CircuitStopListener(receiver).into())
.await
{
log::error!("bind 'CircuitStopListener' with error, {}", err);
return;
}
let listener = match self.switch.bind([PROTOCOL_CIRCUIT_RELAY_STOP]).await {
Ok(listener) => listener,
Err(err) => {
log::error!(
"Start protocol listener '{}' with error: {}",
PROTOCOL_CIRCUIT_RELAY_STOP,
err
);
return;
}
};
self.protocol_incoming_loop(
sender,
listener,
self.activities.clone(),
self.switch.max_packet_size,
)
.await;
}
async fn run_reservation_client(&self) {
let mut unordered = FuturesUnorderedMap::new();
let mut id = 0;
for _ in 0..self.channel_limits {
unordered.insert(id, self.clone().reservation_client_loop());
id += 1;
}
while let Some(_) = unordered.next().await {
if self.switch.nat().await == AutoNAT::NAT {
while unordered.len() < self.channel_limits {
unordered.insert(id, self.clone().reservation_client_loop());
id += 1;
}
}
}
}
async fn reservation_client_loop(self) {
if let Err(err) = self.reservation_client_loop_prv().await {
log::error!("reservation_client_loop, stopped with error: {}", err);
}
}
async fn reservation_client_loop_prv(self) -> Result<()> {
let peers = self
.switch
.choose_peers(PROTOCOL_CIRCUIT_RELAY_HOP, 1)
.await?;
if peers.is_empty() {
log::trace!("hop client, sleep...");
sleep(Duration::from_secs(10)).await;
return Ok(());
}
let peer_id = peers[0];
let (mut stream, _) = self
.switch
.connect(&peer_id, [PROTOCOL_CIRCUIT_RELAY_HOP])
.await?;
let reservation =
CircuitV2Rpc::circuit_v2_hop_reserve(&mut stream, self.switch.max_packet_size).await?;
let peer_addr = stream.peer_addr().clone();
log::trace!(
"reserve from {}, expire={:?}, {}",
peer_addr,
reservation.expire,
reservation
.limit
.map(|limit| limit.to_string())
.unwrap_or("".to_owned())
);
self.reservations.fetch_add(1, Ordering::Relaxed);
let mut stream = match self.switch.connect(&peer_id, [PROTOCOL_IPFS_PING]).await {
Ok((stream, _)) => stream,
Err(err) => {
log::trace!("reservation from {}, ping timeout/open failed", peer_addr);
self.reservations.fetch_sub(1, Ordering::Relaxed);
return Err(err.into());
}
};
let nat_addrs = reservation
.addrs
.iter()
.map(|addr| addr.clone().with(Protocol::P2pCircuit))
.collect::<Vec<_>>();
self.switch.set_nat_addrs(nat_addrs.clone()).await;
while SystemTime::now() < reservation.expire {
let now = Instant::now();
if let Err(err) = XStackRpc::xstack_ping(&mut stream).await {
self.reservations.fetch_sub(1, Ordering::Relaxed);
self.switch.remove_nat_addrs(&nat_addrs).await;
log::trace!("reservation from {}, ping timeout/open failed", peer_addr);
return Err(err.into());
}
log::trace!(
"reservation from {}, ping succ, time={:?}",
peer_addr,
now.elapsed()
);
if self.switch.nat().await != AutoNAT::NAT {
self.reservations.fetch_sub(1, Ordering::Relaxed);
return Ok(());
}
sleep(self.ping_duration).await;
}
self.switch.remove_nat_addrs(&nat_addrs).await;
log::trace!("reservation from {}, timeout", peer_addr);
Ok(())
}
async fn protocol_incoming_loop(
&self,
sender: Sender<P2pConn>,
listener: ProtocolListener,
activities: Arc<AtomicUsize>,
max_packet_size: usize,
) {
if let Err(err) =
Self::circuit_stop_server_loop_prv(self, sender, listener, activities, max_packet_size)
.await
{
log::error!("circuit_stop_server_loop, stopped with error {}", err)
} else {
log::info!("circuit_stop_server_loop, stopped.");
}
}
async fn circuit_stop_server_loop_prv(
&self,
sender: Sender<P2pConn>,
listener: ProtocolListener,
activities: Arc<AtomicUsize>,
max_packet_size: usize,
) -> Result<()> {
let mut incoming = listener.into_incoming();
while let Some((stream, _)) = incoming.try_next().await? {
log::trace!(
"[{}] new incoming stream from {}",
PROTOCOL_CIRCUIT_RELAY_STOP,
stream.peer_addr()
);
let sender = sender.clone();
let activities = activities.clone();
let this = self.clone();
spawn_ok(async move {
let peer_id = stream.public_key().to_peer_id();
if let Err(err) = this
.handle_stop_incoming(stream, sender, activities, max_packet_size)
.await
{
log::error!(
"handle incoming({}) stop stream with error: {}",
peer_id,
err
);
}
});
}
Ok(())
}
async fn handle_stop_incoming(
self,
mut stream: ProtocolStream,
mut sender: Sender<P2pConn>,
activities: Arc<AtomicUsize>,
max_packet_size: usize,
) -> Result<()> {
CircuitV2Rpc::circuit_v2_stop_connect_accept(&mut stream, max_packet_size).await?;
let local_addr = stream.local_addr().clone();
let peer_addr = stream.peer_addr().clone();
let ssl_acceptor = create_ssl_acceptor(&self.switch).await?;
let conn: P2pConn =
TlsConn::accept(stream, local_addr, peer_addr, &ssl_acceptor, activities)
.await?
.into();
Ok(sender
.send(conn)
.await
.map_err(|_| Error::new(ErrorKind::BrokenPipe, ""))?)
}
#[allow(unused)]
async fn try_upgrade(self, mut conn: P2pConn) {
if let Err(err) = self.try_upgrade_prv(&mut conn).await {
log::error!(
"try upgrade circuit conn failed, from={}, err={}",
conn.peer_addr(),
err
);
}
}
async fn try_upgrade_prv(self, conn: &mut P2pConn) -> Result<()> {
let (_stream, _) = conn.connect([PROTOCOL_DCUTR]).await.unwrap();
todo!()
}
}
pub struct CircuitStopServer {
reservations: Arc<AtomicUsize>,
}
impl CircuitStopServer {
#[cfg(feature = "global_register")]
#[cfg_attr(docsrs, doc(cfg(feature = "global_register")))]
pub fn new() -> CircuitStopServerBuilder {
use xstack::global_switch;
Self::bind_with(global_switch())
}
pub fn bind_with(switch: &Switch) -> CircuitStopServerBuilder {
CircuitStopServerBuilder {
reservations: Default::default(),
switch: switch.clone(),
incoming_buffer: 100,
activities: Default::default(),
channel_limits: 2,
ping_duration: Duration::from_secs(5),
}
}
pub fn reservations(&self) -> usize {
self.reservations.load(Ordering::Relaxed)
}
}