Skip to main content

commonware_consensus/marshal/resolver/
p2p.rs

1//! P2P resolver initialization and config.
2
3use crate::{
4    marshal::ingress::handler::{self, Handler},
5    Block,
6};
7use commonware_cryptography::PublicKey;
8use commonware_p2p::{Blocker, Provider, Receiver, Sender};
9use commonware_resolver::p2p;
10use commonware_runtime::{Clock, Metrics, Spawner};
11use commonware_utils::channel::mpsc;
12use rand::Rng;
13use std::time::Duration;
14
15/// Configuration for the P2P [Resolver](commonware_resolver::Resolver).
16pub struct Config<P: PublicKey, C: Provider<PublicKey = P>, B: Blocker<PublicKey = P>> {
17    /// The public key to identify this node.
18    pub public_key: P,
19
20    /// The provider of peers that can be consulted for fetching data.
21    pub provider: C,
22
23    /// The blocker that will be used to block peers that send invalid responses.
24    pub blocker: B,
25
26    /// The size of the request mailbox backlog.
27    pub mailbox_size: usize,
28
29    /// Initial expected performance for new participants.
30    pub initial: Duration,
31
32    /// Timeout for requests.
33    pub timeout: Duration,
34
35    /// Retry timeout for the fetcher.
36    pub fetch_retry_timeout: Duration,
37
38    /// Whether requests are sent with priority over other network messages
39    pub priority_requests: bool,
40
41    /// Whether responses are sent with priority over other network messages
42    pub priority_responses: bool,
43}
44
45/// Initialize a P2P resolver.
46pub fn init<E, C, Bl, B, S, R, P>(
47    ctx: &E,
48    config: Config<P, C, Bl>,
49    backfill: (S, R),
50) -> (
51    mpsc::Receiver<handler::Message<B>>,
52    p2p::Mailbox<handler::Request<B>, P>,
53)
54where
55    E: Rng + Spawner + Clock + Metrics,
56    C: Provider<PublicKey = P>,
57    Bl: Blocker<PublicKey = P>,
58    B: Block,
59    S: Sender<PublicKey = P>,
60    R: Receiver<PublicKey = P>,
61    P: PublicKey,
62{
63    let (handler, receiver) = mpsc::channel(config.mailbox_size);
64    let handler = Handler::new(handler);
65    let (resolver_engine, resolver) = p2p::Engine::new(
66        ctx.with_label("resolver"),
67        p2p::Config {
68            provider: config.provider,
69            blocker: config.blocker,
70            consumer: handler.clone(),
71            producer: handler,
72            mailbox_size: config.mailbox_size,
73            me: Some(config.public_key),
74            initial: config.initial,
75            timeout: config.timeout,
76            fetch_retry_timeout: config.fetch_retry_timeout,
77            priority_requests: config.priority_requests,
78            priority_responses: config.priority_responses,
79        },
80    );
81    resolver_engine.start(backfill);
82    (receiver, resolver)
83}