tentacle/
builder.rs

1use std::{io, sync::Arc, time::Duration};
2
3use nohash_hasher::IntMap;
4use tokio_util::codec::LengthDelimitedCodec;
5
6#[cfg(feature = "tls")]
7use crate::service::config::TlsConfig;
8use crate::{
9    protocol_select::SelectFn,
10    secio::KeyProvider,
11    service::{
12        config::{HandshakeType, Meta, ServiceConfig},
13        ProtocolHandle, ProtocolMeta, Service, TcpSocket,
14    },
15    traits::{Codec, ProtocolSpawn, ServiceHandle, ServiceProtocol, SessionProtocol},
16    yamux::Config,
17    ProtocolId,
18};
19
20/// Builder for Service
21pub struct ServiceBuilder<K> {
22    inner: IntMap<ProtocolId, ProtocolMeta>,
23    handshake_type: HandshakeType<K>,
24    forever: bool,
25    config: ServiceConfig,
26}
27
28impl<K> Default for ServiceBuilder<K> {
29    fn default() -> Self {
30        Self {
31            handshake_type: HandshakeType::Noop,
32            inner: IntMap::default(),
33            forever: false,
34            config: ServiceConfig::default(),
35        }
36    }
37}
38
39impl<K> ServiceBuilder<K>
40where
41    K: KeyProvider,
42{
43    /// New a default empty builder
44    pub fn new() -> Self {
45        Default::default()
46    }
47
48    /// Combine the configuration of this builder with service handle to create a Service.
49    pub fn build<H>(self, handle: H) -> Service<H, K>
50    where
51        H: ServiceHandle + Unpin + 'static,
52    {
53        Service::new(
54            self.inner,
55            handle,
56            self.handshake_type,
57            self.forever,
58            self.config,
59        )
60    }
61
62    /// Insert a custom protocol
63    pub fn insert_protocol(mut self, protocol: ProtocolMeta) -> Self {
64        self.inner.insert(protocol.id(), protocol);
65        self
66    }
67
68    /// Handshake encryption layer protocol selection
69    ///
70    /// If you do not need encrypted communication, you do not need to call this method
71    pub fn handshake_type(mut self, handshake_type: HandshakeType<K>) -> Self {
72        self.handshake_type = handshake_type;
73        self
74    }
75
76    /// When the service has no tasks, it will be turned off by default.
77    /// If you do not want to close service, set it to true.
78    pub fn forever(mut self, forever: bool) -> Self {
79        self.forever = forever;
80        self
81    }
82
83    /// Timeout for handshake and connect
84    ///
85    /// Default 10 second
86    pub fn timeout(mut self, timeout: Duration) -> Self {
87        self.config.timeout = timeout;
88        self
89    }
90
91    /// Yamux config for service
92    ///
93    /// Panic when max_frame_length < yamux_max_window_size
94    pub fn yamux_config(mut self, config: Config) -> Self {
95        assert!(self.config.max_frame_length as u32 >= config.max_stream_window_size);
96        self.config.session_config.yamux_config = config;
97        self
98    }
99
100    /// Secio max frame length
101    ///
102    /// Panic when max_frame_length < yamux_max_window_size
103    pub fn max_frame_length(mut self, size: usize) -> Self {
104        assert!(
105            size as u32
106                >= self
107                    .config
108                    .session_config
109                    .yamux_config
110                    .max_stream_window_size
111        );
112        self.config.max_frame_length = size;
113        self
114    }
115
116    /// Tentacle use lots of bound channel, default channel size is 128
117    pub fn set_channel_size(mut self, size: usize) -> Self {
118        self.config.session_config.channel_size = size;
119        self
120    }
121
122    /// Set send buffer size, default is 24Mb
123    pub fn set_send_buffer_size(mut self, size: usize) -> Self {
124        self.config.session_config.send_buffer_size = size;
125        self
126    }
127
128    /// Set receive buffer size, default is 24Mb
129    pub fn set_recv_buffer_size(mut self, size: usize) -> Self {
130        self.config.session_config.recv_buffer_size = size;
131        self
132    }
133
134    /// If session is close by remote, did you want to keep unreceived message as more as possible
135    /// default is false
136    pub fn keep_buffer(mut self, keep: bool) -> Self {
137        self.config.keep_buffer = keep;
138        self
139    }
140
141    /// Whether to allow tentative registration upnp, default is disable(false)
142    ///
143    /// upnp: https://en.wikipedia.org/wiki/Universal_Plug_and_Play
144    ///
145    /// Upnp is a simple solution to nat penetration, which requires routing support for registration mapping.
146    ///
147    /// The function provided here is that if the external ip of the query route is a public network,
148    /// then an attempt is made to register the local listener port into the mapping so that it can
149    /// receive the access request of the external network, and if the external ip of the route is not the public network,
150    /// Then do nothing
151    #[cfg(all(not(target_family = "wasm"), feature = "upnp"))]
152    #[cfg_attr(docsrs, doc(cfg(feature = "upnp")))]
153    pub fn upnp(mut self, enable: bool) -> Self {
154        self.config.upnp = enable;
155        self
156    }
157
158    /// The limit of max open connection(file descriptors)
159    /// If not limited, service will try to serve as many connections as possible until it exhausts system resources(os error),
160    /// and then close the listener, no longer accepting new connection requests, and the established connections remain working
161    ///
162    /// Default is 65535
163    pub fn max_connection_number(mut self, number: usize) -> Self {
164        self.config.max_connection_number = number;
165        self
166    }
167
168    /// Users can make their own custom configuration for all tcp socket at the bottom of Tentacle according to their own needs,
169    /// for example, use reuse port to try to build nat penetration
170    ///
171    /// In this way, any actively connected outbound connection is potentially connectable. Through this setting,
172    /// the device after NAT can have the opportunity to be connected to the public network.
173    ///
174    /// TCP Hole Punching: http://bford.info/pub/net/p2pnat/
175    /// STUN: https://tools.ietf.org/html/rfc5389
176    ///
177    /// for example, set all tcp bind to `127.0.0.1:1080`, set keepalive:
178    ///
179    /// ```ignore
180    ///  use socket2;
181    ///  use tentacle::{service::TcpSocket, builder::ServiceBuilder};
182    ///  #[cfg(unix)]
183    ///  use std::os::unix::io::{FromRawFd, IntoRawFd};
184    ///  #[cfg(windows)]
185    ///  use std::os::windows::io::{FromRawSocket, IntoRawSocket};
186    ///  use std::net::SocketAddr;
187    ///
188    ///  let mut server = ServiceBuilder::new();
189    ///  server.tcp_config(|socket: TcpSocket| {
190    ///      let socket = unsafe {
191    ///         #[cfg(unix)]
192    ///         let socket = socket2::Socket::from_raw_fd(socket.into_raw_fd());
193    ///         #[cfg(windows)]
194    ///         let socket = socket2::Socket::from_raw_socket(socket.into_raw_socket());
195    ///         socket
196    ///      };
197    ///      #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
198    ///      socket.set_reuse_port(true)?;
199    ///
200    ///      socket.set_reuse_address(true)?;
201    ///      socket.bind(&"127.0.0.1:1080".parse::<SocketAddr>().unwrap().into())?;
202    ///      socket.set_keepalive(true)?;
203    ///      let socket = unsafe {
204    ///         #[cfg(unix)]
205    ///         let socket = TcpSocket::from_raw_fd(socket.into_raw_fd());
206    ///         #[cfg(windows)]
207    ///         let socket = TcpSocket::from_raw_socket(socket.into_raw_socket());
208    ///         socket
209    ///      };
210    ///      Ok(socket)
211    /// });
212    /// ```
213    ///
214    /// ## Note
215    ///
216    /// User use `listen(2)` or `connect(2)` on this closure will cause abnormal behavior
217    #[cfg(not(target_family = "wasm"))]
218    pub fn tcp_config<F>(mut self, f: F) -> Self
219    where
220        F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
221    {
222        self.config.tcp_config.tcp = Arc::new(f);
223        self
224    }
225
226    /// The same as tcp config, but use on ws transport
227    #[cfg(feature = "ws")]
228    #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
229    pub fn tcp_config_on_ws<F>(mut self, f: F) -> Self
230    where
231        F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
232    {
233        self.config.tcp_config.ws = Arc::new(f);
234        self
235    }
236
237    /// Clear all protocols
238    pub fn clear(&mut self) {
239        self.inner.clear();
240    }
241
242    /// set rustls ServerConfig, default is NoClientAuth
243    #[cfg(feature = "tls")]
244    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
245    pub fn tls_config(mut self, config: TlsConfig) -> Self {
246        self.config.tls_config = Some(config);
247        self
248    }
249
250    /// The same as tcp config, but use on tls transport
251    #[cfg(feature = "tls")]
252    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
253    pub fn tcp_config_on_tls<F>(mut self, f: F) -> Self
254    where
255        F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
256    {
257        self.config.tcp_config.tls = Arc::new(f);
258        self
259    }
260}
261
262pub(crate) type NameFn = Box<dyn Fn(ProtocolId) -> String + Send + Sync>;
263pub(crate) type CodecFn = Box<dyn Fn() -> Box<dyn Codec + Send + 'static> + Send + Sync>;
264pub(crate) type SessionHandleFn =
265    Box<dyn FnMut() -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static + Unpin>> + Send>;
266pub(crate) type SelectVersionFn = Box<dyn Fn() -> Option<SelectFn<String>> + Send + Sync + 'static>;
267pub(crate) type BeforeReceiveFn = Box<dyn Fn() -> Option<BeforeReceive> + Send + Sync + 'static>;
268pub(crate) type BeforeReceive =
269    Box<dyn Fn(bytes::BytesMut) -> Result<bytes::Bytes, io::Error> + Send + 'static>;
270
271/// Builder for protocol meta
272pub struct MetaBuilder {
273    id: ProtocolId,
274    name: NameFn,
275    support_versions: Vec<String>,
276    codec: CodecFn,
277    service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static + Unpin>>,
278    session_handle: SessionHandleFn,
279    select_version: SelectVersionFn,
280    before_send: Option<Box<dyn Fn(bytes::Bytes) -> bytes::Bytes + Send + 'static>>,
281    before_receive: BeforeReceiveFn,
282    spawn: Option<Box<dyn ProtocolSpawn + Send + Sync + 'static>>,
283}
284
285impl MetaBuilder {
286    /// New a default builder
287    pub fn new() -> Self {
288        Default::default()
289    }
290
291    /// Define protocol id
292    ///
293    /// It is just an internal index of the system that
294    /// identifies the open/close and message transfer for the specified protocol.
295    pub fn id(mut self, id: ProtocolId) -> Self {
296        self.id = id;
297        self
298    }
299
300    /// Define protocol name, default is "/p2p/protocol_id"
301    ///
302    /// Used to interact with the remote service to determine whether the protocol is supported.
303    ///
304    /// If not found, the protocol connection(not session just sub stream) will be closed,
305    /// and return a `ProtocolSelectError` event.
306    pub fn name<T: Fn(ProtocolId) -> String + 'static + Send + Sync>(mut self, name: T) -> Self {
307        self.name = Box::new(name);
308        self
309    }
310
311    /// Define protocol support versions, default is `vec!["0.0.1".to_owned()]`
312    ///
313    /// Used to interact with the remote service to confirm that both parties
314    /// open the same version of the protocol.
315    ///
316    /// If not found, the protocol connection(not session just sub stream) will be closed,
317    /// and return a `ProtocolSelectError` event.
318    pub fn support_versions(mut self, versions: Vec<String>) -> Self {
319        self.support_versions = versions;
320        self
321    }
322
323    /// Define protocol codec, default is LengthDelimitedCodec
324    pub fn codec<T: Fn() -> Box<dyn Codec + Send + 'static> + 'static + Send + Sync>(
325        mut self,
326        codec: T,
327    ) -> Self {
328        self.codec = Box::new(codec);
329        self
330    }
331
332    /// Define protocol service handle, default is neither
333    pub fn service_handle<
334        T: FnOnce() -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static + Unpin>>,
335    >(
336        mut self,
337        service_handle: T,
338    ) -> Self {
339        self.service_handle = service_handle();
340        self
341    }
342
343    /// Define protocol session handle, default is neither
344    pub fn session_handle<
345        T: FnMut() -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static + Unpin>>
346            + Send
347            + 'static,
348    >(
349        mut self,
350        session_handle: T,
351    ) -> Self {
352        self.session_handle = Box::new(session_handle);
353        self
354    }
355
356    /// Define the spawn process of the protocol read part
357    ///
358    /// Mutually exclusive with protocol handle
359    #[cfg(feature = "unstable")]
360    #[cfg_attr(docsrs, doc(cfg(feature = "unstable")))]
361    pub fn protocol_spawn<T: ProtocolSpawn + Send + Sync + 'static>(mut self, spawn: T) -> Self {
362        self.spawn = Some(Box::new(spawn));
363        self
364    }
365
366    /// Protocol version selection rule, default is [select_version](../protocol_select/fn.select_version.html)
367    pub fn select_version<T>(mut self, f: T) -> Self
368    where
369        T: Fn() -> Option<SelectFn<String>> + Send + Sync + 'static,
370    {
371        self.select_version = Box::new(f);
372        self
373    }
374
375    /// Unified processing of messages before they are sent
376    pub fn before_send<T>(mut self, f: T) -> Self
377    where
378        T: Fn(bytes::Bytes) -> bytes::Bytes + 'static + Send,
379    {
380        self.before_send = Some(Box::new(f));
381        self
382    }
383
384    /// Unified processing of messages before user received
385    pub fn before_receive<T>(mut self, f: T) -> Self
386    where
387        T: Fn() -> Option<BeforeReceive> + Send + Sync + 'static,
388    {
389        self.before_receive = Box::new(f);
390        self
391    }
392
393    /// Combine the configuration of this builder to create a ProtocolMeta
394    pub fn build(mut self) -> ProtocolMeta {
395        if self.spawn.is_some() {
396            assert!(self.service_handle.is_none());
397            assert!((self.session_handle)().is_none());
398        }
399        let meta = Meta {
400            id: self.id,
401            name: self.name,
402            support_versions: self.support_versions,
403            codec: self.codec,
404            select_version: self.select_version,
405            before_receive: self.before_receive,
406            spawn: self.spawn,
407        };
408        ProtocolMeta {
409            inner: Arc::new(meta),
410            service_handle: self.service_handle,
411            session_handle: self.session_handle,
412            before_send: self.before_send,
413        }
414    }
415}
416
417impl Default for MetaBuilder {
418    fn default() -> Self {
419        MetaBuilder {
420            id: ProtocolId::new(0),
421            name: Box::new(|id| format!("/p2p/{}", id.value())),
422            support_versions: vec!["0.0.1".to_owned()],
423            codec: Box::new(|| Box::new(LengthDelimitedCodec::new())),
424            service_handle: ProtocolHandle::None,
425            session_handle: Box::new(|| ProtocolHandle::None),
426            select_version: Box::new(|| None),
427            before_send: None,
428            before_receive: Box::new(|| None),
429            spawn: None,
430        }
431    }
432}