Skip to main content

soil_service/task_manager/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Substrate service tasks management module.
8
9use crate::{config::TaskType, Error};
10use exit_future::Signal;
11use futures::{
12	future::{pending, select, try_join_all, BoxFuture, Either},
13	Future, FutureExt, StreamExt,
14};
15use parking_lot::Mutex;
16use soil_prometheus::{
17	exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
18	Registry, U64,
19};
20use soil_client::utils::mpsc::{
21	tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
22};
23use std::{
24	collections::{hash_map::Entry, HashMap},
25	panic,
26	pin::Pin,
27	result::Result,
28	sync::Arc,
29};
30use tokio::runtime::Handle;
31use tracing_futures::Instrument;
32
33mod prometheus_future;
34#[cfg(test)]
35mod tests;
36
37/// Default task group name.
38pub const DEFAULT_GROUP_NAME: &str = "default";
39
40/// The name of a group a task belongs to.
41///
42/// This name is passed belong-side the task name to the prometheus metrics and can be used
43/// to group tasks.
44pub enum GroupName {
45	/// Sets the group name to `default`.
46	Default,
47	/// Use the specifically given name as group name.
48	Specific(&'static str),
49}
50
51impl From<Option<&'static str>> for GroupName {
52	fn from(name: Option<&'static str>) -> Self {
53		match name {
54			Some(name) => Self::Specific(name),
55			None => Self::Default,
56		}
57	}
58}
59
60impl From<&'static str> for GroupName {
61	fn from(name: &'static str) -> Self {
62		Self::Specific(name)
63	}
64}
65
66/// An handle for spawning tasks in the service.
67#[derive(Clone)]
68pub struct SpawnTaskHandle {
69	on_exit: exit_future::Exit,
70	tokio_handle: Handle,
71	metrics: Option<Metrics>,
72	task_registry: TaskRegistry,
73}
74
75impl SpawnTaskHandle {
76	/// Spawns the given task with the given name and a group name.
77	/// If group is not specified `DEFAULT_GROUP_NAME` will be used.
78	///
79	/// Note that the `name` is a `&'static str`. The reason for this choice is that
80	/// statistics about this task are getting reported to the Prometheus endpoint (if enabled), and
81	/// that therefore the set of possible task names must be bounded.
82	///
83	/// In other words, it would be a bad idea for someone to do for example
84	/// `spawn(format!("{:?}", some_public_key))`.
85	pub fn spawn(
86		&self,
87		name: &'static str,
88		group: impl Into<GroupName>,
89		task: impl Future<Output = ()> + Send + 'static,
90	) {
91		self.spawn_inner(name, group, task, TaskType::Async)
92	}
93
94	/// Spawns the blocking task with the given name. See also `spawn`.
95	pub fn spawn_blocking(
96		&self,
97		name: &'static str,
98		group: impl Into<GroupName>,
99		task: impl Future<Output = ()> + Send + 'static,
100	) {
101		self.spawn_inner(name, group, task, TaskType::Blocking)
102	}
103
104	/// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`.
105	fn spawn_inner(
106		&self,
107		name: &'static str,
108		group: impl Into<GroupName>,
109		task: impl Future<Output = ()> + Send + 'static,
110		task_type: TaskType,
111	) {
112		let on_exit = self.on_exit.clone();
113		let metrics = self.metrics.clone();
114		let registry = self.task_registry.clone();
115
116		let group = match group.into() {
117			GroupName::Specific(var) => var,
118			// If no group is specified use default.
119			GroupName::Default => DEFAULT_GROUP_NAME,
120		};
121
122		let task_type_label = match task_type {
123			TaskType::Blocking => "blocking",
124			TaskType::Async => "async",
125		};
126
127		// Note that we increase the started counter here and not within the future. This way,
128		// we could properly visualize on Prometheus situations where the spawning doesn't work.
129		if let Some(metrics) = &self.metrics {
130			metrics.tasks_spawned.with_label_values(&[name, group, task_type_label]).inc();
131			// We do a dummy increase in order for the task to show up in metrics.
132			metrics
133				.tasks_ended
134				.with_label_values(&[name, "finished", group, task_type_label])
135				.inc_by(0);
136		}
137
138		let future = async move {
139			// Register the task and keep the "token" alive until the task is ended. Then this
140			// "token" will unregister this task.
141			let _registry_token = registry.register_task(name, group);
142
143			if let Some(metrics) = metrics {
144				// Add some wrappers around `task`.
145				let task = {
146					let poll_duration =
147						metrics.poll_duration.with_label_values(&[name, group, task_type_label]);
148					let poll_start =
149						metrics.poll_start.with_label_values(&[name, group, task_type_label]);
150					let inner =
151						prometheus_future::with_poll_durations(poll_duration, poll_start, task);
152					// The logic of `AssertUnwindSafe` here is ok considering that we throw
153					// away the `Future` after it has panicked.
154					panic::AssertUnwindSafe(inner).catch_unwind()
155				};
156				futures::pin_mut!(task);
157
158				match select(on_exit, task).await {
159					Either::Right((Err(payload), _)) => {
160						metrics
161							.tasks_ended
162							.with_label_values(&[name, "panic", group, task_type_label])
163							.inc();
164						panic::resume_unwind(payload)
165					},
166					Either::Right((Ok(()), _)) => {
167						metrics
168							.tasks_ended
169							.with_label_values(&[name, "finished", group, task_type_label])
170							.inc();
171					},
172					Either::Left(((), _)) => {
173						// The `on_exit` has triggered.
174						metrics
175							.tasks_ended
176							.with_label_values(&[name, "interrupted", group, task_type_label])
177							.inc();
178					},
179				}
180			} else {
181				futures::pin_mut!(task);
182				let _ = select(on_exit, task).await;
183			}
184		}
185		.in_current_span();
186
187		match task_type {
188			TaskType::Async => {
189				self.tokio_handle.spawn(future);
190			},
191			TaskType::Blocking => {
192				let handle = self.tokio_handle.clone();
193				self.tokio_handle.spawn_blocking(move || {
194					handle.block_on(future);
195				});
196			},
197		}
198	}
199}
200
201impl subsoil::core::traits::SpawnNamed for SpawnTaskHandle {
202	fn spawn_blocking(
203		&self,
204		name: &'static str,
205		group: Option<&'static str>,
206		future: BoxFuture<'static, ()>,
207	) {
208		self.spawn_inner(name, group, future, TaskType::Blocking)
209	}
210
211	fn spawn(
212		&self,
213		name: &'static str,
214		group: Option<&'static str>,
215		future: BoxFuture<'static, ()>,
216	) {
217		self.spawn_inner(name, group, future, TaskType::Async)
218	}
219}
220
221/// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any
222/// task spawned through it fails. The service should be on the receiver side
223/// and will shut itself down whenever it receives any message, i.e. an
224/// essential task has failed.
225#[derive(Clone)]
226pub struct SpawnEssentialTaskHandle {
227	essential_failed_tx: TracingUnboundedSender<()>,
228	inner: SpawnTaskHandle,
229}
230
231impl SpawnEssentialTaskHandle {
232	/// Creates a new `SpawnEssentialTaskHandle`.
233	pub fn new(
234		essential_failed_tx: TracingUnboundedSender<()>,
235		spawn_task_handle: SpawnTaskHandle,
236	) -> SpawnEssentialTaskHandle {
237		SpawnEssentialTaskHandle { essential_failed_tx, inner: spawn_task_handle }
238	}
239
240	/// Spawns the given task with the given name.
241	///
242	/// See also [`SpawnTaskHandle::spawn`].
243	pub fn spawn(
244		&self,
245		name: &'static str,
246		group: impl Into<GroupName>,
247		task: impl Future<Output = ()> + Send + 'static,
248	) {
249		self.spawn_inner(name, group, task, TaskType::Async)
250	}
251
252	/// Spawns the blocking task with the given name.
253	///
254	/// See also [`SpawnTaskHandle::spawn_blocking`].
255	pub fn spawn_blocking(
256		&self,
257		name: &'static str,
258		group: impl Into<GroupName>,
259		task: impl Future<Output = ()> + Send + 'static,
260	) {
261		self.spawn_inner(name, group, task, TaskType::Blocking)
262	}
263
264	fn spawn_inner(
265		&self,
266		name: &'static str,
267		group: impl Into<GroupName>,
268		task: impl Future<Output = ()> + Send + 'static,
269		task_type: TaskType,
270	) {
271		let essential_failed = self.essential_failed_tx.clone();
272		let essential_task = std::panic::AssertUnwindSafe(task).catch_unwind().map(move |_| {
273			log::error!("Essential task `{}` failed. Shutting down service.", name);
274			let _ = essential_failed.close();
275		});
276
277		let _ = self.inner.spawn_inner(name, group, essential_task, task_type);
278	}
279}
280
281impl subsoil::core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle {
282	fn spawn_essential_blocking(
283		&self,
284		name: &'static str,
285		group: Option<&'static str>,
286		future: BoxFuture<'static, ()>,
287	) {
288		self.spawn_blocking(name, group, future);
289	}
290
291	fn spawn_essential(
292		&self,
293		name: &'static str,
294		group: Option<&'static str>,
295		future: BoxFuture<'static, ()>,
296	) {
297		self.spawn(name, group, future);
298	}
299}
300
301/// Helper struct to manage background/async tasks in Service.
302pub struct TaskManager {
303	/// A future that resolves when the service has exited, this is useful to
304	/// make sure any internally spawned futures stop when the service does.
305	on_exit: exit_future::Exit,
306	/// A signal that makes the exit future above resolve, fired on drop.
307	_signal: Signal,
308	/// Tokio runtime handle that is used to spawn futures.
309	tokio_handle: Handle,
310	/// Prometheus metric where to report the polling times.
311	metrics: Option<Metrics>,
312	/// Send a signal when a spawned essential task has concluded. The next time
313	/// the service future is polled it should complete with an error.
314	essential_failed_tx: TracingUnboundedSender<()>,
315	/// A receiver for spawned essential-tasks concluding.
316	essential_failed_rx: TracingUnboundedReceiver<()>,
317	/// Things to keep alive until the task manager is dropped.
318	keep_alive: Box<dyn std::any::Any + Send>,
319	/// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent
320	/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
321	/// task fails.
322	children: Vec<TaskManager>,
323	/// The registry of all running tasks.
324	task_registry: TaskRegistry,
325}
326
327impl TaskManager {
328	/// If a Prometheus registry is passed, it will be used to report statistics about the
329	/// service tasks.
330	pub fn new(
331		tokio_handle: Handle,
332		prometheus_registry: Option<&Registry>,
333	) -> Result<Self, PrometheusError> {
334		let (signal, on_exit) = exit_future::signal();
335
336		// A side-channel for essential tasks to communicate shutdown.
337		let (essential_failed_tx, essential_failed_rx) =
338			tracing_unbounded("mpsc_essential_tasks", 100);
339
340		let metrics = prometheus_registry.map(Metrics::register).transpose()?;
341
342		Ok(Self {
343			on_exit,
344			_signal: signal,
345			tokio_handle,
346			metrics,
347			essential_failed_tx,
348			essential_failed_rx,
349			keep_alive: Box::new(()),
350			children: Vec::new(),
351			task_registry: Default::default(),
352		})
353	}
354
355	/// Get a handle for spawning tasks.
356	pub fn spawn_handle(&self) -> SpawnTaskHandle {
357		SpawnTaskHandle {
358			on_exit: self.on_exit.clone(),
359			tokio_handle: self.tokio_handle.clone(),
360			metrics: self.metrics.clone(),
361			task_registry: self.task_registry.clone(),
362		}
363	}
364
365	/// Get a handle for spawning essential tasks.
366	pub fn spawn_essential_handle(&self) -> SpawnEssentialTaskHandle {
367		SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle())
368	}
369
370	/// Return a future that will end with success if the signal to terminate was sent
371	/// (`self.terminate()`) or with an error if an essential task fails.
372	///
373	/// # Warning
374	///
375	/// This function will not wait until the end of the remaining task.
376	pub fn future<'a>(
377		&'a mut self,
378	) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
379		Box::pin(async move {
380			let mut t1 = self.essential_failed_rx.next().fuse();
381			let mut t2 = self.on_exit.clone().fuse();
382			let mut t3 = try_join_all(
383				self.children
384					.iter_mut()
385					.map(|x| x.future())
386					// Never end this future if there is no error because if there is no children,
387					// it must not stop
388					.chain(std::iter::once(pending().boxed())),
389			)
390			.fuse();
391
392			futures::select! {
393				_ = t1 => Err(Error::Other("Essential task failed.".into())),
394				_ = t2 => Ok(()),
395				res = t3 => Err(res.map(|_| ()).expect_err("this future never ends; qed")),
396			}
397		})
398	}
399
400	/// Set what the task manager should keep alive, can be called multiple times.
401	pub fn keep_alive<T: 'static + Send>(&mut self, to_keep_alive: T) {
402		// allows this fn to safely called multiple times.
403		use std::mem;
404		let old = mem::replace(&mut self.keep_alive, Box::new(()));
405		self.keep_alive = Box::new((to_keep_alive, old));
406	}
407
408	/// Register another TaskManager to terminate and gracefully shutdown when the parent
409	/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
410	/// task fails. (But don't end the parent if a child's normal task fails.)
411	pub fn add_child(&mut self, child: TaskManager) {
412		self.children.push(child);
413	}
414
415	/// Consume `self` and return the [`TaskRegistry`].
416	///
417	/// This [`TaskRegistry`] can be used to check for still running tasks after this task manager
418	/// was dropped.
419	pub fn into_task_registry(self) -> TaskRegistry {
420		self.task_registry
421	}
422}
423
424#[derive(Clone)]
425struct Metrics {
426	// This list is ordered alphabetically
427	poll_duration: HistogramVec,
428	poll_start: CounterVec<U64>,
429	tasks_spawned: CounterVec<U64>,
430	tasks_ended: CounterVec<U64>,
431}
432
433impl Metrics {
434	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
435		Ok(Self {
436			poll_duration: register(HistogramVec::new(
437				HistogramOpts {
438					common_opts: Opts::new(
439						"substrate_tasks_polling_duration",
440						"Duration in seconds of each invocation of Future::poll"
441					),
442					buckets: exponential_buckets(0.001, 4.0, 9)
443						.expect("function parameters are constant and always valid; qed"),
444				},
445				&["task_name", "task_group", "kind"]
446			)?, registry)?,
447			poll_start: register(CounterVec::new(
448				Opts::new(
449					"substrate_tasks_polling_started_total",
450					"Total number of times we started invoking Future::poll"
451				),
452				&["task_name", "task_group", "kind"]
453			)?, registry)?,
454			tasks_spawned: register(CounterVec::new(
455				Opts::new(
456					"substrate_tasks_spawned_total",
457					"Total number of tasks that have been spawned on the Service"
458				),
459				&["task_name", "task_group", "kind"]
460			)?, registry)?,
461			tasks_ended: register(CounterVec::new(
462				Opts::new(
463					"substrate_tasks_ended_total",
464					"Total number of tasks for which Future::poll has returned Ready(()) or panicked"
465				),
466				&["task_name", "reason", "task_group", "kind"]
467			)?, registry)?,
468		})
469	}
470}
471
472/// Ensures that a [`Task`] is unregistered when this object is dropped.
473struct UnregisterOnDrop {
474	task: Task,
475	registry: TaskRegistry,
476}
477
478impl Drop for UnregisterOnDrop {
479	fn drop(&mut self) {
480		let mut tasks = self.registry.tasks.lock();
481
482		if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) {
483			*entry.get_mut() -= 1;
484
485			if *entry.get() == 0 {
486				entry.remove();
487			}
488		}
489	}
490}
491
492/// Represents a running async task in the [`TaskManager`].
493///
494/// As a task is identified by a name and a group, it is totally valid that there exists multiple
495/// tasks with the same name and group.
496#[derive(Clone, Hash, Eq, PartialEq)]
497pub struct Task {
498	/// The name of the task.
499	pub name: &'static str,
500	/// The group this task is associated to.
501	pub group: &'static str,
502}
503
504impl Task {
505	/// Returns if the `group` is the [`DEFAULT_GROUP_NAME`].
506	pub fn is_default_group(&self) -> bool {
507		self.group == DEFAULT_GROUP_NAME
508	}
509}
510
511/// Keeps track of all running [`Task`]s in [`TaskManager`].
512#[derive(Clone, Default)]
513pub struct TaskRegistry {
514	tasks: Arc<Mutex<HashMap<Task, usize>>>,
515}
516
517impl TaskRegistry {
518	/// Register a task with the given `name` and `group`.
519	///
520	/// Returns [`UnregisterOnDrop`] that ensures that the task is unregistered when this value is
521	/// dropped.
522	fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop {
523		let task = Task { name, group };
524
525		{
526			let mut tasks = self.tasks.lock();
527
528			*(*tasks).entry(task.clone()).or_default() += 1;
529		}
530
531		UnregisterOnDrop { task, registry: self.clone() }
532	}
533
534	/// Returns the running tasks.
535	///
536	/// As a task is only identified by its `name` and `group`, there can be duplicate tasks. The
537	/// number per task represents the concurrently running tasks with the same identifier.
538	pub fn running_tasks(&self) -> HashMap<Task, usize> {
539		(*self.tasks.lock()).clone()
540	}
541}