polkadot_overseer/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! # Overseer
18//!
19//! `overseer` implements the Overseer architecture described in the
20//! [implementers' guide][overseer-page].
21//! For the motivations behind implementing the overseer itself you should
22//! check out that guide, documentation in this crate will be mostly discussing
23//! technical stuff.
24//!
25//! An `Overseer` is something that allows spawning/stopping and overseeing
26//! asynchronous tasks as well as establishing a well-defined and easy to use
27//! protocol that the tasks can use to communicate with each other. It is desired
28//! that this protocol is the only way tasks communicate with each other, however
29//! at this moment there are no foolproof guards against other ways of communication.
30//!
31//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
32//! share the same behavior from `Overseer`'s point of view.
33//!
34//! ```text
35//!                              +-----------------------------+
36//!                              |         Overseer            |
37//!                              +-----------------------------+
38//!
39//!             ................|  Overseer "holds" these and uses |..............
40//!             .                  them to (re)start things                      .
41//!             .                                                                .
42//!             .  +-------------------+                +---------------------+  .
43//!             .  |   Subsystem1      |                |   Subsystem2        |  .
44//!             .  +-------------------+                +---------------------+  .
45//!             .           |                                       |            .
46//!             ..................................................................
47//!                         |                                       |
48//!                       start()                                 start()
49//!                         V                                       V
50//!             ..................| Overseer "runs" these |.......................
51//!             .  +--------------------+               +---------------------+  .
52//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
53//!             .  +--------------------+               +---------------------+  .
54//!             ..................................................................
55//! ```
56//!
57//! [overseer-page]: https://paritytech.github.io/polkadot-sdk/book/node/overseer.html
58
59// #![deny(unused_results)]
60// unused dependencies can not work for test and examples at the same time
61// yielding false positives
62#![warn(missing_docs)]
63#![allow(dead_code)] // TODO https://github.com/paritytech/polkadot-sdk/issues/5793
64
65use std::{
66	collections::{hash_map, HashMap},
67	fmt::{self, Debug},
68	pin::Pin,
69	sync::Arc,
70	time::Duration,
71};
72
73use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
74
75use polkadot_primitives::{Block, BlockNumber, Hash};
76use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
77
78use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
79use polkadot_node_subsystem_types::messages::{
80	ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
81	AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
82	BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
83	ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
84	DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
85	NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
86	ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
87};
88
89pub use polkadot_node_subsystem_types::{
90	errors::{SubsystemError, SubsystemResult},
91	ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
92	UnpinHandle,
93};
94
95pub mod metrics;
96pub use self::metrics::Metrics as OverseerMetrics;
97
98/// A dummy subsystem, mostly useful for placeholders and tests.
99pub mod dummy;
100pub use self::dummy::DummySubsystem;
101
102pub use polkadot_node_metrics::{
103	metrics::{prometheus, Metrics as MetricsTrait},
104	Metronome,
105};
106
107pub use orchestra as gen;
108pub use orchestra::{
109	contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
110	NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
111	Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
112	SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
113	TrySendError,
114};
115
116#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
117mod memory_stats;
118#[cfg(test)]
119mod tests;
120
121use sp_core::traits::SpawnNamed;
122
123/// Glue to connect `trait orchestra::Spawner` and `SpawnNamed` from `substrate`.
124pub struct SpawnGlue<S>(pub S);
125
126impl<S> AsRef<S> for SpawnGlue<S> {
127	fn as_ref(&self) -> &S {
128		&self.0
129	}
130}
131
132impl<S: Clone> Clone for SpawnGlue<S> {
133	fn clone(&self) -> Self {
134		Self(self.0.clone())
135	}
136}
137
138impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
139	fn spawn_blocking(
140		&self,
141		name: &'static str,
142		group: Option<&'static str>,
143		future: futures::future::BoxFuture<'static, ()>,
144	) {
145		SpawnNamed::spawn_blocking(&self.0, name, group, future)
146	}
147	fn spawn(
148		&self,
149		name: &'static str,
150		group: Option<&'static str>,
151		future: futures::future::BoxFuture<'static, ()>,
152	) {
153		SpawnNamed::spawn(&self.0, name, group, future)
154	}
155}
156
157/// Whether a header supports parachain consensus or not.
158#[async_trait::async_trait]
159pub trait HeadSupportsParachains {
160	/// Return true if the given header supports parachain consensus. Otherwise, false.
161	async fn head_supports_parachains(&self, head: &Hash) -> bool;
162}
163
164#[async_trait::async_trait]
165impl<Client> HeadSupportsParachains for Arc<Client>
166where
167	Client: RuntimeApiSubsystemClient + Sync + Send,
168{
169	async fn head_supports_parachains(&self, head: &Hash) -> bool {
170		// Check that the `ParachainHost` runtime api is at least with version 1 present on chain.
171		self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
172	}
173}
174
175/// A handle used to communicate with the [`Overseer`].
176///
177/// [`Overseer`]: struct.Overseer.html
178#[derive(Clone)]
179pub struct Handle(OverseerHandle);
180
181impl Handle {
182	/// Create a new [`Handle`].
183	pub fn new(raw: OverseerHandle) -> Self {
184		Self(raw)
185	}
186
187	/// Inform the `Overseer` that that some block was imported.
188	pub async fn block_imported(&mut self, block: BlockInfo) {
189		self.send_and_log_error(Event::BlockImported(block)).await
190	}
191
192	/// Send some message to one of the `Subsystem`s.
193	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
194		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
195	}
196
197	/// Send a message not providing an origin.
198	#[inline(always)]
199	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
200		self.send_msg(msg, "").await
201	}
202
203	/// Inform the `Overseer` that some block was finalized.
204	pub async fn block_finalized(&mut self, block: BlockInfo) {
205		self.send_and_log_error(Event::BlockFinalized(block)).await
206	}
207
208	/// Wait for a block with the given hash to be in the active-leaves set.
209	///
210	/// The response channel responds if the hash was activated and is closed if the hash was
211	/// deactivated. Note that due the fact the overseer doesn't store the whole active-leaves set,
212	/// only deltas, the response channel may never return if the hash was deactivated before this
213	/// call. In this case, it's the caller's responsibility to ensure a timeout is set.
214	pub async fn wait_for_activation(
215		&mut self,
216		hash: Hash,
217		response_channel: oneshot::Sender<SubsystemResult<()>>,
218	) {
219		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
220			hash,
221			response_channel,
222		}))
223		.await;
224	}
225
226	/// Tell `Overseer` to shutdown.
227	pub async fn stop(&mut self) {
228		self.send_and_log_error(Event::Stop).await;
229	}
230
231	/// Most basic operation, to stop a server.
232	async fn send_and_log_error(&mut self, event: Event) {
233		if self.0.send(event).await.is_err() {
234			gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
235		}
236	}
237}
238
239/// An event telling the `Overseer` on the particular block
240/// that has been imported or finalized.
241///
242/// This structure exists solely for the purposes of decoupling
243/// `Overseer` code from the client code and the necessity to call
244/// `HeaderBackend::block_number_from_id()`.
245#[derive(Debug, Clone)]
246pub struct BlockInfo {
247	/// Hash of the block.
248	pub hash: Hash,
249	/// Hash of the parent block.
250	pub parent_hash: Hash,
251	/// Block's number.
252	pub number: BlockNumber,
253	/// A handle to unpin the block on drop.
254	pub unpin_handle: UnpinHandle,
255}
256
257impl From<BlockImportNotification<Block>> for BlockInfo {
258	fn from(n: BlockImportNotification<Block>) -> Self {
259		let hash = n.hash;
260		let parent_hash = n.header.parent_hash;
261		let number = n.header.number;
262		let unpin_handle = n.into_unpin_handle();
263
264		BlockInfo { hash, parent_hash, number, unpin_handle }
265	}
266}
267
268impl From<FinalityNotification<Block>> for BlockInfo {
269	fn from(n: FinalityNotification<Block>) -> Self {
270		let hash = n.hash;
271		let parent_hash = n.header.parent_hash;
272		let number = n.header.number;
273		let unpin_handle = n.into_unpin_handle();
274
275		BlockInfo { hash, parent_hash, number, unpin_handle }
276	}
277}
278
279/// An event from outside the overseer scope, such
280/// as the substrate framework or user interaction.
281#[derive(Debug)]
282pub enum Event {
283	/// A new block was imported.
284	///
285	/// This event is not sent if the block was already known
286	/// and we reorged to it e.g. due to a reversion.
287	///
288	/// Also, these events are not sent during a major sync.
289	BlockImported(BlockInfo),
290	/// A block was finalized with i.e. babe or another consensus algorithm.
291	BlockFinalized(BlockInfo),
292	/// Message as sent to a subsystem.
293	MsgToSubsystem {
294		/// The actual message.
295		msg: AllMessages,
296		/// The originating subsystem name.
297		origin: &'static str,
298	},
299	/// A request from the outer world.
300	ExternalRequest(ExternalRequest),
301	/// Stop the overseer on i.e. a UNIX signal.
302	Stop,
303}
304
305/// Some request from outer world.
306#[derive(Debug)]
307pub enum ExternalRequest {
308	/// Wait for the activation of a particular hash
309	/// and be notified by means of the return channel.
310	WaitForActivation {
311		/// The relay parent for which activation to wait for.
312		hash: Hash,
313		/// Response channel to await on.
314		response_channel: oneshot::Sender<SubsystemResult<()>>,
315	},
316}
317
318/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
319/// import and finality notifications into the [`OverseerHandle`].
320pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
321	let mut finality = client.finality_notification_stream();
322	let mut imports = client.import_notification_stream();
323
324	loop {
325		select! {
326			f = finality.next() => {
327				match f {
328					Some(block) => {
329						handle.block_finalized(block.into()).await;
330					}
331					None => break,
332				}
333			},
334			i = imports.next() => {
335				match i {
336					Some(block) => {
337						handle.block_imported(block.into()).await;
338					}
339					None => break,
340				}
341			},
342			complete => break,
343		}
344	}
345}
346
347/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
348///
349/// This returns the overseer along with an [`OverseerHandle`] which can
350/// be used to send messages from external parts of the codebase.
351///
352/// The [`OverseerHandle`] returned from this function is connected to
353/// the returned [`Overseer`].
354///
355/// ```text
356///                  +------------------------------------+
357///                  |            Overseer                |
358///                  +------------------------------------+
359///                    /            |             |      \
360///      ................. subsystems...................................
361///      . +-----------+    +-----------+   +----------+   +---------+ .
362///      . |           |    |           |   |          |   |         | .
363///      . +-----------+    +-----------+   +----------+   +---------+ .
364///      ...............................................................
365///                              |
366///                        probably `spawn`
367///                            a `job`
368///                              |
369///                              V
370///                         +-----------+
371///                         |           |
372///                         +-----------+
373/// ```
374///
375/// [`Subsystem`]: trait.Subsystem.html
376///
377/// # Example
378///
379/// The [`Subsystems`] may be any type as long as they implement an expected interface.
380/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with
381/// them. For the sake of simplicity the termination of the example is done with a timeout.
382/// ```
383/// # use std::time::Duration;
384/// # use futures::{executor, pin_mut, select, FutureExt};
385/// # use futures_timer::Delay;
386/// # use polkadot_primitives::Hash;
387/// # use polkadot_overseer::{
388/// # 	self as overseer,
389/// #   OverseerSignal,
390/// # 	SubsystemSender as _,
391/// # 	AllMessages,
392/// # 	HeadSupportsParachains,
393/// # 	Overseer,
394/// # 	SubsystemError,
395/// # 	gen::{
396/// # 		SubsystemContext,
397/// # 		FromOrchestra,
398/// # 		SpawnedSubsystem,
399/// # 	},
400/// # };
401/// # use polkadot_node_subsystem_types::messages::{
402/// # 	CandidateValidationMessage, CandidateBackingMessage,
403/// # 	NetworkBridgeTxMessage,
404/// # };
405///
406/// struct ValidationSubsystem;
407///
408/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
409/// where
410///     Ctx: overseer::SubsystemContext<
411/// 				Message=CandidateValidationMessage,
412/// 				AllMessages=AllMessages,
413/// 				Signal=OverseerSignal,
414/// 				Error=SubsystemError,
415/// 			>,
416/// {
417///     fn start(
418///         self,
419///         mut ctx: Ctx,
420///     ) -> SpawnedSubsystem<SubsystemError> {
421///         SpawnedSubsystem {
422///             name: "validation-subsystem",
423///             future: Box::pin(async move {
424///                 loop {
425///                     Delay::new(Duration::from_secs(1)).await;
426///                 }
427///             }),
428///         }
429///     }
430/// }
431///
432/// # fn main() { executor::block_on(async move {
433///
434/// struct AlwaysSupportsParachains;
435///
436/// #[async_trait::async_trait]
437/// impl HeadSupportsParachains for AlwaysSupportsParachains {
438///      async fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
439/// }
440///
441/// let spawner = sp_core::testing::TaskExecutor::new();
442/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
443/// 		.unwrap()
444/// 		.replace_candidate_validation(|_| ValidationSubsystem)
445/// 		.build()
446/// 		.unwrap();
447///
448/// let timer = Delay::new(Duration::from_millis(50)).fuse();
449///
450/// let overseer_fut = overseer.run().fuse();
451/// pin_mut!(timer);
452/// pin_mut!(overseer_fut);
453///
454/// select! {
455///     _ = overseer_fut => (),
456///     _ = timer => (),
457/// }
458/// #
459/// # 	});
460/// # }
461/// ```
462#[orchestra(
463	gen=AllMessages,
464	event=Event,
465	signal=OverseerSignal,
466	error=SubsystemError,
467	message_capacity=2048,
468)]
469pub struct Overseer<SupportsParachains> {
470	#[subsystem(CandidateValidationMessage, sends: [
471		ChainApiMessage,
472		RuntimeApiMessage,
473	])]
474	candidate_validation: CandidateValidation,
475
476	#[subsystem(sends: [
477		CandidateValidationMessage,
478		RuntimeApiMessage,
479	])]
480	pvf_checker: PvfChecker,
481
482	#[subsystem(CandidateBackingMessage, sends: [
483		CandidateValidationMessage,
484		CollatorProtocolMessage,
485		ChainApiMessage,
486		AvailabilityDistributionMessage,
487		AvailabilityStoreMessage,
488		StatementDistributionMessage,
489		ProvisionerMessage,
490		RuntimeApiMessage,
491		ProspectiveParachainsMessage,
492	])]
493	candidate_backing: CandidateBacking,
494
495	#[subsystem(StatementDistributionMessage, sends: [
496		NetworkBridgeTxMessage,
497		CandidateBackingMessage,
498		RuntimeApiMessage,
499		ProspectiveParachainsMessage,
500		ChainApiMessage,
501	], can_receive_priority_messages)]
502	statement_distribution: StatementDistribution,
503
504	#[subsystem(AvailabilityDistributionMessage, sends: [
505		AvailabilityStoreMessage,
506		ChainApiMessage,
507		RuntimeApiMessage,
508		NetworkBridgeTxMessage,
509	])]
510	availability_distribution: AvailabilityDistribution,
511
512	#[subsystem(AvailabilityRecoveryMessage, sends: [
513		NetworkBridgeTxMessage,
514		RuntimeApiMessage,
515		AvailabilityStoreMessage,
516	])]
517	availability_recovery: AvailabilityRecovery,
518
519	#[subsystem(blocking, sends: [
520		AvailabilityStoreMessage,
521		RuntimeApiMessage,
522		BitfieldDistributionMessage,
523	])]
524	bitfield_signing: BitfieldSigning,
525
526	#[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
527		RuntimeApiMessage,
528		NetworkBridgeTxMessage,
529		ProvisionerMessage,
530	], can_receive_priority_messages)]
531	bitfield_distribution: BitfieldDistribution,
532
533	#[subsystem(ProvisionerMessage, sends: [
534		RuntimeApiMessage,
535		CandidateBackingMessage,
536		ChainApiMessage,
537		DisputeCoordinatorMessage,
538		ProspectiveParachainsMessage,
539	])]
540	provisioner: Provisioner,
541
542	#[subsystem(blocking, RuntimeApiMessage, sends: [])]
543	runtime_api: RuntimeApi,
544
545	#[subsystem(blocking, AvailabilityStoreMessage, sends: [
546		ChainApiMessage,
547		RuntimeApiMessage,
548	])]
549	availability_store: AvailabilityStore,
550
551	#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
552		BitfieldDistributionMessage,
553		StatementDistributionMessage,
554		ApprovalDistributionMessage,
555		ApprovalVotingParallelMessage,
556		GossipSupportMessage,
557		DisputeDistributionMessage,
558		CollationGenerationMessage,
559		CollatorProtocolMessage,
560	])]
561	network_bridge_rx: NetworkBridgeRx,
562
563	#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
564	network_bridge_tx: NetworkBridgeTx,
565
566	#[subsystem(blocking, ChainApiMessage, sends: [])]
567	chain_api: ChainApi,
568
569	#[subsystem(CollationGenerationMessage, sends: [
570		RuntimeApiMessage,
571		CollatorProtocolMessage,
572	])]
573	collation_generation: CollationGeneration,
574
575	#[subsystem(CollatorProtocolMessage, sends: [
576		NetworkBridgeTxMessage,
577		RuntimeApiMessage,
578		CandidateBackingMessage,
579		ChainApiMessage,
580		ProspectiveParachainsMessage,
581	])]
582	collator_protocol: CollatorProtocol,
583
584	#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
585		NetworkBridgeTxMessage,
586		ApprovalVotingMessage,
587		RuntimeApiMessage,
588	], can_receive_priority_messages)]
589	approval_distribution: ApprovalDistribution,
590
591	#[subsystem(blocking, ApprovalVotingMessage, sends: [
592		ApprovalDistributionMessage,
593		AvailabilityRecoveryMessage,
594		CandidateValidationMessage,
595		ChainApiMessage,
596		ChainSelectionMessage,
597		DisputeCoordinatorMessage,
598		RuntimeApiMessage,
599	])]
600	approval_voting: ApprovalVoting,
601	#[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
602		AvailabilityRecoveryMessage,
603		CandidateValidationMessage,
604		ChainApiMessage,
605		ChainSelectionMessage,
606		DisputeCoordinatorMessage,
607		RuntimeApiMessage,
608		NetworkBridgeTxMessage,
609		ApprovalVotingMessage,
610		ApprovalDistributionMessage,
611		ApprovalVotingParallelMessage,
612	])]
613	approval_voting_parallel: ApprovalVotingParallel,
614	#[subsystem(GossipSupportMessage, sends: [
615		NetworkBridgeTxMessage,
616		NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626>
617		RuntimeApiMessage,
618		ChainSelectionMessage,
619	], can_receive_priority_messages)]
620	gossip_support: GossipSupport,
621
622	#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
623		RuntimeApiMessage,
624		ChainApiMessage,
625		DisputeDistributionMessage,
626		CandidateValidationMessage,
627		ApprovalVotingMessage,
628		AvailabilityStoreMessage,
629		AvailabilityRecoveryMessage,
630		ChainSelectionMessage,
631		ApprovalVotingParallelMessage,
632	])]
633	dispute_coordinator: DisputeCoordinator,
634
635	#[subsystem(DisputeDistributionMessage, sends: [
636		RuntimeApiMessage,
637		DisputeCoordinatorMessage,
638		NetworkBridgeTxMessage,
639	])]
640	dispute_distribution: DisputeDistribution,
641
642	#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
643	chain_selection: ChainSelection,
644
645	#[subsystem(ProspectiveParachainsMessage, sends: [
646		RuntimeApiMessage,
647		ChainApiMessage,
648	])]
649	prospective_parachains: ProspectiveParachains,
650
651	/// External listeners waiting for a hash to be in the active-leave set.
652	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
653
654	/// The set of the "active leaves".
655	pub active_leaves: HashMap<Hash, BlockNumber>,
656
657	/// An implementation for checking whether a header supports parachain consensus.
658	pub supports_parachains: SupportsParachains,
659
660	/// Various Prometheus metrics.
661	pub metrics: OverseerMetrics,
662}
663
664/// Spawn the metrics metronome task.
665pub fn spawn_metronome_metrics<S, SupportsParachains>(
666	overseer: &mut Overseer<S, SupportsParachains>,
667	metronome_metrics: OverseerMetrics,
668) -> Result<(), SubsystemError>
669where
670	S: Spawner,
671	SupportsParachains: HeadSupportsParachains,
672{
673	struct ExtractNameAndMeters;
674
675	impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
676		type Output = Option<(&'static str, SubsystemMeters)>;
677
678		fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
679			subsystem
680				.instance
681				.as_ref()
682				.map(|instance| (instance.name, instance.meters.clone()))
683		}
684	}
685	let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
686
687	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
688	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
689		match memory_stats::MemoryAllocationTracker::new() {
690			Ok(memory_stats) =>
691				Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
692					Ok(memory_stats_snapshot) => {
693						gum::trace!(
694							target: LOG_TARGET,
695							"memory_stats: {:?}",
696							&memory_stats_snapshot
697						);
698						metrics.memory_stats_snapshot(memory_stats_snapshot);
699					},
700					Err(e) =>
701						gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
702				}),
703			Err(_) => {
704				gum::debug!(
705					target: LOG_TARGET,
706					"Memory allocation tracking is not supported by the allocator.",
707				);
708
709				Box::new(|_| {})
710			},
711		};
712
713	#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
714	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
715
716	let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
717		collect_memory_stats(&metronome_metrics);
718
719		// We combine the amount of messages from subsystems to the overseer
720		// as well as the amount of messages from external sources to the overseer
721		// into one `to_overseer` value.
722		metronome_metrics.channel_metrics_snapshot(
723			subsystem_meters
724				.iter()
725				.cloned()
726				.flatten()
727				.map(|(name, ref meters)| (name, meters.read())),
728		);
729
730		futures::future::ready(())
731	});
732	overseer
733		.spawner()
734		.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
735
736	Ok(())
737}
738
739impl<S, SupportsParachains> Overseer<S, SupportsParachains>
740where
741	SupportsParachains: HeadSupportsParachains,
742	S: Spawner,
743{
744	/// Stop the `Overseer`.
745	async fn stop(mut self) {
746		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
747	}
748
749	/// Run the `Overseer`.
750	///
751	/// Logging any errors.
752	pub async fn run(self) {
753		if let Err(err) = self.run_inner().await {
754			gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
755		}
756	}
757
758	async fn run_inner(mut self) -> SubsystemResult<()> {
759		let metrics = self.metrics.clone();
760		spawn_metronome_metrics(&mut self, metrics)?;
761
762		loop {
763			select! {
764				msg = self.events_rx.select_next_some() => {
765					match msg {
766						Event::MsgToSubsystem { msg, origin } => {
767							self.route_message(msg.into(), origin).await?;
768							self.metrics.on_message_relayed();
769						}
770						Event::Stop => {
771							self.stop().await;
772							return Ok(());
773						}
774						Event::BlockImported(block) => {
775							self.block_imported(block).await?;
776						}
777						Event::BlockFinalized(block) => {
778							self.block_finalized(block).await?;
779						}
780						Event::ExternalRequest(request) => {
781							self.handle_external_request(request);
782						}
783					}
784				},
785				msg = self.to_orchestra_rx.select_next_some() => {
786					match msg {
787						ToOrchestra::SpawnJob { name, subsystem, s } => {
788							self.spawn_job(name, subsystem, s);
789						}
790						ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
791							self.spawn_blocking_job(name, subsystem, s);
792						}
793					}
794				},
795				res = self.running_subsystems.select_next_some() => {
796					gum::error!(
797						target: LOG_TARGET,
798						subsystem = ?res,
799						"subsystem finished unexpectedly",
800					);
801					self.stop().await;
802					return res;
803				},
804			}
805		}
806	}
807
808	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
809		match self.active_leaves.entry(block.hash) {
810			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
811			hash_map::Entry::Occupied(entry) => {
812				debug_assert_eq!(*entry.get(), block.number);
813				return Ok(())
814			},
815		};
816
817		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
818			Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
819				hash: block.hash,
820				number: block.number,
821				unpin_handle: block.unpin_handle,
822			}),
823			None => ActiveLeavesUpdate::default(),
824		};
825
826		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
827			debug_assert_eq!(block.number.saturating_sub(1), number);
828			update.deactivated.push(block.parent_hash);
829			self.on_head_deactivated(&block.parent_hash);
830		}
831
832		self.clean_up_external_listeners();
833
834		if !update.is_empty() {
835			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
836		}
837		Ok(())
838	}
839
840	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
841		let mut update = ActiveLeavesUpdate::default();
842
843		self.active_leaves.retain(|h, n| {
844			// prune all orphaned leaves, but don't prune
845			// the finalized block if it is itself a leaf.
846			if *n <= block.number && *h != block.hash {
847				update.deactivated.push(*h);
848				false
849			} else {
850				true
851			}
852		});
853
854		for deactivated in &update.deactivated {
855			self.on_head_deactivated(deactivated)
856		}
857
858		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
859			.await?;
860
861		// If there are no leaves being deactivated, we don't need to send an update.
862		//
863		// Our peers will be informed about our finalized block the next time we
864		// activating/deactivating some leaf.
865		if !update.is_empty() {
866			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
867		}
868
869		Ok(())
870	}
871
872	/// Handles a header activation. If the header's state doesn't support the parachains API,
873	/// this returns `None`.
874	async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
875		if !self.supports_parachains.head_supports_parachains(hash).await {
876			return None
877		}
878
879		self.metrics.on_head_activated();
880		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
881			gum::trace!(
882				target: LOG_TARGET,
883				relay_parent = ?hash,
884				"Leaf got activated, notifying external listeners"
885			);
886			for listener in listeners {
887				// it's fine if the listener is no longer interested
888				let _ = listener.send(Ok(()));
889			}
890		}
891
892		Some(())
893	}
894
895	fn on_head_deactivated(&mut self, hash: &Hash) {
896		self.metrics.on_head_deactivated();
897		self.activation_external_listeners.remove(hash);
898	}
899
900	fn clean_up_external_listeners(&mut self) {
901		self.activation_external_listeners.retain(|_, v| {
902			// remove dead listeners
903			v.retain(|c| !c.is_canceled());
904			!v.is_empty()
905		})
906	}
907
908	fn handle_external_request(&mut self, request: ExternalRequest) {
909		match request {
910			ExternalRequest::WaitForActivation { hash, response_channel } => {
911				if self.active_leaves.get(&hash).is_some() {
912					gum::trace!(
913						target: LOG_TARGET,
914						relay_parent = ?hash,
915						"Leaf was already ready - answering `WaitForActivation`"
916					);
917					// it's fine if the listener is no longer interested
918					let _ = response_channel.send(Ok(()));
919				} else {
920					gum::trace!(
921						target: LOG_TARGET,
922						relay_parent = ?hash,
923						"Leaf not yet ready - queuing `WaitForActivation` sender"
924					);
925					self.activation_external_listeners
926						.entry(hash)
927						.or_default()
928						.push(response_channel);
929				}
930			},
931		}
932	}
933
934	fn spawn_job(
935		&mut self,
936		task_name: &'static str,
937		subsystem_name: Option<&'static str>,
938		j: BoxFuture<'static, ()>,
939	) {
940		self.spawner.spawn(task_name, subsystem_name, j);
941	}
942
943	fn spawn_blocking_job(
944		&mut self,
945		task_name: &'static str,
946		subsystem_name: Option<&'static str>,
947		j: BoxFuture<'static, ()>,
948	) {
949		self.spawner.spawn_blocking(task_name, subsystem_name, j);
950	}
951}