1#![warn(missing_docs)]
63#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67 collections::{hash_map, HashMap},
68 fmt::{self, Debug},
69 pin::Pin,
70 sync::Arc,
71 time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use polkadot_primitives::{Block, BlockNumber, Hash};
77use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use polkadot_node_subsystem_types::messages::{
81 ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82 AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83 BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84 ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85 DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86 NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
87 ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use polkadot_node_subsystem_types::{
91 errors::{SubsystemError, SubsystemResult},
92 ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93 UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use polkadot_node_metrics::{
104 metrics::{prometheus, Metrics as MetricsTrait},
105 Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110 contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111 NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112 Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113 SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114 TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use sp_core::traits::SpawnNamed;
123
124pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128 fn as_ref(&self) -> &S {
129 &self.0
130 }
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134 fn clone(&self) -> Self {
135 Self(self.0.clone())
136 }
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140 fn spawn_blocking(
141 &self,
142 name: &'static str,
143 group: Option<&'static str>,
144 future: futures::future::BoxFuture<'static, ()>,
145 ) {
146 SpawnNamed::spawn_blocking(&self.0, name, group, future)
147 }
148 fn spawn(
149 &self,
150 name: &'static str,
151 group: Option<&'static str>,
152 future: futures::future::BoxFuture<'static, ()>,
153 ) {
154 SpawnNamed::spawn(&self.0, name, group, future)
155 }
156}
157
158#[async_trait::async_trait]
160pub trait HeadSupportsParachains {
161 async fn head_supports_parachains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsParachains for Arc<Client>
167where
168 Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170 async fn head_supports_parachains(&self, head: &Hash) -> bool {
171 self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173 }
174}
175
176#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183 pub fn new(raw: OverseerHandle) -> Self {
185 Self(raw)
186 }
187
188 pub async fn block_imported(&mut self, block: BlockInfo) {
190 self.send_and_log_error(Event::BlockImported(block)).await
191 }
192
193 pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195 self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await
196 }
197
198 pub async fn send_msg_with_priority(
200 &mut self,
201 msg: impl Into<AllMessages>,
202 origin: &'static str,
203 priority: PriorityLevel,
204 ) {
205 self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
206 .await
207 }
208
209 #[inline(always)]
211 pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
212 self.send_msg(msg, "").await
213 }
214
215 pub async fn block_finalized(&mut self, block: BlockInfo) {
217 self.send_and_log_error(Event::BlockFinalized(block)).await
218 }
219
220 pub async fn wait_for_activation(
227 &mut self,
228 hash: Hash,
229 response_channel: oneshot::Sender<SubsystemResult<()>>,
230 ) {
231 self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
232 hash,
233 response_channel,
234 }))
235 .await;
236 }
237
238 pub async fn stop(&mut self) {
240 self.send_and_log_error(Event::Stop).await;
241 }
242
243 async fn send_and_log_error(&mut self, event: Event) {
245 if self.0.send(event).await.is_err() {
246 gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
247 }
248 }
249}
250
251#[derive(Debug, Clone)]
258pub struct BlockInfo {
259 pub hash: Hash,
261 pub parent_hash: Hash,
263 pub number: BlockNumber,
265 pub unpin_handle: UnpinHandle,
267}
268
269impl From<BlockImportNotification<Block>> for BlockInfo {
270 fn from(n: BlockImportNotification<Block>) -> Self {
271 let hash = n.hash;
272 let parent_hash = n.header.parent_hash;
273 let number = n.header.number;
274 let unpin_handle = n.into_unpin_handle();
275
276 BlockInfo { hash, parent_hash, number, unpin_handle }
277 }
278}
279
280impl From<FinalityNotification<Block>> for BlockInfo {
281 fn from(n: FinalityNotification<Block>) -> Self {
282 let hash = n.hash;
283 let parent_hash = n.header.parent_hash;
284 let number = n.header.number;
285 let unpin_handle = n.into_unpin_handle();
286
287 BlockInfo { hash, parent_hash, number, unpin_handle }
288 }
289}
290
291#[derive(Debug)]
294pub enum Event {
295 BlockImported(BlockInfo),
302 BlockFinalized(BlockInfo),
304 MsgToSubsystem {
306 msg: AllMessages,
308 origin: &'static str,
310 priority: PriorityLevel,
312 },
313 ExternalRequest(ExternalRequest),
315 Stop,
317}
318
319#[derive(Debug)]
321pub enum ExternalRequest {
322 WaitForActivation {
325 hash: Hash,
327 response_channel: oneshot::Sender<SubsystemResult<()>>,
329 },
330}
331
332pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
335 let mut finality = client.finality_notification_stream();
336 let mut imports = client.import_notification_stream();
337
338 loop {
339 select! {
340 f = finality.next() => {
341 match f {
342 Some(block) => {
343 handle.block_finalized(block.into()).await;
344 }
345 None => break,
346 }
347 },
348 i = imports.next() => {
349 match i {
350 Some(block) => {
351 handle.block_imported(block.into()).await;
352 }
353 None => break,
354 }
355 },
356 complete => break,
357 }
358 }
359}
360
361#[orchestra(
477 gen=AllMessages,
478 event=Event,
479 signal=OverseerSignal,
480 error=SubsystemError,
481 message_capacity=2048,
482)]
483pub struct Overseer<SupportsParachains> {
484 #[subsystem(CandidateValidationMessage, sends: [
485 ChainApiMessage,
486 RuntimeApiMessage,
487 ])]
488 candidate_validation: CandidateValidation,
489
490 #[subsystem(sends: [
491 CandidateValidationMessage,
492 RuntimeApiMessage,
493 ])]
494 pvf_checker: PvfChecker,
495
496 #[subsystem(CandidateBackingMessage, sends: [
497 CandidateValidationMessage,
498 CollatorProtocolMessage,
499 ChainApiMessage,
500 AvailabilityDistributionMessage,
501 AvailabilityStoreMessage,
502 StatementDistributionMessage,
503 ProvisionerMessage,
504 RuntimeApiMessage,
505 ProspectiveParachainsMessage,
506 ])]
507 candidate_backing: CandidateBacking,
508
509 #[subsystem(StatementDistributionMessage, sends: [
510 NetworkBridgeTxMessage,
511 CandidateBackingMessage,
512 RuntimeApiMessage,
513 ProspectiveParachainsMessage,
514 ChainApiMessage,
515 ], can_receive_priority_messages)]
516 statement_distribution: StatementDistribution,
517
518 #[subsystem(AvailabilityDistributionMessage, sends: [
519 AvailabilityStoreMessage,
520 ChainApiMessage,
521 RuntimeApiMessage,
522 NetworkBridgeTxMessage,
523 ])]
524 availability_distribution: AvailabilityDistribution,
525
526 #[subsystem(AvailabilityRecoveryMessage, sends: [
527 NetworkBridgeTxMessage,
528 RuntimeApiMessage,
529 AvailabilityStoreMessage,
530 ])]
531 availability_recovery: AvailabilityRecovery,
532
533 #[subsystem(blocking, sends: [
534 AvailabilityStoreMessage,
535 RuntimeApiMessage,
536 BitfieldDistributionMessage,
537 ])]
538 bitfield_signing: BitfieldSigning,
539
540 #[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
541 RuntimeApiMessage,
542 NetworkBridgeTxMessage,
543 ProvisionerMessage,
544 ], can_receive_priority_messages)]
545 bitfield_distribution: BitfieldDistribution,
546
547 #[subsystem(ProvisionerMessage, sends: [
548 RuntimeApiMessage,
549 CandidateBackingMessage,
550 DisputeCoordinatorMessage,
551 ProspectiveParachainsMessage,
552 ])]
553 provisioner: Provisioner,
554
555 #[subsystem(blocking, RuntimeApiMessage, sends: [])]
556 runtime_api: RuntimeApi,
557
558 #[subsystem(blocking, AvailabilityStoreMessage, sends: [
559 ChainApiMessage,
560 RuntimeApiMessage,
561 ])]
562 availability_store: AvailabilityStore,
563
564 #[subsystem(blocking, NetworkBridgeRxMessage, sends: [
565 BitfieldDistributionMessage,
566 StatementDistributionMessage,
567 ApprovalDistributionMessage,
568 ApprovalVotingParallelMessage,
569 GossipSupportMessage,
570 DisputeDistributionMessage,
571 CollationGenerationMessage,
572 CollatorProtocolMessage,
573 ])]
574 network_bridge_rx: NetworkBridgeRx,
575
576 #[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
577 network_bridge_tx: NetworkBridgeTx,
578
579 #[subsystem(blocking, ChainApiMessage, sends: [])]
580 chain_api: ChainApi,
581
582 #[subsystem(CollationGenerationMessage, sends: [
583 RuntimeApiMessage,
584 CollatorProtocolMessage,
585 ])]
586 collation_generation: CollationGeneration,
587
588 #[subsystem(CollatorProtocolMessage, sends: [
589 NetworkBridgeTxMessage,
590 RuntimeApiMessage,
591 CandidateBackingMessage,
592 ChainApiMessage,
593 ProspectiveParachainsMessage,
594 ])]
595 collator_protocol: CollatorProtocol,
596
597 #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
598 NetworkBridgeTxMessage,
599 ApprovalVotingMessage,
600 RuntimeApiMessage,
601 ], can_receive_priority_messages)]
602 approval_distribution: ApprovalDistribution,
603
604 #[subsystem(blocking, ApprovalVotingMessage, sends: [
605 ApprovalDistributionMessage,
606 AvailabilityRecoveryMessage,
607 CandidateValidationMessage,
608 ChainApiMessage,
609 ChainSelectionMessage,
610 DisputeCoordinatorMessage,
611 RuntimeApiMessage,
612 ])]
613 approval_voting: ApprovalVoting,
614 #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
615 AvailabilityRecoveryMessage,
616 CandidateValidationMessage,
617 ChainApiMessage,
618 ChainSelectionMessage,
619 DisputeCoordinatorMessage,
620 RuntimeApiMessage,
621 NetworkBridgeTxMessage,
622 ApprovalVotingMessage,
623 ApprovalDistributionMessage,
624 ApprovalVotingParallelMessage,
625 ], can_receive_priority_messages)]
626 approval_voting_parallel: ApprovalVotingParallel,
627 #[subsystem(GossipSupportMessage, sends: [
628 NetworkBridgeTxMessage,
629 NetworkBridgeRxMessage, RuntimeApiMessage,
631 ChainSelectionMessage,
632 ChainApiMessage,
633 ], can_receive_priority_messages)]
634 gossip_support: GossipSupport,
635
636 #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
637 RuntimeApiMessage,
638 ChainApiMessage,
639 DisputeDistributionMessage,
640 CandidateValidationMessage,
641 ApprovalVotingMessage,
642 AvailabilityStoreMessage,
643 AvailabilityRecoveryMessage,
644 ChainSelectionMessage,
645 ApprovalVotingParallelMessage,
646 ], can_receive_priority_messages)]
647 dispute_coordinator: DisputeCoordinator,
648
649 #[subsystem(DisputeDistributionMessage, sends: [
650 RuntimeApiMessage,
651 DisputeCoordinatorMessage,
652 NetworkBridgeTxMessage,
653 ])]
654 dispute_distribution: DisputeDistribution,
655
656 #[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
657 chain_selection: ChainSelection,
658
659 #[subsystem(ProspectiveParachainsMessage, sends: [
660 RuntimeApiMessage,
661 ChainApiMessage,
662 ])]
663 prospective_parachains: ProspectiveParachains,
664
665 pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
667
668 pub active_leaves: HashMap<Hash, BlockNumber>,
670
671 pub supports_parachains: SupportsParachains,
673
674 pub metrics: OverseerMetrics,
676}
677
678pub fn spawn_metronome_metrics<S, SupportsParachains>(
680 overseer: &mut Overseer<S, SupportsParachains>,
681 metronome_metrics: OverseerMetrics,
682) -> Result<(), SubsystemError>
683where
684 S: Spawner,
685 SupportsParachains: HeadSupportsParachains,
686{
687 struct ExtractNameAndMeters;
688
689 impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
690 type Output = Option<(&'static str, SubsystemMeters)>;
691
692 fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
693 subsystem
694 .instance
695 .as_ref()
696 .map(|instance| (instance.name, instance.meters.clone()))
697 }
698 }
699 let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
700
701 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
702 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
703 match memory_stats::MemoryAllocationTracker::new() {
704 Ok(memory_stats) =>
705 Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
706 Ok(memory_stats_snapshot) => {
707 gum::trace!(
708 target: LOG_TARGET,
709 "memory_stats: {:?}",
710 &memory_stats_snapshot
711 );
712 metrics.memory_stats_snapshot(memory_stats_snapshot);
713 },
714 Err(e) =>
715 gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
716 }),
717 Err(_) => {
718 gum::debug!(
719 target: LOG_TARGET,
720 "Memory allocation tracking is not supported by the allocator.",
721 );
722
723 Box::new(|_| {})
724 },
725 };
726
727 #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
728 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
729
730 let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
731 collect_memory_stats(&metronome_metrics);
732
733 metronome_metrics.channel_metrics_snapshot(
737 subsystem_meters
738 .iter()
739 .cloned()
740 .flatten()
741 .map(|(name, ref meters)| (name, meters.read())),
742 );
743
744 futures::future::ready(())
745 });
746 overseer
747 .spawner()
748 .spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
749
750 Ok(())
751}
752
753impl<S, SupportsParachains> Overseer<S, SupportsParachains>
754where
755 SupportsParachains: HeadSupportsParachains,
756 S: Spawner,
757{
758 async fn stop(mut self) {
760 let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
761 }
762
763 pub async fn run(self) {
767 if let Err(err) = self.run_inner().await {
768 gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
769 }
770 }
771
772 async fn run_inner(mut self) -> SubsystemResult<()> {
773 let metrics = self.metrics.clone();
774 spawn_metronome_metrics(&mut self, metrics)?;
775
776 loop {
777 select! {
778 msg = self.events_rx.select_next_some() => {
779 match msg {
780 Event::MsgToSubsystem { msg, origin, priority } => {
781 match priority {
782 PriorityLevel::Normal => {
783 self.route_message(msg.into(), origin).await?;
784 },
785 PriorityLevel::High => {
786 self.route_message_with_priority::<HighPriority>(msg.into(), origin).await?;
787 },
788 }
789 self.metrics.on_message_relayed();
790 }
791 Event::Stop => {
792 self.stop().await;
793 return Ok(());
794 }
795 Event::BlockImported(block) => {
796 self.block_imported(block).await?;
797 }
798 Event::BlockFinalized(block) => {
799 self.block_finalized(block).await?;
800 }
801 Event::ExternalRequest(request) => {
802 self.handle_external_request(request);
803 }
804 }
805 },
806 msg = self.to_orchestra_rx.select_next_some() => {
807 match msg {
808 ToOrchestra::SpawnJob { name, subsystem, s } => {
809 self.spawn_job(name, subsystem, s);
810 }
811 ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
812 self.spawn_blocking_job(name, subsystem, s);
813 }
814 }
815 },
816 res = self.running_subsystems.select_next_some() => {
817 gum::error!(
818 target: LOG_TARGET,
819 subsystem = ?res,
820 "subsystem finished unexpectedly",
821 );
822 self.stop().await;
823 return res;
824 },
825 }
826 }
827 }
828
829 async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
830 match self.active_leaves.entry(block.hash) {
831 hash_map::Entry::Vacant(entry) => entry.insert(block.number),
832 hash_map::Entry::Occupied(entry) => {
833 debug_assert_eq!(*entry.get(), block.number);
834 return Ok(())
835 },
836 };
837
838 let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
839 Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
840 hash: block.hash,
841 number: block.number,
842 unpin_handle: block.unpin_handle,
843 }),
844 None => ActiveLeavesUpdate::default(),
845 };
846
847 if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
848 debug_assert_eq!(block.number.saturating_sub(1), number);
849 update.deactivated.push(block.parent_hash);
850 self.on_head_deactivated(&block.parent_hash);
851 }
852
853 self.clean_up_external_listeners();
854
855 if !update.is_empty() {
856 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
857 }
858 Ok(())
859 }
860
861 async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
862 let mut update = ActiveLeavesUpdate::default();
863
864 self.active_leaves.retain(|h, n| {
865 if *n <= block.number && *h != block.hash {
868 update.deactivated.push(*h);
869 false
870 } else {
871 true
872 }
873 });
874
875 for deactivated in &update.deactivated {
876 self.on_head_deactivated(deactivated)
877 }
878
879 self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
880 .await?;
881
882 if !update.is_empty() {
887 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
888 }
889
890 Ok(())
891 }
892
893 async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
896 if !self.supports_parachains.head_supports_parachains(hash).await {
897 return None
898 }
899
900 self.metrics.on_head_activated();
901 if let Some(listeners) = self.activation_external_listeners.remove(hash) {
902 gum::trace!(
903 target: LOG_TARGET,
904 relay_parent = ?hash,
905 "Leaf got activated, notifying external listeners"
906 );
907 for listener in listeners {
908 let _ = listener.send(Ok(()));
910 }
911 }
912
913 Some(())
914 }
915
916 fn on_head_deactivated(&mut self, hash: &Hash) {
917 self.metrics.on_head_deactivated();
918 self.activation_external_listeners.remove(hash);
919 }
920
921 fn clean_up_external_listeners(&mut self) {
922 self.activation_external_listeners.retain(|_, v| {
923 v.retain(|c| !c.is_canceled());
925 !v.is_empty()
926 })
927 }
928
929 fn handle_external_request(&mut self, request: ExternalRequest) {
930 match request {
931 ExternalRequest::WaitForActivation { hash, response_channel } => {
932 if self.active_leaves.get(&hash).is_some() {
933 gum::trace!(
934 target: LOG_TARGET,
935 relay_parent = ?hash,
936 "Leaf was already ready - answering `WaitForActivation`"
937 );
938 let _ = response_channel.send(Ok(()));
940 } else {
941 gum::trace!(
942 target: LOG_TARGET,
943 relay_parent = ?hash,
944 "Leaf not yet ready - queuing `WaitForActivation` sender"
945 );
946 self.activation_external_listeners
947 .entry(hash)
948 .or_default()
949 .push(response_channel);
950 }
951 },
952 }
953 }
954
955 fn spawn_job(
956 &mut self,
957 task_name: &'static str,
958 subsystem_name: Option<&'static str>,
959 j: BoxFuture<'static, ()>,
960 ) {
961 self.spawner.spawn(task_name, subsystem_name, j);
962 }
963
964 fn spawn_blocking_job(
965 &mut self,
966 task_name: &'static str,
967 subsystem_name: Option<&'static str>,
968 j: BoxFuture<'static, ()>,
969 ) {
970 self.spawner.spawn_blocking(task_name, subsystem_name, j);
971 }
972}