volans_request/
server.rs

1pub mod handler;
2
3pub use handler::Handler;
4
5use std::{
6    collections::{HashSet, VecDeque},
7    task::{Context, Poll},
8};
9
10use smallvec::SmallVec;
11use volans_core::{PeerId, Multiaddr};
12use volans_swarm::{
13    BehaviorEvent, ConnectionDenied, ConnectionId, ListenerEvent, NetworkBehavior,
14    NetworkIncomingBehavior, THandlerAction, THandlerEvent,
15    error::{ConnectionError, ListenError},
16};
17
18use crate::{Codec, Config, InboundFailure, RequestId, Responder};
19
20pub struct Behavior<TCodec>
21where
22    TCodec: Codec + Clone + Send + 'static,
23{
24    protocols: SmallVec<[TCodec::Protocol; 2]>,
25    codec: TCodec,
26    config: Config,
27    pending_event: VecDeque<Event<TCodec::Request, TCodec::Response>>,
28    pending_response: HashSet<RequestId>,
29}
30
31impl<TCodec> Behavior<TCodec>
32where
33    TCodec: Codec + Clone + Send + 'static,
34{
35    pub fn with_codec<P>(codec: TCodec, protocols: P, config: Config) -> Self
36    where
37        P: IntoIterator<Item = TCodec::Protocol>,
38    {
39        let protocols: SmallVec<[TCodec::Protocol; 2]> = protocols.into_iter().collect();
40
41        Self {
42            codec,
43            config,
44            protocols,
45            pending_event: VecDeque::new(),
46            pending_response: HashSet::new(),
47        }
48    }
49
50    fn remove_pending_response(&mut self, request_id: RequestId) -> bool {
51        self.pending_response.remove(&request_id)
52    }
53}
54
55impl<TCodec> NetworkBehavior for Behavior<TCodec>
56where
57    TCodec: Codec + Clone + Send + 'static,
58{
59    type Event = Event<TCodec::Request, TCodec::Response>;
60    type ConnectionHandler = handler::Handler<TCodec>;
61
62    fn on_connection_handler_event(
63        &mut self,
64        id: ConnectionId,
65        peer_id: PeerId,
66        event: THandlerEvent<Self>,
67    ) {
68        match event {
69            handler::Event::Request {
70                request_id,
71                request,
72                sender,
73            } => {
74                let responder = Responder { tx: sender };
75                self.pending_response.insert(request_id);
76                self.pending_event.push_back(Event::Request {
77                    peer_id,
78                    connection_id: id,
79                    request_id,
80                    request,
81                    responder,
82                });
83            }
84            handler::Event::Discard(request_id) => {
85                let removed = self.remove_pending_response(request_id);
86                debug_assert!(removed, "Response for unknown request: {request_id}");
87                self.pending_event.push_back(Event::Failure {
88                    peer_id,
89                    connection_id: id,
90                    request_id,
91                    cause: InboundFailure::Discard,
92                });
93            }
94            handler::Event::Response(request_id) => {
95                let removed = self.remove_pending_response(request_id);
96                debug_assert!(removed, "Response for unknown request: {request_id}");
97                self.pending_event.push_back(Event::ResponseSent {
98                    peer_id,
99                    connection_id: id,
100                    request_id,
101                });
102            }
103            handler::Event::Error { request_id, error } => {
104                let removed = self.remove_pending_response(request_id);
105                debug_assert!(removed, "Response for unknown request: {request_id}");
106                self.pending_event.push_back(Event::Failure {
107                    peer_id,
108                    connection_id: id,
109                    request_id,
110                    cause: error.into(),
111                });
112            }
113            handler::Event::Timeout(request_id) => {
114                let removed = self.remove_pending_response(request_id);
115                debug_assert!(removed, "Response for unknown request: {request_id}");
116                self.pending_event.push_back(Event::Failure {
117                    peer_id,
118                    connection_id: id,
119                    request_id,
120                    cause: InboundFailure::Timeout,
121                });
122            }
123        }
124    }
125
126    fn poll(
127        &mut self,
128        _cx: &mut Context<'_>,
129    ) -> Poll<BehaviorEvent<Self::Event, THandlerAction<Self>>> {
130        if let Some(event) = self.pending_event.pop_front() {
131            return Poll::Ready(BehaviorEvent::Behavior(event));
132        }
133        Poll::Pending
134    }
135}
136
137#[derive(Debug)]
138pub enum Event<TRequest, TResponse> {
139    Request {
140        peer_id: PeerId,
141        connection_id: ConnectionId,
142        request_id: RequestId,
143        request: TRequest,
144        responder: Responder<TResponse>,
145    },
146    Failure {
147        peer_id: PeerId,
148        connection_id: ConnectionId,
149        request_id: RequestId,
150        cause: InboundFailure,
151    },
152    ResponseSent {
153        peer_id: PeerId,
154        connection_id: ConnectionId,
155        request_id: RequestId,
156    },
157}
158
159impl<TCodec> NetworkIncomingBehavior for Behavior<TCodec>
160where
161    TCodec: Codec + Clone + Send + 'static,
162{
163    /// 处理已建立的连接
164    fn handle_established_connection(
165        &mut self,
166        _id: ConnectionId,
167        _peer_id: PeerId,
168        _local_addr: &Multiaddr,
169        _remote_addr: &Multiaddr,
170    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
171        let handler = handler::Handler::new(
172            self.codec.clone(),
173            self.protocols.clone(),
174            self.config.request_timeout,
175        );
176        Ok(handler)
177    }
178
179    /// 连接处理器事件处理
180    fn on_connection_established(
181        &mut self,
182        _id: ConnectionId,
183        _peer_id: PeerId,
184        _local_addr: &Multiaddr,
185        _remote_addr: &Multiaddr,
186    ) {
187    }
188
189    fn on_connection_closed(
190        &mut self,
191        _id: ConnectionId,
192        _peer_id: PeerId,
193        _local_addr: &Multiaddr,
194        _remote_addr: &Multiaddr,
195        _reason: Option<&ConnectionError>,
196    ) {
197    }
198
199    /// 监听失败事件处理
200    fn on_listen_failure(
201        &mut self,
202        _id: ConnectionId,
203        _peer_id: Option<PeerId>,
204        _local_addr: &Multiaddr,
205        _remote_addr: &Multiaddr,
206        _error: &ListenError,
207    ) {
208    }
209
210    /// 监听器事件处理
211    fn on_listener_event(&mut self, _event: ListenerEvent<'_>) {}
212}