polkadot_collator_protocol/
lib.rs1#![deny(missing_docs)]
21#![deny(unused_crate_dependencies)]
22#![recursion_limit = "256"]
23
24use std::{
25 collections::HashSet,
26 time::{Duration, Instant},
27};
28
29use futures::{
30 stream::{FusedStream, StreamExt},
31 FutureExt, TryFutureExt,
32};
33
34use polkadot_node_subsystem_util::reputation::ReputationAggregator;
35use sp_keystore::KeystorePtr;
36
37use polkadot_node_network_protocol::{
38 request_response::{v2 as protocol_v2, IncomingRequestReceiver},
39 PeerId, UnifiedReputationChange as Rep,
40};
41use polkadot_primitives::CollatorPair;
42
43use polkadot_node_subsystem::{errors::SubsystemError, overseer, DummySubsystem, SpawnedSubsystem};
44
45mod collator_side;
46mod validator_side;
47#[cfg(feature = "experimental-collator-protocol")]
48mod validator_side_experimental;
49
50const LOG_TARGET: &'static str = "parachain::collator-protocol";
51const LOG_TARGET_STATS: &'static str = "parachain::collator-protocol::stats";
52
53#[derive(Debug, Clone, Copy)]
55pub struct CollatorEvictionPolicy {
56 pub inactive_collator: Duration,
58 pub undeclared: Duration,
60}
61
62impl Default for CollatorEvictionPolicy {
63 fn default() -> Self {
64 CollatorEvictionPolicy {
65 inactive_collator: Duration::from_secs(24),
66 undeclared: Duration::from_secs(1),
67 }
68 }
69}
70
71pub enum ProtocolSide {
73 Validator {
75 keystore: KeystorePtr,
77 eviction_policy: CollatorEvictionPolicy,
79 metrics: validator_side::Metrics,
81 invulnerables: HashSet<PeerId>,
83 collator_protocol_hold_off: Option<Duration>,
85 },
86 #[cfg(feature = "experimental-collator-protocol")]
88 ValidatorExperimental {
89 keystore: KeystorePtr,
91 metrics: validator_side_experimental::Metrics,
93 },
94 Collator {
96 peer_id: PeerId,
98 collator_pair: CollatorPair,
100 request_receiver_v2: IncomingRequestReceiver<protocol_v2::CollationFetchingRequest>,
102 metrics: collator_side::Metrics,
104 },
105 None,
107}
108
109pub struct CollatorProtocolSubsystem {
111 protocol_side: ProtocolSide,
112}
113
114#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
115impl CollatorProtocolSubsystem {
116 pub fn new(protocol_side: ProtocolSide) -> Self {
121 Self { protocol_side }
122 }
123}
124
125#[overseer::subsystem(CollatorProtocol, error=SubsystemError, prefix=self::overseer)]
126impl<Context> CollatorProtocolSubsystem {
127 fn start(self, ctx: Context) -> SpawnedSubsystem {
128 let future = match self.protocol_side {
129 ProtocolSide::Validator {
130 keystore,
131 eviction_policy,
132 metrics,
133 invulnerables,
134 collator_protocol_hold_off,
135 } => {
136 gum::trace!(
137 target: LOG_TARGET,
138 ?invulnerables,
139 ?collator_protocol_hold_off,
140 "AH collator protocol params",
141 );
142 validator_side::run(
143 ctx,
144 keystore,
145 eviction_policy,
146 metrics,
147 invulnerables,
148 collator_protocol_hold_off,
149 )
150 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
151 .boxed()
152 },
153 #[cfg(feature = "experimental-collator-protocol")]
154 ProtocolSide::ValidatorExperimental { keystore, metrics } =>
155 validator_side_experimental::run(ctx, keystore, metrics)
156 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
157 .boxed(),
158 ProtocolSide::Collator { peer_id, collator_pair, request_receiver_v2, metrics } =>
159 collator_side::run(ctx, peer_id, collator_pair, request_receiver_v2, metrics)
160 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
161 .boxed(),
162 ProtocolSide::None => return DummySubsystem.start(ctx),
163 };
164
165 SpawnedSubsystem { name: "collator-protocol-subsystem", future }
166 }
167}
168
169async fn modify_reputation(
171 reputation: &mut ReputationAggregator,
172 sender: &mut impl overseer::CollatorProtocolSenderTrait,
173 peer: PeerId,
174 rep: Rep,
175) {
176 gum::trace!(
177 target: LOG_TARGET,
178 rep = ?rep,
179 peer_id = %peer,
180 "reputation change for peer",
181 );
182
183 reputation.modify(sender, peer, rep).await;
184}
185
186async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant {
188 let now = Instant::now();
189 let next_poll = last_poll + period;
190
191 if next_poll > now {
192 futures_timer::Delay::new(next_poll - now).await
193 }
194
195 Instant::now()
196}
197
198fn tick_stream(period: Duration) -> impl FusedStream<Item = ()> {
200 futures::stream::unfold(Instant::now(), move |next_check| async move {
201 Some(((), wait_until_next_tick(next_check, period).await))
202 })
203 .fuse()
204}