1use std::{io, sync::Arc, time::Duration};
2
3use nohash_hasher::IntMap;
4use tokio_util::codec::LengthDelimitedCodec;
5
6#[cfg(feature = "tls")]
7use crate::service::config::TlsConfig;
8use crate::{
9 protocol_select::SelectFn,
10 secio::KeyProvider,
11 service::{
12 config::{HandshakeType, Meta, ServiceConfig},
13 ProtocolHandle, ProtocolMeta, Service, TcpSocket,
14 },
15 traits::{Codec, ProtocolSpawn, ServiceHandle, ServiceProtocol, SessionProtocol},
16 yamux::Config,
17 ProtocolId,
18};
19
20pub struct ServiceBuilder<K> {
22 inner: IntMap<ProtocolId, ProtocolMeta>,
23 handshake_type: HandshakeType<K>,
24 forever: bool,
25 config: ServiceConfig,
26}
27
28impl<K> Default for ServiceBuilder<K> {
29 fn default() -> Self {
30 Self {
31 handshake_type: HandshakeType::Noop,
32 inner: IntMap::default(),
33 forever: false,
34 config: ServiceConfig::default(),
35 }
36 }
37}
38
39impl<K> ServiceBuilder<K>
40where
41 K: KeyProvider,
42{
43 pub fn new() -> Self {
45 Default::default()
46 }
47
48 pub fn build<H>(self, handle: H) -> Service<H, K>
50 where
51 H: ServiceHandle + Unpin + 'static,
52 {
53 Service::new(
54 self.inner,
55 handle,
56 self.handshake_type,
57 self.forever,
58 self.config,
59 )
60 }
61
62 pub fn insert_protocol(mut self, protocol: ProtocolMeta) -> Self {
64 self.inner.insert(protocol.id(), protocol);
65 self
66 }
67
68 pub fn handshake_type(mut self, handshake_type: HandshakeType<K>) -> Self {
72 self.handshake_type = handshake_type;
73 self
74 }
75
76 pub fn forever(mut self, forever: bool) -> Self {
79 self.forever = forever;
80 self
81 }
82
83 pub fn timeout(mut self, timeout: Duration) -> Self {
87 self.config.timeout = timeout;
88 self
89 }
90
91 pub fn yamux_config(mut self, config: Config) -> Self {
95 assert!(self.config.max_frame_length as u32 >= config.max_stream_window_size);
96 self.config.session_config.yamux_config = config;
97 self
98 }
99
100 pub fn max_frame_length(mut self, size: usize) -> Self {
104 assert!(
105 size as u32
106 >= self
107 .config
108 .session_config
109 .yamux_config
110 .max_stream_window_size
111 );
112 self.config.max_frame_length = size;
113 self
114 }
115
116 pub fn set_channel_size(mut self, size: usize) -> Self {
118 self.config.session_config.channel_size = size;
119 self
120 }
121
122 pub fn set_send_buffer_size(mut self, size: usize) -> Self {
124 self.config.session_config.send_buffer_size = size;
125 self
126 }
127
128 pub fn set_recv_buffer_size(mut self, size: usize) -> Self {
130 self.config.session_config.recv_buffer_size = size;
131 self
132 }
133
134 pub fn keep_buffer(mut self, keep: bool) -> Self {
137 self.config.keep_buffer = keep;
138 self
139 }
140
141 #[cfg(all(not(target_family = "wasm"), feature = "upnp"))]
152 #[cfg_attr(docsrs, doc(cfg(feature = "upnp")))]
153 pub fn upnp(mut self, enable: bool) -> Self {
154 self.config.upnp = enable;
155 self
156 }
157
158 pub fn max_connection_number(mut self, number: usize) -> Self {
164 self.config.max_connection_number = number;
165 self
166 }
167
168 #[cfg(not(target_family = "wasm"))]
218 pub fn tcp_config<F>(mut self, f: F) -> Self
219 where
220 F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
221 {
222 self.config.tcp_config.tcp = Arc::new(f);
223 self
224 }
225
226 #[cfg(feature = "ws")]
228 #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
229 pub fn tcp_config_on_ws<F>(mut self, f: F) -> Self
230 where
231 F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
232 {
233 self.config.tcp_config.ws = Arc::new(f);
234 self
235 }
236
237 pub fn clear(&mut self) {
239 self.inner.clear();
240 }
241
242 #[cfg(feature = "tls")]
244 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
245 pub fn tls_config(mut self, config: TlsConfig) -> Self {
246 self.config.tls_config = Some(config);
247 self
248 }
249
250 #[cfg(feature = "tls")]
252 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
253 pub fn tcp_config_on_tls<F>(mut self, f: F) -> Self
254 where
255 F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
256 {
257 self.config.tcp_config.tls = Arc::new(f);
258 self
259 }
260}
261
262pub(crate) type NameFn = Box<dyn Fn(ProtocolId) -> String + Send + Sync>;
263pub(crate) type CodecFn = Box<dyn Fn() -> Box<dyn Codec + Send + 'static> + Send + Sync>;
264pub(crate) type SessionHandleFn =
265 Box<dyn FnMut() -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static + Unpin>> + Send>;
266pub(crate) type SelectVersionFn = Box<dyn Fn() -> Option<SelectFn<String>> + Send + Sync + 'static>;
267pub(crate) type BeforeReceiveFn = Box<dyn Fn() -> Option<BeforeReceive> + Send + Sync + 'static>;
268pub(crate) type BeforeReceive =
269 Box<dyn Fn(bytes::BytesMut) -> Result<bytes::Bytes, io::Error> + Send + 'static>;
270
271pub struct MetaBuilder {
273 id: ProtocolId,
274 name: NameFn,
275 support_versions: Vec<String>,
276 codec: CodecFn,
277 service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static + Unpin>>,
278 session_handle: SessionHandleFn,
279 select_version: SelectVersionFn,
280 before_send: Option<Box<dyn Fn(bytes::Bytes) -> bytes::Bytes + Send + 'static>>,
281 before_receive: BeforeReceiveFn,
282 spawn: Option<Box<dyn ProtocolSpawn + Send + Sync + 'static>>,
283}
284
285impl MetaBuilder {
286 pub fn new() -> Self {
288 Default::default()
289 }
290
291 pub fn id(mut self, id: ProtocolId) -> Self {
296 self.id = id;
297 self
298 }
299
300 pub fn name<T: Fn(ProtocolId) -> String + 'static + Send + Sync>(mut self, name: T) -> Self {
307 self.name = Box::new(name);
308 self
309 }
310
311 pub fn support_versions(mut self, versions: Vec<String>) -> Self {
319 self.support_versions = versions;
320 self
321 }
322
323 pub fn codec<T: Fn() -> Box<dyn Codec + Send + 'static> + 'static + Send + Sync>(
325 mut self,
326 codec: T,
327 ) -> Self {
328 self.codec = Box::new(codec);
329 self
330 }
331
332 pub fn service_handle<
334 T: FnOnce() -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static + Unpin>>,
335 >(
336 mut self,
337 service_handle: T,
338 ) -> Self {
339 self.service_handle = service_handle();
340 self
341 }
342
343 pub fn session_handle<
345 T: FnMut() -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static + Unpin>>
346 + Send
347 + 'static,
348 >(
349 mut self,
350 session_handle: T,
351 ) -> Self {
352 self.session_handle = Box::new(session_handle);
353 self
354 }
355
356 #[cfg(feature = "unstable")]
360 #[cfg_attr(docsrs, doc(cfg(feature = "unstable")))]
361 pub fn protocol_spawn<T: ProtocolSpawn + Send + Sync + 'static>(mut self, spawn: T) -> Self {
362 self.spawn = Some(Box::new(spawn));
363 self
364 }
365
366 pub fn select_version<T>(mut self, f: T) -> Self
368 where
369 T: Fn() -> Option<SelectFn<String>> + Send + Sync + 'static,
370 {
371 self.select_version = Box::new(f);
372 self
373 }
374
375 pub fn before_send<T>(mut self, f: T) -> Self
377 where
378 T: Fn(bytes::Bytes) -> bytes::Bytes + 'static + Send,
379 {
380 self.before_send = Some(Box::new(f));
381 self
382 }
383
384 pub fn before_receive<T>(mut self, f: T) -> Self
386 where
387 T: Fn() -> Option<BeforeReceive> + Send + Sync + 'static,
388 {
389 self.before_receive = Box::new(f);
390 self
391 }
392
393 pub fn build(mut self) -> ProtocolMeta {
395 if self.spawn.is_some() {
396 assert!(self.service_handle.is_none());
397 assert!((self.session_handle)().is_none());
398 }
399 let meta = Meta {
400 id: self.id,
401 name: self.name,
402 support_versions: self.support_versions,
403 codec: self.codec,
404 select_version: self.select_version,
405 before_receive: self.before_receive,
406 spawn: self.spawn,
407 };
408 ProtocolMeta {
409 inner: Arc::new(meta),
410 service_handle: self.service_handle,
411 session_handle: self.session_handle,
412 before_send: self.before_send,
413 }
414 }
415}
416
417impl Default for MetaBuilder {
418 fn default() -> Self {
419 MetaBuilder {
420 id: ProtocolId::new(0),
421 name: Box::new(|id| format!("/p2p/{}", id.value())),
422 support_versions: vec!["0.0.1".to_owned()],
423 codec: Box::new(|| Box::new(LengthDelimitedCodec::new())),
424 service_handle: ProtocolHandle::None,
425 session_handle: Box::new(|| ProtocolHandle::None),
426 select_version: Box::new(|| None),
427 before_send: None,
428 before_receive: Box::new(|| None),
429 spawn: None,
430 }
431 }
432}