polkadot_overseer/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! # Overseer
18//!
19//! `overseer` implements the Overseer architecture described in the
20//! [implementers' guide][overseer-page].
21//! For the motivations behind implementing the overseer itself you should
22//! check out that guide, documentation in this crate will be mostly discussing
23//! technical stuff.
24//!
25//! An `Overseer` is something that allows spawning/stopping and overseeing
26//! asynchronous tasks as well as establishing a well-defined and easy to use
27//! protocol that the tasks can use to communicate with each other. It is desired
28//! that this protocol is the only way tasks communicate with each other, however
29//! at this moment there are no foolproof guards against other ways of communication.
30//!
31//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
32//! share the same behavior from `Overseer`'s point of view.
33//!
34//! ```text
35//!                              +-----------------------------+
36//!                              |         Overseer            |
37//!                              +-----------------------------+
38//!
39//!             ................|  Overseer "holds" these and uses |..............
40//!             .                  them to (re)start things                      .
41//!             .                                                                .
42//!             .  +-------------------+                +---------------------+  .
43//!             .  |   Subsystem1      |                |   Subsystem2        |  .
44//!             .  +-------------------+                +---------------------+  .
45//!             .           |                                       |            .
46//!             ..................................................................
47//!                         |                                       |
48//!                       start()                                 start()
49//!                         V                                       V
50//!             ..................| Overseer "runs" these |.......................
51//!             .  +--------------------+               +---------------------+  .
52//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
53//!             .  +--------------------+               +---------------------+  .
54//!             ..................................................................
55//! ```
56//!
57//! [overseer-page]: https://paritytech.github.io/polkadot-sdk/book/node/overseer.html
58
59// #![deny(unused_results)]
60// unused dependencies can not work for test and examples at the same time
61// yielding false positives
62#![warn(missing_docs)]
63// TODO https://github.com/paritytech/polkadot-sdk/issues/5793
64#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67	collections::{hash_map, HashMap},
68	fmt::{self, Debug},
69	pin::Pin,
70	sync::Arc,
71	time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use polkadot_primitives::{Block, BlockNumber, Hash};
77use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use polkadot_node_subsystem_types::messages::{
81	ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82	AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83	BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84	ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85	DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86	NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
87	ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use polkadot_node_subsystem_types::{
91	errors::{SubsystemError, SubsystemResult},
92	ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93	UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99/// A dummy subsystem, mostly useful for placeholders and tests.
100pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use polkadot_node_metrics::{
104	metrics::{prometheus, Metrics as MetricsTrait},
105	Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110	contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111	NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112	Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113	SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114	TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use sp_core::traits::SpawnNamed;
123
124/// Glue to connect `trait orchestra::Spawner` and `SpawnNamed` from `substrate`.
125pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128	fn as_ref(&self) -> &S {
129		&self.0
130	}
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134	fn clone(&self) -> Self {
135		Self(self.0.clone())
136	}
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140	fn spawn_blocking(
141		&self,
142		name: &'static str,
143		group: Option<&'static str>,
144		future: futures::future::BoxFuture<'static, ()>,
145	) {
146		SpawnNamed::spawn_blocking(&self.0, name, group, future)
147	}
148	fn spawn(
149		&self,
150		name: &'static str,
151		group: Option<&'static str>,
152		future: futures::future::BoxFuture<'static, ()>,
153	) {
154		SpawnNamed::spawn(&self.0, name, group, future)
155	}
156}
157
158/// Whether a header supports parachain consensus or not.
159#[async_trait::async_trait]
160pub trait HeadSupportsParachains {
161	/// Return true if the given header supports parachain consensus. Otherwise, false.
162	async fn head_supports_parachains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsParachains for Arc<Client>
167where
168	Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170	async fn head_supports_parachains(&self, head: &Hash) -> bool {
171		// Check that the `ParachainHost` runtime api is at least with version 1 present on chain.
172		self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173	}
174}
175
176/// A handle used to communicate with the [`Overseer`].
177///
178/// [`Overseer`]: struct.Overseer.html
179#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183	/// Create a new [`Handle`].
184	pub fn new(raw: OverseerHandle) -> Self {
185		Self(raw)
186	}
187
188	/// Inform the `Overseer` that that some block was imported.
189	pub async fn block_imported(&mut self, block: BlockInfo) {
190		self.send_and_log_error(Event::BlockImported(block)).await
191	}
192
193	/// Send some message to one of the `Subsystem`s.
194	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
196	}
197
198	/// Send a message not providing an origin.
199	#[inline(always)]
200	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
201		self.send_msg(msg, "").await
202	}
203
204	/// Inform the `Overseer` that some block was finalized.
205	pub async fn block_finalized(&mut self, block: BlockInfo) {
206		self.send_and_log_error(Event::BlockFinalized(block)).await
207	}
208
209	/// Wait for a block with the given hash to be in the active-leaves set.
210	///
211	/// The response channel responds if the hash was activated and is closed if the hash was
212	/// deactivated. Note that due the fact the overseer doesn't store the whole active-leaves set,
213	/// only deltas, the response channel may never return if the hash was deactivated before this
214	/// call. In this case, it's the caller's responsibility to ensure a timeout is set.
215	pub async fn wait_for_activation(
216		&mut self,
217		hash: Hash,
218		response_channel: oneshot::Sender<SubsystemResult<()>>,
219	) {
220		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
221			hash,
222			response_channel,
223		}))
224		.await;
225	}
226
227	/// Tell `Overseer` to shutdown.
228	pub async fn stop(&mut self) {
229		self.send_and_log_error(Event::Stop).await;
230	}
231
232	/// Most basic operation, to stop a server.
233	async fn send_and_log_error(&mut self, event: Event) {
234		if self.0.send(event).await.is_err() {
235			gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
236		}
237	}
238}
239
240/// An event telling the `Overseer` on the particular block
241/// that has been imported or finalized.
242///
243/// This structure exists solely for the purposes of decoupling
244/// `Overseer` code from the client code and the necessity to call
245/// `HeaderBackend::block_number_from_id()`.
246#[derive(Debug, Clone)]
247pub struct BlockInfo {
248	/// Hash of the block.
249	pub hash: Hash,
250	/// Hash of the parent block.
251	pub parent_hash: Hash,
252	/// Block's number.
253	pub number: BlockNumber,
254	/// A handle to unpin the block on drop.
255	pub unpin_handle: UnpinHandle,
256}
257
258impl From<BlockImportNotification<Block>> for BlockInfo {
259	fn from(n: BlockImportNotification<Block>) -> Self {
260		let hash = n.hash;
261		let parent_hash = n.header.parent_hash;
262		let number = n.header.number;
263		let unpin_handle = n.into_unpin_handle();
264
265		BlockInfo { hash, parent_hash, number, unpin_handle }
266	}
267}
268
269impl From<FinalityNotification<Block>> for BlockInfo {
270	fn from(n: FinalityNotification<Block>) -> Self {
271		let hash = n.hash;
272		let parent_hash = n.header.parent_hash;
273		let number = n.header.number;
274		let unpin_handle = n.into_unpin_handle();
275
276		BlockInfo { hash, parent_hash, number, unpin_handle }
277	}
278}
279
280/// An event from outside the overseer scope, such
281/// as the substrate framework or user interaction.
282#[derive(Debug)]
283pub enum Event {
284	/// A new block was imported.
285	///
286	/// This event is not sent if the block was already known
287	/// and we reorged to it e.g. due to a reversion.
288	///
289	/// Also, these events are not sent during a major sync.
290	BlockImported(BlockInfo),
291	/// A block was finalized with i.e. babe or another consensus algorithm.
292	BlockFinalized(BlockInfo),
293	/// Message as sent to a subsystem.
294	MsgToSubsystem {
295		/// The actual message.
296		msg: AllMessages,
297		/// The originating subsystem name.
298		origin: &'static str,
299	},
300	/// A request from the outer world.
301	ExternalRequest(ExternalRequest),
302	/// Stop the overseer on i.e. a UNIX signal.
303	Stop,
304}
305
306/// Some request from outer world.
307#[derive(Debug)]
308pub enum ExternalRequest {
309	/// Wait for the activation of a particular hash
310	/// and be notified by means of the return channel.
311	WaitForActivation {
312		/// The relay parent for which activation to wait for.
313		hash: Hash,
314		/// Response channel to await on.
315		response_channel: oneshot::Sender<SubsystemResult<()>>,
316	},
317}
318
319/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
320/// import and finality notifications into the [`OverseerHandle`].
321pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
322	let mut finality = client.finality_notification_stream();
323	let mut imports = client.import_notification_stream();
324
325	loop {
326		select! {
327			f = finality.next() => {
328				match f {
329					Some(block) => {
330						handle.block_finalized(block.into()).await;
331					}
332					None => break,
333				}
334			},
335			i = imports.next() => {
336				match i {
337					Some(block) => {
338						handle.block_imported(block.into()).await;
339					}
340					None => break,
341				}
342			},
343			complete => break,
344		}
345	}
346}
347
348/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
349///
350/// This returns the overseer along with an [`OverseerHandle`] which can
351/// be used to send messages from external parts of the codebase.
352///
353/// The [`OverseerHandle`] returned from this function is connected to
354/// the returned [`Overseer`].
355///
356/// ```text
357///                  +------------------------------------+
358///                  |            Overseer                |
359///                  +------------------------------------+
360///                    /            |             |      \
361///      ................. subsystems...................................
362///      . +-----------+    +-----------+   +----------+   +---------+ .
363///      . |           |    |           |   |          |   |         | .
364///      . +-----------+    +-----------+   +----------+   +---------+ .
365///      ...............................................................
366///                              |
367///                        probably `spawn`
368///                            a `job`
369///                              |
370///                              V
371///                         +-----------+
372///                         |           |
373///                         +-----------+
374/// ```
375///
376/// [`Subsystem`]: trait.Subsystem.html
377///
378/// # Example
379///
380/// The [`Subsystems`] may be any type as long as they implement an expected interface.
381/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with
382/// them. For the sake of simplicity the termination of the example is done with a timeout.
383/// ```
384/// # use std::time::Duration;
385/// # use futures::{executor, pin_mut, select, FutureExt};
386/// # use futures_timer::Delay;
387/// # use polkadot_primitives::Hash;
388/// # use polkadot_overseer::{
389/// # 	self as overseer,
390/// #   OverseerSignal,
391/// # 	SubsystemSender as _,
392/// # 	AllMessages,
393/// # 	HeadSupportsParachains,
394/// # 	Overseer,
395/// # 	SubsystemError,
396/// # 	gen::{
397/// # 		SubsystemContext,
398/// # 		FromOrchestra,
399/// # 		SpawnedSubsystem,
400/// # 	},
401/// # };
402/// # use polkadot_node_subsystem_types::messages::{
403/// # 	CandidateValidationMessage, CandidateBackingMessage,
404/// # 	NetworkBridgeTxMessage,
405/// # };
406///
407/// struct ValidationSubsystem;
408///
409/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
410/// where
411///     Ctx: overseer::SubsystemContext<
412/// 				Message=CandidateValidationMessage,
413/// 				AllMessages=AllMessages,
414/// 				Signal=OverseerSignal,
415/// 				Error=SubsystemError,
416/// 			>,
417/// {
418///     fn start(
419///         self,
420///         mut ctx: Ctx,
421///     ) -> SpawnedSubsystem<SubsystemError> {
422///         SpawnedSubsystem {
423///             name: "validation-subsystem",
424///             future: Box::pin(async move {
425///                 loop {
426///                     Delay::new(Duration::from_secs(1)).await;
427///                 }
428///             }),
429///         }
430///     }
431/// }
432///
433/// # fn main() { executor::block_on(async move {
434///
435/// struct AlwaysSupportsParachains;
436///
437/// #[async_trait::async_trait]
438/// impl HeadSupportsParachains for AlwaysSupportsParachains {
439///      async fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
440/// }
441///
442/// let spawner = sp_core::testing::TaskExecutor::new();
443/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
444/// 		.unwrap()
445/// 		.replace_candidate_validation(|_| ValidationSubsystem)
446/// 		.build()
447/// 		.unwrap();
448///
449/// let timer = Delay::new(Duration::from_millis(50)).fuse();
450///
451/// let overseer_fut = overseer.run().fuse();
452/// pin_mut!(timer);
453/// pin_mut!(overseer_fut);
454///
455/// select! {
456///     _ = overseer_fut => (),
457///     _ = timer => (),
458/// }
459/// #
460/// # 	});
461/// # }
462/// ```
463#[orchestra(
464	gen=AllMessages,
465	event=Event,
466	signal=OverseerSignal,
467	error=SubsystemError,
468	message_capacity=2048,
469)]
470pub struct Overseer<SupportsParachains> {
471	#[subsystem(CandidateValidationMessage, sends: [
472		ChainApiMessage,
473		RuntimeApiMessage,
474	])]
475	candidate_validation: CandidateValidation,
476
477	#[subsystem(sends: [
478		CandidateValidationMessage,
479		RuntimeApiMessage,
480	])]
481	pvf_checker: PvfChecker,
482
483	#[subsystem(CandidateBackingMessage, sends: [
484		CandidateValidationMessage,
485		CollatorProtocolMessage,
486		ChainApiMessage,
487		AvailabilityDistributionMessage,
488		AvailabilityStoreMessage,
489		StatementDistributionMessage,
490		ProvisionerMessage,
491		RuntimeApiMessage,
492		ProspectiveParachainsMessage,
493	])]
494	candidate_backing: CandidateBacking,
495
496	#[subsystem(StatementDistributionMessage, sends: [
497		NetworkBridgeTxMessage,
498		CandidateBackingMessage,
499		RuntimeApiMessage,
500		ProspectiveParachainsMessage,
501		ChainApiMessage,
502	], can_receive_priority_messages)]
503	statement_distribution: StatementDistribution,
504
505	#[subsystem(AvailabilityDistributionMessage, sends: [
506		AvailabilityStoreMessage,
507		ChainApiMessage,
508		RuntimeApiMessage,
509		NetworkBridgeTxMessage,
510	])]
511	availability_distribution: AvailabilityDistribution,
512
513	#[subsystem(AvailabilityRecoveryMessage, sends: [
514		NetworkBridgeTxMessage,
515		RuntimeApiMessage,
516		AvailabilityStoreMessage,
517	])]
518	availability_recovery: AvailabilityRecovery,
519
520	#[subsystem(blocking, sends: [
521		AvailabilityStoreMessage,
522		RuntimeApiMessage,
523		BitfieldDistributionMessage,
524	])]
525	bitfield_signing: BitfieldSigning,
526
527	#[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
528		RuntimeApiMessage,
529		NetworkBridgeTxMessage,
530		ProvisionerMessage,
531	], can_receive_priority_messages)]
532	bitfield_distribution: BitfieldDistribution,
533
534	#[subsystem(ProvisionerMessage, sends: [
535		RuntimeApiMessage,
536		CandidateBackingMessage,
537		DisputeCoordinatorMessage,
538		ProspectiveParachainsMessage,
539	])]
540	provisioner: Provisioner,
541
542	#[subsystem(blocking, RuntimeApiMessage, sends: [])]
543	runtime_api: RuntimeApi,
544
545	#[subsystem(blocking, AvailabilityStoreMessage, sends: [
546		ChainApiMessage,
547		RuntimeApiMessage,
548	])]
549	availability_store: AvailabilityStore,
550
551	#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
552		BitfieldDistributionMessage,
553		StatementDistributionMessage,
554		ApprovalDistributionMessage,
555		ApprovalVotingParallelMessage,
556		GossipSupportMessage,
557		DisputeDistributionMessage,
558		CollationGenerationMessage,
559		CollatorProtocolMessage,
560	])]
561	network_bridge_rx: NetworkBridgeRx,
562
563	#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
564	network_bridge_tx: NetworkBridgeTx,
565
566	#[subsystem(blocking, ChainApiMessage, sends: [])]
567	chain_api: ChainApi,
568
569	#[subsystem(CollationGenerationMessage, sends: [
570		RuntimeApiMessage,
571		CollatorProtocolMessage,
572	])]
573	collation_generation: CollationGeneration,
574
575	#[subsystem(CollatorProtocolMessage, sends: [
576		NetworkBridgeTxMessage,
577		RuntimeApiMessage,
578		CandidateBackingMessage,
579		ChainApiMessage,
580		ProspectiveParachainsMessage,
581	])]
582	collator_protocol: CollatorProtocol,
583
584	#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
585		NetworkBridgeTxMessage,
586		ApprovalVotingMessage,
587		RuntimeApiMessage,
588	], can_receive_priority_messages)]
589	approval_distribution: ApprovalDistribution,
590
591	#[subsystem(blocking, ApprovalVotingMessage, sends: [
592		ApprovalDistributionMessage,
593		AvailabilityRecoveryMessage,
594		CandidateValidationMessage,
595		ChainApiMessage,
596		ChainSelectionMessage,
597		DisputeCoordinatorMessage,
598		RuntimeApiMessage,
599	])]
600	approval_voting: ApprovalVoting,
601	#[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
602		AvailabilityRecoveryMessage,
603		CandidateValidationMessage,
604		ChainApiMessage,
605		ChainSelectionMessage,
606		DisputeCoordinatorMessage,
607		RuntimeApiMessage,
608		NetworkBridgeTxMessage,
609		ApprovalVotingMessage,
610		ApprovalDistributionMessage,
611		ApprovalVotingParallelMessage,
612	])]
613	approval_voting_parallel: ApprovalVotingParallel,
614	#[subsystem(GossipSupportMessage, sends: [
615		NetworkBridgeTxMessage,
616		NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626>
617		RuntimeApiMessage,
618		ChainSelectionMessage,
619		ChainApiMessage,
620	], can_receive_priority_messages)]
621	gossip_support: GossipSupport,
622
623	#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
624		RuntimeApiMessage,
625		ChainApiMessage,
626		DisputeDistributionMessage,
627		CandidateValidationMessage,
628		ApprovalVotingMessage,
629		AvailabilityStoreMessage,
630		AvailabilityRecoveryMessage,
631		ChainSelectionMessage,
632		ApprovalVotingParallelMessage,
633	])]
634	dispute_coordinator: DisputeCoordinator,
635
636	#[subsystem(DisputeDistributionMessage, sends: [
637		RuntimeApiMessage,
638		DisputeCoordinatorMessage,
639		NetworkBridgeTxMessage,
640	])]
641	dispute_distribution: DisputeDistribution,
642
643	#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
644	chain_selection: ChainSelection,
645
646	#[subsystem(ProspectiveParachainsMessage, sends: [
647		RuntimeApiMessage,
648		ChainApiMessage,
649	])]
650	prospective_parachains: ProspectiveParachains,
651
652	/// External listeners waiting for a hash to be in the active-leave set.
653	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
654
655	/// The set of the "active leaves".
656	pub active_leaves: HashMap<Hash, BlockNumber>,
657
658	/// An implementation for checking whether a header supports parachain consensus.
659	pub supports_parachains: SupportsParachains,
660
661	/// Various Prometheus metrics.
662	pub metrics: OverseerMetrics,
663}
664
665/// Spawn the metrics metronome task.
666pub fn spawn_metronome_metrics<S, SupportsParachains>(
667	overseer: &mut Overseer<S, SupportsParachains>,
668	metronome_metrics: OverseerMetrics,
669) -> Result<(), SubsystemError>
670where
671	S: Spawner,
672	SupportsParachains: HeadSupportsParachains,
673{
674	struct ExtractNameAndMeters;
675
676	impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
677		type Output = Option<(&'static str, SubsystemMeters)>;
678
679		fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
680			subsystem
681				.instance
682				.as_ref()
683				.map(|instance| (instance.name, instance.meters.clone()))
684		}
685	}
686	let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
687
688	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
689	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
690		match memory_stats::MemoryAllocationTracker::new() {
691			Ok(memory_stats) =>
692				Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
693					Ok(memory_stats_snapshot) => {
694						gum::trace!(
695							target: LOG_TARGET,
696							"memory_stats: {:?}",
697							&memory_stats_snapshot
698						);
699						metrics.memory_stats_snapshot(memory_stats_snapshot);
700					},
701					Err(e) =>
702						gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
703				}),
704			Err(_) => {
705				gum::debug!(
706					target: LOG_TARGET,
707					"Memory allocation tracking is not supported by the allocator.",
708				);
709
710				Box::new(|_| {})
711			},
712		};
713
714	#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
715	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
716
717	let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
718		collect_memory_stats(&metronome_metrics);
719
720		// We combine the amount of messages from subsystems to the overseer
721		// as well as the amount of messages from external sources to the overseer
722		// into one `to_overseer` value.
723		metronome_metrics.channel_metrics_snapshot(
724			subsystem_meters
725				.iter()
726				.cloned()
727				.flatten()
728				.map(|(name, ref meters)| (name, meters.read())),
729		);
730
731		futures::future::ready(())
732	});
733	overseer
734		.spawner()
735		.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
736
737	Ok(())
738}
739
740impl<S, SupportsParachains> Overseer<S, SupportsParachains>
741where
742	SupportsParachains: HeadSupportsParachains,
743	S: Spawner,
744{
745	/// Stop the `Overseer`.
746	async fn stop(mut self) {
747		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
748	}
749
750	/// Run the `Overseer`.
751	///
752	/// Logging any errors.
753	pub async fn run(self) {
754		if let Err(err) = self.run_inner().await {
755			gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
756		}
757	}
758
759	async fn run_inner(mut self) -> SubsystemResult<()> {
760		let metrics = self.metrics.clone();
761		spawn_metronome_metrics(&mut self, metrics)?;
762
763		loop {
764			select! {
765				msg = self.events_rx.select_next_some() => {
766					match msg {
767						Event::MsgToSubsystem { msg, origin } => {
768							self.route_message(msg.into(), origin).await?;
769							self.metrics.on_message_relayed();
770						}
771						Event::Stop => {
772							self.stop().await;
773							return Ok(());
774						}
775						Event::BlockImported(block) => {
776							self.block_imported(block).await?;
777						}
778						Event::BlockFinalized(block) => {
779							self.block_finalized(block).await?;
780						}
781						Event::ExternalRequest(request) => {
782							self.handle_external_request(request);
783						}
784					}
785				},
786				msg = self.to_orchestra_rx.select_next_some() => {
787					match msg {
788						ToOrchestra::SpawnJob { name, subsystem, s } => {
789							self.spawn_job(name, subsystem, s);
790						}
791						ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
792							self.spawn_blocking_job(name, subsystem, s);
793						}
794					}
795				},
796				res = self.running_subsystems.select_next_some() => {
797					gum::error!(
798						target: LOG_TARGET,
799						subsystem = ?res,
800						"subsystem finished unexpectedly",
801					);
802					self.stop().await;
803					return res;
804				},
805			}
806		}
807	}
808
809	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
810		match self.active_leaves.entry(block.hash) {
811			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
812			hash_map::Entry::Occupied(entry) => {
813				debug_assert_eq!(*entry.get(), block.number);
814				return Ok(())
815			},
816		};
817
818		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
819			Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
820				hash: block.hash,
821				number: block.number,
822				unpin_handle: block.unpin_handle,
823			}),
824			None => ActiveLeavesUpdate::default(),
825		};
826
827		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
828			debug_assert_eq!(block.number.saturating_sub(1), number);
829			update.deactivated.push(block.parent_hash);
830			self.on_head_deactivated(&block.parent_hash);
831		}
832
833		self.clean_up_external_listeners();
834
835		if !update.is_empty() {
836			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
837		}
838		Ok(())
839	}
840
841	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
842		let mut update = ActiveLeavesUpdate::default();
843
844		self.active_leaves.retain(|h, n| {
845			// prune all orphaned leaves, but don't prune
846			// the finalized block if it is itself a leaf.
847			if *n <= block.number && *h != block.hash {
848				update.deactivated.push(*h);
849				false
850			} else {
851				true
852			}
853		});
854
855		for deactivated in &update.deactivated {
856			self.on_head_deactivated(deactivated)
857		}
858
859		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
860			.await?;
861
862		// If there are no leaves being deactivated, we don't need to send an update.
863		//
864		// Our peers will be informed about our finalized block the next time we
865		// activating/deactivating some leaf.
866		if !update.is_empty() {
867			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
868		}
869
870		Ok(())
871	}
872
873	/// Handles a header activation. If the header's state doesn't support the parachains API,
874	/// this returns `None`.
875	async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
876		if !self.supports_parachains.head_supports_parachains(hash).await {
877			return None
878		}
879
880		self.metrics.on_head_activated();
881		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
882			gum::trace!(
883				target: LOG_TARGET,
884				relay_parent = ?hash,
885				"Leaf got activated, notifying external listeners"
886			);
887			for listener in listeners {
888				// it's fine if the listener is no longer interested
889				let _ = listener.send(Ok(()));
890			}
891		}
892
893		Some(())
894	}
895
896	fn on_head_deactivated(&mut self, hash: &Hash) {
897		self.metrics.on_head_deactivated();
898		self.activation_external_listeners.remove(hash);
899	}
900
901	fn clean_up_external_listeners(&mut self) {
902		self.activation_external_listeners.retain(|_, v| {
903			// remove dead listeners
904			v.retain(|c| !c.is_canceled());
905			!v.is_empty()
906		})
907	}
908
909	fn handle_external_request(&mut self, request: ExternalRequest) {
910		match request {
911			ExternalRequest::WaitForActivation { hash, response_channel } => {
912				if self.active_leaves.get(&hash).is_some() {
913					gum::trace!(
914						target: LOG_TARGET,
915						relay_parent = ?hash,
916						"Leaf was already ready - answering `WaitForActivation`"
917					);
918					// it's fine if the listener is no longer interested
919					let _ = response_channel.send(Ok(()));
920				} else {
921					gum::trace!(
922						target: LOG_TARGET,
923						relay_parent = ?hash,
924						"Leaf not yet ready - queuing `WaitForActivation` sender"
925					);
926					self.activation_external_listeners
927						.entry(hash)
928						.or_default()
929						.push(response_channel);
930				}
931			},
932		}
933	}
934
935	fn spawn_job(
936		&mut self,
937		task_name: &'static str,
938		subsystem_name: Option<&'static str>,
939		j: BoxFuture<'static, ()>,
940	) {
941		self.spawner.spawn(task_name, subsystem_name, j);
942	}
943
944	fn spawn_blocking_job(
945		&mut self,
946		task_name: &'static str,
947		subsystem_name: Option<&'static str>,
948		j: BoxFuture<'static, ()>,
949	) {
950		self.spawner.spawn_blocking(task_name, subsystem_name, j);
951	}
952}