interlink/
link.rs

1//! # Links
2//!
3//! Links are used to communicate with services. You can obtain a link by starting a service
4//! or using the [`ServiceContext::link`] function on the service context.
5//!
6//! You can also convert a link into a [`MessageLink`] which is a link based on a type of
7//! message rather than a type of service. This can be done using the [`Link::message_link`]
8//! function.
9
10use std::{error::Error, fmt::Display};
11
12use crate::{
13    envelope::{Envelope, ExecutorEnvelope, FutureEnvelope, ServiceMessage, StopEnvelope},
14    msg::{BoxFuture, Handler, Message},
15    service::{Service, ServiceContext},
16};
17use tokio::sync::{mpsc, oneshot};
18
19/// Links are used to send and receive messages from services
20/// you will receive a link when you start a service or through
21/// the `link()` fn on a service context
22///
23/// Links are cheaply clonable and can be passed between threads
24pub struct Link<S>(pub(crate) mpsc::UnboundedSender<ServiceMessage<S>>);
25
26/// Alternative type to a link rather than representing a
27/// service type this represents a link to a service that
28/// accepts a specific message type
29///
30/// These are cheaply clonable
31pub struct MessageLink<M: Message>(Box<dyn MessageLinkTx<M>>);
32
33/// Sender trait implemented by types that can be used to
34/// send messages of a speicifc type implements a cloning
35/// impl aswell
36trait MessageLinkTx<M: Message> {
37    /// Sends a message using the underlying channel as an
38    /// envelope with the provided `tx` value for handling
39    /// responses
40    fn tx(&self, msg: M, tx: Option<oneshot::Sender<M::Response>>) -> LinkResult<()>;
41
42    /// Boxed cloning implementation which produces a cloned
43    /// value without exposing a sized type
44    fn boxed_clone(&self) -> Box<dyn MessageLinkTx<M>>;
45}
46
47impl<S, M> MessageLinkTx<M> for mpsc::UnboundedSender<ServiceMessage<S>>
48where
49    S: Service + Handler<M>,
50    M: Message,
51{
52    fn tx(&self, msg: M, tx: Option<oneshot::Sender<M::Response>>) -> LinkResult<()> {
53        self.send(Envelope::new(msg, tx))
54            .map_err(|_| LinkError::Send)
55    }
56
57    fn boxed_clone(&self) -> Box<dyn MessageLinkTx<M>> {
58        Box::new(self.clone())
59    }
60}
61
62impl<M> MessageLink<M>
63where
64    M: Message,
65{
66    pub async fn send(&self, msg: M) -> LinkResult<M::Response> {
67        let (tx, rx) = oneshot::channel();
68        self.0.tx(msg, Some(tx))?;
69        rx.await.map_err(|_| LinkError::Recv)
70    }
71
72    pub fn do_send(&self, msg: M) -> LinkResult<()> {
73        self.0.tx(msg, None)
74    }
75}
76
77/// Clone implementation to clone inner sender for the link
78impl<S> Clone for Link<S> {
79    fn clone(&self) -> Self {
80        Self(self.0.clone())
81    }
82}
83/// Clone implementation to clone inner sender for the recipient
84impl<M: Message> Clone for MessageLink<M> {
85    fn clone(&self) -> Self {
86        Self(self.0.boxed_clone())
87    }
88}
89
90/// Errors that can occur while working with a link
91#[derive(Debug)]
92pub enum LinkError {
93    /// Failed to send message to service
94    Send,
95    /// Failed to receive response back from service
96    Recv,
97}
98
99impl Display for LinkError {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            LinkError::Send => f.write_str("Failed to send to link"),
103            LinkError::Recv => f.write_str("Failed to receive from link"),
104        }
105    }
106}
107
108impl Error for LinkError {}
109
110/// Result type for results where the error is a [`LinkError`]
111pub type LinkResult<T> = Result<T, LinkError>;
112
113impl<S> Link<S>
114where
115    S: Service,
116{
117    /// Checks whether the underlying sender is closed
118    pub fn is_closed(&self) -> bool {
119        self.0.is_closed()
120    }
121
122    /// Creates a message link type from this link type this allows you
123    /// to have links to multiple different services that accept a
124    /// specific message type
125    pub fn message_link<M>(&self) -> MessageLink<M>
126    where
127        M: Message,
128        S: Handler<M>,
129    {
130        MessageLink(self.0.boxed_clone())
131    }
132
133    /// Internal wrapper for sending service messages and handling
134    /// the error responses
135    pub(crate) fn tx(&self, value: ServiceMessage<S>) -> LinkResult<()> {
136        match self.0.send(value) {
137            Ok(_) => Ok(()),
138            Err(_) => Err(LinkError::Send),
139        }
140    }
141
142    /// Tells the service to complete and wait on the action which
143    /// produce a future depending on the service and context. While
144    /// the action is being awaited messages will not be accepted. The
145    /// result of the action will be returned.  
146    ///
147    /// Mutable access to the service and the service context are
148    /// provided to the closure
149    ///
150    ///
151    /// ```
152    /// use interlink::prelude::*;
153    /// use std::time::Duration;
154    /// use tokio::time::sleep;
155    ///
156    /// #[derive(Service)]
157    /// struct MyService;
158    ///
159    /// #[tokio::test]
160    /// async fn test() {
161    ///     let link: Link<MyService> = MyService {}.start();
162    ///     let value = link.wait(|service, ctx| Box::pin(async move {
163    ///         println!("Service waiting on processing loop");
164    ///         sleep(Duration::from_millis(1000)).await;
165    ///         println!("Action executed on service");
166    ///         12
167    ///     }))
168    ///     .await
169    ///     .unwrap();
170    ///
171    ///     assert_eq!(value, 12);
172    /// }
173    ///
174    /// ```
175    pub async fn wait<F, R>(&self, action: F) -> LinkResult<R>
176    where
177        for<'a> F:
178            FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, R> + Send + 'static,
179        R: Send + 'static,
180    {
181        let (tx, rx) = oneshot::channel();
182        self.tx(FutureEnvelope::new(Box::new(action), Some(tx)))?;
183        rx.await.map_err(|_| LinkError::Recv)
184    }
185
186    /// Tells the service to complete and wait on the action which
187    /// produce a future depending on the service and context. While
188    /// the action is being awaited messages will not be accepted.
189    ///
190    /// Mutable access to the service and the service context are
191    /// provided to the closure
192    ///
193    /// ```
194    /// use interlink::prelude::*;
195    /// use std::time::Duration;
196    /// use tokio::time::sleep;
197    ///
198    /// #[derive(Service)]
199    /// struct MyService;
200    ///
201    /// #[tokio::test]
202    /// async fn test() {
203    ///     let link: Link<MyService> = MyService {}.start();
204    ///     link.do_wait(|service, ctx| Box::pin(async move {
205    ///         println!("Service waiting on processing loop");
206    ///         sleep(Duration::from_millis(1000)).await;
207    ///         println!("Action executed on service");
208    ///     }))
209    ///     .unwrap();
210    /// }
211    ///
212    /// ```
213    pub fn do_wait<F, R>(&self, action: F) -> LinkResult<()>
214    where
215        for<'a> F:
216            FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, R> + Send + 'static,
217        R: Send + 'static,
218    {
219        self.tx(FutureEnvelope::new(Box::new(action), None))
220    }
221
222    /// Sends a message to the service. The service must implement a
223    /// Handler for the message. Will return the response value from
224    /// the handler once awaited
225    ///
226    /// ```
227    ///
228    /// use interlink::prelude::*;
229    ///
230    /// #[derive(Service)]
231    /// struct Test;
232    ///
233    /// #[derive(Message)]
234    /// #[msg(rtype = "String")]
235    /// struct MyMessage {
236    ///     value: String,
237    /// }
238    ///
239    /// impl Handler<MyMessage> for Test {
240    ///     type Response = Mr<MyMessage>;
241    ///
242    ///     fn handle(&mut self, msg: MyMessage, ctx: &mut ServiceContext<Self>) -> Self::Response {
243    ///         Mr(msg.value)
244    ///     }
245    /// }
246    ///
247    /// #[tokio::test]
248    /// async fn test() {
249    ///     let link = Test {}.start();
250    ///     let resp = link.send(MyMessage {
251    ///         value: "Test123".to_string()
252    ///     })
253    ///     .await
254    ///     .unwrap();
255    ///
256    ///     assert_eq!(&resp, "Test123")
257    /// }
258    /// ```
259    pub async fn send<M>(&self, msg: M) -> LinkResult<M::Response>
260    where
261        M: Message,
262        S: Handler<M>,
263    {
264        let (tx, rx) = oneshot::channel();
265        self.tx(Envelope::new(msg, Some(tx)))?;
266        rx.await.map_err(|_| LinkError::Recv)
267    }
268
269    /// Sends a message to the service. The service must implement a
270    /// Handler for the message. Will not wait for a response from
271    /// the service
272    ///
273    /// ```
274    ///
275    /// use interlink::prelude::*;
276    ///
277    /// #[derive(Service)]
278    /// struct Test;
279    ///
280    /// #[derive(Message)]
281    /// struct MyMessage {
282    ///     value: String,
283    /// }
284    ///
285    /// impl Handler<MyMessage> for Test {
286    ///     type Response = ();
287    ///
288    ///     fn handle(&mut self, msg: MyMessage, ctx: &mut ServiceContext<Self>) {
289    ///         assert_eq!(&msg.value, "Test123");
290    ///     }
291    /// }
292    ///
293    /// #[tokio::test]
294    /// async fn test() {
295    ///     let link = Test {}.start();
296    ///     link.do_send(MyMessage {
297    ///         value: "Test123".to_string()
298    ///     })
299    ///     .unwrap();
300    /// }
301    /// ```
302    pub fn do_send<M>(&self, msg: M) -> LinkResult<()>
303    where
304        M: Message,
305        S: Handler<M>,
306    {
307        self.tx(Envelope::new(msg, None))
308    }
309
310    /// Executes the provided action on the service and service context
311    /// awaiting the promise from this function will result in the return
312    /// value of the closure. The provided closure is given mutable access
313    /// to the service and context
314    ///
315    /// ```
316    /// use interlink::prelude::*;
317    ///
318    /// #[derive(Service)]
319    /// struct Test {
320    ///     value: String
321    /// }
322    ///
323    /// #[tokio::test]
324    /// async fn test() {
325    ///     let link = Test { value: "Test".to_string() }.start();
326    ///     
327    ///     let value = link.exec(|service: &mut Test, _ctx| {
328    ///         service.value.push('A');
329    ///
330    ///         service.value.clone()
331    ///     })
332    ///     .await
333    ///     .expect("Failed to execute action on service");
334    ///
335    ///     assert_eq!(value, "TestA");
336    /// }
337    ///
338    /// ```
339    pub async fn exec<F, R>(&self, action: F) -> LinkResult<R>
340    where
341        F: FnOnce(&mut S, &mut ServiceContext<S>) -> R + Send + 'static,
342        R: Send + 'static,
343    {
344        let (tx, rx) = oneshot::channel();
345
346        self.tx(ExecutorEnvelope::new(action, Some(tx)))?;
347
348        rx.await.map_err(|_| LinkError::Recv)
349    }
350
351    /// Executes the provided action on the service and service context
352    /// ignoring the result of the action. The provided closure is given
353    /// mutable access to the service and context
354    ///
355    /// ```
356    /// use interlink::prelude::*;
357    ///
358    /// #[derive(Service)]
359    /// struct Test {
360    ///     value: String
361    /// }
362    ///
363    /// #[tokio::test]
364    /// async fn test() {
365    ///     let link = Test { value: "Test".to_string() }.start();
366    ///     
367    ///     link.do_exec(|service: &mut Test, _ctx| {
368    ///         println!("Value: {}", service.value);
369    ///
370    ///         service.value.push('A');
371    ///
372    ///         println!("Value: {}", service.value);
373    ///     })
374    ///     .expect("Failed to execute action on service");
375    /// }
376    ///
377    /// ```
378    pub fn do_exec<F, R>(&self, action: F) -> LinkResult<()>
379    where
380        F: FnOnce(&mut S, &mut ServiceContext<S>) -> R + Send + 'static,
381        R: Send + 'static,
382    {
383        self.tx(ExecutorEnvelope::new(action, None))
384    }
385
386    /// Tells the associated service to stop processing messages. After
387    /// this message is recieved no more messages will be processed.
388    pub fn stop(&self) {
389        // Send the stop message to the service
390        let _ = self.tx(Box::new(StopEnvelope));
391    }
392}