interlink/msg.rs
1//! # Messages
2//!
3//! Messages are used to communicate with services. Messages are handled by [`Handler`]'s on services
4//! and must respond with a specific message type. The default derived message response type is the unit type.
5//! When handling a value you must choose a response type for how you intent to create the response value.
6//! See the different types below:
7//!
8//! ## Response Types
9//!
10//! - () Unit response type. This type responds with a empty value allowing you to return nothing from a handler
11//! - [`Mr`] Message response type. This is for when you are synchronously responding to a message.
12//! - [`Fr`] Future response type. This is for responding with a value that is created by awaiting a future. The future is spawned into a new tokio task
13//! - [`Sfr`] Service future response type. This is for when the response depends on awaiting a future that requires a mutable borrow over the service and/or the service context
14//!
15//! ## Messages
16//!
17//! Things that can be sent to services as messages must implement the [`Message`] trait. This trait can also be
18//! derived using the following derive macro.
19//!
20//! ```
21//! use interlink::prelude::*;
22//!
23//! #[derive(Message)]
24//! struct MyMessage {
25//! value: String,
26//! }
27//! ```
28//!
29//! Without specifying the response type in the above message it will default to the () unit response type. To specify the
30//! response type you can use the syntax below
31//!
32//! ```
33//! use interlink::prelude::*;
34//!
35//! #[derive(Message)]
36//! #[msg(rtype = "String")]
37//! struct MyMessage {
38//! value: String,
39//! }
40//! ```
41//!
42//! The rtype portion specifies the type of the response value.
43//!
44use std::{future::ready, pin::Pin};
45use crate::{
46 envelope::{BoxedFutureEnvelope, FutureProducer},
47 service::{Service, ServiceContext},
48};
49use std::future::Future;
50use tokio::sync::oneshot;
51
52/// Type alias for a future that is pinned and boxed with a specific return type (T) and lifetime ('a)
53pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
54
55/// Message type implemented by structures that can be passed
56/// around as messages through envelopes.
57///
58/// This trait can be derived using its derive macro
59/// ```
60/// use interlink::prelude::*;
61///
62/// #[derive(Message)]
63/// struct MyMessage {
64/// value: String,
65/// }
66/// ```
67///
68/// Without specifying the response type in the above message it will default to the () unit response type. To specify the
69/// response type you can use the syntax below
70///
71/// ```
72/// use interlink::prelude::*;
73///
74/// #[derive(Message)]
75/// #[msg(rtype = "String")]
76/// struct MyMessage {
77/// value: String,
78/// }
79/// ```
80pub trait Message: Send + 'static {
81 /// The type of the response that handlers will produce
82 /// when handling this message
83 type Response: Send + 'static;
84}
85
86/// # Message Response
87///
88/// Response type from a handler which directly sends a response
89/// for a specific message
90///
91/// ```
92/// use interlink::prelude::*;
93///
94/// #[derive(Service)]
95/// struct Test { value: String };
96///
97/// #[derive(Message)]
98/// #[msg(rtype="String")]
99/// struct TestMessage {
100/// value: String,
101/// }
102///
103/// impl Handler<TestMessage> for Test {
104/// type Response = Mr<TestMessage>;
105///
106/// fn handle(&mut self, msg: TestMessage, ctx: &mut ServiceContext<Self>) -> Self::Response {
107/// self.value = msg.value;
108///
109/// Mr("Response".to_string())
110/// }
111/// }
112///
113/// #[tokio::test]
114/// async fn test() {
115/// let service = Test { value: "Default".to_string() };
116/// let link = service.start();
117///
118/// let res: String = link
119/// .send(TestMessage {
120/// value: "Example".to_string()
121/// })
122/// .await
123/// .unwrap();
124///
125/// assert_eq!(&res, "Response")
126///
127/// }
128/// ```
129pub struct Mr<M: Message>(pub M::Response);
130
131impl<S, M> ResponseHandler<S, M> for Mr<M>
132where
133 S: Service,
134 M: Message,
135{
136 fn respond(
137 self,
138 _service: &mut S,
139 _ctx: &mut ServiceContext<S>,
140 tx: Option<oneshot::Sender<M::Response>>,
141 ) {
142 if let Some(tx) = tx {
143 let _ = tx.send(self.0);
144 }
145 }
146}
147
148/// Void response handler for sending an empty unit
149/// response automatically after executing
150impl<S, M> ResponseHandler<S, M> for ()
151where
152 S: Service,
153 M: Message<Response = ()>,
154{
155 fn respond(
156 self,
157 _service: &mut S,
158 _ctx: &mut ServiceContext<S>,
159 tx: Option<oneshot::Sender<<M as Message>::Response>>,
160 ) {
161 if let Some(tx) = tx {
162 let _ = tx.send(());
163 }
164 }
165}
166
167/// Response handler for optional handler types to handle
168/// not sending any response
169impl<S, M, R> ResponseHandler<S, M> for Option<R>
170where
171 R: ResponseHandler<S, M>,
172 S: Service,
173 M: Message,
174{
175 fn respond(
176 self,
177 service: &mut S,
178 ctx: &mut ServiceContext<S>,
179 tx: Option<oneshot::Sender<<M as Message>::Response>>,
180 ) {
181 if let Some(value) = self {
182 value.respond(service, ctx, tx);
183 }
184 }
185}
186
187/// Response handler for result response types where the
188/// error half of the result can be handled by a service
189/// error handler
190impl<S, M, R, E> ResponseHandler<S, M> for Result<R, E>
191where
192 R: ResponseHandler<S, M>,
193 S: Service + ErrorHandler<E>,
194 M: Message,
195 E: Send + 'static,
196{
197 fn respond(
198 self,
199 service: &mut S,
200 ctx: &mut ServiceContext<S>,
201 tx: Option<oneshot::Sender<<M as Message>::Response>>,
202 ) {
203 match self {
204 Ok(value) => {
205 value.respond(service, ctx, tx);
206 }
207 Err(err) => {
208 service.handle(err, ctx);
209 }
210 }
211 }
212}
213
214/// # Future Response
215///
216/// Response type from a handler containing a future that
217/// is to be spawned into a another task where the response
218/// will then be sent to the sender. This should be used
219/// when the response is computed in a future that can run
220/// independently from the service.
221///
222///
223/// ```
224/// use interlink::prelude::*;
225/// use std::time::Duration;
226/// use tokio::time::sleep;
227///
228/// #[derive(Service)]
229/// struct Test { value: String };
230///
231/// #[derive(Message)]
232/// #[msg(rtype = "String")]
233/// struct TestMessage {
234/// value: String,
235/// }
236///
237/// impl Handler<TestMessage> for Test {
238/// type Response = Fr<TestMessage>;
239///
240/// fn handle(&mut self, msg: TestMessage, ctx: &mut ServiceContext<Self>) -> Self::Response {
241/// // Additional logic can be run here before the future
242/// // response is created
243///
244/// Fr::new(Box::pin(async move {
245/// // Some future that must be polled in another task
246/// sleep(Duration::from_millis(1000)).await;
247///
248/// // You can return the response type of the message here
249/// "Response".to_string()
250/// }))
251/// }
252/// }
253///
254/// #[tokio::test]
255/// async fn test() {
256/// let service = Test { value: "Default".to_string() };
257/// let link = service.start();
258///
259/// let res: String = link
260/// .send(TestMessage {
261/// value: "Example".to_string()
262/// })
263/// .await
264/// .unwrap();
265///
266/// assert_eq!(&res, "Response")
267///
268/// }
269/// ```
270pub struct Fr<M: Message> {
271 /// The underlying future to await for a response
272 future: BoxFuture<'static, M::Response>,
273}
274
275impl<M> Fr<M>
276where
277 M: Message,
278{
279 /// Creates a new future response from the provided boxed future.
280 pub fn new(future: BoxFuture<'static, M::Response>) -> Fr<M> {
281 Fr { future }
282 }
283
284 /// Creates a Fr wrapping the provided future creating a boxed
285 /// future from the provided future. Don't use this if you've
286 /// already boxed the future
287 pub fn new_box<F>(future: F) -> Fr<M>
288 where
289 F: Future<Output = M::Response> + Send + 'static,
290 {
291 Fr {
292 future: Box::pin(future),
293 }
294 }
295
296 /// Creates a new future response for a ready future containing a value
297 /// that is already ready.
298 pub fn ready(value: M::Response) -> Fr<M> {
299 Fr {
300 future: Box::pin(ready(value)),
301 }
302 }
303}
304
305impl<S, M> ResponseHandler<S, M> for Fr<M>
306where
307 S: Service,
308 M: Message,
309{
310 fn respond(
311 self,
312 _service: &mut S,
313 _ctx: &mut ServiceContext<S>,
314 tx: Option<oneshot::Sender<M::Response>>,
315 ) {
316 tokio::spawn(async move {
317 let res = self.future.await;
318 if let Some(tx) = tx {
319 let _ = tx.send(res);
320 }
321 });
322 }
323}
324
325/// # Service Future Response
326///
327/// Response type from a handler where a future must be
328/// awaited on the processing loop of the service. While
329/// the result of this future is being processed no other
330/// messages will be handled.
331///
332/// This provides a mutable borrow of the service and the service
333/// context to the future that is being awaited.
334///
335/// ```
336/// use interlink::prelude::*;
337/// use std::time::Duration;
338/// use tokio::time::sleep;
339///
340/// #[derive(Service)]
341/// struct Test { value: String };
342///
343/// #[derive(Message)]
344/// #[msg(rtype = "String")]
345/// struct TestMessage {
346/// value: String,
347/// }
348///
349/// impl Handler<TestMessage> for Test {
350/// type Response = Sfr<Self, TestMessage>;
351///
352/// fn handle(&mut self, msg: TestMessage, ctx: &mut ServiceContext<Self>) -> Self::Response {
353/// // Additional logic can be run here before the future
354/// // response is created
355///
356/// Sfr::new(move |service: &mut Test, ctx| {
357/// Box::pin(async move {
358/// // Some future that must be polled on the service loop
359/// sleep(Duration::from_millis(1000)).await;
360///
361/// // Make use of the mutable access to service
362/// service.value = msg.value.clone();
363///
364/// // You can return the response type of the message here
365/// "Response".to_string()
366/// })
367/// })
368/// }
369/// }
370///
371/// #[tokio::test]
372/// async fn test() {
373/// let service = Test { value: "Default".to_string() };
374/// let link = service.start();
375///
376/// let res: String = link
377/// .send(TestMessage {
378/// value: "Example".to_string()
379/// })
380/// .await
381/// .unwrap();
382///
383/// assert_eq!(&res, "Response")
384///
385/// }
386/// ```
387pub struct Sfr<S, M: Message> {
388 /// The producer that will produce the function that must be awaited
389 producer: Box<dyn FutureProducer<S, Response = M::Response>>,
390}
391
392impl<S, M> Sfr<S, M>
393where
394 S: Service,
395 M: Message,
396{
397 /// Creates a new service future response. Takes a fn which
398 /// accepts mutable access to the service and its context
399 /// and returns a boxed future with the same lifetime as the
400 /// borrow
401 ///
402 /// `producer` The producer fn
403 pub fn new<P>(producer: P) -> Sfr<S, M>
404 where
405 for<'a> P: FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, M::Response>
406 + Send
407 + 'static,
408 {
409 Sfr {
410 producer: Box::new(producer),
411 }
412 }
413}
414
415/// The response handler for service future responses passes on
416/// the producer in an envelope to be handled by the context
417impl<S, M> ResponseHandler<S, M> for Sfr<S, M>
418where
419 S: Service,
420 M: Message,
421{
422 fn respond(
423 self,
424 _service: &mut S,
425 ctx: &mut ServiceContext<S>,
426 tx: Option<oneshot::Sender<M::Response>>,
427 ) {
428 let _ = ctx
429 .shared_link()
430 .tx(BoxedFutureEnvelope::new(self.producer, tx));
431 }
432}
433
434/// Handler implementation for handling what happens
435/// with a response value
436pub trait ResponseHandler<S: Service, M: Message>: Send + 'static {
437 fn respond(
438 self,
439 service: &mut S,
440 ctx: &mut ServiceContext<S>,
441 tx: Option<oneshot::Sender<M::Response>>,
442 );
443}
444
445/// Handler implementation for allowing a service to handle a specific
446/// message type
447pub trait Handler<M: Message>: Service {
448 /// The response type this handler will use
449 type Response: ResponseHandler<Self, M>;
450
451 /// Handler for processing the message using the current service
452 /// context and message. Will respond with the specified response type
453 ///
454 /// `self` The service handling the message
455 /// `msg` The message that is being handled
456 /// `ctx` Mutable borrow of the service context
457 fn handle(&mut self, msg: M, ctx: &mut ServiceContext<Self>) -> Self::Response;
458}
459
460/// Handler for accepting streams of messages for a service
461/// from streams attached to the service see `attach_stream`
462/// on ServiceContext
463pub trait StreamHandler<M: Send>: Service {
464 /// Handler for handling messages received from a stream
465 ///
466 /// `self` The service handling the message
467 /// `msg` The message received
468 /// `ctx` Mutable borrow of the service context
469 fn handle(&mut self, msg: M, ctx: &mut ServiceContext<Self>);
470}
471
472/// Handler for accepting streams of messages for a service
473/// from streams attached to the service
474pub trait ErrorHandler<M: Send>: Service {
475
476 /// Handler for handling errors that occur in associated services
477 /// in cases such as errors while writing messages to a connected sink.
478 /// Responds with an [`ErrorAction`] which determines how the service
479 /// should react to the error
480 ///
481 /// `self` The service handling the error
482 /// `err` The error that was encountered
483 /// `ctx` Mutable borrow of the service context
484 fn handle(&mut self, err: M, ctx: &mut ServiceContext<Self>) -> ErrorAction;
485}
486
487/// Actions that can be taken after handling an error.
488pub enum ErrorAction {
489 /// Continue processing
490 Continue,
491 /// Stop processing
492 Stop,
493}