Skip to main content

lifeline/
service.rs

1use crate::{
2    spawn::{spawn_task, task_name},
3    Bus, Lifeline,
4};
5use log::{debug, error};
6use std::future::Future;
7use std::{any::TypeId, fmt::Debug};
8
9/// Takes channels from the [Bus](./trait.Bus.html), and spawns a tree of tasks.  Returns one or more [Lifeline](./struct.Lifeline.html) values.  
10/// When the [Lifeline](./struct.Lifeline.html) is dropped, the task tree is immediately cancelled.
11///
12/// - Simple implementations can return the [Lifeline](./struct.Lifeline.html) value, a handle returned by [Task::task](./trait.Task.html#method.task).
13/// - Implementations which have fallible spawns can return `anyhow::Result<Lifeline>`.
14/// - Implementations which spawn multiple tasks can store lifelines for each task in self, and return `anyhow::Result<Self>`.
15///
16/// ## Example
17/// ```
18/// use lifeline::prelude::*;
19/// use tokio::sync::mpsc;
20///
21/// lifeline_bus!(pub struct ExampleBus);
22///
23/// #[derive(Debug, Clone)]
24/// struct ExampleMessage {}
25///
26/// impl Message<ExampleBus> for ExampleMessage {
27///     type Channel = mpsc::Sender<Self>;
28/// }    
29///
30/// struct ExampleService {
31///     _run: Lifeline   
32/// }
33///
34/// impl Service for ExampleService {
35///     type Bus = ExampleBus;
36///     type Lifeline = anyhow::Result<Self>;
37///
38///     fn spawn(bus: &ExampleBus) -> anyhow::Result<Self> {
39///         let mut rx = bus.rx::<ExampleMessage>()?;
40///
41///         let _run = Self::task("run", async move {
42///             while let Some(msg) = rx.recv().await {
43///                 log::info!("got message: {:?}", msg);
44///             }
45///         });
46///
47///         Ok(Self { _run })
48///     }
49/// }
50///
51/// async fn run() {
52///     let bus = ExampleBus::default();
53///     let _service = ExampleService::spawn(&bus);
54/// }
55/// ```
56pub trait Service: Task {
57    /// The bus, which must be provided to spawn the task
58    type Bus: Bus;
59
60    /// The service lifeline.  When dropped, all spawned tasks are immediately cancelled.
61    type Lifeline;
62
63    /// Spawns the service with all sub-tasks, and returns a lifeline value.  When the lifeline is dropped, all spawned tasks are immediately cancelled.
64    ///
65    /// Implementations should synchronously take channels from the bus, and then use them asynchronously.  This makes errors occur as early and predictably as possible.
66    fn spawn(bus: &Self::Bus) -> Self::Lifeline;
67}
68
69/// Constructs the bus, spawns the service, and returns both.
70pub trait DefaultService: Service {
71    fn spawn_default() -> (Self::Bus, Self::Lifeline);
72}
73
74impl<T> DefaultService for T
75where
76    T: Service,
77{
78    fn spawn_default() -> (Self::Bus, Self::Lifeline) {
79        let bus = Self::Bus::default();
80        let lifeline = Self::spawn(&bus);
81
82        (bus, lifeline)
83    }
84}
85
86/// Carries messages between **two** bus instances. A variant of the [Service](./trait.Service.html).
87///
88/// Bus types form a tree, with a 'root application' bus, and multiple busses focused on particular domains.  This structure provides isolation,
89/// and predictable failures when [Services](./trait.Service.html) spawn.
90/// ```text
91/// - MainBus
92///   | ListenerBus
93///   |  | ConnectionBus
94///   | DomainSpecificBus
95///   |  | ...
96/// ```
97///
98/// This trait can be implemented to carry messages between the root and the leaf of the tree.
99///
100/// ## Example
101/// ```
102/// use lifeline::prelude::*;
103/// use tokio::sync::mpsc;
104/// lifeline_bus!(pub struct MainBus);
105/// lifeline_bus!(pub struct LeafBus);
106///
107/// #[derive(Debug, Clone)]
108/// struct LeafShutdown {}
109///
110/// #[derive(Debug, Clone)]
111/// struct MainShutdown {}
112///
113/// impl Message<LeafBus> for LeafShutdown {
114///     type Channel = mpsc::Sender<Self>;   
115/// }
116///
117/// impl Message<MainBus> for MainShutdown {
118///     type Channel = mpsc::Sender<Self>;   
119/// }
120///
121/// pub struct LeafMainCarrier {
122///    _forward_shutdown: Lifeline
123/// }
124///
125/// impl CarryFrom<MainBus> for LeafBus {
126///     type Lifeline = anyhow::Result<LeafMainCarrier>;
127///     fn carry_from(&self, from: &MainBus) -> Self::Lifeline {
128///         let mut rx = self.rx::<LeafShutdown>()?;
129///         let mut tx = from.tx::<MainShutdown>()?;
130///
131///         let _forward_shutdown = Self::try_task("forward_shutdown", async move {
132///             if let Some(msg) = rx.recv().await {
133///                 tx.send(MainShutdown{}).await?;
134///             }
135///
136///             Ok(())
137///         });
138///
139///         Ok(LeafMainCarrier { _forward_shutdown })
140///     }
141/// }
142/// ```
143pub trait CarryFrom<FromBus: Bus>: Bus + Task + Sized {
144    /// The carrier lifeline.  When dropped, all spawned tasks are immediately cancelled.
145    type Lifeline;
146
147    /// Spawns the carrier service, returning the lifeline value.
148    fn carry_from(&self, from: &FromBus) -> Self::Lifeline;
149}
150
151/// The receprocial of the [CarryFrom](./trait.CarryFrom.html) trait.  Implemented for all types on which [CarryFrom](./trait.CarryFrom.html) is implemented.
152pub trait CarryInto<IntoBus: Bus>: Bus + Task + Sized {
153    /// The carrier lifeline.  When dropped, all spawned tasks are immediately cancelled.
154    type Lifeline;
155
156    /// Spawns the carrier service, returning the lifeline value.
157    fn carry_into(&self, into: &IntoBus) -> Self::Lifeline;
158}
159
160impl<F, I> CarryInto<I> for F
161where
162    I: CarryFrom<F>,
163    F: Bus,
164    I: Bus,
165{
166    type Lifeline = <I as CarryFrom<F>>::Lifeline;
167
168    fn carry_into(&self, into: &I) -> Self::Lifeline {
169        into.carry_from(self)
170    }
171}
172
173/// Constructs two bus types, and spawns the carrier between them.
174/// Returns both busses, and the carrier's lifeline.
175pub trait DefaultCarrier<FromBus: Bus>: CarryFrom<FromBus> {
176    fn carry_default() -> (Self, FromBus, Self::Lifeline) {
177        let into = Self::default();
178        let from = FromBus::default();
179        let lifeline = into.carry_from(&from);
180
181        (into, from, lifeline)
182    }
183}
184
185/// Provides the [Self::task](./trait.Task.html#method.task) and [Self::try_task](./trait.Task.html#method.try_task) associated methods for all types.
186///
187/// Lifeline supports the following task executors (using feature flags), and will use the first enabled flag:
188/// - `tokio-executor`
189/// - `async-std-executor`
190///
191/// Fallible tasks can be invoked with [Self::try_task](./trait.Task.html#method.try_task).  Lifeline will log OK/ERR status when the task finishes.
192///
193/// # Example
194/// ```
195/// use lifeline::prelude::*;
196/// use tokio::sync::mpsc;
197///
198/// lifeline_bus!(pub struct ExampleBus);
199///
200/// #[derive(Debug, Clone)]
201/// struct ExampleMessage {}
202///
203/// impl Message<ExampleBus> for ExampleMessage {
204///     type Channel = mpsc::Sender<Self>;
205/// }    
206///
207/// struct ExampleService {
208///     _run: Lifeline   
209/// }
210///
211/// impl Service for ExampleService {
212///     type Bus = ExampleBus;
213///     type Lifeline = anyhow::Result<Self>;
214///
215///     fn spawn(bus: &ExampleBus) -> anyhow::Result<Self> {
216///         let mut rx = bus.rx::<ExampleMessage>()?;
217///
218///         let _run = Self::task("run", async move {
219///             while let Some(msg) = rx.recv().await {
220///                 log::info!("got message: {:?}", msg);
221///             }
222///         });
223///
224///         Ok(Self { _run })
225///     }
226/// }
227/// ```
228pub trait Task {
229    /// Spawns an infallible task using the provided executor, wrapping it in a [Lifeline](./struct.Lifeline.html) handle.
230    /// The task will run until it finishes, or until the [Lifeline](./struct.Lifeline.html) is droped.
231    fn task<Out>(name: &str, fut: impl Future<Output = Out> + Send + 'static) -> Lifeline
232    where
233        Out: Debug + Send + 'static,
234        Self: Sized,
235    {
236        let service_name = task_name::<Self>(name);
237        spawn_task(service_name, fut)
238    }
239
240    /// Spawns an fallible task using the provided executor, wrapping it in a [Lifeline](./struct.Lifeline.html) handle.
241    /// The task will run until it finishes, or until the [Lifeline](./struct.Lifeline.html) is droped.
242    ///
243    /// If the task finishes, lifeline will log an 'OK' or 'ERR' message with the return value.
244    fn try_task<Out>(
245        name: &str,
246        fut: impl Future<Output = anyhow::Result<Out>> + Send + 'static,
247    ) -> Lifeline
248    where
249        Out: Debug + 'static,
250        Self: Sized,
251    {
252        let service_name = task_name::<Self>(name);
253        spawn_task(service_name.clone(), async move {
254            match fut.await {
255                Ok(val) => {
256                    if TypeId::of::<Out>() != TypeId::of::<()>() {
257                        debug!("OK {}: {:?}", service_name, val);
258                    } else {
259                        debug!("OK {}", service_name);
260                    }
261                }
262                Err(err) => {
263                    error!("ERR: {}: {}", service_name, err);
264                }
265            }
266        })
267    }
268}
269
270impl<T> Task for T {}