ckb_network/protocols/
mod.rs

1pub(crate) mod disconnect_message;
2pub(crate) mod discovery;
3pub(crate) mod feeler;
4pub(crate) mod identify;
5pub(crate) mod ping;
6pub(crate) mod support_protocols;
7
8#[cfg(test)]
9mod tests;
10
11use ckb_logger::{debug, trace};
12use futures::{Future, FutureExt};
13use p2p::{
14    ProtocolId, SessionId, async_trait,
15    builder::MetaBuilder,
16    bytes::Bytes,
17    context::{ProtocolContext, ProtocolContextMutRef},
18    service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, ServiceControl, TargetSession},
19    traits::ServiceProtocol,
20};
21use std::{
22    pin::Pin,
23    sync::Arc,
24    task::{Context, Poll},
25    time::Duration,
26};
27use tokio_util::codec::length_delimited;
28
29/// Alias session id
30pub type PeerIndex = SessionId;
31/// Boxed future task
32pub type BoxedFutureTask = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
33
34use crate::{
35    Behaviour, Error, NetworkState, Peer, ProtocolVersion, SupportProtocols,
36    compress::{compress, decompress},
37    network::{async_disconnect_with_message, disconnect_with_message},
38};
39
40/// Abstract protocol context
41#[async_trait]
42pub trait CKBProtocolContext: Send {
43    /// Get ckb2023 flag
44    fn ckb2023(&self) -> bool;
45    /// Set notify to tentacle
46    // Interact with underlying p2p service
47    async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error>;
48    /// Remove notify
49    async fn remove_notify(&self, token: u64) -> Result<(), Error>;
50    /// Send message through quick queue
51    async fn async_quick_send_message(
52        &self,
53        proto_id: ProtocolId,
54        peer_index: PeerIndex,
55        data: Bytes,
56    ) -> Result<(), Error>;
57    /// Send message through quick queue
58    async fn async_quick_send_message_to(
59        &self,
60        peer_index: PeerIndex,
61        data: Bytes,
62    ) -> Result<(), Error>;
63    /// Filter broadcast message through quick queue
64    async fn async_quick_filter_broadcast(
65        &self,
66        target: TargetSession,
67        data: Bytes,
68    ) -> Result<(), Error>;
69    /// spawn a future task, if `blocking` is true we use tokio_threadpool::blocking to handle the task.
70    async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
71    /// Send message
72    async fn async_send_message(
73        &self,
74        proto_id: ProtocolId,
75        peer_index: PeerIndex,
76        data: Bytes,
77    ) -> Result<(), Error>;
78    /// Send message
79    async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
80    /// Filter broadcast message
81    async fn async_filter_broadcast(&self, target: TargetSession, data: Bytes)
82    -> Result<(), Error>;
83    /// Disconnect session
84    async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
85    /// Send message through quick queue
86    fn quick_send_message(
87        &self,
88        proto_id: ProtocolId,
89        peer_index: PeerIndex,
90        data: Bytes,
91    ) -> Result<(), Error>;
92    /// Send message through quick queue
93    fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
94    /// Filter broadcast message through quick queue
95    fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
96    /// spawn a future task, if `blocking` is true we use tokio_threadpool::blocking to handle the task.
97    fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
98    /// Send message
99    fn send_message(
100        &self,
101        proto_id: ProtocolId,
102        peer_index: PeerIndex,
103        data: Bytes,
104    ) -> Result<(), Error>;
105    /// Send message
106    fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
107    /// Filter broadcast message
108    fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
109    /// Disconnect session
110    fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
111    // Interact with NetworkState
112    /// Get peer info
113    fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer>;
114    /// Modify peer info
115    fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>);
116    /// Get all session id
117    fn connected_peers(&self) -> Vec<PeerIndex>;
118    /// Report peer behavior
119    fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour);
120    /// Ban peer
121    fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String);
122    /// current protocol id
123    fn protocol_id(&self) -> ProtocolId;
124    /// Raw tentacle controller
125    fn p2p_control(&self) -> Option<&ServiceControl> {
126        None
127    }
128}
129
130/// type alias of dyn ckb protocol context
131pub type BoxedCKBProtocolContext = Arc<dyn CKBProtocolContext + Sync>;
132
133/// Abstract protocol handle base on tentacle service handle
134#[async_trait]
135pub trait CKBProtocolHandler: Sync + Send {
136    /// Init action on service run
137    async fn init(&mut self, nc: BoxedCKBProtocolContext);
138    /// Called when opening protocol
139    async fn connected(
140        &mut self,
141        _nc: BoxedCKBProtocolContext,
142        _peer_index: PeerIndex,
143        _version: &str,
144    ) {
145    }
146    /// Called when closing protocol
147    async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {}
148    /// Called when the corresponding protocol message is received
149    async fn received(
150        &mut self,
151        _nc: BoxedCKBProtocolContext,
152        _peer_index: PeerIndex,
153        _data: Bytes,
154    ) {
155    }
156    /// Called when the Service receives the notify task
157    async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {}
158    /// Behave like `Stream::poll`
159    async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> {
160        None
161    }
162}
163
164/// Help to build protocol meta
165pub struct CKBProtocol {
166    id: ProtocolId,
167    // for example: b"/ckb/"
168    protocol_name: String,
169    // supported version, used to check protocol version
170    supported_versions: Vec<ProtocolVersion>,
171    max_frame_length: usize,
172    handler: Box<dyn CKBProtocolHandler>,
173    network_state: Arc<NetworkState>,
174}
175
176impl CKBProtocol {
177    /// New with support protocol
178    // a helper constructor to build `CKBProtocol` with `SupportProtocols` enum
179    pub fn new_with_support_protocol(
180        support_protocol: support_protocols::SupportProtocols,
181        handler: Box<dyn CKBProtocolHandler>,
182        network_state: Arc<NetworkState>,
183    ) -> Self {
184        CKBProtocol {
185            id: support_protocol.protocol_id(),
186            max_frame_length: support_protocol.max_frame_length(),
187            protocol_name: support_protocol.name(),
188            supported_versions: support_protocol.support_versions(),
189            network_state,
190            handler,
191        }
192    }
193
194    /// New with all config
195    pub fn new(
196        protocol_name: String,
197        id: ProtocolId,
198        versions: &[ProtocolVersion],
199        max_frame_length: usize,
200        handler: Box<dyn CKBProtocolHandler>,
201        network_state: Arc<NetworkState>,
202    ) -> Self {
203        CKBProtocol {
204            id,
205            max_frame_length,
206            network_state,
207            handler,
208            protocol_name: format!("/ckb/{protocol_name}"),
209            supported_versions: {
210                let mut versions: Vec<_> = versions.to_vec();
211                versions.sort_by(|a, b| b.cmp(a));
212                versions.to_vec()
213            },
214        }
215    }
216
217    /// Protocol id
218    pub fn id(&self) -> ProtocolId {
219        self.id
220    }
221
222    /// Protocol name
223    pub fn protocol_name(&self) -> String {
224        self.protocol_name.clone()
225    }
226
227    /// Whether support this version
228    pub fn match_version(&self, version: ProtocolVersion) -> bool {
229        self.supported_versions.contains(&version)
230    }
231
232    /// Build to tentacle protocol meta
233    pub fn build(self) -> ProtocolMeta {
234        let protocol_name = self.protocol_name();
235        let max_frame_length = self.max_frame_length;
236        let supported_versions = self
237            .supported_versions
238            .iter()
239            .map(ToString::to_string)
240            .collect::<Vec<_>>();
241        MetaBuilder::default()
242            .id(self.id)
243            .name(move |_| protocol_name.clone())
244            .codec(move || {
245                Box::new(
246                    length_delimited::Builder::new()
247                        .max_frame_length(max_frame_length)
248                        .new_codec(),
249                )
250            })
251            .support_versions(supported_versions)
252            .service_handle(move || {
253                ProtocolHandle::Callback(Box::new(CKBHandler {
254                    proto_id: self.id,
255                    network_state: Arc::clone(&self.network_state),
256                    handler: self.handler,
257                }))
258            })
259            .before_send(compress)
260            .before_receive(|| Some(Box::new(decompress)))
261            .build()
262    }
263}
264
265struct CKBHandler {
266    proto_id: ProtocolId,
267    network_state: Arc<NetworkState>,
268    handler: Box<dyn CKBProtocolHandler>,
269}
270
271// Just proxy to inner handler, this struct exists for convenient unit test.
272#[async_trait]
273impl ServiceProtocol for CKBHandler {
274    async fn init(&mut self, context: &mut ProtocolContext) {
275        let nc = DefaultCKBProtocolContext {
276            proto_id: self.proto_id,
277            network_state: Arc::clone(&self.network_state),
278            p2p_control: context.control().to_owned().into(),
279            async_p2p_control: context.control().to_owned(),
280        };
281        self.handler.init(Arc::new(nc)).await;
282    }
283
284    async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
285        // This judgment will be removed in the first release after 2023 hardfork
286        if self
287            .network_state
288            .ckb2023
289            .load(std::sync::atomic::Ordering::SeqCst)
290            && version != crate::protocols::support_protocols::LASTEST_VERSION
291            && context.proto_id != SupportProtocols::RelayV2.protocol_id()
292        {
293            debug!(
294                "The version of session {}, protocol {} is {}, not 3. It will be disconnected.",
295                context.session.id, context.proto_id, version
296            );
297            let id = context.session.id;
298            let _ignore = context.disconnect(id).await;
299            return;
300        }
301        self.network_state.with_peer_registry_mut(|reg| {
302            if let Some(peer) = reg.get_peer_mut(context.session.id) {
303                peer.protocols.insert(self.proto_id, version.to_owned());
304            }
305        });
306
307        if !self.network_state.is_active() {
308            return;
309        }
310
311        let nc = DefaultCKBProtocolContext {
312            proto_id: self.proto_id,
313            network_state: Arc::clone(&self.network_state),
314            p2p_control: context.control().to_owned().into(),
315            async_p2p_control: context.control().to_owned(),
316        };
317        let peer_index = context.session.id;
318
319        self.handler
320            .connected(Arc::new(nc), peer_index, version)
321            .await;
322    }
323
324    async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
325        self.network_state.with_peer_registry_mut(|reg| {
326            if let Some(peer) = reg.get_peer_mut(context.session.id) {
327                peer.protocols.remove(&self.proto_id);
328            }
329        });
330
331        if !self.network_state.is_active() {
332            return;
333        }
334
335        let nc = DefaultCKBProtocolContext {
336            proto_id: self.proto_id,
337            network_state: Arc::clone(&self.network_state),
338            p2p_control: context.control().to_owned().into(),
339            async_p2p_control: context.control().to_owned(),
340        };
341        let peer_index = context.session.id;
342        self.handler.disconnected(Arc::new(nc), peer_index).await;
343    }
344
345    async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
346        if !self.network_state.is_active() {
347            return;
348        }
349
350        trace!(
351            "[received message]: {}, {}, length={}",
352            self.proto_id,
353            context.session.id,
354            data.len()
355        );
356        let nc = DefaultCKBProtocolContext {
357            proto_id: self.proto_id,
358            network_state: Arc::clone(&self.network_state),
359            p2p_control: context.control().to_owned().into(),
360            async_p2p_control: context.control().to_owned(),
361        };
362        let peer_index = context.session.id;
363        self.handler.received(Arc::new(nc), peer_index, data).await;
364    }
365
366    async fn notify(&mut self, context: &mut ProtocolContext, token: u64) {
367        if !self.network_state.is_active() {
368            return;
369        }
370        let nc = DefaultCKBProtocolContext {
371            proto_id: self.proto_id,
372            network_state: Arc::clone(&self.network_state),
373            p2p_control: context.control().to_owned().into(),
374            async_p2p_control: context.control().to_owned(),
375        };
376        self.handler.notify(Arc::new(nc), token).await;
377    }
378
379    async fn poll(&mut self, context: &mut ProtocolContext) -> Option<()> {
380        let nc = DefaultCKBProtocolContext {
381            proto_id: self.proto_id,
382            network_state: Arc::clone(&self.network_state),
383            p2p_control: context.control().to_owned().into(),
384            async_p2p_control: context.control().to_owned(),
385        };
386        self.handler.poll(Arc::new(nc)).await
387    }
388}
389
390struct DefaultCKBProtocolContext {
391    proto_id: ProtocolId,
392    network_state: Arc<NetworkState>,
393    p2p_control: ServiceControl,
394    async_p2p_control: ServiceAsyncControl,
395}
396
397#[async_trait]
398impl CKBProtocolContext for DefaultCKBProtocolContext {
399    fn ckb2023(&self) -> bool {
400        self.network_state
401            .ckb2023
402            .load(std::sync::atomic::Ordering::SeqCst)
403    }
404    async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error> {
405        self.async_p2p_control
406            .set_service_notify(self.proto_id, interval, token)
407            .await?;
408        Ok(())
409    }
410    async fn remove_notify(&self, token: u64) -> Result<(), Error> {
411        self.async_p2p_control
412            .remove_service_notify(self.proto_id, token)
413            .await?;
414        Ok(())
415    }
416    async fn async_quick_send_message(
417        &self,
418        proto_id: ProtocolId,
419        peer_index: PeerIndex,
420        data: Bytes,
421    ) -> Result<(), Error> {
422        trace!(
423            "[send message]: {}, to={}, length={}",
424            proto_id,
425            peer_index,
426            data.len()
427        );
428        self.async_p2p_control
429            .quick_send_message_to(peer_index, proto_id, data)
430            .await?;
431        Ok(())
432    }
433    async fn async_quick_send_message_to(
434        &self,
435        peer_index: PeerIndex,
436        data: Bytes,
437    ) -> Result<(), Error> {
438        trace!(
439            "[send message to]: {}, to={}, length={}",
440            self.proto_id,
441            peer_index,
442            data.len()
443        );
444        self.async_p2p_control
445            .quick_send_message_to(peer_index, self.proto_id, data)
446            .await?;
447        Ok(())
448    }
449    async fn async_quick_filter_broadcast(
450        &self,
451        target: TargetSession,
452        data: Bytes,
453    ) -> Result<(), Error> {
454        self.async_p2p_control
455            .quick_filter_broadcast(target, self.proto_id, data)
456            .await?;
457        Ok(())
458    }
459    async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
460        let task = if blocking {
461            Box::pin(BlockingFutureTask::new(task))
462        } else {
463            task
464        };
465        self.async_p2p_control.future_task(task).await?;
466        Ok(())
467    }
468    async fn async_send_message(
469        &self,
470        proto_id: ProtocolId,
471        peer_index: PeerIndex,
472        data: Bytes,
473    ) -> Result<(), Error> {
474        trace!(
475            "[send message]: {}, to={}, length={}",
476            proto_id,
477            peer_index,
478            data.len()
479        );
480        self.async_p2p_control
481            .send_message_to(peer_index, proto_id, data)
482            .await?;
483        Ok(())
484    }
485    async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
486        trace!(
487            "[send message to]: {}, to={}, length={}",
488            self.proto_id,
489            peer_index,
490            data.len()
491        );
492        self.async_p2p_control
493            .send_message_to(peer_index, self.proto_id, data)
494            .await?;
495        Ok(())
496    }
497    async fn async_filter_broadcast(
498        &self,
499        target: TargetSession,
500        data: Bytes,
501    ) -> Result<(), Error> {
502        self.async_p2p_control
503            .filter_broadcast(target, self.proto_id, data)
504            .await?;
505        Ok(())
506    }
507    async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
508        debug!("Disconnect peer: {}, message: {}", peer_index, message);
509        async_disconnect_with_message(&self.async_p2p_control, peer_index, message).await?;
510        Ok(())
511    }
512    fn quick_send_message(
513        &self,
514        proto_id: ProtocolId,
515        peer_index: PeerIndex,
516        data: Bytes,
517    ) -> Result<(), Error> {
518        trace!(
519            "[send message]: {}, to={}, length={}",
520            proto_id,
521            peer_index,
522            data.len()
523        );
524        self.p2p_control
525            .quick_send_message_to(peer_index, proto_id, data)?;
526        Ok(())
527    }
528    fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
529        trace!(
530            "[send message to]: {}, to={}, length={}",
531            self.proto_id,
532            peer_index,
533            data.len()
534        );
535        self.p2p_control
536            .quick_send_message_to(peer_index, self.proto_id, data)?;
537        Ok(())
538    }
539    fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
540        self.p2p_control
541            .quick_filter_broadcast(target, self.proto_id, data)?;
542        Ok(())
543    }
544    fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
545        let task = if blocking {
546            Box::pin(BlockingFutureTask::new(task))
547        } else {
548            task
549        };
550        self.p2p_control.future_task(task)?;
551        Ok(())
552    }
553    fn send_message(
554        &self,
555        proto_id: ProtocolId,
556        peer_index: PeerIndex,
557        data: Bytes,
558    ) -> Result<(), Error> {
559        trace!(
560            "[send message]: {}, to={}, length={}",
561            proto_id,
562            peer_index,
563            data.len()
564        );
565        self.p2p_control
566            .send_message_to(peer_index, proto_id, data)?;
567        Ok(())
568    }
569    fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
570        trace!(
571            "[send message to]: {}, to={}, length={}",
572            self.proto_id,
573            peer_index,
574            data.len()
575        );
576        self.p2p_control
577            .send_message_to(peer_index, self.proto_id, data)?;
578        Ok(())
579    }
580    fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
581        self.p2p_control
582            .filter_broadcast(target, self.proto_id, data)?;
583        Ok(())
584    }
585    fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
586        debug!("Disconnect peer: {}, message: {}", peer_index, message);
587        disconnect_with_message(&self.p2p_control, peer_index, message)?;
588        Ok(())
589    }
590
591    fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
592        self.network_state
593            .with_peer_registry(|reg| reg.get_peer(peer_index).cloned())
594    }
595    fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>) {
596        self.network_state.with_peer_registry_mut(|reg| {
597            reg.get_peer_mut(peer_index).map(f);
598        })
599    }
600
601    fn connected_peers(&self) -> Vec<PeerIndex> {
602        self.network_state.with_peer_registry(|reg| {
603            reg.peers()
604                .iter()
605                .filter_map(|(peer_index, peer)| {
606                    if peer.protocols.contains_key(&self.proto_id) {
607                        Some(peer_index)
608                    } else {
609                        None
610                    }
611                })
612                .cloned()
613                .collect()
614        })
615    }
616    fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour) {
617        self.network_state
618            .report_session(&self.p2p_control, peer_index, behaviour);
619    }
620    fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
621        self.network_state
622            .ban_session(&self.p2p_control, peer_index, duration, reason);
623    }
624
625    fn protocol_id(&self) -> ProtocolId {
626        self.proto_id
627    }
628
629    fn p2p_control(&self) -> Option<&ServiceControl> {
630        Some(&self.p2p_control)
631    }
632}
633
634pub(crate) struct BlockingFutureTask {
635    task: BoxedFutureTask,
636}
637
638impl BlockingFutureTask {
639    pub(crate) fn new(task: BoxedFutureTask) -> BlockingFutureTask {
640        BlockingFutureTask { task }
641    }
642}
643
644impl Future for BlockingFutureTask {
645    type Output = ();
646
647    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
648        p2p::runtime::block_in_place(|| self.task.poll_unpin(cx))
649    }
650}