1pub(crate) mod disconnect_message;
2pub(crate) mod discovery;
3pub(crate) mod feeler;
4pub(crate) mod identify;
5pub(crate) mod ping;
6pub(crate) mod support_protocols;
7
8#[cfg(not(target_family = "wasm"))]
9pub(crate) mod hole_punching;
10
11#[cfg(test)]
12mod tests;
13
14use ckb_logger::{debug, trace};
15use futures::{Future, FutureExt};
16use p2p::{
17 ProtocolId, SessionId, async_trait,
18 builder::MetaBuilder,
19 bytes::Bytes,
20 context::{ProtocolContext, ProtocolContextMutRef},
21 service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, ServiceControl, TargetSession},
22 traits::ServiceProtocol,
23};
24use std::{
25 pin::Pin,
26 sync::Arc,
27 task::{Context, Poll},
28 time::Duration,
29};
30use tokio_util::codec::length_delimited;
31
32pub type PeerIndex = SessionId;
34pub type BoxedFutureTask = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
36
37use crate::{
38 Behaviour, Error, NetworkState, Peer, ProtocolVersion, SupportProtocols,
39 compress::{compress, decompress},
40 network::{async_disconnect_with_message, disconnect_with_message},
41};
42
43#[async_trait]
45pub trait CKBProtocolContext: Send {
46 async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error>;
49 async fn remove_notify(&self, token: u64) -> Result<(), Error>;
51 async fn async_quick_send_message(
53 &self,
54 proto_id: ProtocolId,
55 peer_index: PeerIndex,
56 data: Bytes,
57 ) -> Result<(), Error>;
58 async fn async_quick_send_message_to(
60 &self,
61 peer_index: PeerIndex,
62 data: Bytes,
63 ) -> Result<(), Error>;
64 async fn async_quick_filter_broadcast(
66 &self,
67 target: TargetSession,
68 data: Bytes,
69 ) -> Result<(), Error>;
70 async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
72 async fn async_send_message(
74 &self,
75 proto_id: ProtocolId,
76 peer_index: PeerIndex,
77 data: Bytes,
78 ) -> Result<(), Error>;
79 async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
81 async fn async_filter_broadcast(&self, target: TargetSession, data: Bytes)
83 -> Result<(), Error>;
84 async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
86 fn quick_send_message(
88 &self,
89 proto_id: ProtocolId,
90 peer_index: PeerIndex,
91 data: Bytes,
92 ) -> Result<(), Error>;
93 fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
95 fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
97 fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
99 fn send_message(
101 &self,
102 proto_id: ProtocolId,
103 peer_index: PeerIndex,
104 data: Bytes,
105 ) -> Result<(), Error>;
106 fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
108 fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
110 fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
112 fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer>;
115 fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>);
117 fn connected_peers(&self) -> Vec<PeerIndex>;
119 fn full_relay_connected_peers(&self) -> Vec<PeerIndex>;
121 fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour);
123 fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String);
125 fn protocol_id(&self) -> ProtocolId;
127 fn p2p_control(&self) -> Option<&ServiceControl> {
129 None
130 }
131}
132
133pub type BoxedCKBProtocolContext = Arc<dyn CKBProtocolContext + Sync>;
135
136#[async_trait]
138pub trait CKBProtocolHandler: Sync + Send {
139 async fn init(&mut self, nc: BoxedCKBProtocolContext);
141 async fn connected(
143 &mut self,
144 _nc: BoxedCKBProtocolContext,
145 _peer_index: PeerIndex,
146 _version: &str,
147 ) {
148 }
149 async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {}
151 async fn received(
153 &mut self,
154 _nc: BoxedCKBProtocolContext,
155 _peer_index: PeerIndex,
156 _data: Bytes,
157 ) {
158 }
159 async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {}
161 async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> {
163 None
164 }
165}
166
167pub struct CKBProtocol {
169 id: ProtocolId,
170 protocol_name: String,
172 supported_versions: Vec<ProtocolVersion>,
174 max_frame_length: usize,
175 handler: Box<dyn CKBProtocolHandler>,
176 network_state: Arc<NetworkState>,
177}
178
179impl CKBProtocol {
180 pub fn new_with_support_protocol(
183 support_protocol: support_protocols::SupportProtocols,
184 handler: Box<dyn CKBProtocolHandler>,
185 network_state: Arc<NetworkState>,
186 ) -> Self {
187 CKBProtocol {
188 id: support_protocol.protocol_id(),
189 max_frame_length: support_protocol.max_frame_length(),
190 protocol_name: support_protocol.name(),
191 supported_versions: support_protocol.support_versions(),
192 network_state,
193 handler,
194 }
195 }
196
197 pub fn new(
199 protocol_name: String,
200 id: ProtocolId,
201 versions: &[ProtocolVersion],
202 max_frame_length: usize,
203 handler: Box<dyn CKBProtocolHandler>,
204 network_state: Arc<NetworkState>,
205 ) -> Self {
206 CKBProtocol {
207 id,
208 max_frame_length,
209 network_state,
210 handler,
211 protocol_name: format!("/ckb/{protocol_name}"),
212 supported_versions: {
213 let mut versions: Vec<_> = versions.to_vec();
214 versions.sort_by(|a, b| b.cmp(a));
215 versions.to_vec()
216 },
217 }
218 }
219
220 pub fn id(&self) -> ProtocolId {
222 self.id
223 }
224
225 pub fn protocol_name(&self) -> String {
227 self.protocol_name.clone()
228 }
229
230 pub fn match_version(&self, version: ProtocolVersion) -> bool {
232 self.supported_versions.contains(&version)
233 }
234
235 pub fn build(self) -> ProtocolMeta {
237 let protocol_name = self.protocol_name();
238 let max_frame_length = self.max_frame_length;
239 let supported_versions = self
240 .supported_versions
241 .iter()
242 .map(ToString::to_string)
243 .collect::<Vec<_>>();
244 MetaBuilder::default()
245 .id(self.id)
246 .name(move |_| protocol_name.clone())
247 .codec(move || {
248 Box::new(
249 length_delimited::Builder::new()
250 .max_frame_length(max_frame_length)
251 .new_codec(),
252 )
253 })
254 .support_versions(supported_versions)
255 .service_handle(move || {
256 ProtocolHandle::Callback(Box::new(CKBHandler {
257 proto_id: self.id,
258 network_state: Arc::clone(&self.network_state),
259 handler: self.handler,
260 }))
261 })
262 .before_send(compress)
263 .before_receive(|| Some(Box::new(decompress)))
264 .build()
265 }
266}
267
268struct CKBHandler {
269 proto_id: ProtocolId,
270 network_state: Arc<NetworkState>,
271 handler: Box<dyn CKBProtocolHandler>,
272}
273
274#[async_trait]
276impl ServiceProtocol for CKBHandler {
277 async fn init(&mut self, context: &mut ProtocolContext) {
278 let nc = DefaultCKBProtocolContext {
279 proto_id: self.proto_id,
280 network_state: Arc::clone(&self.network_state),
281 p2p_control: context.control().to_owned().into(),
282 async_p2p_control: context.control().to_owned(),
283 };
284 self.handler.init(Arc::new(nc)).await;
285 }
286
287 async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
288 self.network_state.with_peer_registry_mut(|reg| {
289 if let Some(peer) = reg.get_peer_mut(context.session.id) {
290 peer.protocols.insert(self.proto_id, version.to_owned());
291 }
292 });
293
294 if !self.network_state.is_active() {
295 return;
296 }
297
298 let nc = DefaultCKBProtocolContext {
299 proto_id: self.proto_id,
300 network_state: Arc::clone(&self.network_state),
301 p2p_control: context.control().to_owned().into(),
302 async_p2p_control: context.control().to_owned(),
303 };
304 let peer_index = context.session.id;
305
306 self.handler
307 .connected(Arc::new(nc), peer_index, version)
308 .await;
309 }
310
311 async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
312 self.network_state.with_peer_registry_mut(|reg| {
313 if let Some(peer) = reg.get_peer_mut(context.session.id) {
314 peer.protocols.remove(&self.proto_id);
315 }
316 });
317
318 if !self.network_state.is_active() {
319 return;
320 }
321
322 let nc = DefaultCKBProtocolContext {
323 proto_id: self.proto_id,
324 network_state: Arc::clone(&self.network_state),
325 p2p_control: context.control().to_owned().into(),
326 async_p2p_control: context.control().to_owned(),
327 };
328 let peer_index = context.session.id;
329 self.handler.disconnected(Arc::new(nc), peer_index).await;
330 }
331
332 async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
333 if !self.network_state.is_active() {
334 return;
335 }
336
337 trace!(
338 "[received message]: {}, {}, length={}",
339 self.proto_id,
340 context.session.id,
341 data.len()
342 );
343 let nc = DefaultCKBProtocolContext {
344 proto_id: self.proto_id,
345 network_state: Arc::clone(&self.network_state),
346 p2p_control: context.control().to_owned().into(),
347 async_p2p_control: context.control().to_owned(),
348 };
349 let peer_index = context.session.id;
350 self.handler.received(Arc::new(nc), peer_index, data).await;
351 }
352
353 async fn notify(&mut self, context: &mut ProtocolContext, token: u64) {
354 if !self.network_state.is_active() {
355 return;
356 }
357 let nc = DefaultCKBProtocolContext {
358 proto_id: self.proto_id,
359 network_state: Arc::clone(&self.network_state),
360 p2p_control: context.control().to_owned().into(),
361 async_p2p_control: context.control().to_owned(),
362 };
363 self.handler.notify(Arc::new(nc), token).await;
364 }
365
366 async fn poll(&mut self, context: &mut ProtocolContext) -> Option<()> {
367 let nc = DefaultCKBProtocolContext {
368 proto_id: self.proto_id,
369 network_state: Arc::clone(&self.network_state),
370 p2p_control: context.control().to_owned().into(),
371 async_p2p_control: context.control().to_owned(),
372 };
373 self.handler.poll(Arc::new(nc)).await
374 }
375}
376
377struct DefaultCKBProtocolContext {
378 proto_id: ProtocolId,
379 network_state: Arc<NetworkState>,
380 p2p_control: ServiceControl,
381 async_p2p_control: ServiceAsyncControl,
382}
383
384#[async_trait]
385impl CKBProtocolContext for DefaultCKBProtocolContext {
386 async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error> {
387 self.async_p2p_control
388 .set_service_notify(self.proto_id, interval, token)
389 .await?;
390 Ok(())
391 }
392 async fn remove_notify(&self, token: u64) -> Result<(), Error> {
393 self.async_p2p_control
394 .remove_service_notify(self.proto_id, token)
395 .await?;
396 Ok(())
397 }
398 async fn async_quick_send_message(
399 &self,
400 proto_id: ProtocolId,
401 peer_index: PeerIndex,
402 data: Bytes,
403 ) -> Result<(), Error> {
404 trace!(
405 "[send message]: {}, to={}, length={}",
406 proto_id,
407 peer_index,
408 data.len()
409 );
410 self.async_p2p_control
411 .quick_send_message_to(peer_index, proto_id, data)
412 .await?;
413 Ok(())
414 }
415 async fn async_quick_send_message_to(
416 &self,
417 peer_index: PeerIndex,
418 data: Bytes,
419 ) -> Result<(), Error> {
420 trace!(
421 "[send message to]: {}, to={}, length={}",
422 self.proto_id,
423 peer_index,
424 data.len()
425 );
426 self.async_p2p_control
427 .quick_send_message_to(peer_index, self.proto_id, data)
428 .await?;
429 Ok(())
430 }
431 async fn async_quick_filter_broadcast(
432 &self,
433 target: TargetSession,
434 data: Bytes,
435 ) -> Result<(), Error> {
436 self.async_p2p_control
437 .quick_filter_broadcast(target, self.proto_id, data)
438 .await?;
439 Ok(())
440 }
441 async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
442 let task = if blocking {
443 Box::pin(BlockingFutureTask::new(task))
444 } else {
445 task
446 };
447 self.async_p2p_control.future_task(task).await?;
448 Ok(())
449 }
450 async fn async_send_message(
451 &self,
452 proto_id: ProtocolId,
453 peer_index: PeerIndex,
454 data: Bytes,
455 ) -> Result<(), Error> {
456 trace!(
457 "[send message]: {}, to={}, length={}",
458 proto_id,
459 peer_index,
460 data.len()
461 );
462 self.async_p2p_control
463 .send_message_to(peer_index, proto_id, data)
464 .await?;
465 Ok(())
466 }
467 async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
468 trace!(
469 "[send message to]: {}, to={}, length={}",
470 self.proto_id,
471 peer_index,
472 data.len()
473 );
474 self.async_p2p_control
475 .send_message_to(peer_index, self.proto_id, data)
476 .await?;
477 Ok(())
478 }
479 async fn async_filter_broadcast(
480 &self,
481 target: TargetSession,
482 data: Bytes,
483 ) -> Result<(), Error> {
484 self.async_p2p_control
485 .filter_broadcast(target, self.proto_id, data)
486 .await?;
487 Ok(())
488 }
489 async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
490 debug!("Disconnect peer: {}, message: {}", peer_index, message);
491 async_disconnect_with_message(&self.async_p2p_control, peer_index, message).await?;
492 Ok(())
493 }
494 fn quick_send_message(
495 &self,
496 proto_id: ProtocolId,
497 peer_index: PeerIndex,
498 data: Bytes,
499 ) -> Result<(), Error> {
500 trace!(
501 "[send message]: {}, to={}, length={}",
502 proto_id,
503 peer_index,
504 data.len()
505 );
506 self.p2p_control
507 .quick_send_message_to(peer_index, proto_id, data)?;
508 Ok(())
509 }
510 fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
511 trace!(
512 "[send message to]: {}, to={}, length={}",
513 self.proto_id,
514 peer_index,
515 data.len()
516 );
517 self.p2p_control
518 .quick_send_message_to(peer_index, self.proto_id, data)?;
519 Ok(())
520 }
521 fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
522 self.p2p_control
523 .quick_filter_broadcast(target, self.proto_id, data)?;
524 Ok(())
525 }
526 fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
527 let task = if blocking {
528 Box::pin(BlockingFutureTask::new(task))
529 } else {
530 task
531 };
532 self.p2p_control.future_task(task)?;
533 Ok(())
534 }
535 fn send_message(
536 &self,
537 proto_id: ProtocolId,
538 peer_index: PeerIndex,
539 data: Bytes,
540 ) -> Result<(), Error> {
541 trace!(
542 "[send message]: {}, to={}, length={}",
543 proto_id,
544 peer_index,
545 data.len()
546 );
547 self.p2p_control
548 .send_message_to(peer_index, proto_id, data)?;
549 Ok(())
550 }
551 fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
552 trace!(
553 "[send message to]: {}, to={}, length={}",
554 self.proto_id,
555 peer_index,
556 data.len()
557 );
558 self.p2p_control
559 .send_message_to(peer_index, self.proto_id, data)?;
560 Ok(())
561 }
562 fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
563 self.p2p_control
564 .filter_broadcast(target, self.proto_id, data)?;
565 Ok(())
566 }
567 fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
568 debug!("Disconnect peer: {}, message: {}", peer_index, message);
569 disconnect_with_message(&self.p2p_control, peer_index, message)?;
570 Ok(())
571 }
572
573 fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
574 self.network_state
575 .with_peer_registry(|reg| reg.get_peer(peer_index).cloned())
576 }
577 fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>) {
578 self.network_state.with_peer_registry_mut(|reg| {
579 reg.get_peer_mut(peer_index).map(f);
580 })
581 }
582
583 fn connected_peers(&self) -> Vec<PeerIndex> {
584 self.network_state.with_peer_registry(|reg| {
585 reg.peers()
586 .iter()
587 .filter_map(|(peer_index, peer)| {
588 if peer.protocols.contains_key(&self.proto_id) {
589 Some(peer_index)
590 } else {
591 None
592 }
593 })
594 .cloned()
595 .collect()
596 })
597 }
598
599 fn full_relay_connected_peers(&self) -> Vec<PeerIndex> {
600 self.network_state.with_peer_registry(|reg| {
601 reg.peers()
602 .iter()
603 .filter_map(|(peer_index, peer)| {
604 if peer.protocols.contains_key(&self.proto_id) && !peer.is_block_relay_only() {
605 Some(peer_index)
606 } else {
607 None
608 }
609 })
610 .cloned()
611 .collect()
612 })
613 }
614
615 fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour) {
616 self.network_state
617 .report_session(&self.p2p_control, peer_index, behaviour);
618 }
619 fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
620 self.network_state
621 .ban_session(&self.p2p_control, peer_index, duration, reason);
622 }
623
624 fn protocol_id(&self) -> ProtocolId {
625 self.proto_id
626 }
627
628 fn p2p_control(&self) -> Option<&ServiceControl> {
629 Some(&self.p2p_control)
630 }
631}
632
633pub(crate) struct BlockingFutureTask {
634 task: BoxedFutureTask,
635}
636
637impl BlockingFutureTask {
638 pub(crate) fn new(task: BoxedFutureTask) -> BlockingFutureTask {
639 BlockingFutureTask { task }
640 }
641}
642
643impl Future for BlockingFutureTask {
644 type Output = ();
645
646 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
647 p2p::runtime::block_in_place(|| self.task.poll_unpin(cx))
648 }
649}