ckb_network/protocols/
mod.rs

1pub(crate) mod disconnect_message;
2pub(crate) mod discovery;
3pub(crate) mod feeler;
4pub(crate) mod identify;
5pub(crate) mod ping;
6pub(crate) mod support_protocols;
7
8#[cfg(not(target_family = "wasm"))]
9pub(crate) mod hole_punching;
10
11#[cfg(test)]
12mod tests;
13
14use ckb_logger::{debug, trace};
15use futures::{Future, FutureExt};
16use p2p::{
17    ProtocolId, SessionId, async_trait,
18    builder::MetaBuilder,
19    bytes::Bytes,
20    context::{ProtocolContext, ProtocolContextMutRef},
21    service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, ServiceControl, TargetSession},
22    traits::ServiceProtocol,
23};
24use std::{
25    pin::Pin,
26    sync::Arc,
27    task::{Context, Poll},
28    time::Duration,
29};
30use tokio_util::codec::length_delimited;
31
32/// Alias session id
33pub type PeerIndex = SessionId;
34/// Boxed future task
35pub type BoxedFutureTask = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
36
37use crate::{
38    Behaviour, Error, NetworkState, Peer, ProtocolVersion,
39    compress::LengthDelimitedCodecWithCompress,
40    network::{async_disconnect_with_message, disconnect_with_message},
41};
42
43#[cfg(not(target_family = "wasm"))]
44use crate::SupportProtocols;
45
46/// Abstract protocol context
47#[async_trait]
48pub trait CKBProtocolContext: Send {
49    /// Set notify to tentacle
50    // Interact with underlying p2p service
51    async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error>;
52    /// Remove notify
53    async fn remove_notify(&self, token: u64) -> Result<(), Error>;
54    /// Send message through quick queue
55    async fn async_quick_send_message(
56        &self,
57        proto_id: ProtocolId,
58        peer_index: PeerIndex,
59        data: Bytes,
60    ) -> Result<(), Error>;
61    /// Send message through quick queue
62    async fn async_quick_send_message_to(
63        &self,
64        peer_index: PeerIndex,
65        data: Bytes,
66    ) -> Result<(), Error>;
67    /// Filter broadcast message through quick queue
68    async fn async_quick_filter_broadcast(
69        &self,
70        target: TargetSession,
71        data: Bytes,
72    ) -> Result<(), Error>;
73    /// spawn a future task, if `blocking` is true we use tokio_threadpool::blocking to handle the task.
74    async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
75    /// Send message
76    async fn async_send_message(
77        &self,
78        proto_id: ProtocolId,
79        peer_index: PeerIndex,
80        data: Bytes,
81    ) -> Result<(), Error>;
82    /// Send message
83    async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
84    /// Filter broadcast message
85    async fn async_filter_broadcast(&self, target: TargetSession, data: Bytes)
86    -> Result<(), Error>;
87    /// Filter broadcast message through queue and specify protocol id
88    async fn async_filter_broadcast_with_proto(
89        &self,
90        proto_id: ProtocolId,
91        target: TargetSession,
92        data: Bytes,
93    ) -> Result<(), Error>;
94    /// Filter broadcast message through quick queue and specify protocol id
95    async fn async_quick_filter_broadcast_with_proto(
96        &self,
97        proto_id: ProtocolId,
98        target: TargetSession,
99        data: Bytes,
100    ) -> Result<(), Error>;
101    /// Disconnect session
102    async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
103    /// Send message through quick queue
104    fn quick_send_message(
105        &self,
106        proto_id: ProtocolId,
107        peer_index: PeerIndex,
108        data: Bytes,
109    ) -> Result<(), Error>;
110    /// Send message through quick queue
111    fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
112    /// Filter broadcast message through quick queue
113    fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
114    /// Filter broadcast message through quick queue and specify protocol id
115    fn quick_filter_broadcast_with_proto(
116        &self,
117        proto_id: ProtocolId,
118        target: TargetSession,
119        data: Bytes,
120    ) -> Result<(), Error>;
121    /// spawn a future task, if `blocking` is true we use tokio_threadpool::blocking to handle the task.
122    fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
123    /// Send message
124    fn send_message(
125        &self,
126        proto_id: ProtocolId,
127        peer_index: PeerIndex,
128        data: Bytes,
129    ) -> Result<(), Error>;
130    /// Send message
131    fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
132    /// Filter broadcast message
133    fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
134    /// Disconnect session
135    fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
136    // Interact with NetworkState
137    /// Get peer info
138    fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer>;
139    /// Modify peer info
140    fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>);
141    /// Get all session id
142    fn connected_peers(&self) -> Vec<PeerIndex>;
143    /// Get all full_relay (exclude block-relay-only) id
144    fn full_relay_connected_peers(&self) -> Vec<PeerIndex>;
145    /// Report peer behavior
146    fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour);
147    /// Ban peer
148    fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String);
149    /// current protocol id
150    fn protocol_id(&self) -> ProtocolId;
151    /// Raw tentacle controller
152    fn p2p_control(&self) -> Option<&ServiceControl> {
153        None
154    }
155}
156
157/// type alias of dyn ckb protocol context
158pub type BoxedCKBProtocolContext = Arc<dyn CKBProtocolContext + Sync>;
159
160/// Abstract protocol handle base on tentacle service handle
161#[async_trait]
162pub trait CKBProtocolHandler: Sync + Send {
163    /// Init action on service run
164    async fn init(&mut self, nc: BoxedCKBProtocolContext);
165    /// Called when opening protocol
166    async fn connected(
167        &mut self,
168        _nc: BoxedCKBProtocolContext,
169        _peer_index: PeerIndex,
170        _version: &str,
171    ) {
172    }
173    /// Called when closing protocol
174    async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {}
175    /// Called when the corresponding protocol message is received
176    async fn received(
177        &mut self,
178        _nc: BoxedCKBProtocolContext,
179        _peer_index: PeerIndex,
180        _data: Bytes,
181    ) {
182    }
183    /// Called when the Service receives the notify task
184    async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {}
185    /// Behave like `Stream::poll`
186    async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> {
187        None
188    }
189}
190
191/// Help to build protocol meta
192pub struct CKBProtocol {
193    id: ProtocolId,
194    // for example: b"/ckb/"
195    protocol_name: String,
196    // supported version, used to check protocol version
197    supported_versions: Vec<ProtocolVersion>,
198    max_frame_length: usize,
199    handler: Box<dyn CKBProtocolHandler>,
200    network_state: Arc<NetworkState>,
201    compress: bool,
202}
203
204impl CKBProtocol {
205    /// New with support protocol
206    // a helper constructor to build `CKBProtocol` with `SupportProtocols` enum
207    pub fn new_with_support_protocol(
208        support_protocol: support_protocols::SupportProtocols,
209        handler: Box<dyn CKBProtocolHandler>,
210        network_state: Arc<NetworkState>,
211    ) -> Self {
212        CKBProtocol {
213            id: support_protocol.protocol_id(),
214            max_frame_length: support_protocol.max_frame_length(),
215            protocol_name: support_protocol.name(),
216            supported_versions: support_protocol.support_versions(),
217            network_state,
218            handler,
219            compress: true,
220        }
221    }
222
223    /// New with all config
224    pub fn new(
225        protocol_name: String,
226        id: ProtocolId,
227        versions: &[ProtocolVersion],
228        max_frame_length: usize,
229        handler: Box<dyn CKBProtocolHandler>,
230        network_state: Arc<NetworkState>,
231    ) -> Self {
232        CKBProtocol {
233            id,
234            max_frame_length,
235            network_state,
236            handler,
237            protocol_name: format!("/ckb/{protocol_name}"),
238            supported_versions: {
239                let mut versions: Vec<_> = versions.to_vec();
240                versions.sort_by(|a, b| b.cmp(a));
241                versions.to_vec()
242            },
243            compress: true,
244        }
245    }
246
247    /// Enable or disable compression, default is enabled
248    pub fn compress(mut self, enable: bool) -> Self {
249        self.compress = enable;
250        self
251    }
252
253    /// Protocol id
254    pub fn id(&self) -> ProtocolId {
255        self.id
256    }
257
258    /// Protocol name
259    pub fn protocol_name(&self) -> String {
260        self.protocol_name.clone()
261    }
262
263    /// Whether support this version
264    pub fn match_version(&self, version: ProtocolVersion) -> bool {
265        self.supported_versions.contains(&version)
266    }
267
268    /// Build to tentacle protocol meta
269    pub fn build(self) -> ProtocolMeta {
270        let protocol_name = self.protocol_name();
271        let max_frame_length = self.max_frame_length;
272        let supported_versions = self
273            .supported_versions
274            .iter()
275            .map(ToString::to_string)
276            .collect::<Vec<_>>();
277        MetaBuilder::default()
278            .id(self.id)
279            .name(move |_| protocol_name.clone())
280            .codec(move || {
281                Box::new(LengthDelimitedCodecWithCompress::new(
282                    self.compress,
283                    length_delimited::Builder::new()
284                        .max_frame_length(max_frame_length)
285                        .new_codec(),
286                    self.id,
287                ))
288            })
289            .support_versions(supported_versions)
290            .service_handle(move || {
291                ProtocolHandle::Callback(Box::new(CKBHandler {
292                    proto_id: self.id,
293                    network_state: Arc::clone(&self.network_state),
294                    handler: self.handler,
295                }))
296            })
297            .build()
298    }
299}
300
301struct CKBHandler {
302    proto_id: ProtocolId,
303    network_state: Arc<NetworkState>,
304    handler: Box<dyn CKBProtocolHandler>,
305}
306
307// Just proxy to inner handler, this struct exists for convenient unit test.
308#[async_trait]
309impl ServiceProtocol for CKBHandler {
310    async fn init(&mut self, context: &mut ProtocolContext) {
311        let nc = DefaultCKBProtocolContext {
312            proto_id: self.proto_id,
313            network_state: Arc::clone(&self.network_state),
314            p2p_control: context.control().to_owned().into(),
315            async_p2p_control: context.control().to_owned(),
316        };
317        self.handler.init(Arc::new(nc)).await;
318    }
319
320    async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
321        self.network_state.with_peer_registry_mut(|reg| {
322            if let Some(peer) = reg.get_peer_mut(context.session.id) {
323                peer.protocols.insert(self.proto_id, version.to_owned());
324            }
325        });
326
327        if !self.network_state.is_active() {
328            return;
329        }
330
331        let nc = DefaultCKBProtocolContext {
332            proto_id: self.proto_id,
333            network_state: Arc::clone(&self.network_state),
334            p2p_control: context.control().to_owned().into(),
335            async_p2p_control: context.control().to_owned(),
336        };
337        let peer_index = context.session.id;
338
339        self.handler
340            .connected(Arc::new(nc), peer_index, version)
341            .await;
342    }
343
344    async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
345        self.network_state.with_peer_registry_mut(|reg| {
346            if let Some(peer) = reg.get_peer_mut(context.session.id) {
347                peer.protocols.remove(&self.proto_id);
348            }
349        });
350
351        if !self.network_state.is_active() {
352            return;
353        }
354
355        let nc = DefaultCKBProtocolContext {
356            proto_id: self.proto_id,
357            network_state: Arc::clone(&self.network_state),
358            p2p_control: context.control().to_owned().into(),
359            async_p2p_control: context.control().to_owned(),
360        };
361        let peer_index = context.session.id;
362        self.handler.disconnected(Arc::new(nc), peer_index).await;
363    }
364
365    async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
366        if !self.network_state.is_active() {
367            return;
368        }
369
370        trace!(
371            "[received message]: {}, {}, length={}",
372            self.proto_id,
373            context.session.id,
374            data.len()
375        );
376        let nc = DefaultCKBProtocolContext {
377            proto_id: self.proto_id,
378            network_state: Arc::clone(&self.network_state),
379            p2p_control: context.control().to_owned().into(),
380            async_p2p_control: context.control().to_owned(),
381        };
382        let peer_index = context.session.id;
383        self.handler.received(Arc::new(nc), peer_index, data).await;
384    }
385
386    async fn notify(&mut self, context: &mut ProtocolContext, token: u64) {
387        if !self.network_state.is_active() {
388            return;
389        }
390        let nc = DefaultCKBProtocolContext {
391            proto_id: self.proto_id,
392            network_state: Arc::clone(&self.network_state),
393            p2p_control: context.control().to_owned().into(),
394            async_p2p_control: context.control().to_owned(),
395        };
396        self.handler.notify(Arc::new(nc), token).await;
397    }
398
399    async fn poll(&mut self, context: &mut ProtocolContext) -> Option<()> {
400        let nc = DefaultCKBProtocolContext {
401            proto_id: self.proto_id,
402            network_state: Arc::clone(&self.network_state),
403            p2p_control: context.control().to_owned().into(),
404            async_p2p_control: context.control().to_owned(),
405        };
406        self.handler.poll(Arc::new(nc)).await
407    }
408}
409
410struct DefaultCKBProtocolContext {
411    proto_id: ProtocolId,
412    network_state: Arc<NetworkState>,
413    p2p_control: ServiceControl,
414    async_p2p_control: ServiceAsyncControl,
415}
416
417#[async_trait]
418impl CKBProtocolContext for DefaultCKBProtocolContext {
419    async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error> {
420        self.async_p2p_control
421            .set_service_notify(self.proto_id, interval, token)
422            .await?;
423        Ok(())
424    }
425    async fn remove_notify(&self, token: u64) -> Result<(), Error> {
426        self.async_p2p_control
427            .remove_service_notify(self.proto_id, token)
428            .await?;
429        Ok(())
430    }
431    async fn async_quick_send_message(
432        &self,
433        proto_id: ProtocolId,
434        peer_index: PeerIndex,
435        data: Bytes,
436    ) -> Result<(), Error> {
437        trace!(
438            "[send message]: {}, to={}, length={}",
439            proto_id,
440            peer_index,
441            data.len()
442        );
443        self.async_p2p_control
444            .quick_send_message_to(peer_index, proto_id, data)
445            .await?;
446        Ok(())
447    }
448    async fn async_quick_send_message_to(
449        &self,
450        peer_index: PeerIndex,
451        data: Bytes,
452    ) -> Result<(), Error> {
453        trace!(
454            "[send message to]: {}, to={}, length={}",
455            self.proto_id,
456            peer_index,
457            data.len()
458        );
459        self.async_p2p_control
460            .quick_send_message_to(peer_index, self.proto_id, data)
461            .await?;
462        Ok(())
463    }
464    async fn async_quick_filter_broadcast(
465        &self,
466        target: TargetSession,
467        data: Bytes,
468    ) -> Result<(), Error> {
469        self.async_p2p_control
470            .quick_filter_broadcast(target, self.proto_id, data)
471            .await?;
472        Ok(())
473    }
474    async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
475        let task = if blocking {
476            Box::pin(BlockingFutureTask::new(task))
477        } else {
478            task
479        };
480        self.async_p2p_control.future_task(task).await?;
481        Ok(())
482    }
483    async fn async_send_message(
484        &self,
485        proto_id: ProtocolId,
486        peer_index: PeerIndex,
487        data: Bytes,
488    ) -> Result<(), Error> {
489        trace!(
490            "[send message]: {}, to={}, length={}",
491            proto_id,
492            peer_index,
493            data.len()
494        );
495        self.async_p2p_control
496            .send_message_to(peer_index, proto_id, data)
497            .await?;
498        Ok(())
499    }
500    async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
501        trace!(
502            "[send message to]: {}, to={}, length={}",
503            self.proto_id,
504            peer_index,
505            data.len()
506        );
507        self.async_p2p_control
508            .send_message_to(peer_index, self.proto_id, data)
509            .await?;
510        Ok(())
511    }
512    async fn async_filter_broadcast(
513        &self,
514        target: TargetSession,
515        data: Bytes,
516    ) -> Result<(), Error> {
517        self.async_p2p_control
518            .filter_broadcast(target, self.proto_id, data)
519            .await?;
520        Ok(())
521    }
522    async fn async_filter_broadcast_with_proto(
523        &self,
524        proto_id: ProtocolId,
525        target: TargetSession,
526        data: Bytes,
527    ) -> Result<(), Error> {
528        self.async_p2p_control
529            .filter_broadcast(target, proto_id, data)
530            .await?;
531        Ok(())
532    }
533    async fn async_quick_filter_broadcast_with_proto(
534        &self,
535        proto_id: ProtocolId,
536        target: TargetSession,
537        data: Bytes,
538    ) -> Result<(), Error> {
539        self.async_p2p_control
540            .quick_filter_broadcast(target, proto_id, data)
541            .await?;
542        Ok(())
543    }
544    async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
545        debug!("Disconnect peer: {}, message: {}", peer_index, message);
546        async_disconnect_with_message(&self.async_p2p_control, peer_index, message).await?;
547        Ok(())
548    }
549    fn quick_send_message(
550        &self,
551        proto_id: ProtocolId,
552        peer_index: PeerIndex,
553        data: Bytes,
554    ) -> Result<(), Error> {
555        trace!(
556            "[send message]: {}, to={}, length={}",
557            proto_id,
558            peer_index,
559            data.len()
560        );
561        self.p2p_control
562            .quick_send_message_to(peer_index, proto_id, data)?;
563        Ok(())
564    }
565    fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
566        trace!(
567            "[send message to]: {}, to={}, length={}",
568            self.proto_id,
569            peer_index,
570            data.len()
571        );
572        self.p2p_control
573            .quick_send_message_to(peer_index, self.proto_id, data)?;
574        Ok(())
575    }
576    fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
577        self.p2p_control
578            .quick_filter_broadcast(target, self.proto_id, data)?;
579        Ok(())
580    }
581    fn quick_filter_broadcast_with_proto(
582        &self,
583        proto_id: ProtocolId,
584        target: TargetSession,
585        data: Bytes,
586    ) -> Result<(), Error> {
587        self.p2p_control
588            .quick_filter_broadcast(target, proto_id, data)?;
589        Ok(())
590    }
591    fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
592        let task = if blocking {
593            Box::pin(BlockingFutureTask::new(task))
594        } else {
595            task
596        };
597        self.p2p_control.future_task(task)?;
598        Ok(())
599    }
600    fn send_message(
601        &self,
602        proto_id: ProtocolId,
603        peer_index: PeerIndex,
604        data: Bytes,
605    ) -> Result<(), Error> {
606        trace!(
607            "[send message]: {}, to={}, length={}",
608            proto_id,
609            peer_index,
610            data.len()
611        );
612        self.p2p_control
613            .send_message_to(peer_index, proto_id, data)?;
614        Ok(())
615    }
616    fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
617        trace!(
618            "[send message to]: {}, to={}, length={}",
619            self.proto_id,
620            peer_index,
621            data.len()
622        );
623        self.p2p_control
624            .send_message_to(peer_index, self.proto_id, data)?;
625        Ok(())
626    }
627    fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
628        self.p2p_control
629            .filter_broadcast(target, self.proto_id, data)?;
630        Ok(())
631    }
632    fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
633        debug!("Disconnect peer: {}, message: {}", peer_index, message);
634        disconnect_with_message(&self.p2p_control, peer_index, message)?;
635        Ok(())
636    }
637
638    fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
639        self.network_state
640            .with_peer_registry(|reg| reg.get_peer(peer_index).cloned())
641    }
642    fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>) {
643        self.network_state.with_peer_registry_mut(|reg| {
644            reg.get_peer_mut(peer_index).map(f);
645        })
646    }
647
648    fn connected_peers(&self) -> Vec<PeerIndex> {
649        self.network_state.with_peer_registry(|reg| {
650            reg.peers()
651                .iter()
652                .filter_map(|(peer_index, peer)| {
653                    if peer.protocols.contains_key(&self.proto_id) {
654                        Some(peer_index)
655                    } else {
656                        None
657                    }
658                })
659                .cloned()
660                .collect()
661        })
662    }
663
664    fn full_relay_connected_peers(&self) -> Vec<PeerIndex> {
665        self.network_state.with_peer_registry(|reg| {
666            reg.peers()
667                .iter()
668                .filter_map(|(peer_index, peer)| {
669                    if peer.protocols.contains_key(&self.proto_id) && !peer.is_block_relay_only() {
670                        Some(peer_index)
671                    } else {
672                        None
673                    }
674                })
675                .cloned()
676                .collect()
677        })
678    }
679
680    fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour) {
681        self.network_state
682            .report_session(&self.p2p_control, peer_index, behaviour);
683    }
684    fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
685        self.network_state
686            .ban_session(&self.p2p_control, peer_index, duration, reason);
687    }
688
689    fn protocol_id(&self) -> ProtocolId {
690        self.proto_id
691    }
692
693    fn p2p_control(&self) -> Option<&ServiceControl> {
694        Some(&self.p2p_control)
695    }
696}
697
698pub(crate) struct BlockingFutureTask {
699    task: BoxedFutureTask,
700}
701
702impl BlockingFutureTask {
703    pub(crate) fn new(task: BoxedFutureTask) -> BlockingFutureTask {
704        BlockingFutureTask { task }
705    }
706}
707
708impl Future for BlockingFutureTask {
709    type Output = ();
710
711    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
712        p2p::runtime::block_in_place(|| self.task.poll_unpin(cx))
713    }
714}