pezkuwi_overseer/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
2// This file is part of Pezkuwi.
3
4// Pezkuwi is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Pezkuwi is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Pezkuwi.  If not, see <http://www.gnu.org/licenses/>.
16
17//! # Overseer
18//!
19//! `overseer` implements the Overseer architecture described in the
20//! [implementers' guide][overseer-page].
21//! For the motivations behind implementing the overseer itself you should
22//! check out that guide, documentation in this crate will be mostly discussing
23//! technical stuff.
24//!
25//! An `Overseer` is something that allows spawning/stopping and overseeing
26//! asynchronous tasks as well as establishing a well-defined and easy to use
27//! protocol that the tasks can use to communicate with each other. It is desired
28//! that this protocol is the only way tasks communicate with each other, however
29//! at this moment there are no foolproof guards against other ways of communication.
30//!
31//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
32//! share the same behavior from `Overseer`'s point of view.
33//!
34//! ```text
35//!                              +-----------------------------+
36//!                              |         Overseer            |
37//!                              +-----------------------------+
38//!
39//!             ................|  Overseer "holds" these and uses |..............
40//!             .                  them to (re)start things                      .
41//!             .                                                                .
42//!             .  +-------------------+                +---------------------+  .
43//!             .  |   Subsystem1      |                |   Subsystem2        |  .
44//!             .  +-------------------+                +---------------------+  .
45//!             .           |                                       |            .
46//!             ..................................................................
47//!                         |                                       |
48//!                       start()                                 start()
49//!                         V                                       V
50//!             ..................| Overseer "runs" these |.......................
51//!             .  +--------------------+               +---------------------+  .
52//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
53//!             .  +--------------------+               +---------------------+  .
54//!             ..................................................................
55//! ```
56//!
57//! [overseer-page]: https://docs.pezkuwichain.io/sdk/book/node/overseer.html
58
59// #![deny(unused_results)]
60// unused dependencies can not work for test and examples at the same time
61// yielding false positives
62#![warn(missing_docs)]
63// TODO https://github.com/pezkuwichain/pezkuwi-sdk/issues/144
64#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67	collections::{hash_map, HashMap},
68	fmt::{self, Debug},
69	pin::Pin,
70	sync::Arc,
71	time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use pezkuwi_primitives::{Block, BlockNumber, Hash};
77use pezsc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use pezkuwi_node_subsystem_types::messages::{
81	ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82	AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83	BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84	ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85	DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86	NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveTeyrchainsMessage,
87	ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use pezkuwi_node_subsystem_types::{
91	errors::{SubsystemError, SubsystemResult},
92	ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93	UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99/// A dummy subsystem, mostly useful for placeholders and tests.
100pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use pezkuwi_node_metrics::{
104	metrics::{prometheus, Metrics as MetricsTrait},
105	Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110	contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111	NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112	Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113	SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114	TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use pezsp_core::traits::SpawnNamed;
123
124/// Glue to connect `trait orchestra::Spawner` and `SpawnNamed` from `bizinikiwi`.
125pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128	fn as_ref(&self) -> &S {
129		&self.0
130	}
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134	fn clone(&self) -> Self {
135		Self(self.0.clone())
136	}
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140	fn spawn_blocking(
141		&self,
142		name: &'static str,
143		group: Option<&'static str>,
144		future: futures::future::BoxFuture<'static, ()>,
145	) {
146		SpawnNamed::spawn_blocking(&self.0, name, group, future)
147	}
148	fn spawn(
149		&self,
150		name: &'static str,
151		group: Option<&'static str>,
152		future: futures::future::BoxFuture<'static, ()>,
153	) {
154		SpawnNamed::spawn(&self.0, name, group, future)
155	}
156}
157
158/// Whether a header supports teyrchain consensus or not.
159#[async_trait::async_trait]
160pub trait HeadSupportsTeyrchains {
161	/// Return true if the given header supports teyrchain consensus. Otherwise, false.
162	async fn head_supports_teyrchains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsTeyrchains for Arc<Client>
167where
168	Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170	async fn head_supports_teyrchains(&self, head: &Hash) -> bool {
171		// Check that the `TeyrchainHost` runtime api is at least with version 1 present on chain.
172		self.api_version_teyrchain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173	}
174}
175
176/// A handle used to communicate with the [`Overseer`].
177///
178/// [`Overseer`]: struct.Overseer.html
179#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183	/// Create a new [`Handle`].
184	pub fn new(raw: OverseerHandle) -> Self {
185		Self(raw)
186	}
187
188	/// Inform the `Overseer` that that some block was imported.
189	pub async fn block_imported(&mut self, block: BlockInfo) {
190		self.send_and_log_error(Event::BlockImported(block)).await
191	}
192
193	/// Send some message with normal priority to one of the `Subsystem`s.
194	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195		self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await
196	}
197
198	/// Send some message with the specified priority to one of the `Subsystem`s.
199	pub async fn send_msg_with_priority(
200		&mut self,
201		msg: impl Into<AllMessages>,
202		origin: &'static str,
203		priority: PriorityLevel,
204	) {
205		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
206			.await
207	}
208
209	/// Send a message not providing an origin.
210	#[inline(always)]
211	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
212		self.send_msg(msg, "").await
213	}
214
215	/// Inform the `Overseer` that some block was finalized.
216	pub async fn block_finalized(&mut self, block: BlockInfo) {
217		self.send_and_log_error(Event::BlockFinalized(block)).await
218	}
219
220	/// Wait for a block with the given hash to be in the active-leaves set.
221	///
222	/// The response channel responds if the hash was activated and is closed if the hash was
223	/// deactivated. Note that due the fact the overseer doesn't store the whole active-leaves set,
224	/// only deltas, the response channel may never return if the hash was deactivated before this
225	/// call. In this case, it's the caller's responsibility to ensure a timeout is set.
226	pub async fn wait_for_activation(
227		&mut self,
228		hash: Hash,
229		response_channel: oneshot::Sender<SubsystemResult<()>>,
230	) {
231		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
232			hash,
233			response_channel,
234		}))
235		.await;
236	}
237
238	/// Tell `Overseer` to shutdown.
239	pub async fn stop(&mut self) {
240		self.send_and_log_error(Event::Stop).await;
241	}
242
243	/// Most basic operation, to stop a server.
244	async fn send_and_log_error(&mut self, event: Event) {
245		if self.0.send(event).await.is_err() {
246			gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
247		}
248	}
249}
250
251/// An event telling the `Overseer` on the particular block
252/// that has been imported or finalized.
253///
254/// This structure exists solely for the purposes of decoupling
255/// `Overseer` code from the client code and the necessity to call
256/// `HeaderBackend::block_number_from_id()`.
257#[derive(Debug, Clone)]
258pub struct BlockInfo {
259	/// Hash of the block.
260	pub hash: Hash,
261	/// Hash of the parent block.
262	pub parent_hash: Hash,
263	/// Block's number.
264	pub number: BlockNumber,
265	/// A handle to unpin the block on drop.
266	pub unpin_handle: UnpinHandle,
267}
268
269impl From<BlockImportNotification<Block>> for BlockInfo {
270	fn from(n: BlockImportNotification<Block>) -> Self {
271		let hash = n.hash;
272		let parent_hash = n.header.parent_hash;
273		let number = n.header.number;
274		let unpin_handle = n.into_unpin_handle();
275
276		BlockInfo { hash, parent_hash, number, unpin_handle }
277	}
278}
279
280impl From<FinalityNotification<Block>> for BlockInfo {
281	fn from(n: FinalityNotification<Block>) -> Self {
282		let hash = n.hash;
283		let parent_hash = n.header.parent_hash;
284		let number = n.header.number;
285		let unpin_handle = n.into_unpin_handle();
286
287		BlockInfo { hash, parent_hash, number, unpin_handle }
288	}
289}
290
291/// An event from outside the overseer scope, such
292/// as the bizinikiwi framework or user interaction.
293#[derive(Debug)]
294pub enum Event {
295	/// A new block was imported.
296	///
297	/// This event is not sent if the block was already known
298	/// and we reorged to it e.g. due to a reversion.
299	///
300	/// Also, these events are not sent during a major sync.
301	BlockImported(BlockInfo),
302	/// A block was finalized with i.e. babe or another consensus algorithm.
303	BlockFinalized(BlockInfo),
304	/// Message as sent to a subsystem.
305	MsgToSubsystem {
306		/// The actual message.
307		msg: AllMessages,
308		/// The originating subsystem name.
309		origin: &'static str,
310		/// The priority of the message.
311		priority: PriorityLevel,
312	},
313	/// A request from the outer world.
314	ExternalRequest(ExternalRequest),
315	/// Stop the overseer on i.e. a UNIX signal.
316	Stop,
317}
318
319/// Some request from outer world.
320#[derive(Debug)]
321pub enum ExternalRequest {
322	/// Wait for the activation of a particular hash
323	/// and be notified by means of the return channel.
324	WaitForActivation {
325		/// The relay parent for which activation to wait for.
326		hash: Hash,
327		/// Response channel to await on.
328		response_channel: oneshot::Sender<SubsystemResult<()>>,
329	},
330}
331
332/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
333/// import and finality notifications into the [`OverseerHandle`].
334pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
335	let mut finality = client.finality_notification_stream();
336	let mut imports = client.import_notification_stream();
337
338	loop {
339		select! {
340			f = finality.next() => {
341				match f {
342					Some(block) => {
343						handle.block_finalized(block.into()).await;
344					}
345					None => break,
346				}
347			},
348			i = imports.next() => {
349				match i {
350					Some(block) => {
351						handle.block_imported(block.into()).await;
352					}
353					None => break,
354				}
355			},
356			complete => break,
357		}
358	}
359}
360
361/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
362///
363/// This returns the overseer along with an [`OverseerHandle`] which can
364/// be used to send messages from external parts of the codebase.
365///
366/// The [`OverseerHandle`] returned from this function is connected to
367/// the returned [`Overseer`].
368///
369/// ```text
370///                  +------------------------------------+
371///                  |            Overseer                |
372///                  +------------------------------------+
373///                    /            |             |      \
374///      ................. subsystems...................................
375///      . +-----------+    +-----------+   +----------+   +---------+ .
376///      . |           |    |           |   |          |   |         | .
377///      . +-----------+    +-----------+   +----------+   +---------+ .
378///      ...............................................................
379///                              |
380///                        probably `spawn`
381///                            a `job`
382///                              |
383///                              V
384///                         +-----------+
385///                         |           |
386///                         +-----------+
387/// ```
388///
389/// [`Subsystem`]: trait.Subsystem.html
390///
391/// # Example
392///
393/// The [`Subsystems`] may be any type as long as they implement an expected interface.
394/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with
395/// them. For the sake of simplicity the termination of the example is done with a timeout.
396/// ```
397/// # use std::time::Duration;
398/// # use futures::{executor, pin_mut, select, FutureExt};
399/// # use futures_timer::Delay;
400/// # use pezkuwi_primitives::Hash;
401/// # use pezkuwi_overseer::{
402/// # 	self as overseer,
403/// #   OverseerSignal,
404/// # 	SubsystemSender as _,
405/// # 	AllMessages,
406/// # 	HeadSupportsTeyrchains,
407/// # 	Overseer,
408/// # 	SubsystemError,
409/// # 	gen::{
410/// # 		SubsystemContext,
411/// # 		FromOrchestra,
412/// # 		SpawnedSubsystem,
413/// # 	},
414/// # };
415/// # use pezkuwi_node_subsystem_types::messages::{
416/// # 	CandidateValidationMessage, CandidateBackingMessage,
417/// # 	NetworkBridgeTxMessage,
418/// # };
419///
420/// struct ValidationSubsystem;
421///
422/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
423/// where
424///     Ctx: overseer::SubsystemContext<
425/// 				Message=CandidateValidationMessage,
426/// 				AllMessages=AllMessages,
427/// 				Signal=OverseerSignal,
428/// 				Error=SubsystemError,
429/// 			>,
430/// {
431///     fn start(
432///         self,
433///         mut ctx: Ctx,
434///     ) -> SpawnedSubsystem<SubsystemError> {
435///         SpawnedSubsystem {
436///             name: "validation-subsystem",
437///             future: Box::pin(async move {
438///                 loop {
439///                     Delay::new(Duration::from_secs(1)).await;
440///                 }
441///             }),
442///         }
443///     }
444/// }
445///
446/// # fn main() { executor::block_on(async move {
447///
448/// struct AlwaysSupportsTeyrchains;
449///
450/// #[async_trait::async_trait]
451/// impl HeadSupportsTeyrchains for AlwaysSupportsTeyrchains {
452///      async fn head_supports_teyrchains(&self, _head: &Hash) -> bool { true }
453/// }
454///
455/// let spawner = pezsp_core::testing::TaskExecutor::new();
456/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsTeyrchains, None)
457/// 		.unwrap()
458/// 		.replace_candidate_validation(|_| ValidationSubsystem)
459/// 		.build()
460/// 		.unwrap();
461///
462/// let timer = Delay::new(Duration::from_millis(50)).fuse();
463///
464/// let overseer_fut = overseer.run().fuse();
465/// pin_mut!(timer);
466/// pin_mut!(overseer_fut);
467///
468/// select! {
469///     _ = overseer_fut => (),
470///     _ = timer => (),
471/// }
472/// #
473/// # 	});
474/// # }
475/// ```
476#[orchestra(
477	gen=AllMessages,
478	event=Event,
479	signal=OverseerSignal,
480	error=SubsystemError,
481	message_capacity=2048,
482)]
483pub struct Overseer<SupportsTeyrchains> {
484	#[subsystem(CandidateValidationMessage, sends: [
485		ChainApiMessage,
486		RuntimeApiMessage,
487	])]
488	candidate_validation: CandidateValidation,
489
490	#[subsystem(sends: [
491		CandidateValidationMessage,
492		RuntimeApiMessage,
493	])]
494	pvf_checker: PvfChecker,
495
496	#[subsystem(CandidateBackingMessage, sends: [
497		CandidateValidationMessage,
498		CollatorProtocolMessage,
499		ChainApiMessage,
500		AvailabilityDistributionMessage,
501		AvailabilityStoreMessage,
502		StatementDistributionMessage,
503		ProvisionerMessage,
504		RuntimeApiMessage,
505		ProspectiveTeyrchainsMessage,
506	])]
507	candidate_backing: CandidateBacking,
508
509	#[subsystem(StatementDistributionMessage, sends: [
510		NetworkBridgeTxMessage,
511		CandidateBackingMessage,
512		RuntimeApiMessage,
513		ProspectiveTeyrchainsMessage,
514		ChainApiMessage,
515	], can_receive_priority_messages)]
516	statement_distribution: StatementDistribution,
517
518	#[subsystem(AvailabilityDistributionMessage, sends: [
519		AvailabilityStoreMessage,
520		ChainApiMessage,
521		RuntimeApiMessage,
522		NetworkBridgeTxMessage,
523	])]
524	availability_distribution: AvailabilityDistribution,
525
526	#[subsystem(AvailabilityRecoveryMessage, sends: [
527		NetworkBridgeTxMessage,
528		RuntimeApiMessage,
529		AvailabilityStoreMessage,
530	])]
531	availability_recovery: AvailabilityRecovery,
532
533	#[subsystem(blocking, sends: [
534		AvailabilityStoreMessage,
535		RuntimeApiMessage,
536		BitfieldDistributionMessage,
537	])]
538	bitfield_signing: BitfieldSigning,
539
540	#[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
541		RuntimeApiMessage,
542		NetworkBridgeTxMessage,
543		ProvisionerMessage,
544	], can_receive_priority_messages)]
545	bitfield_distribution: BitfieldDistribution,
546
547	#[subsystem(ProvisionerMessage, sends: [
548		RuntimeApiMessage,
549		CandidateBackingMessage,
550		DisputeCoordinatorMessage,
551		ProspectiveTeyrchainsMessage,
552		ChainApiMessage,
553	])]
554	provisioner: Provisioner,
555
556	#[subsystem(blocking, RuntimeApiMessage, sends: [])]
557	runtime_api: RuntimeApi,
558
559	#[subsystem(blocking, AvailabilityStoreMessage, sends: [
560		ChainApiMessage,
561		RuntimeApiMessage,
562	])]
563	availability_store: AvailabilityStore,
564
565	#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
566		BitfieldDistributionMessage,
567		StatementDistributionMessage,
568		ApprovalVotingParallelMessage,
569		GossipSupportMessage,
570		DisputeDistributionMessage,
571		CollationGenerationMessage,
572		CollatorProtocolMessage,
573	])]
574	network_bridge_rx: NetworkBridgeRx,
575
576	#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
577	network_bridge_tx: NetworkBridgeTx,
578
579	#[subsystem(blocking, ChainApiMessage, sends: [])]
580	chain_api: ChainApi,
581
582	#[subsystem(CollationGenerationMessage, sends: [
583		RuntimeApiMessage,
584		CollatorProtocolMessage,
585	])]
586	collation_generation: CollationGeneration,
587
588	#[subsystem(CollatorProtocolMessage, sends: [
589		NetworkBridgeTxMessage,
590		RuntimeApiMessage,
591		CandidateBackingMessage,
592		ChainApiMessage,
593		ProspectiveTeyrchainsMessage,
594	])]
595	collator_protocol: CollatorProtocol,
596
597	#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
598		NetworkBridgeTxMessage,
599		ApprovalVotingMessage,
600		RuntimeApiMessage,
601	], can_receive_priority_messages)]
602	approval_distribution: ApprovalDistribution,
603
604	#[subsystem(blocking, ApprovalVotingMessage, sends: [
605		ApprovalDistributionMessage,
606		AvailabilityRecoveryMessage,
607		CandidateValidationMessage,
608		ChainApiMessage,
609		ChainSelectionMessage,
610		DisputeCoordinatorMessage,
611		RuntimeApiMessage,
612	])]
613	approval_voting: ApprovalVoting,
614	#[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
615		AvailabilityRecoveryMessage,
616		CandidateValidationMessage,
617		ChainApiMessage,
618		ChainSelectionMessage,
619		DisputeCoordinatorMessage,
620		RuntimeApiMessage,
621		NetworkBridgeTxMessage,
622		ApprovalVotingParallelMessage,
623	], can_receive_priority_messages)]
624	approval_voting_parallel: ApprovalVotingParallel,
625	#[subsystem(GossipSupportMessage, sends: [
626		NetworkBridgeTxMessage,
627		NetworkBridgeRxMessage, // TODO <https://github.com/pezkuwichain/pezkuwi-sdk/issues/303>
628		RuntimeApiMessage,
629		ChainSelectionMessage,
630		ChainApiMessage,
631	], can_receive_priority_messages)]
632	gossip_support: GossipSupport,
633
634	#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
635		RuntimeApiMessage,
636		ChainApiMessage,
637		DisputeDistributionMessage,
638		CandidateValidationMessage,
639		AvailabilityStoreMessage,
640		AvailabilityRecoveryMessage,
641		ChainSelectionMessage,
642		ApprovalVotingParallelMessage,
643	], can_receive_priority_messages)]
644	dispute_coordinator: DisputeCoordinator,
645
646	#[subsystem(DisputeDistributionMessage, sends: [
647		RuntimeApiMessage,
648		DisputeCoordinatorMessage,
649		NetworkBridgeTxMessage,
650	])]
651	dispute_distribution: DisputeDistribution,
652
653	#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
654	chain_selection: ChainSelection,
655
656	#[subsystem(ProspectiveTeyrchainsMessage, sends: [
657		RuntimeApiMessage,
658		ChainApiMessage,
659	])]
660	prospective_teyrchains: ProspectiveTeyrchains,
661
662	/// External listeners waiting for a hash to be in the active-leave set.
663	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
664
665	/// The set of the "active leaves".
666	pub active_leaves: HashMap<Hash, BlockNumber>,
667
668	/// An implementation for checking whether a header supports teyrchain consensus.
669	pub supports_teyrchains: SupportsTeyrchains,
670
671	/// Various Prometheus metrics.
672	pub metrics: OverseerMetrics,
673}
674
675/// Spawn the metrics metronome task.
676pub fn spawn_metronome_metrics<S, SupportsTeyrchains>(
677	overseer: &mut Overseer<S, SupportsTeyrchains>,
678	metronome_metrics: OverseerMetrics,
679) -> Result<(), SubsystemError>
680where
681	S: Spawner,
682	SupportsTeyrchains: HeadSupportsTeyrchains,
683{
684	struct ExtractNameAndMeters;
685
686	impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
687		type Output = Option<(&'static str, SubsystemMeters)>;
688
689		fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
690			subsystem
691				.instance
692				.as_ref()
693				.map(|instance| (instance.name, instance.meters.clone()))
694		}
695	}
696	let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
697
698	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
699	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
700		match memory_stats::MemoryAllocationTracker::new() {
701			Ok(memory_stats) => {
702				Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
703					Ok(memory_stats_snapshot) => {
704						gum::trace!(
705							target: LOG_TARGET,
706							"memory_stats: {:?}",
707							&memory_stats_snapshot
708						);
709						metrics.memory_stats_snapshot(memory_stats_snapshot);
710					},
711					Err(e) => {
712						gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e)
713					},
714				})
715			},
716			Err(_) => {
717				gum::debug!(
718					target: LOG_TARGET,
719					"Memory allocation tracking is not supported by the allocator.",
720				);
721
722				Box::new(|_| {})
723			},
724		};
725
726	#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
727	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
728
729	let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
730		collect_memory_stats(&metronome_metrics);
731
732		// We combine the amount of messages from subsystems to the overseer
733		// as well as the amount of messages from external sources to the overseer
734		// into one `to_overseer` value.
735		metronome_metrics.channel_metrics_snapshot(
736			subsystem_meters
737				.iter()
738				.cloned()
739				.flatten()
740				.map(|(name, ref meters)| (name, meters.read())),
741		);
742
743		futures::future::ready(())
744	});
745	overseer
746		.spawner()
747		.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
748
749	Ok(())
750}
751
752impl<S, SupportsTeyrchains> Overseer<S, SupportsTeyrchains>
753where
754	SupportsTeyrchains: HeadSupportsTeyrchains,
755	S: Spawner,
756{
757	/// Stop the `Overseer`.
758	async fn stop(mut self) {
759		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
760	}
761
762	/// Run the `Overseer`.
763	///
764	/// Logging any errors.
765	pub async fn run(self) {
766		if let Err(err) = self.run_inner().await {
767			gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
768		}
769	}
770
771	async fn run_inner(mut self) -> SubsystemResult<()> {
772		let metrics = self.metrics.clone();
773		spawn_metronome_metrics(&mut self, metrics)?;
774
775		loop {
776			select! {
777				msg = self.events_rx.select_next_some() => {
778					match msg {
779						Event::MsgToSubsystem { msg, origin, priority } => {
780							match priority {
781								PriorityLevel::Normal => {
782									self.route_message(msg.into(), origin).await?;
783								},
784								PriorityLevel::High => {
785									self.route_message_with_priority::<HighPriority>(msg.into(), origin).await?;
786								},
787							}
788							self.metrics.on_message_relayed();
789						}
790						Event::Stop => {
791							self.stop().await;
792							return Ok(());
793						}
794						Event::BlockImported(block) => {
795							self.block_imported(block).await?;
796						}
797						Event::BlockFinalized(block) => {
798							self.block_finalized(block).await?;
799						}
800						Event::ExternalRequest(request) => {
801							self.handle_external_request(request);
802						}
803					}
804				},
805				msg = self.to_orchestra_rx.select_next_some() => {
806					match msg {
807						ToOrchestra::SpawnJob { name, subsystem, s } => {
808							self.spawn_job(name, subsystem, s);
809						}
810						ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
811							self.spawn_blocking_job(name, subsystem, s);
812						}
813					}
814				},
815				res = self.running_subsystems.select_next_some() => {
816					gum::error!(
817						target: LOG_TARGET,
818						subsystem = ?res,
819						"subsystem finished unexpectedly",
820					);
821					self.stop().await;
822					return res;
823				},
824			}
825		}
826	}
827
828	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
829		match self.active_leaves.entry(block.hash) {
830			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
831			hash_map::Entry::Occupied(entry) => {
832				debug_assert_eq!(*entry.get(), block.number);
833				return Ok(());
834			},
835		};
836
837		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
838			Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
839				hash: block.hash,
840				number: block.number,
841				unpin_handle: block.unpin_handle,
842			}),
843			None => ActiveLeavesUpdate::default(),
844		};
845
846		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
847			debug_assert_eq!(block.number.saturating_sub(1), number);
848			update.deactivated.push(block.parent_hash);
849			self.on_head_deactivated(&block.parent_hash);
850		}
851
852		self.clean_up_external_listeners();
853
854		if !update.is_empty() {
855			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
856		}
857		Ok(())
858	}
859
860	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
861		let mut update = ActiveLeavesUpdate::default();
862
863		self.active_leaves.retain(|h, n| {
864			// prune all orphaned leaves, but don't prune
865			// the finalized block if it is itself a leaf.
866			if *n <= block.number && *h != block.hash {
867				update.deactivated.push(*h);
868				false
869			} else {
870				true
871			}
872		});
873
874		for deactivated in &update.deactivated {
875			self.on_head_deactivated(deactivated)
876		}
877
878		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
879			.await?;
880
881		// If there are no leaves being deactivated, we don't need to send an update.
882		//
883		// Our peers will be informed about our finalized block the next time we
884		// activating/deactivating some leaf.
885		if !update.is_empty() {
886			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
887		}
888
889		Ok(())
890	}
891
892	/// Handles a header activation. If the header's state doesn't support the teyrchains API,
893	/// this returns `None`.
894	async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
895		if !self.supports_teyrchains.head_supports_teyrchains(hash).await {
896			return None;
897		}
898
899		self.metrics.on_head_activated();
900		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
901			gum::trace!(
902				target: LOG_TARGET,
903				relay_parent = ?hash,
904				"Leaf got activated, notifying external listeners"
905			);
906			for listener in listeners {
907				// it's fine if the listener is no longer interested
908				let _ = listener.send(Ok(()));
909			}
910		}
911
912		Some(())
913	}
914
915	fn on_head_deactivated(&mut self, hash: &Hash) {
916		self.metrics.on_head_deactivated();
917		self.activation_external_listeners.remove(hash);
918	}
919
920	fn clean_up_external_listeners(&mut self) {
921		self.activation_external_listeners.retain(|_, v| {
922			// remove dead listeners
923			v.retain(|c| !c.is_canceled());
924			!v.is_empty()
925		})
926	}
927
928	fn handle_external_request(&mut self, request: ExternalRequest) {
929		match request {
930			ExternalRequest::WaitForActivation { hash, response_channel } => {
931				if self.active_leaves.get(&hash).is_some() {
932					gum::trace!(
933						target: LOG_TARGET,
934						relay_parent = ?hash,
935						"Leaf was already ready - answering `WaitForActivation`"
936					);
937					// it's fine if the listener is no longer interested
938					let _ = response_channel.send(Ok(()));
939				} else {
940					gum::trace!(
941						target: LOG_TARGET,
942						relay_parent = ?hash,
943						"Leaf not yet ready - queuing `WaitForActivation` sender"
944					);
945					self.activation_external_listeners
946						.entry(hash)
947						.or_default()
948						.push(response_channel);
949				}
950			},
951		}
952	}
953
954	fn spawn_job(
955		&mut self,
956		task_name: &'static str,
957		subsystem_name: Option<&'static str>,
958		j: BoxFuture<'static, ()>,
959	) {
960		self.spawner.spawn(task_name, subsystem_name, j);
961	}
962
963	fn spawn_blocking_job(
964		&mut self,
965		task_name: &'static str,
966		subsystem_name: Option<&'static str>,
967		j: BoxFuture<'static, ()>,
968	) {
969		self.spawner.spawn_blocking(task_name, subsystem_name, j);
970	}
971}