lifeline/service.rs
1use crate::{
2 spawn::{spawn_task, task_name},
3 Bus, Lifeline,
4};
5use log::{debug, error};
6use std::future::Future;
7use std::{any::TypeId, fmt::Debug};
8
9/// Takes channels from the [Bus](./trait.Bus.html), and spawns a tree of tasks. Returns one or more [Lifeline](./struct.Lifeline.html) values.
10/// When the [Lifeline](./struct.Lifeline.html) is dropped, the task tree is immediately cancelled.
11///
12/// - Simple implementations can return the [Lifeline](./struct.Lifeline.html) value, a handle returned by [Task::task](./trait.Task.html#method.task).
13/// - Implementations which have fallible spawns can return `anyhow::Result<Lifeline>`.
14/// - Implementations which spawn multiple tasks can store lifelines for each task in self, and return `anyhow::Result<Self>`.
15///
16/// ## Example
17/// ```
18/// use lifeline::prelude::*;
19/// use tokio::sync::mpsc;
20///
21/// lifeline_bus!(pub struct ExampleBus);
22///
23/// #[derive(Debug, Clone)]
24/// struct ExampleMessage {}
25///
26/// impl Message<ExampleBus> for ExampleMessage {
27/// type Channel = mpsc::Sender<Self>;
28/// }
29///
30/// struct ExampleService {
31/// _run: Lifeline
32/// }
33///
34/// impl Service for ExampleService {
35/// type Bus = ExampleBus;
36/// type Lifeline = anyhow::Result<Self>;
37///
38/// fn spawn(bus: &ExampleBus) -> anyhow::Result<Self> {
39/// let mut rx = bus.rx::<ExampleMessage>()?;
40///
41/// let _run = Self::task("run", async move {
42/// while let Some(msg) = rx.recv().await {
43/// log::info!("got message: {:?}", msg);
44/// }
45/// });
46///
47/// Ok(Self { _run })
48/// }
49/// }
50///
51/// async fn run() {
52/// let bus = ExampleBus::default();
53/// let _service = ExampleService::spawn(&bus);
54/// }
55/// ```
56pub trait Service: Task {
57 /// The bus, which must be provided to spawn the task
58 type Bus: Bus;
59
60 /// The service lifeline. When dropped, all spawned tasks are immediately cancelled.
61 type Lifeline;
62
63 /// Spawns the service with all sub-tasks, and returns a lifeline value. When the lifeline is dropped, all spawned tasks are immediately cancelled.
64 ///
65 /// Implementations should synchronously take channels from the bus, and then use them asynchronously. This makes errors occur as early and predictably as possible.
66 fn spawn(bus: &Self::Bus) -> Self::Lifeline;
67}
68
69/// Constructs the bus, spawns the service, and returns both.
70pub trait DefaultService: Service {
71 fn spawn_default() -> (Self::Bus, Self::Lifeline);
72}
73
74impl<T> DefaultService for T
75where
76 T: Service,
77{
78 fn spawn_default() -> (Self::Bus, Self::Lifeline) {
79 let bus = Self::Bus::default();
80 let lifeline = Self::spawn(&bus);
81
82 (bus, lifeline)
83 }
84}
85
86/// Carries messages between **two** bus instances. A variant of the [Service](./trait.Service.html).
87///
88/// Bus types form a tree, with a 'root application' bus, and multiple busses focused on particular domains. This structure provides isolation,
89/// and predictable failures when [Services](./trait.Service.html) spawn.
90/// ```text
91/// - MainBus
92/// | ListenerBus
93/// | | ConnectionBus
94/// | DomainSpecificBus
95/// | | ...
96/// ```
97///
98/// This trait can be implemented to carry messages between the root and the leaf of the tree.
99///
100/// ## Example
101/// ```
102/// use lifeline::prelude::*;
103/// use tokio::sync::mpsc;
104/// lifeline_bus!(pub struct MainBus);
105/// lifeline_bus!(pub struct LeafBus);
106///
107/// #[derive(Debug, Clone)]
108/// struct LeafShutdown {}
109///
110/// #[derive(Debug, Clone)]
111/// struct MainShutdown {}
112///
113/// impl Message<LeafBus> for LeafShutdown {
114/// type Channel = mpsc::Sender<Self>;
115/// }
116///
117/// impl Message<MainBus> for MainShutdown {
118/// type Channel = mpsc::Sender<Self>;
119/// }
120///
121/// pub struct LeafMainCarrier {
122/// _forward_shutdown: Lifeline
123/// }
124///
125/// impl CarryFrom<MainBus> for LeafBus {
126/// type Lifeline = anyhow::Result<LeafMainCarrier>;
127/// fn carry_from(&self, from: &MainBus) -> Self::Lifeline {
128/// let mut rx = self.rx::<LeafShutdown>()?;
129/// let mut tx = from.tx::<MainShutdown>()?;
130///
131/// let _forward_shutdown = Self::try_task("forward_shutdown", async move {
132/// if let Some(msg) = rx.recv().await {
133/// tx.send(MainShutdown{}).await?;
134/// }
135///
136/// Ok(())
137/// });
138///
139/// Ok(LeafMainCarrier { _forward_shutdown })
140/// }
141/// }
142/// ```
143pub trait CarryFrom<FromBus: Bus>: Bus + Task + Sized {
144 /// The carrier lifeline. When dropped, all spawned tasks are immediately cancelled.
145 type Lifeline;
146
147 /// Spawns the carrier service, returning the lifeline value.
148 fn carry_from(&self, from: &FromBus) -> Self::Lifeline;
149}
150
151/// The receprocial of the [CarryFrom](./trait.CarryFrom.html) trait. Implemented for all types on which [CarryFrom](./trait.CarryFrom.html) is implemented.
152pub trait CarryInto<IntoBus: Bus>: Bus + Task + Sized {
153 /// The carrier lifeline. When dropped, all spawned tasks are immediately cancelled.
154 type Lifeline;
155
156 /// Spawns the carrier service, returning the lifeline value.
157 fn carry_into(&self, into: &IntoBus) -> Self::Lifeline;
158}
159
160impl<F, I> CarryInto<I> for F
161where
162 I: CarryFrom<F>,
163 F: Bus,
164 I: Bus,
165{
166 type Lifeline = <I as CarryFrom<F>>::Lifeline;
167
168 fn carry_into(&self, into: &I) -> Self::Lifeline {
169 into.carry_from(self)
170 }
171}
172
173/// Constructs two bus types, and spawns the carrier between them.
174/// Returns both busses, and the carrier's lifeline.
175pub trait DefaultCarrier<FromBus: Bus>: CarryFrom<FromBus> {
176 fn carry_default() -> (Self, FromBus, Self::Lifeline) {
177 let into = Self::default();
178 let from = FromBus::default();
179 let lifeline = into.carry_from(&from);
180
181 (into, from, lifeline)
182 }
183}
184
185/// Provides the [Self::task](./trait.Task.html#method.task) and [Self::try_task](./trait.Task.html#method.try_task) associated methods for all types.
186///
187/// Lifeline supports the following task executors (using feature flags), and will use the first enabled flag:
188/// - `tokio-executor`
189/// - `async-std-executor`
190///
191/// Fallible tasks can be invoked with [Self::try_task](./trait.Task.html#method.try_task). Lifeline will log OK/ERR status when the task finishes.
192///
193/// # Example
194/// ```
195/// use lifeline::prelude::*;
196/// use tokio::sync::mpsc;
197///
198/// lifeline_bus!(pub struct ExampleBus);
199///
200/// #[derive(Debug, Clone)]
201/// struct ExampleMessage {}
202///
203/// impl Message<ExampleBus> for ExampleMessage {
204/// type Channel = mpsc::Sender<Self>;
205/// }
206///
207/// struct ExampleService {
208/// _run: Lifeline
209/// }
210///
211/// impl Service for ExampleService {
212/// type Bus = ExampleBus;
213/// type Lifeline = anyhow::Result<Self>;
214///
215/// fn spawn(bus: &ExampleBus) -> anyhow::Result<Self> {
216/// let mut rx = bus.rx::<ExampleMessage>()?;
217///
218/// let _run = Self::task("run", async move {
219/// while let Some(msg) = rx.recv().await {
220/// log::info!("got message: {:?}", msg);
221/// }
222/// });
223///
224/// Ok(Self { _run })
225/// }
226/// }
227/// ```
228pub trait Task {
229 /// Spawns an infallible task using the provided executor, wrapping it in a [Lifeline](./struct.Lifeline.html) handle.
230 /// The task will run until it finishes, or until the [Lifeline](./struct.Lifeline.html) is droped.
231 fn task<Out>(name: &str, fut: impl Future<Output = Out> + Send + 'static) -> Lifeline
232 where
233 Out: Debug + Send + 'static,
234 Self: Sized,
235 {
236 let service_name = task_name::<Self>(name);
237 spawn_task(service_name, fut)
238 }
239
240 /// Spawns an fallible task using the provided executor, wrapping it in a [Lifeline](./struct.Lifeline.html) handle.
241 /// The task will run until it finishes, or until the [Lifeline](./struct.Lifeline.html) is droped.
242 ///
243 /// If the task finishes, lifeline will log an 'OK' or 'ERR' message with the return value.
244 fn try_task<Out>(
245 name: &str,
246 fut: impl Future<Output = anyhow::Result<Out>> + Send + 'static,
247 ) -> Lifeline
248 where
249 Out: Debug + 'static,
250 Self: Sized,
251 {
252 let service_name = task_name::<Self>(name);
253 spawn_task(service_name.clone(), async move {
254 match fut.await {
255 Ok(val) => {
256 if TypeId::of::<Out>() != TypeId::of::<()>() {
257 debug!("OK {}: {:?}", service_name, val);
258 } else {
259 debug!("OK {}", service_name);
260 }
261 }
262 Err(err) => {
263 error!("ERR: {}: {}", service_name, err);
264 }
265 }
266 })
267 }
268}
269
270impl<T> Task for T {}