1#![warn(missing_docs)]
63#![allow(dead_code)] use std::{
66 collections::{hash_map, HashMap},
67 fmt::{self, Debug},
68 pin::Pin,
69 sync::Arc,
70 time::Duration,
71};
72
73use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
74
75use polkadot_primitives::{Block, BlockNumber, Hash};
76use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
77
78use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
79use polkadot_node_subsystem_types::messages::{
80 ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
81 AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
82 BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
83 ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
84 DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
85 NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
86 ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
87};
88
89pub use polkadot_node_subsystem_types::{
90 errors::{SubsystemError, SubsystemResult},
91 ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
92 UnpinHandle,
93};
94
95pub mod metrics;
96pub use self::metrics::Metrics as OverseerMetrics;
97
98pub mod dummy;
100pub use self::dummy::DummySubsystem;
101
102pub use polkadot_node_metrics::{
103 metrics::{prometheus, Metrics as MetricsTrait},
104 Metronome,
105};
106
107pub use orchestra as gen;
108pub use orchestra::{
109 contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
110 NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
111 Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
112 SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
113 TrySendError,
114};
115
116#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
117mod memory_stats;
118#[cfg(test)]
119mod tests;
120
121use sp_core::traits::SpawnNamed;
122
123pub struct SpawnGlue<S>(pub S);
125
126impl<S> AsRef<S> for SpawnGlue<S> {
127 fn as_ref(&self) -> &S {
128 &self.0
129 }
130}
131
132impl<S: Clone> Clone for SpawnGlue<S> {
133 fn clone(&self) -> Self {
134 Self(self.0.clone())
135 }
136}
137
138impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
139 fn spawn_blocking(
140 &self,
141 name: &'static str,
142 group: Option<&'static str>,
143 future: futures::future::BoxFuture<'static, ()>,
144 ) {
145 SpawnNamed::spawn_blocking(&self.0, name, group, future)
146 }
147 fn spawn(
148 &self,
149 name: &'static str,
150 group: Option<&'static str>,
151 future: futures::future::BoxFuture<'static, ()>,
152 ) {
153 SpawnNamed::spawn(&self.0, name, group, future)
154 }
155}
156
157#[async_trait::async_trait]
159pub trait HeadSupportsParachains {
160 async fn head_supports_parachains(&self, head: &Hash) -> bool;
162}
163
164#[async_trait::async_trait]
165impl<Client> HeadSupportsParachains for Arc<Client>
166where
167 Client: RuntimeApiSubsystemClient + Sync + Send,
168{
169 async fn head_supports_parachains(&self, head: &Hash) -> bool {
170 self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
172 }
173}
174
175#[derive(Clone)]
179pub struct Handle(OverseerHandle);
180
181impl Handle {
182 pub fn new(raw: OverseerHandle) -> Self {
184 Self(raw)
185 }
186
187 pub async fn block_imported(&mut self, block: BlockInfo) {
189 self.send_and_log_error(Event::BlockImported(block)).await
190 }
191
192 pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
194 self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
195 }
196
197 #[inline(always)]
199 pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
200 self.send_msg(msg, "").await
201 }
202
203 pub async fn block_finalized(&mut self, block: BlockInfo) {
205 self.send_and_log_error(Event::BlockFinalized(block)).await
206 }
207
208 pub async fn wait_for_activation(
215 &mut self,
216 hash: Hash,
217 response_channel: oneshot::Sender<SubsystemResult<()>>,
218 ) {
219 self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
220 hash,
221 response_channel,
222 }))
223 .await;
224 }
225
226 pub async fn stop(&mut self) {
228 self.send_and_log_error(Event::Stop).await;
229 }
230
231 async fn send_and_log_error(&mut self, event: Event) {
233 if self.0.send(event).await.is_err() {
234 gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
235 }
236 }
237}
238
239#[derive(Debug, Clone)]
246pub struct BlockInfo {
247 pub hash: Hash,
249 pub parent_hash: Hash,
251 pub number: BlockNumber,
253 pub unpin_handle: UnpinHandle,
255}
256
257impl From<BlockImportNotification<Block>> for BlockInfo {
258 fn from(n: BlockImportNotification<Block>) -> Self {
259 let hash = n.hash;
260 let parent_hash = n.header.parent_hash;
261 let number = n.header.number;
262 let unpin_handle = n.into_unpin_handle();
263
264 BlockInfo { hash, parent_hash, number, unpin_handle }
265 }
266}
267
268impl From<FinalityNotification<Block>> for BlockInfo {
269 fn from(n: FinalityNotification<Block>) -> Self {
270 let hash = n.hash;
271 let parent_hash = n.header.parent_hash;
272 let number = n.header.number;
273 let unpin_handle = n.into_unpin_handle();
274
275 BlockInfo { hash, parent_hash, number, unpin_handle }
276 }
277}
278
279#[derive(Debug)]
282pub enum Event {
283 BlockImported(BlockInfo),
290 BlockFinalized(BlockInfo),
292 MsgToSubsystem {
294 msg: AllMessages,
296 origin: &'static str,
298 },
299 ExternalRequest(ExternalRequest),
301 Stop,
303}
304
305#[derive(Debug)]
307pub enum ExternalRequest {
308 WaitForActivation {
311 hash: Hash,
313 response_channel: oneshot::Sender<SubsystemResult<()>>,
315 },
316}
317
318pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
321 let mut finality = client.finality_notification_stream();
322 let mut imports = client.import_notification_stream();
323
324 loop {
325 select! {
326 f = finality.next() => {
327 match f {
328 Some(block) => {
329 handle.block_finalized(block.into()).await;
330 }
331 None => break,
332 }
333 },
334 i = imports.next() => {
335 match i {
336 Some(block) => {
337 handle.block_imported(block.into()).await;
338 }
339 None => break,
340 }
341 },
342 complete => break,
343 }
344 }
345}
346
347#[orchestra(
463 gen=AllMessages,
464 event=Event,
465 signal=OverseerSignal,
466 error=SubsystemError,
467 message_capacity=2048,
468)]
469pub struct Overseer<SupportsParachains> {
470 #[subsystem(CandidateValidationMessage, sends: [
471 ChainApiMessage,
472 RuntimeApiMessage,
473 ])]
474 candidate_validation: CandidateValidation,
475
476 #[subsystem(sends: [
477 CandidateValidationMessage,
478 RuntimeApiMessage,
479 ])]
480 pvf_checker: PvfChecker,
481
482 #[subsystem(CandidateBackingMessage, sends: [
483 CandidateValidationMessage,
484 CollatorProtocolMessage,
485 ChainApiMessage,
486 AvailabilityDistributionMessage,
487 AvailabilityStoreMessage,
488 StatementDistributionMessage,
489 ProvisionerMessage,
490 RuntimeApiMessage,
491 ProspectiveParachainsMessage,
492 ])]
493 candidate_backing: CandidateBacking,
494
495 #[subsystem(StatementDistributionMessage, sends: [
496 NetworkBridgeTxMessage,
497 CandidateBackingMessage,
498 RuntimeApiMessage,
499 ProspectiveParachainsMessage,
500 ChainApiMessage,
501 ], can_receive_priority_messages)]
502 statement_distribution: StatementDistribution,
503
504 #[subsystem(AvailabilityDistributionMessage, sends: [
505 AvailabilityStoreMessage,
506 ChainApiMessage,
507 RuntimeApiMessage,
508 NetworkBridgeTxMessage,
509 ])]
510 availability_distribution: AvailabilityDistribution,
511
512 #[subsystem(AvailabilityRecoveryMessage, sends: [
513 NetworkBridgeTxMessage,
514 RuntimeApiMessage,
515 AvailabilityStoreMessage,
516 ])]
517 availability_recovery: AvailabilityRecovery,
518
519 #[subsystem(blocking, sends: [
520 AvailabilityStoreMessage,
521 RuntimeApiMessage,
522 BitfieldDistributionMessage,
523 ])]
524 bitfield_signing: BitfieldSigning,
525
526 #[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
527 RuntimeApiMessage,
528 NetworkBridgeTxMessage,
529 ProvisionerMessage,
530 ], can_receive_priority_messages)]
531 bitfield_distribution: BitfieldDistribution,
532
533 #[subsystem(ProvisionerMessage, sends: [
534 RuntimeApiMessage,
535 CandidateBackingMessage,
536 ChainApiMessage,
537 DisputeCoordinatorMessage,
538 ProspectiveParachainsMessage,
539 ])]
540 provisioner: Provisioner,
541
542 #[subsystem(blocking, RuntimeApiMessage, sends: [])]
543 runtime_api: RuntimeApi,
544
545 #[subsystem(blocking, AvailabilityStoreMessage, sends: [
546 ChainApiMessage,
547 RuntimeApiMessage,
548 ])]
549 availability_store: AvailabilityStore,
550
551 #[subsystem(blocking, NetworkBridgeRxMessage, sends: [
552 BitfieldDistributionMessage,
553 StatementDistributionMessage,
554 ApprovalDistributionMessage,
555 ApprovalVotingParallelMessage,
556 GossipSupportMessage,
557 DisputeDistributionMessage,
558 CollationGenerationMessage,
559 CollatorProtocolMessage,
560 ])]
561 network_bridge_rx: NetworkBridgeRx,
562
563 #[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
564 network_bridge_tx: NetworkBridgeTx,
565
566 #[subsystem(blocking, ChainApiMessage, sends: [])]
567 chain_api: ChainApi,
568
569 #[subsystem(CollationGenerationMessage, sends: [
570 RuntimeApiMessage,
571 CollatorProtocolMessage,
572 ])]
573 collation_generation: CollationGeneration,
574
575 #[subsystem(CollatorProtocolMessage, sends: [
576 NetworkBridgeTxMessage,
577 RuntimeApiMessage,
578 CandidateBackingMessage,
579 ChainApiMessage,
580 ProspectiveParachainsMessage,
581 ])]
582 collator_protocol: CollatorProtocol,
583
584 #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
585 NetworkBridgeTxMessage,
586 ApprovalVotingMessage,
587 RuntimeApiMessage,
588 ], can_receive_priority_messages)]
589 approval_distribution: ApprovalDistribution,
590
591 #[subsystem(blocking, ApprovalVotingMessage, sends: [
592 ApprovalDistributionMessage,
593 AvailabilityRecoveryMessage,
594 CandidateValidationMessage,
595 ChainApiMessage,
596 ChainSelectionMessage,
597 DisputeCoordinatorMessage,
598 RuntimeApiMessage,
599 ])]
600 approval_voting: ApprovalVoting,
601 #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
602 AvailabilityRecoveryMessage,
603 CandidateValidationMessage,
604 ChainApiMessage,
605 ChainSelectionMessage,
606 DisputeCoordinatorMessage,
607 RuntimeApiMessage,
608 NetworkBridgeTxMessage,
609 ApprovalVotingMessage,
610 ApprovalDistributionMessage,
611 ApprovalVotingParallelMessage,
612 ])]
613 approval_voting_parallel: ApprovalVotingParallel,
614 #[subsystem(GossipSupportMessage, sends: [
615 NetworkBridgeTxMessage,
616 NetworkBridgeRxMessage, RuntimeApiMessage,
618 ChainSelectionMessage,
619 ], can_receive_priority_messages)]
620 gossip_support: GossipSupport,
621
622 #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
623 RuntimeApiMessage,
624 ChainApiMessage,
625 DisputeDistributionMessage,
626 CandidateValidationMessage,
627 ApprovalVotingMessage,
628 AvailabilityStoreMessage,
629 AvailabilityRecoveryMessage,
630 ChainSelectionMessage,
631 ApprovalVotingParallelMessage,
632 ])]
633 dispute_coordinator: DisputeCoordinator,
634
635 #[subsystem(DisputeDistributionMessage, sends: [
636 RuntimeApiMessage,
637 DisputeCoordinatorMessage,
638 NetworkBridgeTxMessage,
639 ])]
640 dispute_distribution: DisputeDistribution,
641
642 #[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
643 chain_selection: ChainSelection,
644
645 #[subsystem(ProspectiveParachainsMessage, sends: [
646 RuntimeApiMessage,
647 ChainApiMessage,
648 ])]
649 prospective_parachains: ProspectiveParachains,
650
651 pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
653
654 pub active_leaves: HashMap<Hash, BlockNumber>,
656
657 pub supports_parachains: SupportsParachains,
659
660 pub metrics: OverseerMetrics,
662}
663
664pub fn spawn_metronome_metrics<S, SupportsParachains>(
666 overseer: &mut Overseer<S, SupportsParachains>,
667 metronome_metrics: OverseerMetrics,
668) -> Result<(), SubsystemError>
669where
670 S: Spawner,
671 SupportsParachains: HeadSupportsParachains,
672{
673 struct ExtractNameAndMeters;
674
675 impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
676 type Output = Option<(&'static str, SubsystemMeters)>;
677
678 fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
679 subsystem
680 .instance
681 .as_ref()
682 .map(|instance| (instance.name, instance.meters.clone()))
683 }
684 }
685 let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
686
687 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
688 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
689 match memory_stats::MemoryAllocationTracker::new() {
690 Ok(memory_stats) =>
691 Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
692 Ok(memory_stats_snapshot) => {
693 gum::trace!(
694 target: LOG_TARGET,
695 "memory_stats: {:?}",
696 &memory_stats_snapshot
697 );
698 metrics.memory_stats_snapshot(memory_stats_snapshot);
699 },
700 Err(e) =>
701 gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
702 }),
703 Err(_) => {
704 gum::debug!(
705 target: LOG_TARGET,
706 "Memory allocation tracking is not supported by the allocator.",
707 );
708
709 Box::new(|_| {})
710 },
711 };
712
713 #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
714 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
715
716 let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
717 collect_memory_stats(&metronome_metrics);
718
719 metronome_metrics.channel_metrics_snapshot(
723 subsystem_meters
724 .iter()
725 .cloned()
726 .flatten()
727 .map(|(name, ref meters)| (name, meters.read())),
728 );
729
730 futures::future::ready(())
731 });
732 overseer
733 .spawner()
734 .spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
735
736 Ok(())
737}
738
739impl<S, SupportsParachains> Overseer<S, SupportsParachains>
740where
741 SupportsParachains: HeadSupportsParachains,
742 S: Spawner,
743{
744 async fn stop(mut self) {
746 let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
747 }
748
749 pub async fn run(self) {
753 if let Err(err) = self.run_inner().await {
754 gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
755 }
756 }
757
758 async fn run_inner(mut self) -> SubsystemResult<()> {
759 let metrics = self.metrics.clone();
760 spawn_metronome_metrics(&mut self, metrics)?;
761
762 loop {
763 select! {
764 msg = self.events_rx.select_next_some() => {
765 match msg {
766 Event::MsgToSubsystem { msg, origin } => {
767 self.route_message(msg.into(), origin).await?;
768 self.metrics.on_message_relayed();
769 }
770 Event::Stop => {
771 self.stop().await;
772 return Ok(());
773 }
774 Event::BlockImported(block) => {
775 self.block_imported(block).await?;
776 }
777 Event::BlockFinalized(block) => {
778 self.block_finalized(block).await?;
779 }
780 Event::ExternalRequest(request) => {
781 self.handle_external_request(request);
782 }
783 }
784 },
785 msg = self.to_orchestra_rx.select_next_some() => {
786 match msg {
787 ToOrchestra::SpawnJob { name, subsystem, s } => {
788 self.spawn_job(name, subsystem, s);
789 }
790 ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
791 self.spawn_blocking_job(name, subsystem, s);
792 }
793 }
794 },
795 res = self.running_subsystems.select_next_some() => {
796 gum::error!(
797 target: LOG_TARGET,
798 subsystem = ?res,
799 "subsystem finished unexpectedly",
800 );
801 self.stop().await;
802 return res;
803 },
804 }
805 }
806 }
807
808 async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
809 match self.active_leaves.entry(block.hash) {
810 hash_map::Entry::Vacant(entry) => entry.insert(block.number),
811 hash_map::Entry::Occupied(entry) => {
812 debug_assert_eq!(*entry.get(), block.number);
813 return Ok(())
814 },
815 };
816
817 let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
818 Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
819 hash: block.hash,
820 number: block.number,
821 unpin_handle: block.unpin_handle,
822 }),
823 None => ActiveLeavesUpdate::default(),
824 };
825
826 if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
827 debug_assert_eq!(block.number.saturating_sub(1), number);
828 update.deactivated.push(block.parent_hash);
829 self.on_head_deactivated(&block.parent_hash);
830 }
831
832 self.clean_up_external_listeners();
833
834 if !update.is_empty() {
835 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
836 }
837 Ok(())
838 }
839
840 async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
841 let mut update = ActiveLeavesUpdate::default();
842
843 self.active_leaves.retain(|h, n| {
844 if *n <= block.number && *h != block.hash {
847 update.deactivated.push(*h);
848 false
849 } else {
850 true
851 }
852 });
853
854 for deactivated in &update.deactivated {
855 self.on_head_deactivated(deactivated)
856 }
857
858 self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
859 .await?;
860
861 if !update.is_empty() {
866 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
867 }
868
869 Ok(())
870 }
871
872 async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
875 if !self.supports_parachains.head_supports_parachains(hash).await {
876 return None
877 }
878
879 self.metrics.on_head_activated();
880 if let Some(listeners) = self.activation_external_listeners.remove(hash) {
881 gum::trace!(
882 target: LOG_TARGET,
883 relay_parent = ?hash,
884 "Leaf got activated, notifying external listeners"
885 );
886 for listener in listeners {
887 let _ = listener.send(Ok(()));
889 }
890 }
891
892 Some(())
893 }
894
895 fn on_head_deactivated(&mut self, hash: &Hash) {
896 self.metrics.on_head_deactivated();
897 self.activation_external_listeners.remove(hash);
898 }
899
900 fn clean_up_external_listeners(&mut self) {
901 self.activation_external_listeners.retain(|_, v| {
902 v.retain(|c| !c.is_canceled());
904 !v.is_empty()
905 })
906 }
907
908 fn handle_external_request(&mut self, request: ExternalRequest) {
909 match request {
910 ExternalRequest::WaitForActivation { hash, response_channel } => {
911 if self.active_leaves.get(&hash).is_some() {
912 gum::trace!(
913 target: LOG_TARGET,
914 relay_parent = ?hash,
915 "Leaf was already ready - answering `WaitForActivation`"
916 );
917 let _ = response_channel.send(Ok(()));
919 } else {
920 gum::trace!(
921 target: LOG_TARGET,
922 relay_parent = ?hash,
923 "Leaf not yet ready - queuing `WaitForActivation` sender"
924 );
925 self.activation_external_listeners
926 .entry(hash)
927 .or_default()
928 .push(response_channel);
929 }
930 },
931 }
932 }
933
934 fn spawn_job(
935 &mut self,
936 task_name: &'static str,
937 subsystem_name: Option<&'static str>,
938 j: BoxFuture<'static, ()>,
939 ) {
940 self.spawner.spawn(task_name, subsystem_name, j);
941 }
942
943 fn spawn_blocking_job(
944 &mut self,
945 task_name: &'static str,
946 subsystem_name: Option<&'static str>,
947 j: BoxFuture<'static, ()>,
948 ) {
949 self.spawner.spawn_blocking(task_name, subsystem_name, j);
950 }
951}