commonware_consensus/marshal/resolver/
p2p.rs1use crate::{
4 marshal::ingress::handler::{self, Handler},
5 Block,
6};
7use commonware_cryptography::PublicKey;
8use commonware_p2p::{Blocker, Provider, Receiver, Sender};
9use commonware_resolver::p2p;
10use commonware_runtime::{Clock, Metrics, Spawner};
11use commonware_utils::channel::mpsc;
12use rand::Rng;
13use std::time::Duration;
14
15pub struct Config<P: PublicKey, C: Provider<PublicKey = P>, B: Blocker<PublicKey = P>> {
17 pub public_key: P,
19
20 pub provider: C,
22
23 pub blocker: B,
25
26 pub mailbox_size: usize,
28
29 pub initial: Duration,
31
32 pub timeout: Duration,
34
35 pub fetch_retry_timeout: Duration,
37
38 pub priority_requests: bool,
40
41 pub priority_responses: bool,
43}
44
45pub fn init<E, C, Bl, B, S, R, P>(
47 ctx: &E,
48 config: Config<P, C, Bl>,
49 backfill: (S, R),
50) -> (
51 mpsc::Receiver<handler::Message<B>>,
52 p2p::Mailbox<handler::Request<B>, P>,
53)
54where
55 E: Rng + Spawner + Clock + Metrics,
56 C: Provider<PublicKey = P>,
57 Bl: Blocker<PublicKey = P>,
58 B: Block,
59 S: Sender<PublicKey = P>,
60 R: Receiver<PublicKey = P>,
61 P: PublicKey,
62{
63 let (handler, receiver) = mpsc::channel(config.mailbox_size);
64 let handler = Handler::new(handler);
65 let (resolver_engine, resolver) = p2p::Engine::new(
66 ctx.with_label("resolver"),
67 p2p::Config {
68 provider: config.provider,
69 blocker: config.blocker,
70 consumer: handler.clone(),
71 producer: handler,
72 mailbox_size: config.mailbox_size,
73 me: Some(config.public_key),
74 initial: config.initial,
75 timeout: config.timeout,
76 fetch_retry_timeout: config.fetch_retry_timeout,
77 priority_requests: config.priority_requests,
78 priority_responses: config.priority_responses,
79 },
80 );
81 resolver_engine.start(backfill);
82 (receiver, resolver)
83}