1pub(crate) mod disconnect_message;
2pub(crate) mod discovery;
3pub(crate) mod feeler;
4pub(crate) mod identify;
5pub(crate) mod ping;
6pub(crate) mod support_protocols;
7
8#[cfg(not(target_family = "wasm"))]
9pub(crate) mod hole_punching;
10
11#[cfg(test)]
12mod tests;
13
14use ckb_logger::{debug, trace};
15use futures::{Future, FutureExt};
16use p2p::{
17 ProtocolId, SessionId, async_trait,
18 builder::MetaBuilder,
19 bytes::Bytes,
20 context::{ProtocolContext, ProtocolContextMutRef},
21 service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, ServiceControl, TargetSession},
22 traits::ServiceProtocol,
23};
24use std::{
25 pin::Pin,
26 sync::Arc,
27 task::{Context, Poll},
28 time::Duration,
29};
30use tokio_util::codec::length_delimited;
31
32pub type PeerIndex = SessionId;
34pub type BoxedFutureTask = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
36
37use crate::{
38 Behaviour, Error, NetworkState, Peer, ProtocolVersion,
39 compress::LengthDelimitedCodecWithCompress,
40 network::{async_disconnect_with_message, disconnect_with_message},
41};
42
43#[cfg(not(target_family = "wasm"))]
44use crate::SupportProtocols;
45
46#[async_trait]
48pub trait CKBProtocolContext: Send {
49 async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error>;
52 async fn remove_notify(&self, token: u64) -> Result<(), Error>;
54 async fn async_quick_send_message(
56 &self,
57 proto_id: ProtocolId,
58 peer_index: PeerIndex,
59 data: Bytes,
60 ) -> Result<(), Error>;
61 async fn async_quick_send_message_to(
63 &self,
64 peer_index: PeerIndex,
65 data: Bytes,
66 ) -> Result<(), Error>;
67 async fn async_quick_filter_broadcast(
69 &self,
70 target: TargetSession,
71 data: Bytes,
72 ) -> Result<(), Error>;
73 async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
75 async fn async_send_message(
77 &self,
78 proto_id: ProtocolId,
79 peer_index: PeerIndex,
80 data: Bytes,
81 ) -> Result<(), Error>;
82 async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
84 async fn async_filter_broadcast(&self, target: TargetSession, data: Bytes)
86 -> Result<(), Error>;
87 async fn async_filter_broadcast_with_proto(
89 &self,
90 proto_id: ProtocolId,
91 target: TargetSession,
92 data: Bytes,
93 ) -> Result<(), Error>;
94 async fn async_quick_filter_broadcast_with_proto(
96 &self,
97 proto_id: ProtocolId,
98 target: TargetSession,
99 data: Bytes,
100 ) -> Result<(), Error>;
101 async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
103 fn quick_send_message(
105 &self,
106 proto_id: ProtocolId,
107 peer_index: PeerIndex,
108 data: Bytes,
109 ) -> Result<(), Error>;
110 fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
112 fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
114 fn quick_filter_broadcast_with_proto(
116 &self,
117 proto_id: ProtocolId,
118 target: TargetSession,
119 data: Bytes,
120 ) -> Result<(), Error>;
121 fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
123 fn send_message(
125 &self,
126 proto_id: ProtocolId,
127 peer_index: PeerIndex,
128 data: Bytes,
129 ) -> Result<(), Error>;
130 fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
132 fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
134 fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
136 fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer>;
139 fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>);
141 fn connected_peers(&self) -> Vec<PeerIndex>;
143 fn full_relay_connected_peers(&self) -> Vec<PeerIndex>;
145 fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour);
147 fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String);
149 fn protocol_id(&self) -> ProtocolId;
151 fn p2p_control(&self) -> Option<&ServiceControl> {
153 None
154 }
155}
156
157pub type BoxedCKBProtocolContext = Arc<dyn CKBProtocolContext + Sync>;
159
160#[async_trait]
162pub trait CKBProtocolHandler: Sync + Send {
163 async fn init(&mut self, nc: BoxedCKBProtocolContext);
165 async fn connected(
167 &mut self,
168 _nc: BoxedCKBProtocolContext,
169 _peer_index: PeerIndex,
170 _version: &str,
171 ) {
172 }
173 async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {}
175 async fn received(
177 &mut self,
178 _nc: BoxedCKBProtocolContext,
179 _peer_index: PeerIndex,
180 _data: Bytes,
181 ) {
182 }
183 async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {}
185 async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> {
187 None
188 }
189}
190
191pub struct CKBProtocol {
193 id: ProtocolId,
194 protocol_name: String,
196 supported_versions: Vec<ProtocolVersion>,
198 max_frame_length: usize,
199 handler: Box<dyn CKBProtocolHandler>,
200 network_state: Arc<NetworkState>,
201 compress: bool,
202}
203
204impl CKBProtocol {
205 pub fn new_with_support_protocol(
208 support_protocol: support_protocols::SupportProtocols,
209 handler: Box<dyn CKBProtocolHandler>,
210 network_state: Arc<NetworkState>,
211 ) -> Self {
212 CKBProtocol {
213 id: support_protocol.protocol_id(),
214 max_frame_length: support_protocol.max_frame_length(),
215 protocol_name: support_protocol.name(),
216 supported_versions: support_protocol.support_versions(),
217 network_state,
218 handler,
219 compress: true,
220 }
221 }
222
223 pub fn new(
225 protocol_name: String,
226 id: ProtocolId,
227 versions: &[ProtocolVersion],
228 max_frame_length: usize,
229 handler: Box<dyn CKBProtocolHandler>,
230 network_state: Arc<NetworkState>,
231 ) -> Self {
232 CKBProtocol {
233 id,
234 max_frame_length,
235 network_state,
236 handler,
237 protocol_name: format!("/ckb/{protocol_name}"),
238 supported_versions: {
239 let mut versions: Vec<_> = versions.to_vec();
240 versions.sort_by(|a, b| b.cmp(a));
241 versions.to_vec()
242 },
243 compress: true,
244 }
245 }
246
247 pub fn compress(mut self, enable: bool) -> Self {
249 self.compress = enable;
250 self
251 }
252
253 pub fn id(&self) -> ProtocolId {
255 self.id
256 }
257
258 pub fn protocol_name(&self) -> String {
260 self.protocol_name.clone()
261 }
262
263 pub fn match_version(&self, version: ProtocolVersion) -> bool {
265 self.supported_versions.contains(&version)
266 }
267
268 pub fn build(self) -> ProtocolMeta {
270 let protocol_name = self.protocol_name();
271 let max_frame_length = self.max_frame_length;
272 let supported_versions = self
273 .supported_versions
274 .iter()
275 .map(ToString::to_string)
276 .collect::<Vec<_>>();
277 MetaBuilder::default()
278 .id(self.id)
279 .name(move |_| protocol_name.clone())
280 .codec(move || {
281 Box::new(LengthDelimitedCodecWithCompress::new(
282 self.compress,
283 length_delimited::Builder::new()
284 .max_frame_length(max_frame_length)
285 .new_codec(),
286 self.id,
287 ))
288 })
289 .support_versions(supported_versions)
290 .service_handle(move || {
291 ProtocolHandle::Callback(Box::new(CKBHandler {
292 proto_id: self.id,
293 network_state: Arc::clone(&self.network_state),
294 handler: self.handler,
295 }))
296 })
297 .build()
298 }
299}
300
301struct CKBHandler {
302 proto_id: ProtocolId,
303 network_state: Arc<NetworkState>,
304 handler: Box<dyn CKBProtocolHandler>,
305}
306
307#[async_trait]
309impl ServiceProtocol for CKBHandler {
310 async fn init(&mut self, context: &mut ProtocolContext) {
311 let nc = DefaultCKBProtocolContext {
312 proto_id: self.proto_id,
313 network_state: Arc::clone(&self.network_state),
314 p2p_control: context.control().to_owned().into(),
315 async_p2p_control: context.control().to_owned(),
316 };
317 self.handler.init(Arc::new(nc)).await;
318 }
319
320 async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
321 self.network_state.with_peer_registry_mut(|reg| {
322 if let Some(peer) = reg.get_peer_mut(context.session.id) {
323 peer.protocols.insert(self.proto_id, version.to_owned());
324 }
325 });
326
327 if !self.network_state.is_active() {
328 return;
329 }
330
331 let nc = DefaultCKBProtocolContext {
332 proto_id: self.proto_id,
333 network_state: Arc::clone(&self.network_state),
334 p2p_control: context.control().to_owned().into(),
335 async_p2p_control: context.control().to_owned(),
336 };
337 let peer_index = context.session.id;
338
339 self.handler
340 .connected(Arc::new(nc), peer_index, version)
341 .await;
342 }
343
344 async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
345 self.network_state.with_peer_registry_mut(|reg| {
346 if let Some(peer) = reg.get_peer_mut(context.session.id) {
347 peer.protocols.remove(&self.proto_id);
348 }
349 });
350
351 if !self.network_state.is_active() {
352 return;
353 }
354
355 let nc = DefaultCKBProtocolContext {
356 proto_id: self.proto_id,
357 network_state: Arc::clone(&self.network_state),
358 p2p_control: context.control().to_owned().into(),
359 async_p2p_control: context.control().to_owned(),
360 };
361 let peer_index = context.session.id;
362 self.handler.disconnected(Arc::new(nc), peer_index).await;
363 }
364
365 async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
366 if !self.network_state.is_active() {
367 return;
368 }
369
370 trace!(
371 "[received message]: {}, {}, length={}",
372 self.proto_id,
373 context.session.id,
374 data.len()
375 );
376 let nc = DefaultCKBProtocolContext {
377 proto_id: self.proto_id,
378 network_state: Arc::clone(&self.network_state),
379 p2p_control: context.control().to_owned().into(),
380 async_p2p_control: context.control().to_owned(),
381 };
382 let peer_index = context.session.id;
383 self.handler.received(Arc::new(nc), peer_index, data).await;
384 }
385
386 async fn notify(&mut self, context: &mut ProtocolContext, token: u64) {
387 if !self.network_state.is_active() {
388 return;
389 }
390 let nc = DefaultCKBProtocolContext {
391 proto_id: self.proto_id,
392 network_state: Arc::clone(&self.network_state),
393 p2p_control: context.control().to_owned().into(),
394 async_p2p_control: context.control().to_owned(),
395 };
396 self.handler.notify(Arc::new(nc), token).await;
397 }
398
399 async fn poll(&mut self, context: &mut ProtocolContext) -> Option<()> {
400 let nc = DefaultCKBProtocolContext {
401 proto_id: self.proto_id,
402 network_state: Arc::clone(&self.network_state),
403 p2p_control: context.control().to_owned().into(),
404 async_p2p_control: context.control().to_owned(),
405 };
406 self.handler.poll(Arc::new(nc)).await
407 }
408}
409
410struct DefaultCKBProtocolContext {
411 proto_id: ProtocolId,
412 network_state: Arc<NetworkState>,
413 p2p_control: ServiceControl,
414 async_p2p_control: ServiceAsyncControl,
415}
416
417#[async_trait]
418impl CKBProtocolContext for DefaultCKBProtocolContext {
419 async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error> {
420 self.async_p2p_control
421 .set_service_notify(self.proto_id, interval, token)
422 .await?;
423 Ok(())
424 }
425 async fn remove_notify(&self, token: u64) -> Result<(), Error> {
426 self.async_p2p_control
427 .remove_service_notify(self.proto_id, token)
428 .await?;
429 Ok(())
430 }
431 async fn async_quick_send_message(
432 &self,
433 proto_id: ProtocolId,
434 peer_index: PeerIndex,
435 data: Bytes,
436 ) -> Result<(), Error> {
437 trace!(
438 "[send message]: {}, to={}, length={}",
439 proto_id,
440 peer_index,
441 data.len()
442 );
443 self.async_p2p_control
444 .quick_send_message_to(peer_index, proto_id, data)
445 .await?;
446 Ok(())
447 }
448 async fn async_quick_send_message_to(
449 &self,
450 peer_index: PeerIndex,
451 data: Bytes,
452 ) -> Result<(), Error> {
453 trace!(
454 "[send message to]: {}, to={}, length={}",
455 self.proto_id,
456 peer_index,
457 data.len()
458 );
459 self.async_p2p_control
460 .quick_send_message_to(peer_index, self.proto_id, data)
461 .await?;
462 Ok(())
463 }
464 async fn async_quick_filter_broadcast(
465 &self,
466 target: TargetSession,
467 data: Bytes,
468 ) -> Result<(), Error> {
469 self.async_p2p_control
470 .quick_filter_broadcast(target, self.proto_id, data)
471 .await?;
472 Ok(())
473 }
474 async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
475 let task = if blocking {
476 Box::pin(BlockingFutureTask::new(task))
477 } else {
478 task
479 };
480 self.async_p2p_control.future_task(task).await?;
481 Ok(())
482 }
483 async fn async_send_message(
484 &self,
485 proto_id: ProtocolId,
486 peer_index: PeerIndex,
487 data: Bytes,
488 ) -> Result<(), Error> {
489 trace!(
490 "[send message]: {}, to={}, length={}",
491 proto_id,
492 peer_index,
493 data.len()
494 );
495 self.async_p2p_control
496 .send_message_to(peer_index, proto_id, data)
497 .await?;
498 Ok(())
499 }
500 async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
501 trace!(
502 "[send message to]: {}, to={}, length={}",
503 self.proto_id,
504 peer_index,
505 data.len()
506 );
507 self.async_p2p_control
508 .send_message_to(peer_index, self.proto_id, data)
509 .await?;
510 Ok(())
511 }
512 async fn async_filter_broadcast(
513 &self,
514 target: TargetSession,
515 data: Bytes,
516 ) -> Result<(), Error> {
517 self.async_p2p_control
518 .filter_broadcast(target, self.proto_id, data)
519 .await?;
520 Ok(())
521 }
522 async fn async_filter_broadcast_with_proto(
523 &self,
524 proto_id: ProtocolId,
525 target: TargetSession,
526 data: Bytes,
527 ) -> Result<(), Error> {
528 self.async_p2p_control
529 .filter_broadcast(target, proto_id, data)
530 .await?;
531 Ok(())
532 }
533 async fn async_quick_filter_broadcast_with_proto(
534 &self,
535 proto_id: ProtocolId,
536 target: TargetSession,
537 data: Bytes,
538 ) -> Result<(), Error> {
539 self.async_p2p_control
540 .quick_filter_broadcast(target, proto_id, data)
541 .await?;
542 Ok(())
543 }
544 async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
545 debug!("Disconnect peer: {}, message: {}", peer_index, message);
546 async_disconnect_with_message(&self.async_p2p_control, peer_index, message).await?;
547 Ok(())
548 }
549 fn quick_send_message(
550 &self,
551 proto_id: ProtocolId,
552 peer_index: PeerIndex,
553 data: Bytes,
554 ) -> Result<(), Error> {
555 trace!(
556 "[send message]: {}, to={}, length={}",
557 proto_id,
558 peer_index,
559 data.len()
560 );
561 self.p2p_control
562 .quick_send_message_to(peer_index, proto_id, data)?;
563 Ok(())
564 }
565 fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
566 trace!(
567 "[send message to]: {}, to={}, length={}",
568 self.proto_id,
569 peer_index,
570 data.len()
571 );
572 self.p2p_control
573 .quick_send_message_to(peer_index, self.proto_id, data)?;
574 Ok(())
575 }
576 fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
577 self.p2p_control
578 .quick_filter_broadcast(target, self.proto_id, data)?;
579 Ok(())
580 }
581 fn quick_filter_broadcast_with_proto(
582 &self,
583 proto_id: ProtocolId,
584 target: TargetSession,
585 data: Bytes,
586 ) -> Result<(), Error> {
587 self.p2p_control
588 .quick_filter_broadcast(target, proto_id, data)?;
589 Ok(())
590 }
591 fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
592 let task = if blocking {
593 Box::pin(BlockingFutureTask::new(task))
594 } else {
595 task
596 };
597 self.p2p_control.future_task(task)?;
598 Ok(())
599 }
600 fn send_message(
601 &self,
602 proto_id: ProtocolId,
603 peer_index: PeerIndex,
604 data: Bytes,
605 ) -> Result<(), Error> {
606 trace!(
607 "[send message]: {}, to={}, length={}",
608 proto_id,
609 peer_index,
610 data.len()
611 );
612 self.p2p_control
613 .send_message_to(peer_index, proto_id, data)?;
614 Ok(())
615 }
616 fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
617 trace!(
618 "[send message to]: {}, to={}, length={}",
619 self.proto_id,
620 peer_index,
621 data.len()
622 );
623 self.p2p_control
624 .send_message_to(peer_index, self.proto_id, data)?;
625 Ok(())
626 }
627 fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
628 self.p2p_control
629 .filter_broadcast(target, self.proto_id, data)?;
630 Ok(())
631 }
632 fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
633 debug!("Disconnect peer: {}, message: {}", peer_index, message);
634 disconnect_with_message(&self.p2p_control, peer_index, message)?;
635 Ok(())
636 }
637
638 fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
639 self.network_state
640 .with_peer_registry(|reg| reg.get_peer(peer_index).cloned())
641 }
642 fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>) {
643 self.network_state.with_peer_registry_mut(|reg| {
644 reg.get_peer_mut(peer_index).map(f);
645 })
646 }
647
648 fn connected_peers(&self) -> Vec<PeerIndex> {
649 self.network_state.with_peer_registry(|reg| {
650 reg.peers()
651 .iter()
652 .filter_map(|(peer_index, peer)| {
653 if peer.protocols.contains_key(&self.proto_id) {
654 Some(peer_index)
655 } else {
656 None
657 }
658 })
659 .cloned()
660 .collect()
661 })
662 }
663
664 fn full_relay_connected_peers(&self) -> Vec<PeerIndex> {
665 self.network_state.with_peer_registry(|reg| {
666 reg.peers()
667 .iter()
668 .filter_map(|(peer_index, peer)| {
669 if peer.protocols.contains_key(&self.proto_id) && !peer.is_block_relay_only() {
670 Some(peer_index)
671 } else {
672 None
673 }
674 })
675 .cloned()
676 .collect()
677 })
678 }
679
680 fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour) {
681 self.network_state
682 .report_session(&self.p2p_control, peer_index, behaviour);
683 }
684 fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
685 self.network_state
686 .ban_session(&self.p2p_control, peer_index, duration, reason);
687 }
688
689 fn protocol_id(&self) -> ProtocolId {
690 self.proto_id
691 }
692
693 fn p2p_control(&self) -> Option<&ServiceControl> {
694 Some(&self.p2p_control)
695 }
696}
697
698pub(crate) struct BlockingFutureTask {
699 task: BoxedFutureTask,
700}
701
702impl BlockingFutureTask {
703 pub(crate) fn new(task: BoxedFutureTask) -> BlockingFutureTask {
704 BlockingFutureTask { task }
705 }
706}
707
708impl Future for BlockingFutureTask {
709 type Output = ();
710
711 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
712 p2p::runtime::block_in_place(|| self.task.poll_unpin(cx))
713 }
714}