volans-bridge 0.2.0

Bridge protocol for volans
Documentation
use std::{
    collections::{HashMap, VecDeque},
    convert::Infallible,
    task::{Context, Poll},
};

use either::Either;
use futures::{StreamExt, channel::mpsc, ready};
use volans_core::{Multiaddr, PeerId};
use volans_swarm::{
    BehaviorEvent, ConnectionDenied, ConnectionId, DialOpts, NetworkBehavior,
    NetworkOutgoingBehavior, THandlerAction, THandlerEvent,
    behavior::NotifyHandler,
    error::{ConnectionError, DialError},
    handler::DummyHandler,
};

use crate::relay::CircuitRequest;

use super::handler;

/// 中继服务器连接Backend的行为
pub struct Behavior {
    request_receiver: mpsc::UnboundedReceiver<CircuitRequest>,
    dial_requests: HashMap<ConnectionId, CircuitRequest>,
    pending_events: VecDeque<BehaviorEvent<Infallible, THandlerAction<Self>>>,
}

impl Behavior {
    pub fn new(request_receiver: mpsc::UnboundedReceiver<CircuitRequest>) -> Self {
        Self {
            request_receiver,
            dial_requests: HashMap::new(),
            pending_events: VecDeque::new(),
        }
    }
}

impl NetworkBehavior for Behavior {
    type ConnectionHandler = Either<DummyHandler, handler::Handler>;
    type Event = Infallible;

    fn on_connection_handler_event(
        &mut self,
        _id: ConnectionId,
        _peer_id: PeerId,
        event: THandlerEvent<Self>,
    ) {
        unimplemented!("Unexpected event: {:?}", event);
    }

    fn poll(
        &mut self,
        _cx: &mut Context<'_>,
    ) -> Poll<BehaviorEvent<Self::Event, THandlerAction<Self>>> {
        loop {
            if let Some(event) = self.pending_events.pop_front() {
                return Poll::Ready(event);
            }
            return Poll::Pending;
        }
    }
}

impl NetworkOutgoingBehavior for Behavior {
    fn handle_established_connection(
        &mut self,
        id: ConnectionId,
        _peer_id: PeerId,
        _addr: &Multiaddr,
    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
        if self.dial_requests.contains_key(&id) {
            tracing::debug!("处理拨号成功,返回对应的处理器: {:?}", id);
            // 如果是待处理的请求,返回对应的处理器
            Ok(Either::Right(handler::Handler::new()))
        } else {
            // 否则返回一个空的处理器
            Ok(Either::Left(DummyHandler))
        }
    }

    fn on_connection_established(&mut self, id: ConnectionId, peer_id: PeerId, _addr: &Multiaddr) {
        // 在排队中的连接
        if let Some(request) = self.dial_requests.remove(&id) {
            // 处理拨号成功,写入连接操作
            tracing::debug!("处理拨号成功,写入连接操作: {:?}", request);
            self.pending_events.push_back(BehaviorEvent::HandlerAction {
                peer_id,
                handler: NotifyHandler::One(id),
                action: Either::Right(request),
            });
        }
    }

    fn on_connection_closed(
        &mut self,
        id: ConnectionId,
        _peer_id: PeerId,
        _addr: &Multiaddr,
        _reason: Option<&ConnectionError>,
    ) {
        if let Some(request) = self.dial_requests.remove(&id) {
            // 处理拨号失败
            tracing::warn!(" 连接关闭: {:?}", request.dst_peer_id);
        }
    }

    fn on_dial_failure(
        &mut self,
        id: ConnectionId,
        _peer_id: Option<PeerId>,
        _addr: Option<&Multiaddr>,
        _error: &DialError,
    ) {
        if let Some(request) = self.dial_requests.remove(&id) {
            // 处理拨号失败
            tracing::error!(" 处理拨号失败: {:?}", request.dst_peer_id);
        }
    }

    fn poll_dial(&mut self, cx: &mut Context<'_>) -> Poll<DialOpts> {
        let Some(request) = ready!(self.request_receiver.poll_next_unpin(cx)) else {
            return Poll::Pending;
        };
        let dial_opts = DialOpts::new(None, Some(request.dst_peer_id));
        tracing::debug!("Relay Dialing ....{:?}", dial_opts);
        // 关联 dial connect_id 和 Request;
        self.dial_requests
            .insert(dial_opts.connection_id(), request);
        Poll::Ready(dial_opts)
    }
}