ckb_network/protocols/
mod.rs

1pub(crate) mod disconnect_message;
2pub(crate) mod discovery;
3pub(crate) mod feeler;
4pub(crate) mod identify;
5pub(crate) mod ping;
6pub(crate) mod support_protocols;
7
8#[cfg(not(target_family = "wasm"))]
9pub(crate) mod hole_punching;
10
11#[cfg(test)]
12mod tests;
13
14use ckb_logger::{debug, trace};
15use futures::{Future, FutureExt};
16use p2p::{
17    ProtocolId, SessionId, async_trait,
18    builder::MetaBuilder,
19    bytes::Bytes,
20    context::{ProtocolContext, ProtocolContextMutRef},
21    service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, ServiceControl, TargetSession},
22    traits::ServiceProtocol,
23};
24use std::{
25    pin::Pin,
26    sync::Arc,
27    task::{Context, Poll},
28    time::Duration,
29};
30use tokio_util::codec::length_delimited;
31
32/// Alias session id
33pub type PeerIndex = SessionId;
34/// Boxed future task
35pub type BoxedFutureTask = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
36
37use crate::{
38    Behaviour, Error, NetworkState, Peer, ProtocolVersion, SupportProtocols,
39    compress::{compress, decompress},
40    network::{async_disconnect_with_message, disconnect_with_message},
41};
42
43/// Abstract protocol context
44#[async_trait]
45pub trait CKBProtocolContext: Send {
46    /// Set notify to tentacle
47    // Interact with underlying p2p service
48    async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error>;
49    /// Remove notify
50    async fn remove_notify(&self, token: u64) -> Result<(), Error>;
51    /// Send message through quick queue
52    async fn async_quick_send_message(
53        &self,
54        proto_id: ProtocolId,
55        peer_index: PeerIndex,
56        data: Bytes,
57    ) -> Result<(), Error>;
58    /// Send message through quick queue
59    async fn async_quick_send_message_to(
60        &self,
61        peer_index: PeerIndex,
62        data: Bytes,
63    ) -> Result<(), Error>;
64    /// Filter broadcast message through quick queue
65    async fn async_quick_filter_broadcast(
66        &self,
67        target: TargetSession,
68        data: Bytes,
69    ) -> Result<(), Error>;
70    /// spawn a future task, if `blocking` is true we use tokio_threadpool::blocking to handle the task.
71    async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
72    /// Send message
73    async fn async_send_message(
74        &self,
75        proto_id: ProtocolId,
76        peer_index: PeerIndex,
77        data: Bytes,
78    ) -> Result<(), Error>;
79    /// Send message
80    async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
81    /// Filter broadcast message
82    async fn async_filter_broadcast(&self, target: TargetSession, data: Bytes)
83    -> Result<(), Error>;
84    /// Disconnect session
85    async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
86    /// Send message through quick queue
87    fn quick_send_message(
88        &self,
89        proto_id: ProtocolId,
90        peer_index: PeerIndex,
91        data: Bytes,
92    ) -> Result<(), Error>;
93    /// Send message through quick queue
94    fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
95    /// Filter broadcast message through quick queue
96    fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
97    /// spawn a future task, if `blocking` is true we use tokio_threadpool::blocking to handle the task.
98    fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
99    /// Send message
100    fn send_message(
101        &self,
102        proto_id: ProtocolId,
103        peer_index: PeerIndex,
104        data: Bytes,
105    ) -> Result<(), Error>;
106    /// Send message
107    fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
108    /// Filter broadcast message
109    fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
110    /// Disconnect session
111    fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
112    // Interact with NetworkState
113    /// Get peer info
114    fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer>;
115    /// Modify peer info
116    fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>);
117    /// Get all session id
118    fn connected_peers(&self) -> Vec<PeerIndex>;
119    /// Get all full_relay (exclude block-relay-only) id
120    fn full_relay_connected_peers(&self) -> Vec<PeerIndex>;
121    /// Report peer behavior
122    fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour);
123    /// Ban peer
124    fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String);
125    /// current protocol id
126    fn protocol_id(&self) -> ProtocolId;
127    /// Raw tentacle controller
128    fn p2p_control(&self) -> Option<&ServiceControl> {
129        None
130    }
131}
132
133/// type alias of dyn ckb protocol context
134pub type BoxedCKBProtocolContext = Arc<dyn CKBProtocolContext + Sync>;
135
136/// Abstract protocol handle base on tentacle service handle
137#[async_trait]
138pub trait CKBProtocolHandler: Sync + Send {
139    /// Init action on service run
140    async fn init(&mut self, nc: BoxedCKBProtocolContext);
141    /// Called when opening protocol
142    async fn connected(
143        &mut self,
144        _nc: BoxedCKBProtocolContext,
145        _peer_index: PeerIndex,
146        _version: &str,
147    ) {
148    }
149    /// Called when closing protocol
150    async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {}
151    /// Called when the corresponding protocol message is received
152    async fn received(
153        &mut self,
154        _nc: BoxedCKBProtocolContext,
155        _peer_index: PeerIndex,
156        _data: Bytes,
157    ) {
158    }
159    /// Called when the Service receives the notify task
160    async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {}
161    /// Behave like `Stream::poll`
162    async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> {
163        None
164    }
165}
166
167/// Help to build protocol meta
168pub struct CKBProtocol {
169    id: ProtocolId,
170    // for example: b"/ckb/"
171    protocol_name: String,
172    // supported version, used to check protocol version
173    supported_versions: Vec<ProtocolVersion>,
174    max_frame_length: usize,
175    handler: Box<dyn CKBProtocolHandler>,
176    network_state: Arc<NetworkState>,
177}
178
179impl CKBProtocol {
180    /// New with support protocol
181    // a helper constructor to build `CKBProtocol` with `SupportProtocols` enum
182    pub fn new_with_support_protocol(
183        support_protocol: support_protocols::SupportProtocols,
184        handler: Box<dyn CKBProtocolHandler>,
185        network_state: Arc<NetworkState>,
186    ) -> Self {
187        CKBProtocol {
188            id: support_protocol.protocol_id(),
189            max_frame_length: support_protocol.max_frame_length(),
190            protocol_name: support_protocol.name(),
191            supported_versions: support_protocol.support_versions(),
192            network_state,
193            handler,
194        }
195    }
196
197    /// New with all config
198    pub fn new(
199        protocol_name: String,
200        id: ProtocolId,
201        versions: &[ProtocolVersion],
202        max_frame_length: usize,
203        handler: Box<dyn CKBProtocolHandler>,
204        network_state: Arc<NetworkState>,
205    ) -> Self {
206        CKBProtocol {
207            id,
208            max_frame_length,
209            network_state,
210            handler,
211            protocol_name: format!("/ckb/{protocol_name}"),
212            supported_versions: {
213                let mut versions: Vec<_> = versions.to_vec();
214                versions.sort_by(|a, b| b.cmp(a));
215                versions.to_vec()
216            },
217        }
218    }
219
220    /// Protocol id
221    pub fn id(&self) -> ProtocolId {
222        self.id
223    }
224
225    /// Protocol name
226    pub fn protocol_name(&self) -> String {
227        self.protocol_name.clone()
228    }
229
230    /// Whether support this version
231    pub fn match_version(&self, version: ProtocolVersion) -> bool {
232        self.supported_versions.contains(&version)
233    }
234
235    /// Build to tentacle protocol meta
236    pub fn build(self) -> ProtocolMeta {
237        let protocol_name = self.protocol_name();
238        let max_frame_length = self.max_frame_length;
239        let supported_versions = self
240            .supported_versions
241            .iter()
242            .map(ToString::to_string)
243            .collect::<Vec<_>>();
244        MetaBuilder::default()
245            .id(self.id)
246            .name(move |_| protocol_name.clone())
247            .codec(move || {
248                Box::new(
249                    length_delimited::Builder::new()
250                        .max_frame_length(max_frame_length)
251                        .new_codec(),
252                )
253            })
254            .support_versions(supported_versions)
255            .service_handle(move || {
256                ProtocolHandle::Callback(Box::new(CKBHandler {
257                    proto_id: self.id,
258                    network_state: Arc::clone(&self.network_state),
259                    handler: self.handler,
260                }))
261            })
262            .before_send(compress)
263            .before_receive(|| Some(Box::new(decompress)))
264            .build()
265    }
266}
267
268struct CKBHandler {
269    proto_id: ProtocolId,
270    network_state: Arc<NetworkState>,
271    handler: Box<dyn CKBProtocolHandler>,
272}
273
274// Just proxy to inner handler, this struct exists for convenient unit test.
275#[async_trait]
276impl ServiceProtocol for CKBHandler {
277    async fn init(&mut self, context: &mut ProtocolContext) {
278        let nc = DefaultCKBProtocolContext {
279            proto_id: self.proto_id,
280            network_state: Arc::clone(&self.network_state),
281            p2p_control: context.control().to_owned().into(),
282            async_p2p_control: context.control().to_owned(),
283        };
284        self.handler.init(Arc::new(nc)).await;
285    }
286
287    async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
288        self.network_state.with_peer_registry_mut(|reg| {
289            if let Some(peer) = reg.get_peer_mut(context.session.id) {
290                peer.protocols.insert(self.proto_id, version.to_owned());
291            }
292        });
293
294        if !self.network_state.is_active() {
295            return;
296        }
297
298        let nc = DefaultCKBProtocolContext {
299            proto_id: self.proto_id,
300            network_state: Arc::clone(&self.network_state),
301            p2p_control: context.control().to_owned().into(),
302            async_p2p_control: context.control().to_owned(),
303        };
304        let peer_index = context.session.id;
305
306        self.handler
307            .connected(Arc::new(nc), peer_index, version)
308            .await;
309    }
310
311    async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
312        self.network_state.with_peer_registry_mut(|reg| {
313            if let Some(peer) = reg.get_peer_mut(context.session.id) {
314                peer.protocols.remove(&self.proto_id);
315            }
316        });
317
318        if !self.network_state.is_active() {
319            return;
320        }
321
322        let nc = DefaultCKBProtocolContext {
323            proto_id: self.proto_id,
324            network_state: Arc::clone(&self.network_state),
325            p2p_control: context.control().to_owned().into(),
326            async_p2p_control: context.control().to_owned(),
327        };
328        let peer_index = context.session.id;
329        self.handler.disconnected(Arc::new(nc), peer_index).await;
330    }
331
332    async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
333        if !self.network_state.is_active() {
334            return;
335        }
336
337        trace!(
338            "[received message]: {}, {}, length={}",
339            self.proto_id,
340            context.session.id,
341            data.len()
342        );
343        let nc = DefaultCKBProtocolContext {
344            proto_id: self.proto_id,
345            network_state: Arc::clone(&self.network_state),
346            p2p_control: context.control().to_owned().into(),
347            async_p2p_control: context.control().to_owned(),
348        };
349        let peer_index = context.session.id;
350        self.handler.received(Arc::new(nc), peer_index, data).await;
351    }
352
353    async fn notify(&mut self, context: &mut ProtocolContext, token: u64) {
354        if !self.network_state.is_active() {
355            return;
356        }
357        let nc = DefaultCKBProtocolContext {
358            proto_id: self.proto_id,
359            network_state: Arc::clone(&self.network_state),
360            p2p_control: context.control().to_owned().into(),
361            async_p2p_control: context.control().to_owned(),
362        };
363        self.handler.notify(Arc::new(nc), token).await;
364    }
365
366    async fn poll(&mut self, context: &mut ProtocolContext) -> Option<()> {
367        let nc = DefaultCKBProtocolContext {
368            proto_id: self.proto_id,
369            network_state: Arc::clone(&self.network_state),
370            p2p_control: context.control().to_owned().into(),
371            async_p2p_control: context.control().to_owned(),
372        };
373        self.handler.poll(Arc::new(nc)).await
374    }
375}
376
377struct DefaultCKBProtocolContext {
378    proto_id: ProtocolId,
379    network_state: Arc<NetworkState>,
380    p2p_control: ServiceControl,
381    async_p2p_control: ServiceAsyncControl,
382}
383
384#[async_trait]
385impl CKBProtocolContext for DefaultCKBProtocolContext {
386    async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error> {
387        self.async_p2p_control
388            .set_service_notify(self.proto_id, interval, token)
389            .await?;
390        Ok(())
391    }
392    async fn remove_notify(&self, token: u64) -> Result<(), Error> {
393        self.async_p2p_control
394            .remove_service_notify(self.proto_id, token)
395            .await?;
396        Ok(())
397    }
398    async fn async_quick_send_message(
399        &self,
400        proto_id: ProtocolId,
401        peer_index: PeerIndex,
402        data: Bytes,
403    ) -> Result<(), Error> {
404        trace!(
405            "[send message]: {}, to={}, length={}",
406            proto_id,
407            peer_index,
408            data.len()
409        );
410        self.async_p2p_control
411            .quick_send_message_to(peer_index, proto_id, data)
412            .await?;
413        Ok(())
414    }
415    async fn async_quick_send_message_to(
416        &self,
417        peer_index: PeerIndex,
418        data: Bytes,
419    ) -> Result<(), Error> {
420        trace!(
421            "[send message to]: {}, to={}, length={}",
422            self.proto_id,
423            peer_index,
424            data.len()
425        );
426        self.async_p2p_control
427            .quick_send_message_to(peer_index, self.proto_id, data)
428            .await?;
429        Ok(())
430    }
431    async fn async_quick_filter_broadcast(
432        &self,
433        target: TargetSession,
434        data: Bytes,
435    ) -> Result<(), Error> {
436        self.async_p2p_control
437            .quick_filter_broadcast(target, self.proto_id, data)
438            .await?;
439        Ok(())
440    }
441    async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
442        let task = if blocking {
443            Box::pin(BlockingFutureTask::new(task))
444        } else {
445            task
446        };
447        self.async_p2p_control.future_task(task).await?;
448        Ok(())
449    }
450    async fn async_send_message(
451        &self,
452        proto_id: ProtocolId,
453        peer_index: PeerIndex,
454        data: Bytes,
455    ) -> Result<(), Error> {
456        trace!(
457            "[send message]: {}, to={}, length={}",
458            proto_id,
459            peer_index,
460            data.len()
461        );
462        self.async_p2p_control
463            .send_message_to(peer_index, proto_id, data)
464            .await?;
465        Ok(())
466    }
467    async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
468        trace!(
469            "[send message to]: {}, to={}, length={}",
470            self.proto_id,
471            peer_index,
472            data.len()
473        );
474        self.async_p2p_control
475            .send_message_to(peer_index, self.proto_id, data)
476            .await?;
477        Ok(())
478    }
479    async fn async_filter_broadcast(
480        &self,
481        target: TargetSession,
482        data: Bytes,
483    ) -> Result<(), Error> {
484        self.async_p2p_control
485            .filter_broadcast(target, self.proto_id, data)
486            .await?;
487        Ok(())
488    }
489    async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
490        debug!("Disconnect peer: {}, message: {}", peer_index, message);
491        async_disconnect_with_message(&self.async_p2p_control, peer_index, message).await?;
492        Ok(())
493    }
494    fn quick_send_message(
495        &self,
496        proto_id: ProtocolId,
497        peer_index: PeerIndex,
498        data: Bytes,
499    ) -> Result<(), Error> {
500        trace!(
501            "[send message]: {}, to={}, length={}",
502            proto_id,
503            peer_index,
504            data.len()
505        );
506        self.p2p_control
507            .quick_send_message_to(peer_index, proto_id, data)?;
508        Ok(())
509    }
510    fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
511        trace!(
512            "[send message to]: {}, to={}, length={}",
513            self.proto_id,
514            peer_index,
515            data.len()
516        );
517        self.p2p_control
518            .quick_send_message_to(peer_index, self.proto_id, data)?;
519        Ok(())
520    }
521    fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
522        self.p2p_control
523            .quick_filter_broadcast(target, self.proto_id, data)?;
524        Ok(())
525    }
526    fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
527        let task = if blocking {
528            Box::pin(BlockingFutureTask::new(task))
529        } else {
530            task
531        };
532        self.p2p_control.future_task(task)?;
533        Ok(())
534    }
535    fn send_message(
536        &self,
537        proto_id: ProtocolId,
538        peer_index: PeerIndex,
539        data: Bytes,
540    ) -> Result<(), Error> {
541        trace!(
542            "[send message]: {}, to={}, length={}",
543            proto_id,
544            peer_index,
545            data.len()
546        );
547        self.p2p_control
548            .send_message_to(peer_index, proto_id, data)?;
549        Ok(())
550    }
551    fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
552        trace!(
553            "[send message to]: {}, to={}, length={}",
554            self.proto_id,
555            peer_index,
556            data.len()
557        );
558        self.p2p_control
559            .send_message_to(peer_index, self.proto_id, data)?;
560        Ok(())
561    }
562    fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
563        self.p2p_control
564            .filter_broadcast(target, self.proto_id, data)?;
565        Ok(())
566    }
567    fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
568        debug!("Disconnect peer: {}, message: {}", peer_index, message);
569        disconnect_with_message(&self.p2p_control, peer_index, message)?;
570        Ok(())
571    }
572
573    fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
574        self.network_state
575            .with_peer_registry(|reg| reg.get_peer(peer_index).cloned())
576    }
577    fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>) {
578        self.network_state.with_peer_registry_mut(|reg| {
579            reg.get_peer_mut(peer_index).map(f);
580        })
581    }
582
583    fn connected_peers(&self) -> Vec<PeerIndex> {
584        self.network_state.with_peer_registry(|reg| {
585            reg.peers()
586                .iter()
587                .filter_map(|(peer_index, peer)| {
588                    if peer.protocols.contains_key(&self.proto_id) {
589                        Some(peer_index)
590                    } else {
591                        None
592                    }
593                })
594                .cloned()
595                .collect()
596        })
597    }
598
599    fn full_relay_connected_peers(&self) -> Vec<PeerIndex> {
600        self.network_state.with_peer_registry(|reg| {
601            reg.peers()
602                .iter()
603                .filter_map(|(peer_index, peer)| {
604                    if peer.protocols.contains_key(&self.proto_id) && !peer.is_block_relay_only() {
605                        Some(peer_index)
606                    } else {
607                        None
608                    }
609                })
610                .cloned()
611                .collect()
612        })
613    }
614
615    fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour) {
616        self.network_state
617            .report_session(&self.p2p_control, peer_index, behaviour);
618    }
619    fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
620        self.network_state
621            .ban_session(&self.p2p_control, peer_index, duration, reason);
622    }
623
624    fn protocol_id(&self) -> ProtocolId {
625        self.proto_id
626    }
627
628    fn p2p_control(&self) -> Option<&ServiceControl> {
629        Some(&self.p2p_control)
630    }
631}
632
633pub(crate) struct BlockingFutureTask {
634    task: BoxedFutureTask,
635}
636
637impl BlockingFutureTask {
638    pub(crate) fn new(task: BoxedFutureTask) -> BlockingFutureTask {
639        BlockingFutureTask { task }
640    }
641}
642
643impl Future for BlockingFutureTask {
644    type Output = ();
645
646    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
647        p2p::runtime::block_in_place(|| self.task.poll_unpin(cx))
648    }
649}