1pub(crate) mod disconnect_message;
2pub(crate) mod discovery;
3pub(crate) mod feeler;
4pub(crate) mod identify;
5pub(crate) mod ping;
6pub(crate) mod support_protocols;
7
8#[cfg(test)]
9mod tests;
10
11use ckb_logger::{debug, trace};
12use futures::{Future, FutureExt};
13use p2p::{
14 ProtocolId, SessionId, async_trait,
15 builder::MetaBuilder,
16 bytes::Bytes,
17 context::{ProtocolContext, ProtocolContextMutRef},
18 service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, ServiceControl, TargetSession},
19 traits::ServiceProtocol,
20};
21use std::{
22 pin::Pin,
23 sync::Arc,
24 task::{Context, Poll},
25 time::Duration,
26};
27use tokio_util::codec::length_delimited;
28
29pub type PeerIndex = SessionId;
31pub type BoxedFutureTask = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
33
34use crate::{
35 Behaviour, Error, NetworkState, Peer, ProtocolVersion, SupportProtocols,
36 compress::{compress, decompress},
37 network::{async_disconnect_with_message, disconnect_with_message},
38};
39
40#[async_trait]
42pub trait CKBProtocolContext: Send {
43 fn ckb2023(&self) -> bool;
45 async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error>;
48 async fn remove_notify(&self, token: u64) -> Result<(), Error>;
50 async fn async_quick_send_message(
52 &self,
53 proto_id: ProtocolId,
54 peer_index: PeerIndex,
55 data: Bytes,
56 ) -> Result<(), Error>;
57 async fn async_quick_send_message_to(
59 &self,
60 peer_index: PeerIndex,
61 data: Bytes,
62 ) -> Result<(), Error>;
63 async fn async_quick_filter_broadcast(
65 &self,
66 target: TargetSession,
67 data: Bytes,
68 ) -> Result<(), Error>;
69 async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
71 async fn async_send_message(
73 &self,
74 proto_id: ProtocolId,
75 peer_index: PeerIndex,
76 data: Bytes,
77 ) -> Result<(), Error>;
78 async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
80 async fn async_filter_broadcast(&self, target: TargetSession, data: Bytes)
82 -> Result<(), Error>;
83 async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
85 fn quick_send_message(
87 &self,
88 proto_id: ProtocolId,
89 peer_index: PeerIndex,
90 data: Bytes,
91 ) -> Result<(), Error>;
92 fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
94 fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
96 fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
98 fn send_message(
100 &self,
101 proto_id: ProtocolId,
102 peer_index: PeerIndex,
103 data: Bytes,
104 ) -> Result<(), Error>;
105 fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
107 fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
109 fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
111 fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer>;
114 fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>);
116 fn connected_peers(&self) -> Vec<PeerIndex>;
118 fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour);
120 fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String);
122 fn protocol_id(&self) -> ProtocolId;
124 fn p2p_control(&self) -> Option<&ServiceControl> {
126 None
127 }
128}
129
130pub type BoxedCKBProtocolContext = Arc<dyn CKBProtocolContext + Sync>;
132
133#[async_trait]
135pub trait CKBProtocolHandler: Sync + Send {
136 async fn init(&mut self, nc: BoxedCKBProtocolContext);
138 async fn connected(
140 &mut self,
141 _nc: BoxedCKBProtocolContext,
142 _peer_index: PeerIndex,
143 _version: &str,
144 ) {
145 }
146 async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {}
148 async fn received(
150 &mut self,
151 _nc: BoxedCKBProtocolContext,
152 _peer_index: PeerIndex,
153 _data: Bytes,
154 ) {
155 }
156 async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {}
158 async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> {
160 None
161 }
162}
163
164pub struct CKBProtocol {
166 id: ProtocolId,
167 protocol_name: String,
169 supported_versions: Vec<ProtocolVersion>,
171 max_frame_length: usize,
172 handler: Box<dyn CKBProtocolHandler>,
173 network_state: Arc<NetworkState>,
174}
175
176impl CKBProtocol {
177 pub fn new_with_support_protocol(
180 support_protocol: support_protocols::SupportProtocols,
181 handler: Box<dyn CKBProtocolHandler>,
182 network_state: Arc<NetworkState>,
183 ) -> Self {
184 CKBProtocol {
185 id: support_protocol.protocol_id(),
186 max_frame_length: support_protocol.max_frame_length(),
187 protocol_name: support_protocol.name(),
188 supported_versions: support_protocol.support_versions(),
189 network_state,
190 handler,
191 }
192 }
193
194 pub fn new(
196 protocol_name: String,
197 id: ProtocolId,
198 versions: &[ProtocolVersion],
199 max_frame_length: usize,
200 handler: Box<dyn CKBProtocolHandler>,
201 network_state: Arc<NetworkState>,
202 ) -> Self {
203 CKBProtocol {
204 id,
205 max_frame_length,
206 network_state,
207 handler,
208 protocol_name: format!("/ckb/{protocol_name}"),
209 supported_versions: {
210 let mut versions: Vec<_> = versions.to_vec();
211 versions.sort_by(|a, b| b.cmp(a));
212 versions.to_vec()
213 },
214 }
215 }
216
217 pub fn id(&self) -> ProtocolId {
219 self.id
220 }
221
222 pub fn protocol_name(&self) -> String {
224 self.protocol_name.clone()
225 }
226
227 pub fn match_version(&self, version: ProtocolVersion) -> bool {
229 self.supported_versions.contains(&version)
230 }
231
232 pub fn build(self) -> ProtocolMeta {
234 let protocol_name = self.protocol_name();
235 let max_frame_length = self.max_frame_length;
236 let supported_versions = self
237 .supported_versions
238 .iter()
239 .map(ToString::to_string)
240 .collect::<Vec<_>>();
241 MetaBuilder::default()
242 .id(self.id)
243 .name(move |_| protocol_name.clone())
244 .codec(move || {
245 Box::new(
246 length_delimited::Builder::new()
247 .max_frame_length(max_frame_length)
248 .new_codec(),
249 )
250 })
251 .support_versions(supported_versions)
252 .service_handle(move || {
253 ProtocolHandle::Callback(Box::new(CKBHandler {
254 proto_id: self.id,
255 network_state: Arc::clone(&self.network_state),
256 handler: self.handler,
257 }))
258 })
259 .before_send(compress)
260 .before_receive(|| Some(Box::new(decompress)))
261 .build()
262 }
263}
264
265struct CKBHandler {
266 proto_id: ProtocolId,
267 network_state: Arc<NetworkState>,
268 handler: Box<dyn CKBProtocolHandler>,
269}
270
271#[async_trait]
273impl ServiceProtocol for CKBHandler {
274 async fn init(&mut self, context: &mut ProtocolContext) {
275 let nc = DefaultCKBProtocolContext {
276 proto_id: self.proto_id,
277 network_state: Arc::clone(&self.network_state),
278 p2p_control: context.control().to_owned().into(),
279 async_p2p_control: context.control().to_owned(),
280 };
281 self.handler.init(Arc::new(nc)).await;
282 }
283
284 async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
285 if self
287 .network_state
288 .ckb2023
289 .load(std::sync::atomic::Ordering::SeqCst)
290 && version != crate::protocols::support_protocols::LASTEST_VERSION
291 && context.proto_id != SupportProtocols::RelayV2.protocol_id()
292 {
293 debug!(
294 "The version of session {}, protocol {} is {}, not 3. It will be disconnected.",
295 context.session.id, context.proto_id, version
296 );
297 let id = context.session.id;
298 let _ignore = context.disconnect(id).await;
299 return;
300 }
301 self.network_state.with_peer_registry_mut(|reg| {
302 if let Some(peer) = reg.get_peer_mut(context.session.id) {
303 peer.protocols.insert(self.proto_id, version.to_owned());
304 }
305 });
306
307 if !self.network_state.is_active() {
308 return;
309 }
310
311 let nc = DefaultCKBProtocolContext {
312 proto_id: self.proto_id,
313 network_state: Arc::clone(&self.network_state),
314 p2p_control: context.control().to_owned().into(),
315 async_p2p_control: context.control().to_owned(),
316 };
317 let peer_index = context.session.id;
318
319 self.handler
320 .connected(Arc::new(nc), peer_index, version)
321 .await;
322 }
323
324 async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
325 self.network_state.with_peer_registry_mut(|reg| {
326 if let Some(peer) = reg.get_peer_mut(context.session.id) {
327 peer.protocols.remove(&self.proto_id);
328 }
329 });
330
331 if !self.network_state.is_active() {
332 return;
333 }
334
335 let nc = DefaultCKBProtocolContext {
336 proto_id: self.proto_id,
337 network_state: Arc::clone(&self.network_state),
338 p2p_control: context.control().to_owned().into(),
339 async_p2p_control: context.control().to_owned(),
340 };
341 let peer_index = context.session.id;
342 self.handler.disconnected(Arc::new(nc), peer_index).await;
343 }
344
345 async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
346 if !self.network_state.is_active() {
347 return;
348 }
349
350 trace!(
351 "[received message]: {}, {}, length={}",
352 self.proto_id,
353 context.session.id,
354 data.len()
355 );
356 let nc = DefaultCKBProtocolContext {
357 proto_id: self.proto_id,
358 network_state: Arc::clone(&self.network_state),
359 p2p_control: context.control().to_owned().into(),
360 async_p2p_control: context.control().to_owned(),
361 };
362 let peer_index = context.session.id;
363 self.handler.received(Arc::new(nc), peer_index, data).await;
364 }
365
366 async fn notify(&mut self, context: &mut ProtocolContext, token: u64) {
367 if !self.network_state.is_active() {
368 return;
369 }
370 let nc = DefaultCKBProtocolContext {
371 proto_id: self.proto_id,
372 network_state: Arc::clone(&self.network_state),
373 p2p_control: context.control().to_owned().into(),
374 async_p2p_control: context.control().to_owned(),
375 };
376 self.handler.notify(Arc::new(nc), token).await;
377 }
378
379 async fn poll(&mut self, context: &mut ProtocolContext) -> Option<()> {
380 let nc = DefaultCKBProtocolContext {
381 proto_id: self.proto_id,
382 network_state: Arc::clone(&self.network_state),
383 p2p_control: context.control().to_owned().into(),
384 async_p2p_control: context.control().to_owned(),
385 };
386 self.handler.poll(Arc::new(nc)).await
387 }
388}
389
390struct DefaultCKBProtocolContext {
391 proto_id: ProtocolId,
392 network_state: Arc<NetworkState>,
393 p2p_control: ServiceControl,
394 async_p2p_control: ServiceAsyncControl,
395}
396
397#[async_trait]
398impl CKBProtocolContext for DefaultCKBProtocolContext {
399 fn ckb2023(&self) -> bool {
400 self.network_state
401 .ckb2023
402 .load(std::sync::atomic::Ordering::SeqCst)
403 }
404 async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error> {
405 self.async_p2p_control
406 .set_service_notify(self.proto_id, interval, token)
407 .await?;
408 Ok(())
409 }
410 async fn remove_notify(&self, token: u64) -> Result<(), Error> {
411 self.async_p2p_control
412 .remove_service_notify(self.proto_id, token)
413 .await?;
414 Ok(())
415 }
416 async fn async_quick_send_message(
417 &self,
418 proto_id: ProtocolId,
419 peer_index: PeerIndex,
420 data: Bytes,
421 ) -> Result<(), Error> {
422 trace!(
423 "[send message]: {}, to={}, length={}",
424 proto_id,
425 peer_index,
426 data.len()
427 );
428 self.async_p2p_control
429 .quick_send_message_to(peer_index, proto_id, data)
430 .await?;
431 Ok(())
432 }
433 async fn async_quick_send_message_to(
434 &self,
435 peer_index: PeerIndex,
436 data: Bytes,
437 ) -> Result<(), Error> {
438 trace!(
439 "[send message to]: {}, to={}, length={}",
440 self.proto_id,
441 peer_index,
442 data.len()
443 );
444 self.async_p2p_control
445 .quick_send_message_to(peer_index, self.proto_id, data)
446 .await?;
447 Ok(())
448 }
449 async fn async_quick_filter_broadcast(
450 &self,
451 target: TargetSession,
452 data: Bytes,
453 ) -> Result<(), Error> {
454 self.async_p2p_control
455 .quick_filter_broadcast(target, self.proto_id, data)
456 .await?;
457 Ok(())
458 }
459 async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
460 let task = if blocking {
461 Box::pin(BlockingFutureTask::new(task))
462 } else {
463 task
464 };
465 self.async_p2p_control.future_task(task).await?;
466 Ok(())
467 }
468 async fn async_send_message(
469 &self,
470 proto_id: ProtocolId,
471 peer_index: PeerIndex,
472 data: Bytes,
473 ) -> Result<(), Error> {
474 trace!(
475 "[send message]: {}, to={}, length={}",
476 proto_id,
477 peer_index,
478 data.len()
479 );
480 self.async_p2p_control
481 .send_message_to(peer_index, proto_id, data)
482 .await?;
483 Ok(())
484 }
485 async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
486 trace!(
487 "[send message to]: {}, to={}, length={}",
488 self.proto_id,
489 peer_index,
490 data.len()
491 );
492 self.async_p2p_control
493 .send_message_to(peer_index, self.proto_id, data)
494 .await?;
495 Ok(())
496 }
497 async fn async_filter_broadcast(
498 &self,
499 target: TargetSession,
500 data: Bytes,
501 ) -> Result<(), Error> {
502 self.async_p2p_control
503 .filter_broadcast(target, self.proto_id, data)
504 .await?;
505 Ok(())
506 }
507 async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
508 debug!("Disconnect peer: {}, message: {}", peer_index, message);
509 async_disconnect_with_message(&self.async_p2p_control, peer_index, message).await?;
510 Ok(())
511 }
512 fn quick_send_message(
513 &self,
514 proto_id: ProtocolId,
515 peer_index: PeerIndex,
516 data: Bytes,
517 ) -> Result<(), Error> {
518 trace!(
519 "[send message]: {}, to={}, length={}",
520 proto_id,
521 peer_index,
522 data.len()
523 );
524 self.p2p_control
525 .quick_send_message_to(peer_index, proto_id, data)?;
526 Ok(())
527 }
528 fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
529 trace!(
530 "[send message to]: {}, to={}, length={}",
531 self.proto_id,
532 peer_index,
533 data.len()
534 );
535 self.p2p_control
536 .quick_send_message_to(peer_index, self.proto_id, data)?;
537 Ok(())
538 }
539 fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
540 self.p2p_control
541 .quick_filter_broadcast(target, self.proto_id, data)?;
542 Ok(())
543 }
544 fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
545 let task = if blocking {
546 Box::pin(BlockingFutureTask::new(task))
547 } else {
548 task
549 };
550 self.p2p_control.future_task(task)?;
551 Ok(())
552 }
553 fn send_message(
554 &self,
555 proto_id: ProtocolId,
556 peer_index: PeerIndex,
557 data: Bytes,
558 ) -> Result<(), Error> {
559 trace!(
560 "[send message]: {}, to={}, length={}",
561 proto_id,
562 peer_index,
563 data.len()
564 );
565 self.p2p_control
566 .send_message_to(peer_index, proto_id, data)?;
567 Ok(())
568 }
569 fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
570 trace!(
571 "[send message to]: {}, to={}, length={}",
572 self.proto_id,
573 peer_index,
574 data.len()
575 );
576 self.p2p_control
577 .send_message_to(peer_index, self.proto_id, data)?;
578 Ok(())
579 }
580 fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
581 self.p2p_control
582 .filter_broadcast(target, self.proto_id, data)?;
583 Ok(())
584 }
585 fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
586 debug!("Disconnect peer: {}, message: {}", peer_index, message);
587 disconnect_with_message(&self.p2p_control, peer_index, message)?;
588 Ok(())
589 }
590
591 fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
592 self.network_state
593 .with_peer_registry(|reg| reg.get_peer(peer_index).cloned())
594 }
595 fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>) {
596 self.network_state.with_peer_registry_mut(|reg| {
597 reg.get_peer_mut(peer_index).map(f);
598 })
599 }
600
601 fn connected_peers(&self) -> Vec<PeerIndex> {
602 self.network_state.with_peer_registry(|reg| {
603 reg.peers()
604 .iter()
605 .filter_map(|(peer_index, peer)| {
606 if peer.protocols.contains_key(&self.proto_id) {
607 Some(peer_index)
608 } else {
609 None
610 }
611 })
612 .cloned()
613 .collect()
614 })
615 }
616 fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour) {
617 self.network_state
618 .report_session(&self.p2p_control, peer_index, behaviour);
619 }
620 fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
621 self.network_state
622 .ban_session(&self.p2p_control, peer_index, duration, reason);
623 }
624
625 fn protocol_id(&self) -> ProtocolId {
626 self.proto_id
627 }
628
629 fn p2p_control(&self) -> Option<&ServiceControl> {
630 Some(&self.p2p_control)
631 }
632}
633
634pub(crate) struct BlockingFutureTask {
635 task: BoxedFutureTask,
636}
637
638impl BlockingFutureTask {
639 pub(crate) fn new(task: BoxedFutureTask) -> BlockingFutureTask {
640 BlockingFutureTask { task }
641 }
642}
643
644impl Future for BlockingFutureTask {
645 type Output = ();
646
647 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
648 p2p::runtime::block_in_place(|| self.task.poll_unpin(cx))
649 }
650}