1#![warn(missing_docs)]
63#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67 collections::{hash_map, HashMap},
68 fmt::{self, Debug},
69 pin::Pin,
70 sync::Arc,
71 time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use polkadot_primitives::{Block, BlockNumber, Hash};
77use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use polkadot_node_subsystem_types::messages::{
81 ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82 AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83 BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84 ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85 DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86 NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
87 ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use polkadot_node_subsystem_types::{
91 errors::{SubsystemError, SubsystemResult},
92 ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93 UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use polkadot_node_metrics::{
104 metrics::{prometheus, Metrics as MetricsTrait},
105 Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110 contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111 NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112 Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113 SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114 TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use sp_core::traits::SpawnNamed;
123
124pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128 fn as_ref(&self) -> &S {
129 &self.0
130 }
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134 fn clone(&self) -> Self {
135 Self(self.0.clone())
136 }
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140 fn spawn_blocking(
141 &self,
142 name: &'static str,
143 group: Option<&'static str>,
144 future: futures::future::BoxFuture<'static, ()>,
145 ) {
146 SpawnNamed::spawn_blocking(&self.0, name, group, future)
147 }
148 fn spawn(
149 &self,
150 name: &'static str,
151 group: Option<&'static str>,
152 future: futures::future::BoxFuture<'static, ()>,
153 ) {
154 SpawnNamed::spawn(&self.0, name, group, future)
155 }
156}
157
158#[async_trait::async_trait]
160pub trait HeadSupportsParachains {
161 async fn head_supports_parachains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsParachains for Arc<Client>
167where
168 Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170 async fn head_supports_parachains(&self, head: &Hash) -> bool {
171 self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173 }
174}
175
176#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183 pub fn new(raw: OverseerHandle) -> Self {
185 Self(raw)
186 }
187
188 pub async fn block_imported(&mut self, block: BlockInfo) {
190 self.send_and_log_error(Event::BlockImported(block)).await
191 }
192
193 pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195 self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
196 }
197
198 #[inline(always)]
200 pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
201 self.send_msg(msg, "").await
202 }
203
204 pub async fn block_finalized(&mut self, block: BlockInfo) {
206 self.send_and_log_error(Event::BlockFinalized(block)).await
207 }
208
209 pub async fn wait_for_activation(
216 &mut self,
217 hash: Hash,
218 response_channel: oneshot::Sender<SubsystemResult<()>>,
219 ) {
220 self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
221 hash,
222 response_channel,
223 }))
224 .await;
225 }
226
227 pub async fn stop(&mut self) {
229 self.send_and_log_error(Event::Stop).await;
230 }
231
232 async fn send_and_log_error(&mut self, event: Event) {
234 if self.0.send(event).await.is_err() {
235 gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
236 }
237 }
238}
239
240#[derive(Debug, Clone)]
247pub struct BlockInfo {
248 pub hash: Hash,
250 pub parent_hash: Hash,
252 pub number: BlockNumber,
254 pub unpin_handle: UnpinHandle,
256}
257
258impl From<BlockImportNotification<Block>> for BlockInfo {
259 fn from(n: BlockImportNotification<Block>) -> Self {
260 let hash = n.hash;
261 let parent_hash = n.header.parent_hash;
262 let number = n.header.number;
263 let unpin_handle = n.into_unpin_handle();
264
265 BlockInfo { hash, parent_hash, number, unpin_handle }
266 }
267}
268
269impl From<FinalityNotification<Block>> for BlockInfo {
270 fn from(n: FinalityNotification<Block>) -> Self {
271 let hash = n.hash;
272 let parent_hash = n.header.parent_hash;
273 let number = n.header.number;
274 let unpin_handle = n.into_unpin_handle();
275
276 BlockInfo { hash, parent_hash, number, unpin_handle }
277 }
278}
279
280#[derive(Debug)]
283pub enum Event {
284 BlockImported(BlockInfo),
291 BlockFinalized(BlockInfo),
293 MsgToSubsystem {
295 msg: AllMessages,
297 origin: &'static str,
299 },
300 ExternalRequest(ExternalRequest),
302 Stop,
304}
305
306#[derive(Debug)]
308pub enum ExternalRequest {
309 WaitForActivation {
312 hash: Hash,
314 response_channel: oneshot::Sender<SubsystemResult<()>>,
316 },
317}
318
319pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
322 let mut finality = client.finality_notification_stream();
323 let mut imports = client.import_notification_stream();
324
325 loop {
326 select! {
327 f = finality.next() => {
328 match f {
329 Some(block) => {
330 handle.block_finalized(block.into()).await;
331 }
332 None => break,
333 }
334 },
335 i = imports.next() => {
336 match i {
337 Some(block) => {
338 handle.block_imported(block.into()).await;
339 }
340 None => break,
341 }
342 },
343 complete => break,
344 }
345 }
346}
347
348#[orchestra(
464 gen=AllMessages,
465 event=Event,
466 signal=OverseerSignal,
467 error=SubsystemError,
468 message_capacity=2048,
469)]
470pub struct Overseer<SupportsParachains> {
471 #[subsystem(CandidateValidationMessage, sends: [
472 ChainApiMessage,
473 RuntimeApiMessage,
474 ])]
475 candidate_validation: CandidateValidation,
476
477 #[subsystem(sends: [
478 CandidateValidationMessage,
479 RuntimeApiMessage,
480 ])]
481 pvf_checker: PvfChecker,
482
483 #[subsystem(CandidateBackingMessage, sends: [
484 CandidateValidationMessage,
485 CollatorProtocolMessage,
486 ChainApiMessage,
487 AvailabilityDistributionMessage,
488 AvailabilityStoreMessage,
489 StatementDistributionMessage,
490 ProvisionerMessage,
491 RuntimeApiMessage,
492 ProspectiveParachainsMessage,
493 ])]
494 candidate_backing: CandidateBacking,
495
496 #[subsystem(StatementDistributionMessage, sends: [
497 NetworkBridgeTxMessage,
498 CandidateBackingMessage,
499 RuntimeApiMessage,
500 ProspectiveParachainsMessage,
501 ChainApiMessage,
502 ], can_receive_priority_messages)]
503 statement_distribution: StatementDistribution,
504
505 #[subsystem(AvailabilityDistributionMessage, sends: [
506 AvailabilityStoreMessage,
507 ChainApiMessage,
508 RuntimeApiMessage,
509 NetworkBridgeTxMessage,
510 ])]
511 availability_distribution: AvailabilityDistribution,
512
513 #[subsystem(AvailabilityRecoveryMessage, sends: [
514 NetworkBridgeTxMessage,
515 RuntimeApiMessage,
516 AvailabilityStoreMessage,
517 ])]
518 availability_recovery: AvailabilityRecovery,
519
520 #[subsystem(blocking, sends: [
521 AvailabilityStoreMessage,
522 RuntimeApiMessage,
523 BitfieldDistributionMessage,
524 ])]
525 bitfield_signing: BitfieldSigning,
526
527 #[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
528 RuntimeApiMessage,
529 NetworkBridgeTxMessage,
530 ProvisionerMessage,
531 ], can_receive_priority_messages)]
532 bitfield_distribution: BitfieldDistribution,
533
534 #[subsystem(ProvisionerMessage, sends: [
535 RuntimeApiMessage,
536 CandidateBackingMessage,
537 DisputeCoordinatorMessage,
538 ProspectiveParachainsMessage,
539 ])]
540 provisioner: Provisioner,
541
542 #[subsystem(blocking, RuntimeApiMessage, sends: [])]
543 runtime_api: RuntimeApi,
544
545 #[subsystem(blocking, AvailabilityStoreMessage, sends: [
546 ChainApiMessage,
547 RuntimeApiMessage,
548 ])]
549 availability_store: AvailabilityStore,
550
551 #[subsystem(blocking, NetworkBridgeRxMessage, sends: [
552 BitfieldDistributionMessage,
553 StatementDistributionMessage,
554 ApprovalDistributionMessage,
555 ApprovalVotingParallelMessage,
556 GossipSupportMessage,
557 DisputeDistributionMessage,
558 CollationGenerationMessage,
559 CollatorProtocolMessage,
560 ])]
561 network_bridge_rx: NetworkBridgeRx,
562
563 #[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
564 network_bridge_tx: NetworkBridgeTx,
565
566 #[subsystem(blocking, ChainApiMessage, sends: [])]
567 chain_api: ChainApi,
568
569 #[subsystem(CollationGenerationMessage, sends: [
570 RuntimeApiMessage,
571 CollatorProtocolMessage,
572 ])]
573 collation_generation: CollationGeneration,
574
575 #[subsystem(CollatorProtocolMessage, sends: [
576 NetworkBridgeTxMessage,
577 RuntimeApiMessage,
578 CandidateBackingMessage,
579 ChainApiMessage,
580 ProspectiveParachainsMessage,
581 ])]
582 collator_protocol: CollatorProtocol,
583
584 #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
585 NetworkBridgeTxMessage,
586 ApprovalVotingMessage,
587 RuntimeApiMessage,
588 ], can_receive_priority_messages)]
589 approval_distribution: ApprovalDistribution,
590
591 #[subsystem(blocking, ApprovalVotingMessage, sends: [
592 ApprovalDistributionMessage,
593 AvailabilityRecoveryMessage,
594 CandidateValidationMessage,
595 ChainApiMessage,
596 ChainSelectionMessage,
597 DisputeCoordinatorMessage,
598 RuntimeApiMessage,
599 ])]
600 approval_voting: ApprovalVoting,
601 #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
602 AvailabilityRecoveryMessage,
603 CandidateValidationMessage,
604 ChainApiMessage,
605 ChainSelectionMessage,
606 DisputeCoordinatorMessage,
607 RuntimeApiMessage,
608 NetworkBridgeTxMessage,
609 ApprovalVotingMessage,
610 ApprovalDistributionMessage,
611 ApprovalVotingParallelMessage,
612 ])]
613 approval_voting_parallel: ApprovalVotingParallel,
614 #[subsystem(GossipSupportMessage, sends: [
615 NetworkBridgeTxMessage,
616 NetworkBridgeRxMessage, RuntimeApiMessage,
618 ChainSelectionMessage,
619 ChainApiMessage,
620 ], can_receive_priority_messages)]
621 gossip_support: GossipSupport,
622
623 #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
624 RuntimeApiMessage,
625 ChainApiMessage,
626 DisputeDistributionMessage,
627 CandidateValidationMessage,
628 ApprovalVotingMessage,
629 AvailabilityStoreMessage,
630 AvailabilityRecoveryMessage,
631 ChainSelectionMessage,
632 ApprovalVotingParallelMessage,
633 ])]
634 dispute_coordinator: DisputeCoordinator,
635
636 #[subsystem(DisputeDistributionMessage, sends: [
637 RuntimeApiMessage,
638 DisputeCoordinatorMessage,
639 NetworkBridgeTxMessage,
640 ])]
641 dispute_distribution: DisputeDistribution,
642
643 #[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
644 chain_selection: ChainSelection,
645
646 #[subsystem(ProspectiveParachainsMessage, sends: [
647 RuntimeApiMessage,
648 ChainApiMessage,
649 ])]
650 prospective_parachains: ProspectiveParachains,
651
652 pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
654
655 pub active_leaves: HashMap<Hash, BlockNumber>,
657
658 pub supports_parachains: SupportsParachains,
660
661 pub metrics: OverseerMetrics,
663}
664
665pub fn spawn_metronome_metrics<S, SupportsParachains>(
667 overseer: &mut Overseer<S, SupportsParachains>,
668 metronome_metrics: OverseerMetrics,
669) -> Result<(), SubsystemError>
670where
671 S: Spawner,
672 SupportsParachains: HeadSupportsParachains,
673{
674 struct ExtractNameAndMeters;
675
676 impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
677 type Output = Option<(&'static str, SubsystemMeters)>;
678
679 fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
680 subsystem
681 .instance
682 .as_ref()
683 .map(|instance| (instance.name, instance.meters.clone()))
684 }
685 }
686 let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
687
688 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
689 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
690 match memory_stats::MemoryAllocationTracker::new() {
691 Ok(memory_stats) =>
692 Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
693 Ok(memory_stats_snapshot) => {
694 gum::trace!(
695 target: LOG_TARGET,
696 "memory_stats: {:?}",
697 &memory_stats_snapshot
698 );
699 metrics.memory_stats_snapshot(memory_stats_snapshot);
700 },
701 Err(e) =>
702 gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
703 }),
704 Err(_) => {
705 gum::debug!(
706 target: LOG_TARGET,
707 "Memory allocation tracking is not supported by the allocator.",
708 );
709
710 Box::new(|_| {})
711 },
712 };
713
714 #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
715 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
716
717 let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
718 collect_memory_stats(&metronome_metrics);
719
720 metronome_metrics.channel_metrics_snapshot(
724 subsystem_meters
725 .iter()
726 .cloned()
727 .flatten()
728 .map(|(name, ref meters)| (name, meters.read())),
729 );
730
731 futures::future::ready(())
732 });
733 overseer
734 .spawner()
735 .spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
736
737 Ok(())
738}
739
740impl<S, SupportsParachains> Overseer<S, SupportsParachains>
741where
742 SupportsParachains: HeadSupportsParachains,
743 S: Spawner,
744{
745 async fn stop(mut self) {
747 let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
748 }
749
750 pub async fn run(self) {
754 if let Err(err) = self.run_inner().await {
755 gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
756 }
757 }
758
759 async fn run_inner(mut self) -> SubsystemResult<()> {
760 let metrics = self.metrics.clone();
761 spawn_metronome_metrics(&mut self, metrics)?;
762
763 loop {
764 select! {
765 msg = self.events_rx.select_next_some() => {
766 match msg {
767 Event::MsgToSubsystem { msg, origin } => {
768 self.route_message(msg.into(), origin).await?;
769 self.metrics.on_message_relayed();
770 }
771 Event::Stop => {
772 self.stop().await;
773 return Ok(());
774 }
775 Event::BlockImported(block) => {
776 self.block_imported(block).await?;
777 }
778 Event::BlockFinalized(block) => {
779 self.block_finalized(block).await?;
780 }
781 Event::ExternalRequest(request) => {
782 self.handle_external_request(request);
783 }
784 }
785 },
786 msg = self.to_orchestra_rx.select_next_some() => {
787 match msg {
788 ToOrchestra::SpawnJob { name, subsystem, s } => {
789 self.spawn_job(name, subsystem, s);
790 }
791 ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
792 self.spawn_blocking_job(name, subsystem, s);
793 }
794 }
795 },
796 res = self.running_subsystems.select_next_some() => {
797 gum::error!(
798 target: LOG_TARGET,
799 subsystem = ?res,
800 "subsystem finished unexpectedly",
801 );
802 self.stop().await;
803 return res;
804 },
805 }
806 }
807 }
808
809 async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
810 match self.active_leaves.entry(block.hash) {
811 hash_map::Entry::Vacant(entry) => entry.insert(block.number),
812 hash_map::Entry::Occupied(entry) => {
813 debug_assert_eq!(*entry.get(), block.number);
814 return Ok(())
815 },
816 };
817
818 let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
819 Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
820 hash: block.hash,
821 number: block.number,
822 unpin_handle: block.unpin_handle,
823 }),
824 None => ActiveLeavesUpdate::default(),
825 };
826
827 if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
828 debug_assert_eq!(block.number.saturating_sub(1), number);
829 update.deactivated.push(block.parent_hash);
830 self.on_head_deactivated(&block.parent_hash);
831 }
832
833 self.clean_up_external_listeners();
834
835 if !update.is_empty() {
836 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
837 }
838 Ok(())
839 }
840
841 async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
842 let mut update = ActiveLeavesUpdate::default();
843
844 self.active_leaves.retain(|h, n| {
845 if *n <= block.number && *h != block.hash {
848 update.deactivated.push(*h);
849 false
850 } else {
851 true
852 }
853 });
854
855 for deactivated in &update.deactivated {
856 self.on_head_deactivated(deactivated)
857 }
858
859 self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
860 .await?;
861
862 if !update.is_empty() {
867 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
868 }
869
870 Ok(())
871 }
872
873 async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
876 if !self.supports_parachains.head_supports_parachains(hash).await {
877 return None
878 }
879
880 self.metrics.on_head_activated();
881 if let Some(listeners) = self.activation_external_listeners.remove(hash) {
882 gum::trace!(
883 target: LOG_TARGET,
884 relay_parent = ?hash,
885 "Leaf got activated, notifying external listeners"
886 );
887 for listener in listeners {
888 let _ = listener.send(Ok(()));
890 }
891 }
892
893 Some(())
894 }
895
896 fn on_head_deactivated(&mut self, hash: &Hash) {
897 self.metrics.on_head_deactivated();
898 self.activation_external_listeners.remove(hash);
899 }
900
901 fn clean_up_external_listeners(&mut self) {
902 self.activation_external_listeners.retain(|_, v| {
903 v.retain(|c| !c.is_canceled());
905 !v.is_empty()
906 })
907 }
908
909 fn handle_external_request(&mut self, request: ExternalRequest) {
910 match request {
911 ExternalRequest::WaitForActivation { hash, response_channel } => {
912 if self.active_leaves.get(&hash).is_some() {
913 gum::trace!(
914 target: LOG_TARGET,
915 relay_parent = ?hash,
916 "Leaf was already ready - answering `WaitForActivation`"
917 );
918 let _ = response_channel.send(Ok(()));
920 } else {
921 gum::trace!(
922 target: LOG_TARGET,
923 relay_parent = ?hash,
924 "Leaf not yet ready - queuing `WaitForActivation` sender"
925 );
926 self.activation_external_listeners
927 .entry(hash)
928 .or_default()
929 .push(response_channel);
930 }
931 },
932 }
933 }
934
935 fn spawn_job(
936 &mut self,
937 task_name: &'static str,
938 subsystem_name: Option<&'static str>,
939 j: BoxFuture<'static, ()>,
940 ) {
941 self.spawner.spawn(task_name, subsystem_name, j);
942 }
943
944 fn spawn_blocking_job(
945 &mut self,
946 task_name: &'static str,
947 subsystem_name: Option<&'static str>,
948 j: BoxFuture<'static, ()>,
949 ) {
950 self.spawner.spawn_blocking(task_name, subsystem_name, j);
951 }
952}