polkadot_overseer/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! # Overseer
18//!
19//! `overseer` implements the Overseer architecture described in the
20//! [implementers' guide][overseer-page].
21//! For the motivations behind implementing the overseer itself you should
22//! check out that guide, documentation in this crate will be mostly discussing
23//! technical stuff.
24//!
25//! An `Overseer` is something that allows spawning/stopping and overseeing
26//! asynchronous tasks as well as establishing a well-defined and easy to use
27//! protocol that the tasks can use to communicate with each other. It is desired
28//! that this protocol is the only way tasks communicate with each other, however
29//! at this moment there are no foolproof guards against other ways of communication.
30//!
31//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
32//! share the same behavior from `Overseer`'s point of view.
33//!
34//! ```text
35//!                              +-----------------------------+
36//!                              |         Overseer            |
37//!                              +-----------------------------+
38//!
39//!             ................|  Overseer "holds" these and uses |..............
40//!             .                  them to (re)start things                      .
41//!             .                                                                .
42//!             .  +-------------------+                +---------------------+  .
43//!             .  |   Subsystem1      |                |   Subsystem2        |  .
44//!             .  +-------------------+                +---------------------+  .
45//!             .           |                                       |            .
46//!             ..................................................................
47//!                         |                                       |
48//!                       start()                                 start()
49//!                         V                                       V
50//!             ..................| Overseer "runs" these |.......................
51//!             .  +--------------------+               +---------------------+  .
52//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
53//!             .  +--------------------+               +---------------------+  .
54//!             ..................................................................
55//! ```
56//!
57//! [overseer-page]: https://paritytech.github.io/polkadot-sdk/book/node/overseer.html
58
59// #![deny(unused_results)]
60// unused dependencies can not work for test and examples at the same time
61// yielding false positives
62#![warn(missing_docs)]
63// TODO https://github.com/paritytech/polkadot-sdk/issues/5793
64#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67	collections::{hash_map, HashMap},
68	fmt::{self, Debug},
69	pin::Pin,
70	sync::Arc,
71	time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use polkadot_primitives::{Block, BlockNumber, Hash};
77use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use polkadot_node_subsystem_types::messages::{
81	ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82	AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83	BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84	ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85	DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86	NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
87	ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use polkadot_node_subsystem_types::{
91	errors::{SubsystemError, SubsystemResult},
92	ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93	UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99/// A dummy subsystem, mostly useful for placeholders and tests.
100pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use polkadot_node_metrics::{
104	metrics::{prometheus, Metrics as MetricsTrait},
105	Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110	contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111	NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112	Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113	SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114	TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use sp_core::traits::SpawnNamed;
123
124/// Glue to connect `trait orchestra::Spawner` and `SpawnNamed` from `substrate`.
125pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128	fn as_ref(&self) -> &S {
129		&self.0
130	}
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134	fn clone(&self) -> Self {
135		Self(self.0.clone())
136	}
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140	fn spawn_blocking(
141		&self,
142		name: &'static str,
143		group: Option<&'static str>,
144		future: futures::future::BoxFuture<'static, ()>,
145	) {
146		SpawnNamed::spawn_blocking(&self.0, name, group, future)
147	}
148	fn spawn(
149		&self,
150		name: &'static str,
151		group: Option<&'static str>,
152		future: futures::future::BoxFuture<'static, ()>,
153	) {
154		SpawnNamed::spawn(&self.0, name, group, future)
155	}
156}
157
158/// Whether a header supports parachain consensus or not.
159#[async_trait::async_trait]
160pub trait HeadSupportsParachains {
161	/// Return true if the given header supports parachain consensus. Otherwise, false.
162	async fn head_supports_parachains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsParachains for Arc<Client>
167where
168	Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170	async fn head_supports_parachains(&self, head: &Hash) -> bool {
171		// Check that the `ParachainHost` runtime api is at least with version 1 present on chain.
172		self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173	}
174}
175
176/// A handle used to communicate with the [`Overseer`].
177///
178/// [`Overseer`]: struct.Overseer.html
179#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183	/// Create a new [`Handle`].
184	pub fn new(raw: OverseerHandle) -> Self {
185		Self(raw)
186	}
187
188	/// Inform the `Overseer` that that some block was imported.
189	pub async fn block_imported(&mut self, block: BlockInfo) {
190		self.send_and_log_error(Event::BlockImported(block)).await
191	}
192
193	/// Send some message with normal priority to one of the `Subsystem`s.
194	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195		self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await
196	}
197
198	/// Send some message with the specified priority to one of the `Subsystem`s.
199	pub async fn send_msg_with_priority(
200		&mut self,
201		msg: impl Into<AllMessages>,
202		origin: &'static str,
203		priority: PriorityLevel,
204	) {
205		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
206			.await
207	}
208
209	/// Send a message not providing an origin.
210	#[inline(always)]
211	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
212		self.send_msg(msg, "").await
213	}
214
215	/// Inform the `Overseer` that some block was finalized.
216	pub async fn block_finalized(&mut self, block: BlockInfo) {
217		self.send_and_log_error(Event::BlockFinalized(block)).await
218	}
219
220	/// Wait for a block with the given hash to be in the active-leaves set.
221	///
222	/// The response channel responds if the hash was activated and is closed if the hash was
223	/// deactivated. Note that due the fact the overseer doesn't store the whole active-leaves set,
224	/// only deltas, the response channel may never return if the hash was deactivated before this
225	/// call. In this case, it's the caller's responsibility to ensure a timeout is set.
226	pub async fn wait_for_activation(
227		&mut self,
228		hash: Hash,
229		response_channel: oneshot::Sender<SubsystemResult<()>>,
230	) {
231		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
232			hash,
233			response_channel,
234		}))
235		.await;
236	}
237
238	/// Tell `Overseer` to shutdown.
239	pub async fn stop(&mut self) {
240		self.send_and_log_error(Event::Stop).await;
241	}
242
243	/// Most basic operation, to stop a server.
244	async fn send_and_log_error(&mut self, event: Event) {
245		if self.0.send(event).await.is_err() {
246			gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
247		}
248	}
249}
250
251/// An event telling the `Overseer` on the particular block
252/// that has been imported or finalized.
253///
254/// This structure exists solely for the purposes of decoupling
255/// `Overseer` code from the client code and the necessity to call
256/// `HeaderBackend::block_number_from_id()`.
257#[derive(Debug, Clone)]
258pub struct BlockInfo {
259	/// Hash of the block.
260	pub hash: Hash,
261	/// Hash of the parent block.
262	pub parent_hash: Hash,
263	/// Block's number.
264	pub number: BlockNumber,
265	/// A handle to unpin the block on drop.
266	pub unpin_handle: UnpinHandle,
267}
268
269impl From<BlockImportNotification<Block>> for BlockInfo {
270	fn from(n: BlockImportNotification<Block>) -> Self {
271		let hash = n.hash;
272		let parent_hash = n.header.parent_hash;
273		let number = n.header.number;
274		let unpin_handle = n.into_unpin_handle();
275
276		BlockInfo { hash, parent_hash, number, unpin_handle }
277	}
278}
279
280impl From<FinalityNotification<Block>> for BlockInfo {
281	fn from(n: FinalityNotification<Block>) -> Self {
282		let hash = n.hash;
283		let parent_hash = n.header.parent_hash;
284		let number = n.header.number;
285		let unpin_handle = n.into_unpin_handle();
286
287		BlockInfo { hash, parent_hash, number, unpin_handle }
288	}
289}
290
291/// An event from outside the overseer scope, such
292/// as the substrate framework or user interaction.
293#[derive(Debug)]
294pub enum Event {
295	/// A new block was imported.
296	///
297	/// This event is not sent if the block was already known
298	/// and we reorged to it e.g. due to a reversion.
299	///
300	/// Also, these events are not sent during a major sync.
301	BlockImported(BlockInfo),
302	/// A block was finalized with i.e. babe or another consensus algorithm.
303	BlockFinalized(BlockInfo),
304	/// Message as sent to a subsystem.
305	MsgToSubsystem {
306		/// The actual message.
307		msg: AllMessages,
308		/// The originating subsystem name.
309		origin: &'static str,
310		/// The priority of the message.
311		priority: PriorityLevel,
312	},
313	/// A request from the outer world.
314	ExternalRequest(ExternalRequest),
315	/// Stop the overseer on i.e. a UNIX signal.
316	Stop,
317}
318
319/// Some request from outer world.
320#[derive(Debug)]
321pub enum ExternalRequest {
322	/// Wait for the activation of a particular hash
323	/// and be notified by means of the return channel.
324	WaitForActivation {
325		/// The relay parent for which activation to wait for.
326		hash: Hash,
327		/// Response channel to await on.
328		response_channel: oneshot::Sender<SubsystemResult<()>>,
329	},
330}
331
332/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
333/// import and finality notifications into the [`OverseerHandle`].
334pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
335	let mut finality = client.finality_notification_stream();
336	let mut imports = client.import_notification_stream();
337
338	loop {
339		select! {
340			f = finality.next() => {
341				match f {
342					Some(block) => {
343						handle.block_finalized(block.into()).await;
344					}
345					None => break,
346				}
347			},
348			i = imports.next() => {
349				match i {
350					Some(block) => {
351						handle.block_imported(block.into()).await;
352					}
353					None => break,
354				}
355			},
356			complete => break,
357		}
358	}
359}
360
361/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
362///
363/// This returns the overseer along with an [`OverseerHandle`] which can
364/// be used to send messages from external parts of the codebase.
365///
366/// The [`OverseerHandle`] returned from this function is connected to
367/// the returned [`Overseer`].
368///
369/// ```text
370///                  +------------------------------------+
371///                  |            Overseer                |
372///                  +------------------------------------+
373///                    /            |             |      \
374///      ................. subsystems...................................
375///      . +-----------+    +-----------+   +----------+   +---------+ .
376///      . |           |    |           |   |          |   |         | .
377///      . +-----------+    +-----------+   +----------+   +---------+ .
378///      ...............................................................
379///                              |
380///                        probably `spawn`
381///                            a `job`
382///                              |
383///                              V
384///                         +-----------+
385///                         |           |
386///                         +-----------+
387/// ```
388///
389/// [`Subsystem`]: trait.Subsystem.html
390///
391/// # Example
392///
393/// The [`Subsystems`] may be any type as long as they implement an expected interface.
394/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with
395/// them. For the sake of simplicity the termination of the example is done with a timeout.
396/// ```
397/// # use std::time::Duration;
398/// # use futures::{executor, pin_mut, select, FutureExt};
399/// # use futures_timer::Delay;
400/// # use polkadot_primitives::Hash;
401/// # use polkadot_overseer::{
402/// # 	self as overseer,
403/// #   OverseerSignal,
404/// # 	SubsystemSender as _,
405/// # 	AllMessages,
406/// # 	HeadSupportsParachains,
407/// # 	Overseer,
408/// # 	SubsystemError,
409/// # 	gen::{
410/// # 		SubsystemContext,
411/// # 		FromOrchestra,
412/// # 		SpawnedSubsystem,
413/// # 	},
414/// # };
415/// # use polkadot_node_subsystem_types::messages::{
416/// # 	CandidateValidationMessage, CandidateBackingMessage,
417/// # 	NetworkBridgeTxMessage,
418/// # };
419///
420/// struct ValidationSubsystem;
421///
422/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
423/// where
424///     Ctx: overseer::SubsystemContext<
425/// 				Message=CandidateValidationMessage,
426/// 				AllMessages=AllMessages,
427/// 				Signal=OverseerSignal,
428/// 				Error=SubsystemError,
429/// 			>,
430/// {
431///     fn start(
432///         self,
433///         mut ctx: Ctx,
434///     ) -> SpawnedSubsystem<SubsystemError> {
435///         SpawnedSubsystem {
436///             name: "validation-subsystem",
437///             future: Box::pin(async move {
438///                 loop {
439///                     Delay::new(Duration::from_secs(1)).await;
440///                 }
441///             }),
442///         }
443///     }
444/// }
445///
446/// # fn main() { executor::block_on(async move {
447///
448/// struct AlwaysSupportsParachains;
449///
450/// #[async_trait::async_trait]
451/// impl HeadSupportsParachains for AlwaysSupportsParachains {
452///      async fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
453/// }
454///
455/// let spawner = sp_core::testing::TaskExecutor::new();
456/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
457/// 		.unwrap()
458/// 		.replace_candidate_validation(|_| ValidationSubsystem)
459/// 		.build()
460/// 		.unwrap();
461///
462/// let timer = Delay::new(Duration::from_millis(50)).fuse();
463///
464/// let overseer_fut = overseer.run().fuse();
465/// pin_mut!(timer);
466/// pin_mut!(overseer_fut);
467///
468/// select! {
469///     _ = overseer_fut => (),
470///     _ = timer => (),
471/// }
472/// #
473/// # 	});
474/// # }
475/// ```
476#[orchestra(
477	gen=AllMessages,
478	event=Event,
479	signal=OverseerSignal,
480	error=SubsystemError,
481	message_capacity=2048,
482)]
483pub struct Overseer<SupportsParachains> {
484	#[subsystem(CandidateValidationMessage, sends: [
485		ChainApiMessage,
486		RuntimeApiMessage,
487	])]
488	candidate_validation: CandidateValidation,
489
490	#[subsystem(sends: [
491		CandidateValidationMessage,
492		RuntimeApiMessage,
493	])]
494	pvf_checker: PvfChecker,
495
496	#[subsystem(CandidateBackingMessage, sends: [
497		CandidateValidationMessage,
498		CollatorProtocolMessage,
499		ChainApiMessage,
500		AvailabilityDistributionMessage,
501		AvailabilityStoreMessage,
502		StatementDistributionMessage,
503		ProvisionerMessage,
504		RuntimeApiMessage,
505		ProspectiveParachainsMessage,
506	])]
507	candidate_backing: CandidateBacking,
508
509	#[subsystem(StatementDistributionMessage, sends: [
510		NetworkBridgeTxMessage,
511		CandidateBackingMessage,
512		RuntimeApiMessage,
513		ProspectiveParachainsMessage,
514		ChainApiMessage,
515	], can_receive_priority_messages)]
516	statement_distribution: StatementDistribution,
517
518	#[subsystem(AvailabilityDistributionMessage, sends: [
519		AvailabilityStoreMessage,
520		ChainApiMessage,
521		RuntimeApiMessage,
522		NetworkBridgeTxMessage,
523	])]
524	availability_distribution: AvailabilityDistribution,
525
526	#[subsystem(AvailabilityRecoveryMessage, sends: [
527		NetworkBridgeTxMessage,
528		RuntimeApiMessage,
529		AvailabilityStoreMessage,
530	])]
531	availability_recovery: AvailabilityRecovery,
532
533	#[subsystem(blocking, sends: [
534		AvailabilityStoreMessage,
535		RuntimeApiMessage,
536		BitfieldDistributionMessage,
537	])]
538	bitfield_signing: BitfieldSigning,
539
540	#[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
541		RuntimeApiMessage,
542		NetworkBridgeTxMessage,
543		ProvisionerMessage,
544	], can_receive_priority_messages)]
545	bitfield_distribution: BitfieldDistribution,
546
547	#[subsystem(ProvisionerMessage, sends: [
548		RuntimeApiMessage,
549		CandidateBackingMessage,
550		DisputeCoordinatorMessage,
551		ProspectiveParachainsMessage,
552	])]
553	provisioner: Provisioner,
554
555	#[subsystem(blocking, RuntimeApiMessage, sends: [])]
556	runtime_api: RuntimeApi,
557
558	#[subsystem(blocking, AvailabilityStoreMessage, sends: [
559		ChainApiMessage,
560		RuntimeApiMessage,
561	])]
562	availability_store: AvailabilityStore,
563
564	#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
565		BitfieldDistributionMessage,
566		StatementDistributionMessage,
567		ApprovalDistributionMessage,
568		ApprovalVotingParallelMessage,
569		GossipSupportMessage,
570		DisputeDistributionMessage,
571		CollationGenerationMessage,
572		CollatorProtocolMessage,
573	])]
574	network_bridge_rx: NetworkBridgeRx,
575
576	#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
577	network_bridge_tx: NetworkBridgeTx,
578
579	#[subsystem(blocking, ChainApiMessage, sends: [])]
580	chain_api: ChainApi,
581
582	#[subsystem(CollationGenerationMessage, sends: [
583		RuntimeApiMessage,
584		CollatorProtocolMessage,
585	])]
586	collation_generation: CollationGeneration,
587
588	#[subsystem(CollatorProtocolMessage, sends: [
589		NetworkBridgeTxMessage,
590		RuntimeApiMessage,
591		CandidateBackingMessage,
592		ChainApiMessage,
593		ProspectiveParachainsMessage,
594	])]
595	collator_protocol: CollatorProtocol,
596
597	#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
598		NetworkBridgeTxMessage,
599		ApprovalVotingMessage,
600		RuntimeApiMessage,
601	], can_receive_priority_messages)]
602	approval_distribution: ApprovalDistribution,
603
604	#[subsystem(blocking, ApprovalVotingMessage, sends: [
605		ApprovalDistributionMessage,
606		AvailabilityRecoveryMessage,
607		CandidateValidationMessage,
608		ChainApiMessage,
609		ChainSelectionMessage,
610		DisputeCoordinatorMessage,
611		RuntimeApiMessage,
612	])]
613	approval_voting: ApprovalVoting,
614	#[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
615		AvailabilityRecoveryMessage,
616		CandidateValidationMessage,
617		ChainApiMessage,
618		ChainSelectionMessage,
619		DisputeCoordinatorMessage,
620		RuntimeApiMessage,
621		NetworkBridgeTxMessage,
622		ApprovalVotingMessage,
623		ApprovalDistributionMessage,
624		ApprovalVotingParallelMessage,
625	], can_receive_priority_messages)]
626	approval_voting_parallel: ApprovalVotingParallel,
627	#[subsystem(GossipSupportMessage, sends: [
628		NetworkBridgeTxMessage,
629		NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626>
630		RuntimeApiMessage,
631		ChainSelectionMessage,
632		ChainApiMessage,
633	], can_receive_priority_messages)]
634	gossip_support: GossipSupport,
635
636	#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
637		RuntimeApiMessage,
638		ChainApiMessage,
639		DisputeDistributionMessage,
640		CandidateValidationMessage,
641		ApprovalVotingMessage,
642		AvailabilityStoreMessage,
643		AvailabilityRecoveryMessage,
644		ChainSelectionMessage,
645		ApprovalVotingParallelMessage,
646	], can_receive_priority_messages)]
647	dispute_coordinator: DisputeCoordinator,
648
649	#[subsystem(DisputeDistributionMessage, sends: [
650		RuntimeApiMessage,
651		DisputeCoordinatorMessage,
652		NetworkBridgeTxMessage,
653	])]
654	dispute_distribution: DisputeDistribution,
655
656	#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
657	chain_selection: ChainSelection,
658
659	#[subsystem(ProspectiveParachainsMessage, sends: [
660		RuntimeApiMessage,
661		ChainApiMessage,
662	])]
663	prospective_parachains: ProspectiveParachains,
664
665	/// External listeners waiting for a hash to be in the active-leave set.
666	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
667
668	/// The set of the "active leaves".
669	pub active_leaves: HashMap<Hash, BlockNumber>,
670
671	/// An implementation for checking whether a header supports parachain consensus.
672	pub supports_parachains: SupportsParachains,
673
674	/// Various Prometheus metrics.
675	pub metrics: OverseerMetrics,
676}
677
678/// Spawn the metrics metronome task.
679pub fn spawn_metronome_metrics<S, SupportsParachains>(
680	overseer: &mut Overseer<S, SupportsParachains>,
681	metronome_metrics: OverseerMetrics,
682) -> Result<(), SubsystemError>
683where
684	S: Spawner,
685	SupportsParachains: HeadSupportsParachains,
686{
687	struct ExtractNameAndMeters;
688
689	impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
690		type Output = Option<(&'static str, SubsystemMeters)>;
691
692		fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
693			subsystem
694				.instance
695				.as_ref()
696				.map(|instance| (instance.name, instance.meters.clone()))
697		}
698	}
699	let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
700
701	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
702	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
703		match memory_stats::MemoryAllocationTracker::new() {
704			Ok(memory_stats) =>
705				Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
706					Ok(memory_stats_snapshot) => {
707						gum::trace!(
708							target: LOG_TARGET,
709							"memory_stats: {:?}",
710							&memory_stats_snapshot
711						);
712						metrics.memory_stats_snapshot(memory_stats_snapshot);
713					},
714					Err(e) =>
715						gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
716				}),
717			Err(_) => {
718				gum::debug!(
719					target: LOG_TARGET,
720					"Memory allocation tracking is not supported by the allocator.",
721				);
722
723				Box::new(|_| {})
724			},
725		};
726
727	#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
728	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
729
730	let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
731		collect_memory_stats(&metronome_metrics);
732
733		// We combine the amount of messages from subsystems to the overseer
734		// as well as the amount of messages from external sources to the overseer
735		// into one `to_overseer` value.
736		metronome_metrics.channel_metrics_snapshot(
737			subsystem_meters
738				.iter()
739				.cloned()
740				.flatten()
741				.map(|(name, ref meters)| (name, meters.read())),
742		);
743
744		futures::future::ready(())
745	});
746	overseer
747		.spawner()
748		.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
749
750	Ok(())
751}
752
753impl<S, SupportsParachains> Overseer<S, SupportsParachains>
754where
755	SupportsParachains: HeadSupportsParachains,
756	S: Spawner,
757{
758	/// Stop the `Overseer`.
759	async fn stop(mut self) {
760		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
761	}
762
763	/// Run the `Overseer`.
764	///
765	/// Logging any errors.
766	pub async fn run(self) {
767		if let Err(err) = self.run_inner().await {
768			gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
769		}
770	}
771
772	async fn run_inner(mut self) -> SubsystemResult<()> {
773		let metrics = self.metrics.clone();
774		spawn_metronome_metrics(&mut self, metrics)?;
775
776		loop {
777			select! {
778				msg = self.events_rx.select_next_some() => {
779					match msg {
780						Event::MsgToSubsystem { msg, origin, priority } => {
781							match priority {
782								PriorityLevel::Normal => {
783									self.route_message(msg.into(), origin).await?;
784								},
785								PriorityLevel::High => {
786									self.route_message_with_priority::<HighPriority>(msg.into(), origin).await?;
787								},
788							}
789							self.metrics.on_message_relayed();
790						}
791						Event::Stop => {
792							self.stop().await;
793							return Ok(());
794						}
795						Event::BlockImported(block) => {
796							self.block_imported(block).await?;
797						}
798						Event::BlockFinalized(block) => {
799							self.block_finalized(block).await?;
800						}
801						Event::ExternalRequest(request) => {
802							self.handle_external_request(request);
803						}
804					}
805				},
806				msg = self.to_orchestra_rx.select_next_some() => {
807					match msg {
808						ToOrchestra::SpawnJob { name, subsystem, s } => {
809							self.spawn_job(name, subsystem, s);
810						}
811						ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
812							self.spawn_blocking_job(name, subsystem, s);
813						}
814					}
815				},
816				res = self.running_subsystems.select_next_some() => {
817					gum::error!(
818						target: LOG_TARGET,
819						subsystem = ?res,
820						"subsystem finished unexpectedly",
821					);
822					self.stop().await;
823					return res;
824				},
825			}
826		}
827	}
828
829	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
830		match self.active_leaves.entry(block.hash) {
831			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
832			hash_map::Entry::Occupied(entry) => {
833				debug_assert_eq!(*entry.get(), block.number);
834				return Ok(())
835			},
836		};
837
838		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
839			Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
840				hash: block.hash,
841				number: block.number,
842				unpin_handle: block.unpin_handle,
843			}),
844			None => ActiveLeavesUpdate::default(),
845		};
846
847		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
848			debug_assert_eq!(block.number.saturating_sub(1), number);
849			update.deactivated.push(block.parent_hash);
850			self.on_head_deactivated(&block.parent_hash);
851		}
852
853		self.clean_up_external_listeners();
854
855		if !update.is_empty() {
856			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
857		}
858		Ok(())
859	}
860
861	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
862		let mut update = ActiveLeavesUpdate::default();
863
864		self.active_leaves.retain(|h, n| {
865			// prune all orphaned leaves, but don't prune
866			// the finalized block if it is itself a leaf.
867			if *n <= block.number && *h != block.hash {
868				update.deactivated.push(*h);
869				false
870			} else {
871				true
872			}
873		});
874
875		for deactivated in &update.deactivated {
876			self.on_head_deactivated(deactivated)
877		}
878
879		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
880			.await?;
881
882		// If there are no leaves being deactivated, we don't need to send an update.
883		//
884		// Our peers will be informed about our finalized block the next time we
885		// activating/deactivating some leaf.
886		if !update.is_empty() {
887			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
888		}
889
890		Ok(())
891	}
892
893	/// Handles a header activation. If the header's state doesn't support the parachains API,
894	/// this returns `None`.
895	async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
896		if !self.supports_parachains.head_supports_parachains(hash).await {
897			return None
898		}
899
900		self.metrics.on_head_activated();
901		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
902			gum::trace!(
903				target: LOG_TARGET,
904				relay_parent = ?hash,
905				"Leaf got activated, notifying external listeners"
906			);
907			for listener in listeners {
908				// it's fine if the listener is no longer interested
909				let _ = listener.send(Ok(()));
910			}
911		}
912
913		Some(())
914	}
915
916	fn on_head_deactivated(&mut self, hash: &Hash) {
917		self.metrics.on_head_deactivated();
918		self.activation_external_listeners.remove(hash);
919	}
920
921	fn clean_up_external_listeners(&mut self) {
922		self.activation_external_listeners.retain(|_, v| {
923			// remove dead listeners
924			v.retain(|c| !c.is_canceled());
925			!v.is_empty()
926		})
927	}
928
929	fn handle_external_request(&mut self, request: ExternalRequest) {
930		match request {
931			ExternalRequest::WaitForActivation { hash, response_channel } => {
932				if self.active_leaves.get(&hash).is_some() {
933					gum::trace!(
934						target: LOG_TARGET,
935						relay_parent = ?hash,
936						"Leaf was already ready - answering `WaitForActivation`"
937					);
938					// it's fine if the listener is no longer interested
939					let _ = response_channel.send(Ok(()));
940				} else {
941					gum::trace!(
942						target: LOG_TARGET,
943						relay_parent = ?hash,
944						"Leaf not yet ready - queuing `WaitForActivation` sender"
945					);
946					self.activation_external_listeners
947						.entry(hash)
948						.or_default()
949						.push(response_channel);
950				}
951			},
952		}
953	}
954
955	fn spawn_job(
956		&mut self,
957		task_name: &'static str,
958		subsystem_name: Option<&'static str>,
959		j: BoxFuture<'static, ()>,
960	) {
961		self.spawner.spawn(task_name, subsystem_name, j);
962	}
963
964	fn spawn_blocking_job(
965		&mut self,
966		task_name: &'static str,
967		subsystem_name: Option<&'static str>,
968		j: BoxFuture<'static, ()>,
969	) {
970		self.spawner.spawn_blocking(task_name, subsystem_name, j);
971	}
972}