1use itertools::Itertools;
24use metrics::{Meters, MetricsWatcher};
25use polkadot_node_core_approval_voting::{Config, RealAssignmentCriteria};
26use polkadot_node_metrics::metered::{
27 self, channel, unbounded, MeteredReceiver, MeteredSender, UnboundedMeteredReceiver,
28 UnboundedMeteredSender,
29};
30
31use polkadot_node_primitives::{
32 approval::time::{Clock, SystemClock},
33 DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36 messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage},
37 overseer, FromOrchestra, SpawnedSubsystem, SubsystemError, SubsystemResult,
38};
39use polkadot_node_subsystem_util::{
40 self,
41 database::Database,
42 runtime::{Config as RuntimeInfoConfig, RuntimeInfo},
43};
44use polkadot_overseer::{OverseerSignal, Priority, SubsystemSender, TimeoutExt};
45use polkadot_primitives::{CandidateIndex, Hash, ValidatorIndex, ValidatorSignature};
46use rand::SeedableRng;
47
48use sc_keystore::LocalKeystore;
49use sp_consensus::SyncOracle;
50
51use futures::{channel::oneshot, prelude::*, StreamExt};
52pub use metrics::Metrics;
53use polkadot_node_core_approval_voting::{
54 approval_db::common::Config as DatabaseConfig, ApprovalVotingWorkProvider,
55};
56use std::{
57 collections::{HashMap, HashSet},
58 fmt::Debug,
59 sync::Arc,
60 time::Duration,
61};
62use stream::{select_with_strategy, PollNext, SelectWithStrategy};
63pub mod metrics;
64
65#[cfg(test)]
66mod tests;
67
68pub(crate) const LOG_TARGET: &str = "parachain::approval-voting-parallel";
69const WAIT_FOR_SIGS_GATHER_TIMEOUT: Duration = Duration::from_millis(2000);
72
73pub const APPROVAL_DISTRIBUTION_WORKER_COUNT: usize = 4;
75
76pub const DEFAULT_WORKERS_CHANNEL_SIZE: usize = 64000 / APPROVAL_DISTRIBUTION_WORKER_COUNT;
79
80fn prio_right<'a>(_val: &'a mut ()) -> PollNext {
81 PollNext::Right
82}
83
84pub struct ApprovalVotingParallelSubsystem {
86 keystore: Arc<LocalKeystore>,
90 db_config: DatabaseConfig,
91 slot_duration_millis: u64,
92 db: Arc<dyn Database>,
93 sync_oracle: Box<dyn SyncOracle + Send>,
94 metrics: Metrics,
95 spawner: Arc<dyn overseer::gen::Spawner + 'static>,
96 clock: Arc<dyn Clock + Send + Sync>,
97 overseer_message_channel_capacity_override: Option<usize>,
98}
99
100impl ApprovalVotingParallelSubsystem {
101 pub fn with_config(
103 config: Config,
104 db: Arc<dyn Database>,
105 keystore: Arc<LocalKeystore>,
106 sync_oracle: Box<dyn SyncOracle + Send>,
107 metrics: Metrics,
108 spawner: impl overseer::gen::Spawner + 'static + Clone,
109 overseer_message_channel_capacity_override: Option<usize>,
110 ) -> Self {
111 ApprovalVotingParallelSubsystem::with_config_and_clock(
112 config,
113 db,
114 keystore,
115 sync_oracle,
116 metrics,
117 Arc::new(SystemClock {}),
118 spawner,
119 overseer_message_channel_capacity_override,
120 )
121 }
122
123 pub fn with_config_and_clock(
125 config: Config,
126 db: Arc<dyn Database>,
127 keystore: Arc<LocalKeystore>,
128 sync_oracle: Box<dyn SyncOracle + Send>,
129 metrics: Metrics,
130 clock: Arc<dyn Clock + Send + Sync>,
131 spawner: impl overseer::gen::Spawner + 'static,
132 overseer_message_channel_capacity_override: Option<usize>,
133 ) -> Self {
134 ApprovalVotingParallelSubsystem {
135 keystore,
136 slot_duration_millis: config.slot_duration_millis,
137 db,
138 db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
139 sync_oracle,
140 metrics,
141 spawner: Arc::new(spawner),
142 clock,
143 overseer_message_channel_capacity_override,
144 }
145 }
146
147 fn workers_channel_size(&self) -> usize {
149 self.overseer_message_channel_capacity_override
150 .unwrap_or(DEFAULT_WORKERS_CHANNEL_SIZE)
151 }
152}
153
154#[overseer::subsystem(ApprovalVotingParallel, error = SubsystemError, prefix = self::overseer)]
155impl<Context: Send> ApprovalVotingParallelSubsystem {
156 fn start(self, ctx: Context) -> SpawnedSubsystem {
157 let future = run::<Context>(ctx, self)
158 .map_err(|e| SubsystemError::with_origin("approval-voting-parallel", e))
159 .boxed();
160
161 SpawnedSubsystem { name: "approval-voting-parallel-subsystem", future }
162 }
163}
164
165#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
170async fn start_workers<Context>(
171 ctx: &mut Context,
172 subsystem: ApprovalVotingParallelSubsystem,
173 metrics_watcher: &mut MetricsWatcher,
174) -> SubsystemResult<(ToWorker<ApprovalVotingMessage>, Vec<ToWorker<ApprovalDistributionMessage>>)>
175where
176{
177 gum::info!(target: LOG_TARGET, "Starting approval distribution workers");
178
179 let (to_approval_voting_worker, approval_voting_work_provider) = build_worker_handles(
181 "approval-voting-parallel-db".into(),
182 subsystem.workers_channel_size(),
183 metrics_watcher,
184 prio_right,
185 );
186 let mut to_approval_distribution_workers = Vec::new();
187 let slot_duration_millis = subsystem.slot_duration_millis;
188
189 for i in 0..APPROVAL_DISTRIBUTION_WORKER_COUNT {
190 let mut network_sender = ctx.sender().clone();
191 let mut runtime_api_sender = ctx.sender().clone();
192 let mut approval_distribution_to_approval_voting = to_approval_voting_worker.clone();
193
194 let approval_distr_instance =
195 polkadot_approval_distribution::ApprovalDistribution::new_with_clock(
196 subsystem.metrics.approval_distribution_metrics(),
197 subsystem.slot_duration_millis,
198 subsystem.clock.clone(),
199 Arc::new(RealAssignmentCriteria {}),
200 );
201 let task_name = format!("approval-voting-parallel-{}", i);
202 let (to_approval_distribution_worker, mut approval_distribution_work_provider) =
203 build_worker_handles(
204 task_name.clone(),
205 subsystem.workers_channel_size(),
206 metrics_watcher,
207 prio_right,
208 );
209
210 metrics_watcher.watch(task_name.clone(), to_approval_distribution_worker.meter());
211
212 subsystem.spawner.spawn_blocking(
213 task_name.leak(),
214 Some("approval-voting-parallel"),
215 Box::pin(async move {
216 let mut state =
217 polkadot_approval_distribution::State::with_config(slot_duration_millis);
218 let mut rng = rand::rngs::StdRng::from_entropy();
219 let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
220 keystore: None,
221 session_cache_lru_size: DISPUTE_WINDOW.get(),
222 });
223
224 loop {
225 let message = match approval_distribution_work_provider.next().await {
226 Some(message) => message,
227 None => {
228 gum::info!(
229 target: LOG_TARGET,
230 "Approval distribution stream finished, most likely shutting down",
231 );
232 break;
233 },
234 };
235 if approval_distr_instance
236 .handle_from_orchestra(
237 message,
238 &mut approval_distribution_to_approval_voting,
239 &mut network_sender,
240 &mut runtime_api_sender,
241 &mut state,
242 &mut rng,
243 &mut session_info_provider,
244 )
245 .await
246 {
247 gum::info!(
248 target: LOG_TARGET,
249 "Approval distribution worker {}, exiting because of shutdown", i
250 );
251 };
252 }
253 }),
254 );
255 to_approval_distribution_workers.push(to_approval_distribution_worker);
256 }
257
258 gum::info!(target: LOG_TARGET, "Starting approval voting workers");
259
260 let sender = ctx.sender().clone();
261 let to_approval_distribution = ApprovalVotingToApprovalDistribution(sender.clone());
262 polkadot_node_core_approval_voting::start_approval_worker(
263 approval_voting_work_provider,
264 sender.clone(),
265 to_approval_distribution,
266 polkadot_node_core_approval_voting::Config {
267 slot_duration_millis: subsystem.slot_duration_millis,
268 col_approval_data: subsystem.db_config.col_approval_data,
269 },
270 subsystem.db.clone(),
271 subsystem.keystore.clone(),
272 subsystem.sync_oracle,
273 subsystem.metrics.approval_voting_metrics(),
274 subsystem.spawner.clone(),
275 "approval-voting-parallel-db",
276 "approval-voting-parallel",
277 subsystem.clock.clone(),
278 )
279 .await?;
280
281 Ok((to_approval_voting_worker, to_approval_distribution_workers))
282}
283
284#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
286async fn run<Context>(
287 mut ctx: Context,
288 subsystem: ApprovalVotingParallelSubsystem,
289) -> SubsystemResult<()> {
290 let mut metrics_watcher = MetricsWatcher::new(subsystem.metrics.clone());
291 gum::info!(
292 target: LOG_TARGET,
293 "Starting workers"
294 );
295
296 let (to_approval_voting_worker, to_approval_distribution_workers) =
297 start_workers(&mut ctx, subsystem, &mut metrics_watcher).await?;
298
299 gum::info!(
300 target: LOG_TARGET,
301 "Starting main subsystem loop"
302 );
303
304 run_main_loop(ctx, to_approval_voting_worker, to_approval_distribution_workers, metrics_watcher)
305 .await
306}
307
308#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
313async fn run_main_loop<Context>(
314 mut ctx: Context,
315 mut to_approval_voting_worker: ToWorker<ApprovalVotingMessage>,
316 mut to_approval_distribution_workers: Vec<ToWorker<ApprovalDistributionMessage>>,
317 metrics_watcher: MetricsWatcher,
318) -> SubsystemResult<()> {
319 loop {
320 futures::select! {
321 next_msg = ctx.recv().fuse() => {
322 let next_msg = match next_msg {
323 Ok(msg) => msg,
324 Err(err) => {
325 gum::info!(target: LOG_TARGET, ?err, "Approval voting parallel subsystem received an error");
326 return Err(err);
327 }
328 };
329
330 match next_msg {
331 FromOrchestra::Signal(msg) => {
332 if matches!(msg, OverseerSignal::ActiveLeaves(_)) {
333 metrics_watcher.collect_metrics();
334 }
335
336 for worker in to_approval_distribution_workers.iter_mut() {
337 worker
338 .send_signal(msg.clone()).await?;
339 }
340
341 to_approval_voting_worker.send_signal(msg.clone()).await?;
342 if matches!(msg, OverseerSignal::Conclude) {
343 break;
344 }
345 },
346 FromOrchestra::Communication { msg } => match msg {
347 ApprovalVotingParallelMessage::ApprovedAncestor(_, _,_) |
349 ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(_, _) => {
350 to_approval_voting_worker.send_message_with_priority::<overseer::HighPriority>(
351 msg.try_into().expect(
352 "Message is one of ApprovedAncestor, GetApprovalSignaturesForCandidate
353 and that can be safely converted to ApprovalVotingMessage; qed"
354 )
355 ).await;
356 },
357 ApprovalVotingParallelMessage::NewBlocks(msg) => {
360 for worker in to_approval_distribution_workers.iter_mut() {
361 worker
362 .send_message(
363 ApprovalDistributionMessage::NewBlocks(msg.clone()),
364 )
365 .await;
366 }
367 },
368 ApprovalVotingParallelMessage::DistributeAssignment(assignment, claimed) => {
369 let worker = assigned_worker_for_validator(assignment.validator, &mut to_approval_distribution_workers);
370 worker
371 .send_message(
372 ApprovalDistributionMessage::DistributeAssignment(assignment, claimed)
373 )
374 .await;
375
376 },
377 ApprovalVotingParallelMessage::DistributeApproval(vote) => {
378 let worker = assigned_worker_for_validator(vote.validator, &mut to_approval_distribution_workers);
379 worker
380 .send_message(
381 ApprovalDistributionMessage::DistributeApproval(vote)
382 ).await;
383
384 },
385 ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg) => {
386 if let polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage(
387 peer_id,
388 msg,
389 ) = msg
390 {
391 let (all_msgs_from_same_validator, messages_split_by_validator) = validator_index_for_msg(msg);
392
393 for (validator_index, msg) in all_msgs_from_same_validator.into_iter().chain(messages_split_by_validator.into_iter().flatten()) {
394 let worker = assigned_worker_for_validator(validator_index, &mut to_approval_distribution_workers);
395
396 worker
397 .send_message(
398 ApprovalDistributionMessage::NetworkBridgeUpdate(
399 polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage(
400 peer_id, msg,
401 ),
402 ),
403 ).await;
404 }
405 } else {
406 for worker in to_approval_distribution_workers.iter_mut() {
407 worker
408 .send_message_with_priority::<overseer::HighPriority>(
409 ApprovalDistributionMessage::NetworkBridgeUpdate(msg.clone()),
410 ).await;
411 }
412 }
413 },
414 ApprovalVotingParallelMessage::GetApprovalSignatures(indices, tx) => {
415 handle_get_approval_signatures(&mut ctx, &mut to_approval_distribution_workers, indices, tx).await;
416 },
417 ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag) => {
418 for worker in to_approval_distribution_workers.iter_mut() {
419 worker
420 .send_message(
421 ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag)
422 ).await;
423 }
424 },
425 },
426 };
427
428 },
429 };
430 }
431 Ok(())
432}
433
434#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)]
437async fn handle_get_approval_signatures<Context>(
438 ctx: &mut Context,
439 to_approval_distribution_workers: &mut Vec<ToWorker<ApprovalDistributionMessage>>,
440 requested_candidates: HashSet<(Hash, CandidateIndex)>,
441 result_channel: oneshot::Sender<
442 HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)>,
443 >,
444) {
445 let mut sigs = HashMap::new();
446 let mut signatures_channels = Vec::new();
447 for worker in to_approval_distribution_workers.iter_mut() {
448 let (tx, rx) = oneshot::channel();
449 worker.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures(
450 requested_candidates.clone(),
451 tx,
452 ));
453 signatures_channels.push(rx);
454 }
455
456 let gather_signatures = async move {
457 let Some(results) = futures::future::join_all(signatures_channels)
458 .timeout(WAIT_FOR_SIGS_GATHER_TIMEOUT)
459 .await
460 else {
461 gum::warn!(
462 target: LOG_TARGET,
463 "Waiting for approval signatures timed out - dead lock?"
464 );
465 return;
466 };
467
468 for result in results {
469 let worker_sigs = match result {
470 Ok(sigs) => sigs,
471 Err(_) => {
472 gum::error!(
473 target: LOG_TARGET,
474 "Getting approval signatures failed, oneshot got closed"
475 );
476 continue;
477 },
478 };
479 sigs.extend(worker_sigs);
480 }
481
482 if let Err(_) = result_channel.send(sigs) {
483 gum::debug!(
484 target: LOG_TARGET,
485 "Sending back approval signatures failed, oneshot got closed"
486 );
487 }
488 };
489
490 if let Err(err) = ctx.spawn("approval-voting-gather-signatures", Box::pin(gather_signatures)) {
491 gum::warn!(target: LOG_TARGET, "Failed to spawn gather signatures task: {:?}", err);
492 }
493}
494
495fn assigned_worker_for_validator(
497 validator: ValidatorIndex,
498 to_approval_distribution_workers: &mut Vec<ToWorker<ApprovalDistributionMessage>>,
499) -> &mut ToWorker<ApprovalDistributionMessage> {
500 let worker_index = validator.0 as usize % to_approval_distribution_workers.len();
501 to_approval_distribution_workers
502 .get_mut(worker_index)
503 .expect("Worker index is obtained modulo len; qed")
504}
505
506fn validator_index_for_msg(
515 msg: polkadot_node_network_protocol::ApprovalDistributionMessage,
516) -> (
517 Option<(ValidatorIndex, polkadot_node_network_protocol::ApprovalDistributionMessage)>,
518 Option<Vec<(ValidatorIndex, polkadot_node_network_protocol::ApprovalDistributionMessage)>>,
519) {
520 match msg {
521 polkadot_node_network_protocol::ValidationProtocols::V3(ref message) => match message {
522 polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(msgs) => {
523 if let Ok(validator) = msgs.iter().map(|(msg, _)| msg.validator).all_equal_value() {
524 (Some((validator, msg)), None)
525 } else {
526 let split = msgs
527 .iter()
528 .map(|(msg, claimed_candidates)| {
529 (
530 msg.validator,
531 polkadot_node_network_protocol::ValidationProtocols::V3(
532 polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(
533 vec![(msg.clone(), claimed_candidates.clone())]
534 ),
535 ),
536 )
537 })
538 .collect_vec();
539 (None, Some(split))
540 }
541 },
542 polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(msgs) => {
543 if let Ok(validator) = msgs.iter().map(|msg| msg.validator).all_equal_value() {
544 (Some((validator, msg)), None)
545 } else {
546 let split = msgs
547 .iter()
548 .map(|vote| {
549 (
550 vote.validator,
551 polkadot_node_network_protocol::ValidationProtocols::V3(
552 polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(
553 vec![vote.clone()]
554 ),
555 ),
556 )
557 })
558 .collect_vec();
559 (None, Some(split))
560 }
561 },
562 },
563 }
564}
565
566type WorkProvider<M, Clos, State> = WorkProviderImpl<
571 SelectWithStrategy<
572 MeteredReceiver<FromOrchestra<M>>,
573 UnboundedMeteredReceiver<FromOrchestra<M>>,
574 Clos,
575 State,
576 >,
577>;
578
579pub struct WorkProviderImpl<T>(T);
580
581impl<T, M> Stream for WorkProviderImpl<T>
582where
583 T: Stream<Item = FromOrchestra<M>> + Unpin + Send,
584{
585 type Item = FromOrchestra<M>;
586
587 fn poll_next(
588 mut self: std::pin::Pin<&mut Self>,
589 cx: &mut std::task::Context<'_>,
590 ) -> std::task::Poll<Option<Self::Item>> {
591 self.0.poll_next_unpin(cx)
592 }
593}
594
595#[async_trait::async_trait]
596impl<T> ApprovalVotingWorkProvider for WorkProviderImpl<T>
597where
598 T: Stream<Item = FromOrchestra<ApprovalVotingMessage>> + Unpin + Send,
599{
600 async fn recv(&mut self) -> SubsystemResult<FromOrchestra<ApprovalVotingMessage>> {
601 self.0.next().await.ok_or(SubsystemError::Context(
602 "ApprovalVotingWorkProviderImpl: Channel closed".to_string(),
603 ))
604 }
605}
606
607impl<M, Clos, State> WorkProvider<M, Clos, State>
608where
609 M: Send + Sync + 'static,
610 Clos: FnMut(&mut State) -> PollNext,
611 State: Default,
612{
613 fn from_rx_worker(rx: RxWorker<M>, prio: Clos) -> Self {
615 let prioritised = select_with_strategy(rx.0, rx.1, prio);
616 WorkProviderImpl(prioritised)
617 }
618}
619
620pub struct ToWorker<T: Send + Sync + 'static>(
626 MeteredSender<FromOrchestra<T>>,
627 UnboundedMeteredSender<FromOrchestra<T>>,
628);
629
630impl<T: Send + Sync + 'static> Clone for ToWorker<T> {
631 fn clone(&self) -> Self {
632 Self(self.0.clone(), self.1.clone())
633 }
634}
635
636impl<T: Send + Sync + 'static> ToWorker<T> {
637 async fn send_signal(&mut self, signal: OverseerSignal) -> Result<(), SubsystemError> {
638 self.1
639 .unbounded_send(FromOrchestra::Signal(signal))
640 .map_err(|err| SubsystemError::QueueError(err.into_send_error()))
641 }
642
643 fn meter(&self) -> Meters {
644 Meters::new(self.0.meter(), self.1.meter())
645 }
646}
647
648impl<T: Send + Sync + 'static + Debug> overseer::SubsystemSender<T> for ToWorker<T> {
649 fn send_message<'life0, 'async_trait>(
650 &'life0 mut self,
651 msg: T,
652 ) -> ::core::pin::Pin<
653 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
654 >
655 where
656 'life0: 'async_trait,
657 Self: 'async_trait,
658 {
659 async {
660 if let Err(err) =
661 self.0.send(polkadot_overseer::FromOrchestra::Communication { msg }).await
662 {
663 gum::error!(
664 target: LOG_TARGET,
665 "Failed to send message to approval voting worker: {:?}, subsystem is probably shutting down.",
666 err
667 );
668 }
669 }
670 .boxed()
671 }
672
673 fn try_send_message(&mut self, msg: T) -> Result<(), metered::TrySendError<T>> {
674 self.0
675 .try_send(polkadot_overseer::FromOrchestra::Communication { msg })
676 .map_err(|result| {
677 let is_full = result.is_full();
678 let msg = match result.into_inner() {
679 polkadot_overseer::FromOrchestra::Signal(_) => {
680 panic!("Cannot happen variant is never built")
681 },
682 polkadot_overseer::FromOrchestra::Communication { msg } => msg,
683 };
684 if is_full {
685 metered::TrySendError::Full(msg)
686 } else {
687 metered::TrySendError::Closed(msg)
688 }
689 })
690 }
691
692 fn send_messages<'life0, 'async_trait, I>(
693 &'life0 mut self,
694 msgs: I,
695 ) -> ::core::pin::Pin<
696 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
697 >
698 where
699 I: IntoIterator<Item = T> + Send,
700 I::IntoIter: Send,
701 I: 'async_trait,
702 'life0: 'async_trait,
703 Self: 'async_trait,
704 {
705 async {
706 for msg in msgs {
707 self.send_message(msg).await;
708 }
709 }
710 .boxed()
711 }
712
713 fn send_unbounded_message(&mut self, msg: T) {
714 if let Err(err) =
715 self.1.unbounded_send(polkadot_overseer::FromOrchestra::Communication { msg })
716 {
717 gum::error!(
718 target: LOG_TARGET,
719 "Failed to send unbounded message to approval voting worker: {:?}, subsystem is probably shutting down.",
720 err
721 );
722 }
723 }
724
725 fn send_message_with_priority<'life0, 'async_trait, P>(
726 &'life0 mut self,
727 msg: T,
728 ) -> ::core::pin::Pin<
729 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
730 >
731 where
732 P: 'async_trait + Priority,
733 'life0: 'async_trait,
734 Self: 'async_trait,
735 {
736 match P::priority() {
737 polkadot_overseer::PriorityLevel::Normal => self.send_message(msg),
738 polkadot_overseer::PriorityLevel::High => {
739 async { self.send_unbounded_message(msg) }.boxed()
740 },
741 }
742 }
743
744 fn try_send_message_with_priority<P: Priority>(
745 &mut self,
746 msg: T,
747 ) -> Result<(), metered::TrySendError<T>> {
748 match P::priority() {
749 polkadot_overseer::PriorityLevel::Normal => self.try_send_message(msg),
750 polkadot_overseer::PriorityLevel::High => Ok(self.send_unbounded_message(msg)),
751 }
752 }
753}
754
755pub struct RxWorker<T: Send + Sync + 'static>(
757 MeteredReceiver<FromOrchestra<T>>,
758 UnboundedMeteredReceiver<FromOrchestra<T>>,
759);
760
761fn build_channels<T: Send + Sync + 'static>(
764 channel_name: String,
765 channel_size: usize,
766 metrics_watcher: &mut MetricsWatcher,
767) -> (ToWorker<T>, RxWorker<T>) {
768 let (tx_work, rx_work) = channel::<FromOrchestra<T>>(channel_size);
769 let (tx_work_unbounded, rx_work_unbounded) = unbounded::<FromOrchestra<T>>();
770 let to_worker = ToWorker(tx_work, tx_work_unbounded);
771
772 metrics_watcher.watch(channel_name, to_worker.meter());
773
774 (to_worker, RxWorker(rx_work, rx_work_unbounded))
775}
776
777fn build_worker_handles<M, Clos, State>(
782 channel_name: String,
783 channel_size: usize,
784 metrics_watcher: &mut MetricsWatcher,
785 prio_right: Clos,
786) -> (ToWorker<M>, WorkProvider<M, Clos, State>)
787where
788 M: Send + Sync + 'static,
789 Clos: FnMut(&mut State) -> PollNext,
790 State: Default,
791{
792 let (to_worker, rx_worker) = build_channels(channel_name, channel_size, metrics_watcher);
793 (to_worker, WorkProviderImpl::from_rx_worker(rx_worker, prio_right))
794}
795
796#[derive(Clone)]
799pub struct ApprovalVotingToApprovalDistribution<S: SubsystemSender<ApprovalVotingParallelMessage>>(
800 S,
801);
802
803impl<S: SubsystemSender<ApprovalVotingParallelMessage>>
804 overseer::SubsystemSender<ApprovalDistributionMessage>
805 for ApprovalVotingToApprovalDistribution<S>
806{
807 #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
808 fn send_message<'life0, 'async_trait>(
809 &'life0 mut self,
810 msg: ApprovalDistributionMessage,
811 ) -> ::core::pin::Pin<
812 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
813 >
814 where
815 'life0: 'async_trait,
816 Self: 'async_trait,
817 {
818 self.0.send_message(msg.into())
819 }
820
821 fn try_send_message(
822 &mut self,
823 msg: ApprovalDistributionMessage,
824 ) -> Result<(), metered::TrySendError<ApprovalDistributionMessage>> {
825 self.0.try_send_message(msg.into()).map_err(|err| match err {
826 metered::TrySendError::Closed(msg) => {
828 metered::TrySendError::Closed(msg.try_into().unwrap())
829 },
830 metered::TrySendError::Full(msg) => {
831 metered::TrySendError::Full(msg.try_into().unwrap())
832 },
833 })
834 }
835
836 #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
837 fn send_messages<'life0, 'async_trait, I>(
838 &'life0 mut self,
839 msgs: I,
840 ) -> ::core::pin::Pin<
841 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
842 >
843 where
844 I: IntoIterator<Item = ApprovalDistributionMessage> + Send,
845 I::IntoIter: Send,
846 I: 'async_trait,
847 'life0: 'async_trait,
848 Self: 'async_trait,
849 {
850 self.0.send_messages(msgs.into_iter().map(|msg| msg.into()))
851 }
852
853 fn send_unbounded_message(&mut self, msg: ApprovalDistributionMessage) {
854 self.0.send_unbounded_message(msg.into())
855 }
856
857 fn send_message_with_priority<'life0, 'async_trait, P>(
858 &'life0 mut self,
859 msg: ApprovalDistributionMessage,
860 ) -> ::core::pin::Pin<
861 Box<dyn ::core::future::Future<Output = ()> + ::core::marker::Send + 'async_trait>,
862 >
863 where
864 P: 'async_trait + Priority,
865 'life0: 'async_trait,
866 Self: 'async_trait,
867 {
868 self.0.send_message_with_priority::<P>(msg.into())
869 }
870
871 fn try_send_message_with_priority<P: Priority>(
872 &mut self,
873 msg: ApprovalDistributionMessage,
874 ) -> Result<(), metered::TrySendError<ApprovalDistributionMessage>> {
875 self.0.try_send_message_with_priority::<P>(msg.into()).map_err(|err| match err {
876 metered::TrySendError::Closed(msg) => {
878 metered::TrySendError::Closed(msg.try_into().unwrap())
879 },
880 metered::TrySendError::Full(msg) => {
881 metered::TrySendError::Full(msg.try_into().unwrap())
882 },
883 })
884 }
885}