wire_framework/task/
mod.rs

1//! Tasks define the "runnable" concept of the service, e.g. a unit of work that can be executed by the service.
2
3use std::{
4    fmt::{self, Formatter},
5    sync::Arc,
6};
7
8use tokio::sync::Barrier;
9
10pub use self::types::{TaskId, TaskKind};
11use crate::service::StopReceiver;
12
13mod types;
14
15/// A task implementation.
16/// Task defines the "runnable" concept of the service, e.g. a unit of work that can be executed by the service.
17///
18/// Based on the task kind, the implemenation will be treated differently by the service.
19///
20/// ## Task kinds
21///
22/// There may be different kinds of tasks:
23///
24/// ### `Task`
25///
26/// A regular task. Returning from this task will cause the service to stop. [`Task::kind`] has a default
27/// implementation that returns `TaskKind::Task`.
28///
29/// Typically, the implementation of [`Task::run`] will be some form of loop that runs until either an
30/// irrecoverable error happens (then task should return an error), or stop signal is received (then task should
31/// return `Ok(())`).
32///
33/// ### `OneshotTask`
34///
35/// A task that can exit when completed without causing the service to terminate.
36/// In case of `OneshotTask`s, the service will only exit when all the `OneshotTask`s have exited and there are
37/// no more tasks running.
38///
39/// ### `Precondition`
40///
41/// A "barrier" task that is supposed to check invariants before the main tasks are started.
42/// An example of a precondition task could be a task that checks if the database has all the required data.
43/// Precondition tasks are often paired with some other kind of task that will make sure that the precondition
44/// can be satisfied. This is required for a distributed service setup, where the precondition task will be
45/// present on all the nodes, while a task that satisfies the precondition will be present only on one node.
46///
47/// ### `UnconstrainedTask`
48///
49/// A task that can run without waiting for preconditions.
50/// Tasks of this kind are expected to check all the invariants they rely on themselves.
51/// Usually, this kind of task is used either for tasks that must start as early as possible (e.g. healthcheck server),
52/// or for tasks that cannot rely on preconditions.
53///
54/// ### `UnconstrainedOneshotTask`
55///
56/// A task that can run without waiting for preconditions and can exit without stopping the service.
57/// Usually such tasks may be used for satisfying a precondition, for example, they can perform the database
58/// setup.
59#[async_trait::async_trait]
60pub trait Task: 'static + Send {
61    /// Returns the kind of the task.
62    /// The returned values is expected to be static, and it will be used by the service
63    /// to determine how to handle the task.
64    fn kind(&self) -> TaskKind {
65        TaskKind::Task
66    }
67
68    /// Unique name of the task.
69    fn id(&self) -> TaskId;
70
71    /// Runs the task.
72    async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> eyre::Result<()>;
73}
74
75impl dyn Task {
76    /// An internal helper method that guards running the task with a tokio Barrier.
77    /// Used to make sure that the task is not started until all the preconditions are met.
78    pub(super) async fn run_internal(
79        self: Box<Self>,
80        stop_receiver: StopReceiver,
81        preconditions_barrier: Arc<Barrier>,
82    ) -> eyre::Result<()> {
83        match self.kind() {
84            TaskKind::Task | TaskKind::OneshotTask => {
85                self.run_with_barrier(stop_receiver, preconditions_barrier)
86                    .await
87            }
88            TaskKind::UnconstrainedTask | TaskKind::UnconstrainedOneshotTask => {
89                self.run(stop_receiver).await
90            }
91            TaskKind::Precondition => {
92                self.check_precondition(stop_receiver, preconditions_barrier)
93                    .await
94            }
95        }
96    }
97
98    async fn run_with_barrier(
99        self: Box<Self>,
100        mut stop_receiver: StopReceiver,
101        preconditions_barrier: Arc<Barrier>,
102    ) -> eyre::Result<()> {
103        // Wait either for barrier to be lifted or for the stop signal to be received.
104        tokio::select! {
105            _ = preconditions_barrier.wait() => {
106                self.run(stop_receiver).await
107            }
108            _ = stop_receiver.0.changed() => {
109                Ok(())
110            }
111        }
112    }
113
114    async fn check_precondition(
115        self: Box<Self>,
116        mut stop_receiver: StopReceiver,
117        preconditions_barrier: Arc<Barrier>,
118    ) -> eyre::Result<()> {
119        self.run(stop_receiver.clone()).await?;
120        tokio::select! {
121            _ = preconditions_barrier.wait() => {
122                Ok(())
123            }
124            _ = stop_receiver.0.changed() => {
125                Ok(())
126            }
127        }
128    }
129}
130
131impl fmt::Debug for dyn Task {
132    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
133        f.debug_struct("Task")
134            .field("kind", &self.kind())
135            .field("name", &self.id())
136            .finish()
137    }
138}