1pub mod handler;
2
3pub use handler::Handler;
4
5use std::{
6 collections::{HashSet, VecDeque},
7 task::{Context, Poll},
8};
9
10use smallvec::SmallVec;
11use volans_core::{PeerId, Multiaddr};
12use volans_swarm::{
13 BehaviorEvent, ConnectionDenied, ConnectionId, ListenerEvent, NetworkBehavior,
14 NetworkIncomingBehavior, THandlerAction, THandlerEvent,
15 error::{ConnectionError, ListenError},
16};
17
18use crate::{Codec, Config, InboundFailure, RequestId, Responder};
19
20pub struct Behavior<TCodec>
21where
22 TCodec: Codec + Clone + Send + 'static,
23{
24 protocols: SmallVec<[TCodec::Protocol; 2]>,
25 codec: TCodec,
26 config: Config,
27 pending_event: VecDeque<Event<TCodec::Request, TCodec::Response>>,
28 pending_response: HashSet<RequestId>,
29}
30
31impl<TCodec> Behavior<TCodec>
32where
33 TCodec: Codec + Clone + Send + 'static,
34{
35 pub fn with_codec<P>(codec: TCodec, protocols: P, config: Config) -> Self
36 where
37 P: IntoIterator<Item = TCodec::Protocol>,
38 {
39 let protocols: SmallVec<[TCodec::Protocol; 2]> = protocols.into_iter().collect();
40
41 Self {
42 codec,
43 config,
44 protocols,
45 pending_event: VecDeque::new(),
46 pending_response: HashSet::new(),
47 }
48 }
49
50 fn remove_pending_response(&mut self, request_id: RequestId) -> bool {
51 self.pending_response.remove(&request_id)
52 }
53}
54
55impl<TCodec> NetworkBehavior for Behavior<TCodec>
56where
57 TCodec: Codec + Clone + Send + 'static,
58{
59 type Event = Event<TCodec::Request, TCodec::Response>;
60 type ConnectionHandler = handler::Handler<TCodec>;
61
62 fn on_connection_handler_event(
63 &mut self,
64 id: ConnectionId,
65 peer_id: PeerId,
66 event: THandlerEvent<Self>,
67 ) {
68 match event {
69 handler::Event::Request {
70 request_id,
71 request,
72 sender,
73 } => {
74 let responder = Responder { tx: sender };
75 self.pending_response.insert(request_id);
76 self.pending_event.push_back(Event::Request {
77 peer_id,
78 connection_id: id,
79 request_id,
80 request,
81 responder,
82 });
83 }
84 handler::Event::Discard(request_id) => {
85 let removed = self.remove_pending_response(request_id);
86 debug_assert!(removed, "Response for unknown request: {request_id}");
87 self.pending_event.push_back(Event::Failure {
88 peer_id,
89 connection_id: id,
90 request_id,
91 cause: InboundFailure::Discard,
92 });
93 }
94 handler::Event::Response(request_id) => {
95 let removed = self.remove_pending_response(request_id);
96 debug_assert!(removed, "Response for unknown request: {request_id}");
97 self.pending_event.push_back(Event::ResponseSent {
98 peer_id,
99 connection_id: id,
100 request_id,
101 });
102 }
103 handler::Event::Error { request_id, error } => {
104 let removed = self.remove_pending_response(request_id);
105 debug_assert!(removed, "Response for unknown request: {request_id}");
106 self.pending_event.push_back(Event::Failure {
107 peer_id,
108 connection_id: id,
109 request_id,
110 cause: error.into(),
111 });
112 }
113 handler::Event::Timeout(request_id) => {
114 let removed = self.remove_pending_response(request_id);
115 debug_assert!(removed, "Response for unknown request: {request_id}");
116 self.pending_event.push_back(Event::Failure {
117 peer_id,
118 connection_id: id,
119 request_id,
120 cause: InboundFailure::Timeout,
121 });
122 }
123 }
124 }
125
126 fn poll(
127 &mut self,
128 _cx: &mut Context<'_>,
129 ) -> Poll<BehaviorEvent<Self::Event, THandlerAction<Self>>> {
130 if let Some(event) = self.pending_event.pop_front() {
131 return Poll::Ready(BehaviorEvent::Behavior(event));
132 }
133 Poll::Pending
134 }
135}
136
137#[derive(Debug)]
138pub enum Event<TRequest, TResponse> {
139 Request {
140 peer_id: PeerId,
141 connection_id: ConnectionId,
142 request_id: RequestId,
143 request: TRequest,
144 responder: Responder<TResponse>,
145 },
146 Failure {
147 peer_id: PeerId,
148 connection_id: ConnectionId,
149 request_id: RequestId,
150 cause: InboundFailure,
151 },
152 ResponseSent {
153 peer_id: PeerId,
154 connection_id: ConnectionId,
155 request_id: RequestId,
156 },
157}
158
159impl<TCodec> NetworkIncomingBehavior for Behavior<TCodec>
160where
161 TCodec: Codec + Clone + Send + 'static,
162{
163 fn handle_established_connection(
165 &mut self,
166 _id: ConnectionId,
167 _peer_id: PeerId,
168 _local_addr: &Multiaddr,
169 _remote_addr: &Multiaddr,
170 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
171 let handler = handler::Handler::new(
172 self.codec.clone(),
173 self.protocols.clone(),
174 self.config.request_timeout,
175 );
176 Ok(handler)
177 }
178
179 fn on_connection_established(
181 &mut self,
182 _id: ConnectionId,
183 _peer_id: PeerId,
184 _local_addr: &Multiaddr,
185 _remote_addr: &Multiaddr,
186 ) {
187 }
188
189 fn on_connection_closed(
190 &mut self,
191 _id: ConnectionId,
192 _peer_id: PeerId,
193 _local_addr: &Multiaddr,
194 _remote_addr: &Multiaddr,
195 _reason: Option<&ConnectionError>,
196 ) {
197 }
198
199 fn on_listen_failure(
201 &mut self,
202 _id: ConnectionId,
203 _peer_id: Option<PeerId>,
204 _local_addr: &Multiaddr,
205 _remote_addr: &Multiaddr,
206 _error: &ListenError,
207 ) {
208 }
209
210 fn on_listener_event(&mut self, _event: ListenerEvent<'_>) {}
212}