ckb_network/protocols/
mod.rs

1pub(crate) mod disconnect_message;
2pub(crate) mod discovery;
3pub(crate) mod feeler;
4pub(crate) mod identify;
5pub(crate) mod ping;
6pub(crate) mod support_protocols;
7
8#[cfg(test)]
9mod tests;
10
11use ckb_logger::{debug, trace};
12use futures::{Future, FutureExt};
13use p2p::{
14    async_trait,
15    builder::MetaBuilder,
16    bytes::Bytes,
17    context::{ProtocolContext, ProtocolContextMutRef},
18    service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, ServiceControl, TargetSession},
19    traits::ServiceProtocol,
20    ProtocolId, SessionId,
21};
22use std::{
23    pin::Pin,
24    sync::Arc,
25    task::{Context, Poll},
26    time::Duration,
27};
28use tokio_util::codec::length_delimited;
29
30/// Alias session id
31pub type PeerIndex = SessionId;
32/// Boxed future task
33pub type BoxedFutureTask = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
34
35use crate::{
36    compress::{compress, decompress},
37    network::{async_disconnect_with_message, disconnect_with_message},
38    Behaviour, Error, NetworkState, Peer, ProtocolVersion, SupportProtocols,
39};
40
41/// Abstract protocol context
42#[async_trait]
43pub trait CKBProtocolContext: Send {
44    /// Get ckb2023 flag
45    fn ckb2023(&self) -> bool;
46    /// Set notify to tentacle
47    // Interact with underlying p2p service
48    async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error>;
49    /// Remove notify
50    async fn remove_notify(&self, token: u64) -> Result<(), Error>;
51    /// Send message through quick queue
52    async fn async_quick_send_message(
53        &self,
54        proto_id: ProtocolId,
55        peer_index: PeerIndex,
56        data: Bytes,
57    ) -> Result<(), Error>;
58    /// Send message through quick queue
59    async fn async_quick_send_message_to(
60        &self,
61        peer_index: PeerIndex,
62        data: Bytes,
63    ) -> Result<(), Error>;
64    /// Filter broadcast message through quick queue
65    async fn async_quick_filter_broadcast(
66        &self,
67        target: TargetSession,
68        data: Bytes,
69    ) -> Result<(), Error>;
70    /// spawn a future task, if `blocking` is true we use tokio_threadpool::blocking to handle the task.
71    async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
72    /// Send message
73    async fn async_send_message(
74        &self,
75        proto_id: ProtocolId,
76        peer_index: PeerIndex,
77        data: Bytes,
78    ) -> Result<(), Error>;
79    /// Send message
80    async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
81    /// Filter broadcast message
82    async fn async_filter_broadcast(&self, target: TargetSession, data: Bytes)
83        -> Result<(), Error>;
84    /// Disconnect session
85    async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
86    /// Send message through quick queue
87    fn quick_send_message(
88        &self,
89        proto_id: ProtocolId,
90        peer_index: PeerIndex,
91        data: Bytes,
92    ) -> Result<(), Error>;
93    /// Send message through quick queue
94    fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
95    /// Filter broadcast message through quick queue
96    fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
97    /// spawn a future task, if `blocking` is true we use tokio_threadpool::blocking to handle the task.
98    fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
99    /// Send message
100    fn send_message(
101        &self,
102        proto_id: ProtocolId,
103        peer_index: PeerIndex,
104        data: Bytes,
105    ) -> Result<(), Error>;
106    /// Send message
107    fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
108    /// Filter broadcast message
109    fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
110    /// Disconnect session
111    fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
112    // Interact with NetworkState
113    /// Get peer info
114    fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer>;
115    /// Modify peer info
116    fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>);
117    /// Get all session id
118    fn connected_peers(&self) -> Vec<PeerIndex>;
119    /// Report peer behavior
120    fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour);
121    /// Ban peer
122    fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String);
123    /// current protocol id
124    fn protocol_id(&self) -> ProtocolId;
125    /// Raw tentacle controller
126    fn p2p_control(&self) -> Option<&ServiceControl> {
127        None
128    }
129}
130
131/// type alias of dyn ckb protocol context
132pub type BoxedCKBProtocolContext = Arc<dyn CKBProtocolContext + Sync>;
133
134/// Abstract protocol handle base on tentacle service handle
135#[async_trait]
136pub trait CKBProtocolHandler: Sync + Send {
137    /// Init action on service run
138    async fn init(&mut self, nc: BoxedCKBProtocolContext);
139    /// Called when opening protocol
140    async fn connected(
141        &mut self,
142        _nc: BoxedCKBProtocolContext,
143        _peer_index: PeerIndex,
144        _version: &str,
145    ) {
146    }
147    /// Called when closing protocol
148    async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {}
149    /// Called when the corresponding protocol message is received
150    async fn received(
151        &mut self,
152        _nc: BoxedCKBProtocolContext,
153        _peer_index: PeerIndex,
154        _data: Bytes,
155    ) {
156    }
157    /// Called when the Service receives the notify task
158    async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {}
159    /// Behave like `Stream::poll`
160    async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> {
161        None
162    }
163}
164
165/// Help to build protocol meta
166pub struct CKBProtocol {
167    id: ProtocolId,
168    // for example: b"/ckb/"
169    protocol_name: String,
170    // supported version, used to check protocol version
171    supported_versions: Vec<ProtocolVersion>,
172    max_frame_length: usize,
173    handler: Box<dyn CKBProtocolHandler>,
174    network_state: Arc<NetworkState>,
175}
176
177impl CKBProtocol {
178    /// New with support protocol
179    // a helper constructor to build `CKBProtocol` with `SupportProtocols` enum
180    pub fn new_with_support_protocol(
181        support_protocol: support_protocols::SupportProtocols,
182        handler: Box<dyn CKBProtocolHandler>,
183        network_state: Arc<NetworkState>,
184    ) -> Self {
185        CKBProtocol {
186            id: support_protocol.protocol_id(),
187            max_frame_length: support_protocol.max_frame_length(),
188            protocol_name: support_protocol.name(),
189            supported_versions: support_protocol.support_versions(),
190            network_state,
191            handler,
192        }
193    }
194
195    /// New with all config
196    pub fn new(
197        protocol_name: String,
198        id: ProtocolId,
199        versions: &[ProtocolVersion],
200        max_frame_length: usize,
201        handler: Box<dyn CKBProtocolHandler>,
202        network_state: Arc<NetworkState>,
203    ) -> Self {
204        CKBProtocol {
205            id,
206            max_frame_length,
207            network_state,
208            handler,
209            protocol_name: format!("/ckb/{protocol_name}"),
210            supported_versions: {
211                let mut versions: Vec<_> = versions.to_vec();
212                versions.sort_by(|a, b| b.cmp(a));
213                versions.to_vec()
214            },
215        }
216    }
217
218    /// Protocol id
219    pub fn id(&self) -> ProtocolId {
220        self.id
221    }
222
223    /// Protocol name
224    pub fn protocol_name(&self) -> String {
225        self.protocol_name.clone()
226    }
227
228    /// Whether support this version
229    pub fn match_version(&self, version: ProtocolVersion) -> bool {
230        self.supported_versions.contains(&version)
231    }
232
233    /// Build to tentacle protocol meta
234    pub fn build(self) -> ProtocolMeta {
235        let protocol_name = self.protocol_name();
236        let max_frame_length = self.max_frame_length;
237        let supported_versions = self
238            .supported_versions
239            .iter()
240            .map(ToString::to_string)
241            .collect::<Vec<_>>();
242        MetaBuilder::default()
243            .id(self.id)
244            .name(move |_| protocol_name.clone())
245            .codec(move || {
246                Box::new(
247                    length_delimited::Builder::new()
248                        .max_frame_length(max_frame_length)
249                        .new_codec(),
250                )
251            })
252            .support_versions(supported_versions)
253            .service_handle(move || {
254                ProtocolHandle::Callback(Box::new(CKBHandler {
255                    proto_id: self.id,
256                    network_state: Arc::clone(&self.network_state),
257                    handler: self.handler,
258                }))
259            })
260            .before_send(compress)
261            .before_receive(|| Some(Box::new(decompress)))
262            .build()
263    }
264}
265
266struct CKBHandler {
267    proto_id: ProtocolId,
268    network_state: Arc<NetworkState>,
269    handler: Box<dyn CKBProtocolHandler>,
270}
271
272// Just proxy to inner handler, this struct exists for convenient unit test.
273#[async_trait]
274impl ServiceProtocol for CKBHandler {
275    async fn init(&mut self, context: &mut ProtocolContext) {
276        let nc = DefaultCKBProtocolContext {
277            proto_id: self.proto_id,
278            network_state: Arc::clone(&self.network_state),
279            p2p_control: context.control().to_owned().into(),
280            async_p2p_control: context.control().to_owned(),
281        };
282        self.handler.init(Arc::new(nc)).await;
283    }
284
285    async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
286        // This judgment will be removed in the first release after 2023 hardfork
287        if self
288            .network_state
289            .ckb2023
290            .load(std::sync::atomic::Ordering::SeqCst)
291            && version != crate::protocols::support_protocols::LASTEST_VERSION
292            && context.proto_id != SupportProtocols::RelayV2.protocol_id()
293        {
294            debug!(
295                "The version of session {}, protocol {} is {}, not 3. It will be disconnected.",
296                context.session.id, context.proto_id, version
297            );
298            let id = context.session.id;
299            let _ignore = context.disconnect(id).await;
300            return;
301        }
302        self.network_state.with_peer_registry_mut(|reg| {
303            if let Some(peer) = reg.get_peer_mut(context.session.id) {
304                peer.protocols.insert(self.proto_id, version.to_owned());
305            }
306        });
307
308        if !self.network_state.is_active() {
309            return;
310        }
311
312        let nc = DefaultCKBProtocolContext {
313            proto_id: self.proto_id,
314            network_state: Arc::clone(&self.network_state),
315            p2p_control: context.control().to_owned().into(),
316            async_p2p_control: context.control().to_owned(),
317        };
318        let peer_index = context.session.id;
319
320        self.handler
321            .connected(Arc::new(nc), peer_index, version)
322            .await;
323    }
324
325    async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
326        self.network_state.with_peer_registry_mut(|reg| {
327            if let Some(peer) = reg.get_peer_mut(context.session.id) {
328                peer.protocols.remove(&self.proto_id);
329            }
330        });
331
332        if !self.network_state.is_active() {
333            return;
334        }
335
336        let nc = DefaultCKBProtocolContext {
337            proto_id: self.proto_id,
338            network_state: Arc::clone(&self.network_state),
339            p2p_control: context.control().to_owned().into(),
340            async_p2p_control: context.control().to_owned(),
341        };
342        let peer_index = context.session.id;
343        self.handler.disconnected(Arc::new(nc), peer_index).await;
344    }
345
346    async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
347        if !self.network_state.is_active() {
348            return;
349        }
350
351        trace!(
352            "[received message]: {}, {}, length={}",
353            self.proto_id,
354            context.session.id,
355            data.len()
356        );
357        let nc = DefaultCKBProtocolContext {
358            proto_id: self.proto_id,
359            network_state: Arc::clone(&self.network_state),
360            p2p_control: context.control().to_owned().into(),
361            async_p2p_control: context.control().to_owned(),
362        };
363        let peer_index = context.session.id;
364        self.handler.received(Arc::new(nc), peer_index, data).await;
365    }
366
367    async fn notify(&mut self, context: &mut ProtocolContext, token: u64) {
368        if !self.network_state.is_active() {
369            return;
370        }
371        let nc = DefaultCKBProtocolContext {
372            proto_id: self.proto_id,
373            network_state: Arc::clone(&self.network_state),
374            p2p_control: context.control().to_owned().into(),
375            async_p2p_control: context.control().to_owned(),
376        };
377        self.handler.notify(Arc::new(nc), token).await;
378    }
379
380    async fn poll(&mut self, context: &mut ProtocolContext) -> Option<()> {
381        let nc = DefaultCKBProtocolContext {
382            proto_id: self.proto_id,
383            network_state: Arc::clone(&self.network_state),
384            p2p_control: context.control().to_owned().into(),
385            async_p2p_control: context.control().to_owned(),
386        };
387        self.handler.poll(Arc::new(nc)).await
388    }
389}
390
391struct DefaultCKBProtocolContext {
392    proto_id: ProtocolId,
393    network_state: Arc<NetworkState>,
394    p2p_control: ServiceControl,
395    async_p2p_control: ServiceAsyncControl,
396}
397
398#[async_trait]
399impl CKBProtocolContext for DefaultCKBProtocolContext {
400    fn ckb2023(&self) -> bool {
401        self.network_state
402            .ckb2023
403            .load(std::sync::atomic::Ordering::SeqCst)
404    }
405    async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error> {
406        self.async_p2p_control
407            .set_service_notify(self.proto_id, interval, token)
408            .await?;
409        Ok(())
410    }
411    async fn remove_notify(&self, token: u64) -> Result<(), Error> {
412        self.async_p2p_control
413            .remove_service_notify(self.proto_id, token)
414            .await?;
415        Ok(())
416    }
417    async fn async_quick_send_message(
418        &self,
419        proto_id: ProtocolId,
420        peer_index: PeerIndex,
421        data: Bytes,
422    ) -> Result<(), Error> {
423        trace!(
424            "[send message]: {}, to={}, length={}",
425            proto_id,
426            peer_index,
427            data.len()
428        );
429        self.async_p2p_control
430            .quick_send_message_to(peer_index, proto_id, data)
431            .await?;
432        Ok(())
433    }
434    async fn async_quick_send_message_to(
435        &self,
436        peer_index: PeerIndex,
437        data: Bytes,
438    ) -> Result<(), Error> {
439        trace!(
440            "[send message to]: {}, to={}, length={}",
441            self.proto_id,
442            peer_index,
443            data.len()
444        );
445        self.async_p2p_control
446            .quick_send_message_to(peer_index, self.proto_id, data)
447            .await?;
448        Ok(())
449    }
450    async fn async_quick_filter_broadcast(
451        &self,
452        target: TargetSession,
453        data: Bytes,
454    ) -> Result<(), Error> {
455        self.async_p2p_control
456            .quick_filter_broadcast(target, self.proto_id, data)
457            .await?;
458        Ok(())
459    }
460    async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
461        let task = if blocking {
462            Box::pin(BlockingFutureTask::new(task))
463        } else {
464            task
465        };
466        self.async_p2p_control.future_task(task).await?;
467        Ok(())
468    }
469    async fn async_send_message(
470        &self,
471        proto_id: ProtocolId,
472        peer_index: PeerIndex,
473        data: Bytes,
474    ) -> Result<(), Error> {
475        trace!(
476            "[send message]: {}, to={}, length={}",
477            proto_id,
478            peer_index,
479            data.len()
480        );
481        self.async_p2p_control
482            .send_message_to(peer_index, proto_id, data)
483            .await?;
484        Ok(())
485    }
486    async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
487        trace!(
488            "[send message to]: {}, to={}, length={}",
489            self.proto_id,
490            peer_index,
491            data.len()
492        );
493        self.async_p2p_control
494            .send_message_to(peer_index, self.proto_id, data)
495            .await?;
496        Ok(())
497    }
498    async fn async_filter_broadcast(
499        &self,
500        target: TargetSession,
501        data: Bytes,
502    ) -> Result<(), Error> {
503        self.async_p2p_control
504            .filter_broadcast(target, self.proto_id, data)
505            .await?;
506        Ok(())
507    }
508    async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
509        debug!("Disconnect peer: {}, message: {}", peer_index, message);
510        async_disconnect_with_message(&self.async_p2p_control, peer_index, message).await?;
511        Ok(())
512    }
513    fn quick_send_message(
514        &self,
515        proto_id: ProtocolId,
516        peer_index: PeerIndex,
517        data: Bytes,
518    ) -> Result<(), Error> {
519        trace!(
520            "[send message]: {}, to={}, length={}",
521            proto_id,
522            peer_index,
523            data.len()
524        );
525        self.p2p_control
526            .quick_send_message_to(peer_index, proto_id, data)?;
527        Ok(())
528    }
529    fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
530        trace!(
531            "[send message to]: {}, to={}, length={}",
532            self.proto_id,
533            peer_index,
534            data.len()
535        );
536        self.p2p_control
537            .quick_send_message_to(peer_index, self.proto_id, data)?;
538        Ok(())
539    }
540    fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
541        self.p2p_control
542            .quick_filter_broadcast(target, self.proto_id, data)?;
543        Ok(())
544    }
545    fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
546        let task = if blocking {
547            Box::pin(BlockingFutureTask::new(task))
548        } else {
549            task
550        };
551        self.p2p_control.future_task(task)?;
552        Ok(())
553    }
554    fn send_message(
555        &self,
556        proto_id: ProtocolId,
557        peer_index: PeerIndex,
558        data: Bytes,
559    ) -> Result<(), Error> {
560        trace!(
561            "[send message]: {}, to={}, length={}",
562            proto_id,
563            peer_index,
564            data.len()
565        );
566        self.p2p_control
567            .send_message_to(peer_index, proto_id, data)?;
568        Ok(())
569    }
570    fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
571        trace!(
572            "[send message to]: {}, to={}, length={}",
573            self.proto_id,
574            peer_index,
575            data.len()
576        );
577        self.p2p_control
578            .send_message_to(peer_index, self.proto_id, data)?;
579        Ok(())
580    }
581    fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
582        self.p2p_control
583            .filter_broadcast(target, self.proto_id, data)?;
584        Ok(())
585    }
586    fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
587        debug!("Disconnect peer: {}, message: {}", peer_index, message);
588        disconnect_with_message(&self.p2p_control, peer_index, message)?;
589        Ok(())
590    }
591
592    fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
593        self.network_state
594            .with_peer_registry(|reg| reg.get_peer(peer_index).cloned())
595    }
596    fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>) {
597        self.network_state.with_peer_registry_mut(|reg| {
598            reg.get_peer_mut(peer_index).map(f);
599        })
600    }
601
602    fn connected_peers(&self) -> Vec<PeerIndex> {
603        self.network_state.with_peer_registry(|reg| {
604            reg.peers()
605                .iter()
606                .filter_map(|(peer_index, peer)| {
607                    if peer.protocols.contains_key(&self.proto_id) {
608                        Some(peer_index)
609                    } else {
610                        None
611                    }
612                })
613                .cloned()
614                .collect()
615        })
616    }
617    fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour) {
618        self.network_state
619            .report_session(&self.p2p_control, peer_index, behaviour);
620    }
621    fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
622        self.network_state
623            .ban_session(&self.p2p_control, peer_index, duration, reason);
624    }
625
626    fn protocol_id(&self) -> ProtocolId {
627        self.proto_id
628    }
629
630    fn p2p_control(&self) -> Option<&ServiceControl> {
631        Some(&self.p2p_control)
632    }
633}
634
635pub(crate) struct BlockingFutureTask {
636    task: BoxedFutureTask,
637}
638
639impl BlockingFutureTask {
640    pub(crate) fn new(task: BoxedFutureTask) -> BlockingFutureTask {
641        BlockingFutureTask { task }
642    }
643}
644
645impl Future for BlockingFutureTask {
646    type Output = ();
647
648    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
649        p2p::runtime::block_in_place(|| self.task.poll_unpin(cx))
650    }
651}