interlink/
msg.rs

1//! # Messages
2//!
3//! Messages are used to communicate with services. Messages are handled by [`Handler`]'s on services
4//! and must respond with a specific message type. The default derived message response type is the unit type.
5//! When handling a value you must choose a response type for how you intent to create the response value.
6//! See the different types below:
7//!
8//! ## Response Types
9//!
10//! - () Unit response type. This type responds with a empty value allowing you to return nothing from a handler
11//! - [`Mr`] Message response type. This is for when you are synchronously responding to a message.
12//! - [`Fr`] Future response type. This is for responding with a value that is created by awaiting a future. The future is spawned into a new tokio task
13//! - [`Sfr`] Service future response type. This is for when the response depends on awaiting a future that requires a mutable borrow over the service and/or the service context
14//!
15//! ## Messages
16//!
17//! Things that can be sent to services as messages must implement the [`Message`] trait. This trait can also be
18//! derived using the following derive macro.
19//!
20//! ```
21//! use interlink::prelude::*;
22//!
23//! #[derive(Message)]
24//! struct MyMessage {
25//!     value: String,
26//! }
27//! ```
28//!
29//! Without specifying the response type in the above message it will default to the () unit response type. To specify the
30//! response type you can use the syntax below
31//!
32//! ```
33//! use interlink::prelude::*;
34//!
35//! #[derive(Message)]
36//! #[msg(rtype = "String")]
37//! struct MyMessage {
38//!     value: String,
39//! }
40//! ```
41//!
42//! The rtype portion specifies the type of the response value.
43//!
44use std::{future::ready, pin::Pin};
45use crate::{
46    envelope::{BoxedFutureEnvelope, FutureProducer},
47    service::{Service, ServiceContext},
48};
49use std::future::Future;
50use tokio::sync::oneshot;
51
52/// Type alias for a future that is pinned and boxed with a specific return type (T) and lifetime ('a)
53pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
54
55/// Message type implemented by structures that can be passed
56/// around as messages through envelopes.
57///
58/// This trait can be derived using its derive macro
59/// ```
60/// use interlink::prelude::*;
61///
62/// #[derive(Message)]
63/// struct MyMessage {
64///     value: String,
65/// }
66/// ```
67///
68/// Without specifying the response type in the above message it will default to the () unit response type. To specify the
69/// response type you can use the syntax below
70///
71/// ```
72/// use interlink::prelude::*;
73///
74/// #[derive(Message)]
75/// #[msg(rtype = "String")]
76/// struct MyMessage {
77///     value: String,
78/// }
79/// ```
80pub trait Message: Send + 'static {
81    /// The type of the response that handlers will produce
82    /// when handling this message
83    type Response: Send + 'static;
84}
85
86/// # Message Response
87///
88/// Response type from a handler which directly sends a response
89/// for a specific message
90///
91/// ```
92/// use interlink::prelude::*;
93///
94/// #[derive(Service)]
95/// struct Test { value: String };
96///
97/// #[derive(Message)]
98/// #[msg(rtype="String")]
99/// struct TestMessage {
100///     value: String,
101/// }
102///
103/// impl Handler<TestMessage> for Test {
104///     type Response = Mr<TestMessage>;
105///
106///     fn handle(&mut self, msg: TestMessage, ctx: &mut ServiceContext<Self>) -> Self::Response {
107///         self.value = msg.value;
108///
109///         Mr("Response".to_string())
110///     }
111/// }
112///
113/// #[tokio::test]
114/// async fn test() {
115///     let service = Test { value: "Default".to_string() };
116///     let link = service.start();
117///     
118///     let res: String = link
119///         .send(TestMessage {
120///             value: "Example".to_string()
121///         })
122///         .await
123///         .unwrap();
124///     
125///     assert_eq!(&res, "Response")    
126///
127/// }
128/// ```
129pub struct Mr<M: Message>(pub M::Response);
130
131impl<S, M> ResponseHandler<S, M> for Mr<M>
132where
133    S: Service,
134    M: Message,
135{
136    fn respond(
137        self,
138        _service: &mut S,
139        _ctx: &mut ServiceContext<S>,
140        tx: Option<oneshot::Sender<M::Response>>,
141    ) {
142        if let Some(tx) = tx {
143            let _ = tx.send(self.0);
144        }
145    }
146}
147
148/// Void response handler for sending an empty unit
149/// response automatically after executing
150impl<S, M> ResponseHandler<S, M> for ()
151where
152    S: Service,
153    M: Message<Response = ()>,
154{
155    fn respond(
156        self,
157        _service: &mut S,
158        _ctx: &mut ServiceContext<S>,
159        tx: Option<oneshot::Sender<<M as Message>::Response>>,
160    ) {
161        if let Some(tx) = tx {
162            let _ = tx.send(());
163        }
164    }
165}
166
167/// Response handler for optional handler types to handle
168/// not sending any response
169impl<S, M, R> ResponseHandler<S, M> for Option<R>
170where
171    R: ResponseHandler<S, M>,
172    S: Service,
173    M: Message,
174{
175    fn respond(
176        self,
177        service: &mut S,
178        ctx: &mut ServiceContext<S>,
179        tx: Option<oneshot::Sender<<M as Message>::Response>>,
180    ) {
181        if let Some(value) = self {
182            value.respond(service, ctx, tx);
183        }
184    }
185}
186
187/// Response handler for result response types where the
188/// error half of the result can be handled by a service
189/// error handler
190impl<S, M, R, E> ResponseHandler<S, M> for Result<R, E>
191where
192    R: ResponseHandler<S, M>,
193    S: Service + ErrorHandler<E>,
194    M: Message,
195    E: Send + 'static,
196{
197    fn respond(
198        self,
199        service: &mut S,
200        ctx: &mut ServiceContext<S>,
201        tx: Option<oneshot::Sender<<M as Message>::Response>>,
202    ) {
203        match self {
204            Ok(value) => {
205                value.respond(service, ctx, tx);
206            }
207            Err(err) => {
208                service.handle(err, ctx);
209            }
210        }
211    }
212}
213
214/// # Future Response
215///
216/// Response type from a handler containing a future that
217/// is to be spawned into a another task where the response
218/// will then be sent to the sender. This should be used
219/// when the response is computed in a future that can run
220/// independently from the service.
221///
222///
223/// ```
224/// use interlink::prelude::*;
225/// use std::time::Duration;
226/// use tokio::time::sleep;
227///
228/// #[derive(Service)]
229/// struct Test { value: String };
230///
231/// #[derive(Message)]
232/// #[msg(rtype = "String")]
233/// struct TestMessage {
234///     value: String,
235/// }
236///
237/// impl Handler<TestMessage> for Test {
238///     type Response = Fr<TestMessage>;
239///
240///     fn handle(&mut self, msg: TestMessage, ctx: &mut ServiceContext<Self>) -> Self::Response {
241///         // Additional logic can be run here before the future
242///         // response is created
243///
244///         Fr::new(Box::pin(async move {
245///             // Some future that must be polled in another task
246///             sleep(Duration::from_millis(1000)).await;
247///
248///             // You can return the response type of the message here
249///             "Response".to_string()
250///        }))
251///     }
252/// }
253///
254/// #[tokio::test]
255/// async fn test() {
256///     let service = Test { value: "Default".to_string() };
257///     let link = service.start();
258///     
259///     let res: String = link
260///         .send(TestMessage {
261///             value: "Example".to_string()
262///         })
263///         .await
264///         .unwrap();
265///     
266///     assert_eq!(&res, "Response")    
267///
268/// }
269/// ```
270pub struct Fr<M: Message> {
271    /// The underlying future to await for a response
272    future: BoxFuture<'static, M::Response>,
273}
274
275impl<M> Fr<M>
276where
277    M: Message,
278{
279    /// Creates a new future response from the provided boxed future.
280    pub fn new(future: BoxFuture<'static, M::Response>) -> Fr<M> {
281        Fr { future }
282    }
283
284    /// Creates a Fr wrapping the provided future creating a boxed
285    /// future from the provided future. Don't use this if you've
286    /// already boxed the future
287    pub fn new_box<F>(future: F) -> Fr<M>
288    where
289        F: Future<Output = M::Response> + Send + 'static,
290    {
291        Fr {
292            future: Box::pin(future),
293        }
294    }
295
296    /// Creates a new future response for a ready future containing a value
297    /// that is already ready.
298    pub fn ready(value: M::Response) -> Fr<M> {
299        Fr {
300            future: Box::pin(ready(value)),
301        }
302    }
303}
304
305impl<S, M> ResponseHandler<S, M> for Fr<M>
306where
307    S: Service,
308    M: Message,
309{
310    fn respond(
311        self,
312        _service: &mut S,
313        _ctx: &mut ServiceContext<S>,
314        tx: Option<oneshot::Sender<M::Response>>,
315    ) {
316        tokio::spawn(async move {
317            let res = self.future.await;
318            if let Some(tx) = tx {
319                let _ = tx.send(res);
320            }
321        });
322    }
323}
324
325/// # Service Future Response
326///
327/// Response type from a handler where a future must be
328/// awaited on the processing loop of the service. While
329/// the result of this future is being processed no other
330/// messages will be handled.
331///
332/// This provides a mutable borrow of the service and the service
333/// context to the future that is being awaited.
334///
335/// ```
336/// use interlink::prelude::*;
337/// use std::time::Duration;
338/// use tokio::time::sleep;
339///
340/// #[derive(Service)]
341/// struct Test { value: String };
342///
343/// #[derive(Message)]
344/// #[msg(rtype = "String")]
345/// struct TestMessage {
346///     value: String,
347/// }
348///
349/// impl Handler<TestMessage> for Test {
350///     type Response = Sfr<Self, TestMessage>;
351///
352///     fn handle(&mut self, msg: TestMessage, ctx: &mut ServiceContext<Self>) -> Self::Response {
353///         // Additional logic can be run here before the future
354///         // response is created
355///
356///         Sfr::new(move |service: &mut Test, ctx| {
357///             Box::pin(async move {
358///                 // Some future that must be polled on the service loop
359///                 sleep(Duration::from_millis(1000)).await;
360///
361///                 // Make use of the mutable access to service
362///                 service.value = msg.value.clone();
363///
364///                 // You can return the response type of the message here
365///                 "Response".to_string()
366///             })
367///         })
368///     }
369/// }
370///
371/// #[tokio::test]
372/// async fn test() {
373///     let service = Test { value: "Default".to_string() };
374///     let link = service.start();
375///     
376///     let res: String = link
377///         .send(TestMessage {
378///             value: "Example".to_string()
379///         })
380///         .await
381///         .unwrap();
382///     
383///     assert_eq!(&res, "Response")    
384///
385/// }
386/// ```
387pub struct Sfr<S, M: Message> {
388    /// The producer that will produce the function that must be awaited
389    producer: Box<dyn FutureProducer<S, Response = M::Response>>,
390}
391
392impl<S, M> Sfr<S, M>
393where
394    S: Service,
395    M: Message,
396{
397    /// Creates a new service future response. Takes a fn which
398    /// accepts mutable access to the service and its context
399    /// and returns a boxed future with the same lifetime as the
400    /// borrow
401    ///
402    /// `producer` The producer fn
403    pub fn new<P>(producer: P) -> Sfr<S, M>
404    where
405        for<'a> P: FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, M::Response>
406            + Send
407            + 'static,
408    {
409        Sfr {
410            producer: Box::new(producer),
411        }
412    }
413}
414
415/// The response handler for service future responses passes on
416/// the producer in an envelope to be handled by the context
417impl<S, M> ResponseHandler<S, M> for Sfr<S, M>
418where
419    S: Service,
420    M: Message,
421{
422    fn respond(
423        self,
424        _service: &mut S,
425        ctx: &mut ServiceContext<S>,
426        tx: Option<oneshot::Sender<M::Response>>,
427    ) {
428        let _ = ctx
429            .shared_link()
430            .tx(BoxedFutureEnvelope::new(self.producer, tx));
431    }
432}
433
434/// Handler implementation for handling what happens
435/// with a response value
436pub trait ResponseHandler<S: Service, M: Message>: Send + 'static {
437    fn respond(
438        self,
439        service: &mut S,
440        ctx: &mut ServiceContext<S>,
441        tx: Option<oneshot::Sender<M::Response>>,
442    );
443}
444
445/// Handler implementation for allowing a service to handle a specific
446/// message type
447pub trait Handler<M: Message>: Service {
448    /// The response type this handler will use
449    type Response: ResponseHandler<Self, M>;
450
451    /// Handler for processing the message using the current service
452    /// context and message. Will respond with the specified response type
453    ///
454    /// `self` The service handling the message
455    /// `msg`  The message that is being handled
456    /// `ctx`  Mutable borrow of the service context
457    fn handle(&mut self, msg: M, ctx: &mut ServiceContext<Self>) -> Self::Response;
458}
459
460/// Handler for accepting streams of messages for a service
461/// from streams attached to the service see `attach_stream`
462/// on ServiceContext
463pub trait StreamHandler<M: Send>: Service {
464    /// Handler for handling messages received from a stream
465    ///
466    /// `self` The service handling the message
467    /// `msg`  The message received
468    /// `ctx`  Mutable borrow of the service context
469    fn handle(&mut self, msg: M, ctx: &mut ServiceContext<Self>);
470}
471
472/// Handler for accepting streams of messages for a service
473/// from streams attached to the service
474pub trait ErrorHandler<M: Send>: Service {
475
476    /// Handler for handling errors that occur in associated services
477    /// in cases such as errors while writing messages to a connected sink.
478    /// Responds with an [`ErrorAction`] which determines how the service
479    /// should react to the error
480    ///
481    /// `self` The service handling the error
482    /// `err`  The error that was encountered
483    /// `ctx`  Mutable borrow of the service context
484    fn handle(&mut self, err: M, ctx: &mut ServiceContext<Self>) -> ErrorAction;
485}
486
487/// Actions that can be taken after handling an error.
488pub enum ErrorAction {
489    /// Continue processing
490    Continue,
491    /// Stop processing
492    Stop,
493}