wire_framework/task/mod.rs
1//! Tasks define the "runnable" concept of the service, e.g. a unit of work that can be executed by the service.
2
3use std::{
4 fmt::{self, Formatter},
5 sync::Arc,
6};
7
8use tokio::sync::Barrier;
9
10pub use self::types::{TaskId, TaskKind};
11use crate::service::StopReceiver;
12
13mod types;
14
15/// A task implementation.
16/// Task defines the "runnable" concept of the service, e.g. a unit of work that can be executed by the service.
17///
18/// Based on the task kind, the implemenation will be treated differently by the service.
19///
20/// ## Task kinds
21///
22/// There may be different kinds of tasks:
23///
24/// ### `Task`
25///
26/// A regular task. Returning from this task will cause the service to stop. [`Task::kind`] has a default
27/// implementation that returns `TaskKind::Task`.
28///
29/// Typically, the implementation of [`Task::run`] will be some form of loop that runs until either an
30/// irrecoverable error happens (then task should return an error), or stop signal is received (then task should
31/// return `Ok(())`).
32///
33/// ### `OneshotTask`
34///
35/// A task that can exit when completed without causing the service to terminate.
36/// In case of `OneshotTask`s, the service will only exit when all the `OneshotTask`s have exited and there are
37/// no more tasks running.
38///
39/// ### `Precondition`
40///
41/// A "barrier" task that is supposed to check invariants before the main tasks are started.
42/// An example of a precondition task could be a task that checks if the database has all the required data.
43/// Precondition tasks are often paired with some other kind of task that will make sure that the precondition
44/// can be satisfied. This is required for a distributed service setup, where the precondition task will be
45/// present on all the nodes, while a task that satisfies the precondition will be present only on one node.
46///
47/// ### `UnconstrainedTask`
48///
49/// A task that can run without waiting for preconditions.
50/// Tasks of this kind are expected to check all the invariants they rely on themselves.
51/// Usually, this kind of task is used either for tasks that must start as early as possible (e.g. healthcheck server),
52/// or for tasks that cannot rely on preconditions.
53///
54/// ### `UnconstrainedOneshotTask`
55///
56/// A task that can run without waiting for preconditions and can exit without stopping the service.
57/// Usually such tasks may be used for satisfying a precondition, for example, they can perform the database
58/// setup.
59#[async_trait::async_trait]
60pub trait Task: 'static + Send {
61 /// Returns the kind of the task.
62 /// The returned values is expected to be static, and it will be used by the service
63 /// to determine how to handle the task.
64 fn kind(&self) -> TaskKind {
65 TaskKind::Task
66 }
67
68 /// Unique name of the task.
69 fn id(&self) -> TaskId;
70
71 /// Runs the task.
72 async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> eyre::Result<()>;
73}
74
75impl dyn Task {
76 /// An internal helper method that guards running the task with a tokio Barrier.
77 /// Used to make sure that the task is not started until all the preconditions are met.
78 pub(super) async fn run_internal(
79 self: Box<Self>,
80 stop_receiver: StopReceiver,
81 preconditions_barrier: Arc<Barrier>,
82 ) -> eyre::Result<()> {
83 match self.kind() {
84 TaskKind::Task | TaskKind::OneshotTask => {
85 self.run_with_barrier(stop_receiver, preconditions_barrier)
86 .await
87 }
88 TaskKind::UnconstrainedTask | TaskKind::UnconstrainedOneshotTask => {
89 self.run(stop_receiver).await
90 }
91 TaskKind::Precondition => {
92 self.check_precondition(stop_receiver, preconditions_barrier)
93 .await
94 }
95 }
96 }
97
98 async fn run_with_barrier(
99 self: Box<Self>,
100 mut stop_receiver: StopReceiver,
101 preconditions_barrier: Arc<Barrier>,
102 ) -> eyre::Result<()> {
103 // Wait either for barrier to be lifted or for the stop signal to be received.
104 tokio::select! {
105 _ = preconditions_barrier.wait() => {
106 self.run(stop_receiver).await
107 }
108 _ = stop_receiver.0.changed() => {
109 Ok(())
110 }
111 }
112 }
113
114 async fn check_precondition(
115 self: Box<Self>,
116 mut stop_receiver: StopReceiver,
117 preconditions_barrier: Arc<Barrier>,
118 ) -> eyre::Result<()> {
119 self.run(stop_receiver.clone()).await?;
120 tokio::select! {
121 _ = preconditions_barrier.wait() => {
122 Ok(())
123 }
124 _ = stop_receiver.0.changed() => {
125 Ok(())
126 }
127 }
128 }
129}
130
131impl fmt::Debug for dyn Task {
132 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
133 f.debug_struct("Task")
134 .field("kind", &self.kind())
135 .field("name", &self.id())
136 .finish()
137 }
138}