pezkuwi_collator_protocol/
lib.rs1#![deny(missing_docs)]
21#![deny(unused_crate_dependencies)]
22#![recursion_limit = "256"]
23
24use std::{
25 collections::HashSet,
26 time::{Duration, Instant},
27};
28
29use futures::{
30 stream::{FusedStream, StreamExt},
31 FutureExt, TryFutureExt,
32};
33
34use pezkuwi_node_subsystem_util::reputation::ReputationAggregator;
35use pezsp_keystore::KeystorePtr;
36
37use pezkuwi_node_network_protocol::{
38 request_response::{v2 as protocol_v2, IncomingRequestReceiver},
39 PeerId, UnifiedReputationChange as Rep,
40};
41use pezkuwi_primitives::CollatorPair;
42
43use pezkuwi_node_subsystem::{errors::SubsystemError, overseer, DummySubsystem, SpawnedSubsystem};
44
45mod collator_side;
46mod validator_side;
47#[cfg(feature = "experimental-collator-protocol")]
48mod validator_side_experimental;
49
50const LOG_TARGET: &'static str = "teyrchain::collator-protocol";
51const LOG_TARGET_STATS: &'static str = "teyrchain::collator-protocol::stats";
52
53#[derive(Debug, Clone, Copy)]
55pub struct CollatorEvictionPolicy {
56 pub inactive_collator: Duration,
58 pub undeclared: Duration,
60}
61
62impl Default for CollatorEvictionPolicy {
63 fn default() -> Self {
64 CollatorEvictionPolicy {
65 inactive_collator: Duration::from_secs(24),
66 undeclared: Duration::from_secs(1),
67 }
68 }
69}
70
71pub enum ProtocolSide {
73 Validator {
75 keystore: KeystorePtr,
77 eviction_policy: CollatorEvictionPolicy,
79 metrics: validator_side::Metrics,
81 invulnerables: HashSet<PeerId>,
83 collator_protocol_hold_off: Option<Duration>,
85 },
86 #[cfg(feature = "experimental-collator-protocol")]
88 ValidatorExperimental {
89 keystore: KeystorePtr,
91 metrics: validator_side_experimental::Metrics,
93 },
94 Collator {
96 peer_id: PeerId,
98 collator_pair: CollatorPair,
100 request_receiver_v2: IncomingRequestReceiver<protocol_v2::CollationFetchingRequest>,
102 metrics: collator_side::Metrics,
104 },
105 None,
107}
108
109pub struct CollatorProtocolSubsystem {
111 protocol_side: ProtocolSide,
112}
113
114#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
115impl CollatorProtocolSubsystem {
116 pub fn new(protocol_side: ProtocolSide) -> Self {
121 Self { protocol_side }
122 }
123}
124
125#[overseer::subsystem(CollatorProtocol, error=SubsystemError, prefix=self::overseer)]
126impl<Context> CollatorProtocolSubsystem {
127 fn start(self, ctx: Context) -> SpawnedSubsystem {
128 let future = match self.protocol_side {
129 ProtocolSide::Validator {
130 keystore,
131 eviction_policy,
132 metrics,
133 invulnerables,
134 collator_protocol_hold_off,
135 } => {
136 gum::trace!(
137 target: LOG_TARGET,
138 ?invulnerables,
139 ?collator_protocol_hold_off,
140 "AH collator protocol params",
141 );
142 validator_side::run(
143 ctx,
144 keystore,
145 eviction_policy,
146 metrics,
147 invulnerables,
148 collator_protocol_hold_off,
149 )
150 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
151 .boxed()
152 },
153 #[cfg(feature = "experimental-collator-protocol")]
154 ProtocolSide::ValidatorExperimental { keystore, metrics } => {
155 validator_side_experimental::run(ctx, keystore, metrics)
156 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
157 .boxed()
158 },
159 ProtocolSide::Collator { peer_id, collator_pair, request_receiver_v2, metrics } => {
160 collator_side::run(ctx, peer_id, collator_pair, request_receiver_v2, metrics)
161 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
162 .boxed()
163 },
164 ProtocolSide::None => return DummySubsystem.start(ctx),
165 };
166
167 SpawnedSubsystem { name: "collator-protocol-subsystem", future }
168 }
169}
170
171async fn modify_reputation(
173 reputation: &mut ReputationAggregator,
174 sender: &mut impl overseer::CollatorProtocolSenderTrait,
175 peer: PeerId,
176 rep: Rep,
177) {
178 gum::trace!(
179 target: LOG_TARGET,
180 rep = ?rep,
181 peer_id = %peer,
182 "reputation change for peer",
183 );
184
185 reputation.modify(sender, peer, rep).await;
186}
187
188async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant {
190 let now = Instant::now();
191 let next_poll = last_poll + period;
192
193 if next_poll > now {
194 futures_timer::Delay::new(next_poll - now).await
195 }
196
197 Instant::now()
198}
199
200fn tick_stream(period: Duration) -> impl FusedStream<Item = ()> {
202 futures::stream::unfold(Instant::now(), move |next_check| async move {
203 Some(((), wait_until_next_tick(next_check, period).await))
204 })
205 .fuse()
206}