use std::{
collections::{HashMap, VecDeque},
convert::Infallible,
task::{Context, Poll},
};
use either::Either;
use futures::{StreamExt, channel::mpsc, ready};
use volans_core::{Multiaddr, PeerId};
use volans_swarm::{
BehaviorEvent, ConnectionDenied, ConnectionId, DialOpts, NetworkBehavior,
NetworkOutgoingBehavior, THandlerAction, THandlerEvent,
behavior::NotifyHandler,
error::{ConnectionError, DialError},
handler::DummyHandler,
};
use crate::relay::CircuitRequest;
use super::handler;
pub struct Behavior {
request_receiver: mpsc::UnboundedReceiver<CircuitRequest>,
dial_requests: HashMap<ConnectionId, CircuitRequest>,
pending_events: VecDeque<BehaviorEvent<Infallible, THandlerAction<Self>>>,
}
impl Behavior {
pub fn new(request_receiver: mpsc::UnboundedReceiver<CircuitRequest>) -> Self {
Self {
request_receiver,
dial_requests: HashMap::new(),
pending_events: VecDeque::new(),
}
}
}
impl NetworkBehavior for Behavior {
type ConnectionHandler = Either<DummyHandler, handler::Handler>;
type Event = Infallible;
fn on_connection_handler_event(
&mut self,
_id: ConnectionId,
_peer_id: PeerId,
event: THandlerEvent<Self>,
) {
unimplemented!("Unexpected event: {:?}", event);
}
fn poll(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<BehaviorEvent<Self::Event, THandlerAction<Self>>> {
loop {
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
return Poll::Pending;
}
}
}
impl NetworkOutgoingBehavior for Behavior {
fn handle_established_connection(
&mut self,
id: ConnectionId,
_peer_id: PeerId,
_addr: &Multiaddr,
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
if self.dial_requests.contains_key(&id) {
tracing::debug!("处理拨号成功,返回对应的处理器: {:?}", id);
Ok(Either::Right(handler::Handler::new()))
} else {
Ok(Either::Left(DummyHandler))
}
}
fn on_connection_established(&mut self, id: ConnectionId, peer_id: PeerId, _addr: &Multiaddr) {
if let Some(request) = self.dial_requests.remove(&id) {
tracing::debug!("处理拨号成功,写入连接操作: {:?}", request);
self.pending_events.push_back(BehaviorEvent::HandlerAction {
peer_id,
handler: NotifyHandler::One(id),
action: Either::Right(request),
});
}
}
fn on_connection_closed(
&mut self,
id: ConnectionId,
_peer_id: PeerId,
_addr: &Multiaddr,
_reason: Option<&ConnectionError>,
) {
if let Some(request) = self.dial_requests.remove(&id) {
tracing::warn!(" 连接关闭: {:?}", request.dst_peer_id);
}
}
fn on_dial_failure(
&mut self,
id: ConnectionId,
_peer_id: Option<PeerId>,
_addr: Option<&Multiaddr>,
_error: &DialError,
) {
if let Some(request) = self.dial_requests.remove(&id) {
tracing::error!(" 处理拨号失败: {:?}", request.dst_peer_id);
}
}
fn poll_dial(&mut self, cx: &mut Context<'_>) -> Poll<DialOpts> {
let Some(request) = ready!(self.request_receiver.poll_next_unpin(cx)) else {
return Poll::Pending;
};
let dial_opts = DialOpts::new(None, Some(request.dst_peer_id));
tracing::debug!("Relay Dialing ....{:?}", dial_opts);
self.dial_requests
.insert(dial_opts.connection_id(), request);
Poll::Ready(dial_opts)
}
}