1pub(crate) mod disconnect_message;
2pub(crate) mod discovery;
3pub(crate) mod feeler;
4pub(crate) mod identify;
5pub(crate) mod ping;
6pub(crate) mod support_protocols;
7
8#[cfg(test)]
9mod tests;
10
11use ckb_logger::{debug, trace};
12use futures::{Future, FutureExt};
13use p2p::{
14 async_trait,
15 builder::MetaBuilder,
16 bytes::Bytes,
17 context::{ProtocolContext, ProtocolContextMutRef},
18 service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, ServiceControl, TargetSession},
19 traits::ServiceProtocol,
20 ProtocolId, SessionId,
21};
22use std::{
23 pin::Pin,
24 sync::Arc,
25 task::{Context, Poll},
26 time::Duration,
27};
28use tokio_util::codec::length_delimited;
29
30pub type PeerIndex = SessionId;
32pub type BoxedFutureTask = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
34
35use crate::{
36 compress::{compress, decompress},
37 network::{async_disconnect_with_message, disconnect_with_message},
38 Behaviour, Error, NetworkState, Peer, ProtocolVersion, SupportProtocols,
39};
40
41#[async_trait]
43pub trait CKBProtocolContext: Send {
44 fn ckb2023(&self) -> bool;
46 async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error>;
49 async fn remove_notify(&self, token: u64) -> Result<(), Error>;
51 async fn async_quick_send_message(
53 &self,
54 proto_id: ProtocolId,
55 peer_index: PeerIndex,
56 data: Bytes,
57 ) -> Result<(), Error>;
58 async fn async_quick_send_message_to(
60 &self,
61 peer_index: PeerIndex,
62 data: Bytes,
63 ) -> Result<(), Error>;
64 async fn async_quick_filter_broadcast(
66 &self,
67 target: TargetSession,
68 data: Bytes,
69 ) -> Result<(), Error>;
70 async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
72 async fn async_send_message(
74 &self,
75 proto_id: ProtocolId,
76 peer_index: PeerIndex,
77 data: Bytes,
78 ) -> Result<(), Error>;
79 async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
81 async fn async_filter_broadcast(&self, target: TargetSession, data: Bytes)
83 -> Result<(), Error>;
84 async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
86 fn quick_send_message(
88 &self,
89 proto_id: ProtocolId,
90 peer_index: PeerIndex,
91 data: Bytes,
92 ) -> Result<(), Error>;
93 fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
95 fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
97 fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error>;
99 fn send_message(
101 &self,
102 proto_id: ProtocolId,
103 peer_index: PeerIndex,
104 data: Bytes,
105 ) -> Result<(), Error>;
106 fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error>;
108 fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error>;
110 fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error>;
112 fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer>;
115 fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>);
117 fn connected_peers(&self) -> Vec<PeerIndex>;
119 fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour);
121 fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String);
123 fn protocol_id(&self) -> ProtocolId;
125 fn p2p_control(&self) -> Option<&ServiceControl> {
127 None
128 }
129}
130
131pub type BoxedCKBProtocolContext = Arc<dyn CKBProtocolContext + Sync>;
133
134#[async_trait]
136pub trait CKBProtocolHandler: Sync + Send {
137 async fn init(&mut self, nc: BoxedCKBProtocolContext);
139 async fn connected(
141 &mut self,
142 _nc: BoxedCKBProtocolContext,
143 _peer_index: PeerIndex,
144 _version: &str,
145 ) {
146 }
147 async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {}
149 async fn received(
151 &mut self,
152 _nc: BoxedCKBProtocolContext,
153 _peer_index: PeerIndex,
154 _data: Bytes,
155 ) {
156 }
157 async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {}
159 async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> {
161 None
162 }
163}
164
165pub struct CKBProtocol {
167 id: ProtocolId,
168 protocol_name: String,
170 supported_versions: Vec<ProtocolVersion>,
172 max_frame_length: usize,
173 handler: Box<dyn CKBProtocolHandler>,
174 network_state: Arc<NetworkState>,
175}
176
177impl CKBProtocol {
178 pub fn new_with_support_protocol(
181 support_protocol: support_protocols::SupportProtocols,
182 handler: Box<dyn CKBProtocolHandler>,
183 network_state: Arc<NetworkState>,
184 ) -> Self {
185 CKBProtocol {
186 id: support_protocol.protocol_id(),
187 max_frame_length: support_protocol.max_frame_length(),
188 protocol_name: support_protocol.name(),
189 supported_versions: support_protocol.support_versions(),
190 network_state,
191 handler,
192 }
193 }
194
195 pub fn new(
197 protocol_name: String,
198 id: ProtocolId,
199 versions: &[ProtocolVersion],
200 max_frame_length: usize,
201 handler: Box<dyn CKBProtocolHandler>,
202 network_state: Arc<NetworkState>,
203 ) -> Self {
204 CKBProtocol {
205 id,
206 max_frame_length,
207 network_state,
208 handler,
209 protocol_name: format!("/ckb/{protocol_name}"),
210 supported_versions: {
211 let mut versions: Vec<_> = versions.to_vec();
212 versions.sort_by(|a, b| b.cmp(a));
213 versions.to_vec()
214 },
215 }
216 }
217
218 pub fn id(&self) -> ProtocolId {
220 self.id
221 }
222
223 pub fn protocol_name(&self) -> String {
225 self.protocol_name.clone()
226 }
227
228 pub fn match_version(&self, version: ProtocolVersion) -> bool {
230 self.supported_versions.contains(&version)
231 }
232
233 pub fn build(self) -> ProtocolMeta {
235 let protocol_name = self.protocol_name();
236 let max_frame_length = self.max_frame_length;
237 let supported_versions = self
238 .supported_versions
239 .iter()
240 .map(ToString::to_string)
241 .collect::<Vec<_>>();
242 MetaBuilder::default()
243 .id(self.id)
244 .name(move |_| protocol_name.clone())
245 .codec(move || {
246 Box::new(
247 length_delimited::Builder::new()
248 .max_frame_length(max_frame_length)
249 .new_codec(),
250 )
251 })
252 .support_versions(supported_versions)
253 .service_handle(move || {
254 ProtocolHandle::Callback(Box::new(CKBHandler {
255 proto_id: self.id,
256 network_state: Arc::clone(&self.network_state),
257 handler: self.handler,
258 }))
259 })
260 .before_send(compress)
261 .before_receive(|| Some(Box::new(decompress)))
262 .build()
263 }
264}
265
266struct CKBHandler {
267 proto_id: ProtocolId,
268 network_state: Arc<NetworkState>,
269 handler: Box<dyn CKBProtocolHandler>,
270}
271
272#[async_trait]
274impl ServiceProtocol for CKBHandler {
275 async fn init(&mut self, context: &mut ProtocolContext) {
276 let nc = DefaultCKBProtocolContext {
277 proto_id: self.proto_id,
278 network_state: Arc::clone(&self.network_state),
279 p2p_control: context.control().to_owned().into(),
280 async_p2p_control: context.control().to_owned(),
281 };
282 self.handler.init(Arc::new(nc)).await;
283 }
284
285 async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
286 if self
288 .network_state
289 .ckb2023
290 .load(std::sync::atomic::Ordering::SeqCst)
291 && version != crate::protocols::support_protocols::LASTEST_VERSION
292 && context.proto_id != SupportProtocols::RelayV2.protocol_id()
293 {
294 debug!(
295 "The version of session {}, protocol {} is {}, not 3. It will be disconnected.",
296 context.session.id, context.proto_id, version
297 );
298 let id = context.session.id;
299 let _ignore = context.disconnect(id).await;
300 return;
301 }
302 self.network_state.with_peer_registry_mut(|reg| {
303 if let Some(peer) = reg.get_peer_mut(context.session.id) {
304 peer.protocols.insert(self.proto_id, version.to_owned());
305 }
306 });
307
308 if !self.network_state.is_active() {
309 return;
310 }
311
312 let nc = DefaultCKBProtocolContext {
313 proto_id: self.proto_id,
314 network_state: Arc::clone(&self.network_state),
315 p2p_control: context.control().to_owned().into(),
316 async_p2p_control: context.control().to_owned(),
317 };
318 let peer_index = context.session.id;
319
320 self.handler
321 .connected(Arc::new(nc), peer_index, version)
322 .await;
323 }
324
325 async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
326 self.network_state.with_peer_registry_mut(|reg| {
327 if let Some(peer) = reg.get_peer_mut(context.session.id) {
328 peer.protocols.remove(&self.proto_id);
329 }
330 });
331
332 if !self.network_state.is_active() {
333 return;
334 }
335
336 let nc = DefaultCKBProtocolContext {
337 proto_id: self.proto_id,
338 network_state: Arc::clone(&self.network_state),
339 p2p_control: context.control().to_owned().into(),
340 async_p2p_control: context.control().to_owned(),
341 };
342 let peer_index = context.session.id;
343 self.handler.disconnected(Arc::new(nc), peer_index).await;
344 }
345
346 async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
347 if !self.network_state.is_active() {
348 return;
349 }
350
351 trace!(
352 "[received message]: {}, {}, length={}",
353 self.proto_id,
354 context.session.id,
355 data.len()
356 );
357 let nc = DefaultCKBProtocolContext {
358 proto_id: self.proto_id,
359 network_state: Arc::clone(&self.network_state),
360 p2p_control: context.control().to_owned().into(),
361 async_p2p_control: context.control().to_owned(),
362 };
363 let peer_index = context.session.id;
364 self.handler.received(Arc::new(nc), peer_index, data).await;
365 }
366
367 async fn notify(&mut self, context: &mut ProtocolContext, token: u64) {
368 if !self.network_state.is_active() {
369 return;
370 }
371 let nc = DefaultCKBProtocolContext {
372 proto_id: self.proto_id,
373 network_state: Arc::clone(&self.network_state),
374 p2p_control: context.control().to_owned().into(),
375 async_p2p_control: context.control().to_owned(),
376 };
377 self.handler.notify(Arc::new(nc), token).await;
378 }
379
380 async fn poll(&mut self, context: &mut ProtocolContext) -> Option<()> {
381 let nc = DefaultCKBProtocolContext {
382 proto_id: self.proto_id,
383 network_state: Arc::clone(&self.network_state),
384 p2p_control: context.control().to_owned().into(),
385 async_p2p_control: context.control().to_owned(),
386 };
387 self.handler.poll(Arc::new(nc)).await
388 }
389}
390
391struct DefaultCKBProtocolContext {
392 proto_id: ProtocolId,
393 network_state: Arc<NetworkState>,
394 p2p_control: ServiceControl,
395 async_p2p_control: ServiceAsyncControl,
396}
397
398#[async_trait]
399impl CKBProtocolContext for DefaultCKBProtocolContext {
400 fn ckb2023(&self) -> bool {
401 self.network_state
402 .ckb2023
403 .load(std::sync::atomic::Ordering::SeqCst)
404 }
405 async fn set_notify(&self, interval: Duration, token: u64) -> Result<(), Error> {
406 self.async_p2p_control
407 .set_service_notify(self.proto_id, interval, token)
408 .await?;
409 Ok(())
410 }
411 async fn remove_notify(&self, token: u64) -> Result<(), Error> {
412 self.async_p2p_control
413 .remove_service_notify(self.proto_id, token)
414 .await?;
415 Ok(())
416 }
417 async fn async_quick_send_message(
418 &self,
419 proto_id: ProtocolId,
420 peer_index: PeerIndex,
421 data: Bytes,
422 ) -> Result<(), Error> {
423 trace!(
424 "[send message]: {}, to={}, length={}",
425 proto_id,
426 peer_index,
427 data.len()
428 );
429 self.async_p2p_control
430 .quick_send_message_to(peer_index, proto_id, data)
431 .await?;
432 Ok(())
433 }
434 async fn async_quick_send_message_to(
435 &self,
436 peer_index: PeerIndex,
437 data: Bytes,
438 ) -> Result<(), Error> {
439 trace!(
440 "[send message to]: {}, to={}, length={}",
441 self.proto_id,
442 peer_index,
443 data.len()
444 );
445 self.async_p2p_control
446 .quick_send_message_to(peer_index, self.proto_id, data)
447 .await?;
448 Ok(())
449 }
450 async fn async_quick_filter_broadcast(
451 &self,
452 target: TargetSession,
453 data: Bytes,
454 ) -> Result<(), Error> {
455 self.async_p2p_control
456 .quick_filter_broadcast(target, self.proto_id, data)
457 .await?;
458 Ok(())
459 }
460 async fn async_future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
461 let task = if blocking {
462 Box::pin(BlockingFutureTask::new(task))
463 } else {
464 task
465 };
466 self.async_p2p_control.future_task(task).await?;
467 Ok(())
468 }
469 async fn async_send_message(
470 &self,
471 proto_id: ProtocolId,
472 peer_index: PeerIndex,
473 data: Bytes,
474 ) -> Result<(), Error> {
475 trace!(
476 "[send message]: {}, to={}, length={}",
477 proto_id,
478 peer_index,
479 data.len()
480 );
481 self.async_p2p_control
482 .send_message_to(peer_index, proto_id, data)
483 .await?;
484 Ok(())
485 }
486 async fn async_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
487 trace!(
488 "[send message to]: {}, to={}, length={}",
489 self.proto_id,
490 peer_index,
491 data.len()
492 );
493 self.async_p2p_control
494 .send_message_to(peer_index, self.proto_id, data)
495 .await?;
496 Ok(())
497 }
498 async fn async_filter_broadcast(
499 &self,
500 target: TargetSession,
501 data: Bytes,
502 ) -> Result<(), Error> {
503 self.async_p2p_control
504 .filter_broadcast(target, self.proto_id, data)
505 .await?;
506 Ok(())
507 }
508 async fn async_disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
509 debug!("Disconnect peer: {}, message: {}", peer_index, message);
510 async_disconnect_with_message(&self.async_p2p_control, peer_index, message).await?;
511 Ok(())
512 }
513 fn quick_send_message(
514 &self,
515 proto_id: ProtocolId,
516 peer_index: PeerIndex,
517 data: Bytes,
518 ) -> Result<(), Error> {
519 trace!(
520 "[send message]: {}, to={}, length={}",
521 proto_id,
522 peer_index,
523 data.len()
524 );
525 self.p2p_control
526 .quick_send_message_to(peer_index, proto_id, data)?;
527 Ok(())
528 }
529 fn quick_send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
530 trace!(
531 "[send message to]: {}, to={}, length={}",
532 self.proto_id,
533 peer_index,
534 data.len()
535 );
536 self.p2p_control
537 .quick_send_message_to(peer_index, self.proto_id, data)?;
538 Ok(())
539 }
540 fn quick_filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
541 self.p2p_control
542 .quick_filter_broadcast(target, self.proto_id, data)?;
543 Ok(())
544 }
545 fn future_task(&self, task: BoxedFutureTask, blocking: bool) -> Result<(), Error> {
546 let task = if blocking {
547 Box::pin(BlockingFutureTask::new(task))
548 } else {
549 task
550 };
551 self.p2p_control.future_task(task)?;
552 Ok(())
553 }
554 fn send_message(
555 &self,
556 proto_id: ProtocolId,
557 peer_index: PeerIndex,
558 data: Bytes,
559 ) -> Result<(), Error> {
560 trace!(
561 "[send message]: {}, to={}, length={}",
562 proto_id,
563 peer_index,
564 data.len()
565 );
566 self.p2p_control
567 .send_message_to(peer_index, proto_id, data)?;
568 Ok(())
569 }
570 fn send_message_to(&self, peer_index: PeerIndex, data: Bytes) -> Result<(), Error> {
571 trace!(
572 "[send message to]: {}, to={}, length={}",
573 self.proto_id,
574 peer_index,
575 data.len()
576 );
577 self.p2p_control
578 .send_message_to(peer_index, self.proto_id, data)?;
579 Ok(())
580 }
581 fn filter_broadcast(&self, target: TargetSession, data: Bytes) -> Result<(), Error> {
582 self.p2p_control
583 .filter_broadcast(target, self.proto_id, data)?;
584 Ok(())
585 }
586 fn disconnect(&self, peer_index: PeerIndex, message: &str) -> Result<(), Error> {
587 debug!("Disconnect peer: {}, message: {}", peer_index, message);
588 disconnect_with_message(&self.p2p_control, peer_index, message)?;
589 Ok(())
590 }
591
592 fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
593 self.network_state
594 .with_peer_registry(|reg| reg.get_peer(peer_index).cloned())
595 }
596 fn with_peer_mut(&self, peer_index: PeerIndex, f: Box<dyn FnOnce(&mut Peer)>) {
597 self.network_state.with_peer_registry_mut(|reg| {
598 reg.get_peer_mut(peer_index).map(f);
599 })
600 }
601
602 fn connected_peers(&self) -> Vec<PeerIndex> {
603 self.network_state.with_peer_registry(|reg| {
604 reg.peers()
605 .iter()
606 .filter_map(|(peer_index, peer)| {
607 if peer.protocols.contains_key(&self.proto_id) {
608 Some(peer_index)
609 } else {
610 None
611 }
612 })
613 .cloned()
614 .collect()
615 })
616 }
617 fn report_peer(&self, peer_index: PeerIndex, behaviour: Behaviour) {
618 self.network_state
619 .report_session(&self.p2p_control, peer_index, behaviour);
620 }
621 fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
622 self.network_state
623 .ban_session(&self.p2p_control, peer_index, duration, reason);
624 }
625
626 fn protocol_id(&self) -> ProtocolId {
627 self.proto_id
628 }
629
630 fn p2p_control(&self) -> Option<&ServiceControl> {
631 Some(&self.p2p_control)
632 }
633}
634
635pub(crate) struct BlockingFutureTask {
636 task: BoxedFutureTask,
637}
638
639impl BlockingFutureTask {
640 pub(crate) fn new(task: BoxedFutureTask) -> BlockingFutureTask {
641 BlockingFutureTask { task }
642 }
643}
644
645impl Future for BlockingFutureTask {
646 type Output = ();
647
648 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
649 p2p::runtime::block_in_place(|| self.task.poll_unpin(cx))
650 }
651}