interlink/link.rs
1//! # Links
2//!
3//! Links are used to communicate with services. You can obtain a link by starting a service
4//! or using the [`ServiceContext::link`] function on the service context.
5//!
6//! You can also convert a link into a [`MessageLink`] which is a link based on a type of
7//! message rather than a type of service. This can be done using the [`Link::message_link`]
8//! function.
9
10use std::{error::Error, fmt::Display};
11
12use crate::{
13 envelope::{Envelope, ExecutorEnvelope, FutureEnvelope, ServiceMessage, StopEnvelope},
14 msg::{BoxFuture, Handler, Message},
15 service::{Service, ServiceContext},
16};
17use tokio::sync::{mpsc, oneshot};
18
19/// Links are used to send and receive messages from services
20/// you will receive a link when you start a service or through
21/// the `link()` fn on a service context
22///
23/// Links are cheaply clonable and can be passed between threads
24pub struct Link<S>(pub(crate) mpsc::UnboundedSender<ServiceMessage<S>>);
25
26/// Alternative type to a link rather than representing a
27/// service type this represents a link to a service that
28/// accepts a specific message type
29///
30/// These are cheaply clonable
31pub struct MessageLink<M: Message>(Box<dyn MessageLinkTx<M>>);
32
33/// Sender trait implemented by types that can be used to
34/// send messages of a speicifc type implements a cloning
35/// impl aswell
36trait MessageLinkTx<M: Message> {
37 /// Sends a message using the underlying channel as an
38 /// envelope with the provided `tx` value for handling
39 /// responses
40 fn tx(&self, msg: M, tx: Option<oneshot::Sender<M::Response>>) -> LinkResult<()>;
41
42 /// Boxed cloning implementation which produces a cloned
43 /// value without exposing a sized type
44 fn boxed_clone(&self) -> Box<dyn MessageLinkTx<M>>;
45}
46
47impl<S, M> MessageLinkTx<M> for mpsc::UnboundedSender<ServiceMessage<S>>
48where
49 S: Service + Handler<M>,
50 M: Message,
51{
52 fn tx(&self, msg: M, tx: Option<oneshot::Sender<M::Response>>) -> LinkResult<()> {
53 self.send(Envelope::new(msg, tx))
54 .map_err(|_| LinkError::Send)
55 }
56
57 fn boxed_clone(&self) -> Box<dyn MessageLinkTx<M>> {
58 Box::new(self.clone())
59 }
60}
61
62impl<M> MessageLink<M>
63where
64 M: Message,
65{
66 pub async fn send(&self, msg: M) -> LinkResult<M::Response> {
67 let (tx, rx) = oneshot::channel();
68 self.0.tx(msg, Some(tx))?;
69 rx.await.map_err(|_| LinkError::Recv)
70 }
71
72 pub fn do_send(&self, msg: M) -> LinkResult<()> {
73 self.0.tx(msg, None)
74 }
75}
76
77/// Clone implementation to clone inner sender for the link
78impl<S> Clone for Link<S> {
79 fn clone(&self) -> Self {
80 Self(self.0.clone())
81 }
82}
83/// Clone implementation to clone inner sender for the recipient
84impl<M: Message> Clone for MessageLink<M> {
85 fn clone(&self) -> Self {
86 Self(self.0.boxed_clone())
87 }
88}
89
90/// Errors that can occur while working with a link
91#[derive(Debug)]
92pub enum LinkError {
93 /// Failed to send message to service
94 Send,
95 /// Failed to receive response back from service
96 Recv,
97}
98
99impl Display for LinkError {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 LinkError::Send => f.write_str("Failed to send to link"),
103 LinkError::Recv => f.write_str("Failed to receive from link"),
104 }
105 }
106}
107
108impl Error for LinkError {}
109
110/// Result type for results where the error is a [`LinkError`]
111pub type LinkResult<T> = Result<T, LinkError>;
112
113impl<S> Link<S>
114where
115 S: Service,
116{
117 /// Checks whether the underlying sender is closed
118 pub fn is_closed(&self) -> bool {
119 self.0.is_closed()
120 }
121
122 /// Creates a message link type from this link type this allows you
123 /// to have links to multiple different services that accept a
124 /// specific message type
125 pub fn message_link<M>(&self) -> MessageLink<M>
126 where
127 M: Message,
128 S: Handler<M>,
129 {
130 MessageLink(self.0.boxed_clone())
131 }
132
133 /// Internal wrapper for sending service messages and handling
134 /// the error responses
135 pub(crate) fn tx(&self, value: ServiceMessage<S>) -> LinkResult<()> {
136 match self.0.send(value) {
137 Ok(_) => Ok(()),
138 Err(_) => Err(LinkError::Send),
139 }
140 }
141
142 /// Tells the service to complete and wait on the action which
143 /// produce a future depending on the service and context. While
144 /// the action is being awaited messages will not be accepted. The
145 /// result of the action will be returned.
146 ///
147 /// Mutable access to the service and the service context are
148 /// provided to the closure
149 ///
150 ///
151 /// ```
152 /// use interlink::prelude::*;
153 /// use std::time::Duration;
154 /// use tokio::time::sleep;
155 ///
156 /// #[derive(Service)]
157 /// struct MyService;
158 ///
159 /// #[tokio::test]
160 /// async fn test() {
161 /// let link: Link<MyService> = MyService {}.start();
162 /// let value = link.wait(|service, ctx| Box::pin(async move {
163 /// println!("Service waiting on processing loop");
164 /// sleep(Duration::from_millis(1000)).await;
165 /// println!("Action executed on service");
166 /// 12
167 /// }))
168 /// .await
169 /// .unwrap();
170 ///
171 /// assert_eq!(value, 12);
172 /// }
173 ///
174 /// ```
175 pub async fn wait<F, R>(&self, action: F) -> LinkResult<R>
176 where
177 for<'a> F:
178 FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, R> + Send + 'static,
179 R: Send + 'static,
180 {
181 let (tx, rx) = oneshot::channel();
182 self.tx(FutureEnvelope::new(Box::new(action), Some(tx)))?;
183 rx.await.map_err(|_| LinkError::Recv)
184 }
185
186 /// Tells the service to complete and wait on the action which
187 /// produce a future depending on the service and context. While
188 /// the action is being awaited messages will not be accepted.
189 ///
190 /// Mutable access to the service and the service context are
191 /// provided to the closure
192 ///
193 /// ```
194 /// use interlink::prelude::*;
195 /// use std::time::Duration;
196 /// use tokio::time::sleep;
197 ///
198 /// #[derive(Service)]
199 /// struct MyService;
200 ///
201 /// #[tokio::test]
202 /// async fn test() {
203 /// let link: Link<MyService> = MyService {}.start();
204 /// link.do_wait(|service, ctx| Box::pin(async move {
205 /// println!("Service waiting on processing loop");
206 /// sleep(Duration::from_millis(1000)).await;
207 /// println!("Action executed on service");
208 /// }))
209 /// .unwrap();
210 /// }
211 ///
212 /// ```
213 pub fn do_wait<F, R>(&self, action: F) -> LinkResult<()>
214 where
215 for<'a> F:
216 FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, R> + Send + 'static,
217 R: Send + 'static,
218 {
219 self.tx(FutureEnvelope::new(Box::new(action), None))
220 }
221
222 /// Sends a message to the service. The service must implement a
223 /// Handler for the message. Will return the response value from
224 /// the handler once awaited
225 ///
226 /// ```
227 ///
228 /// use interlink::prelude::*;
229 ///
230 /// #[derive(Service)]
231 /// struct Test;
232 ///
233 /// #[derive(Message)]
234 /// #[msg(rtype = "String")]
235 /// struct MyMessage {
236 /// value: String,
237 /// }
238 ///
239 /// impl Handler<MyMessage> for Test {
240 /// type Response = Mr<MyMessage>;
241 ///
242 /// fn handle(&mut self, msg: MyMessage, ctx: &mut ServiceContext<Self>) -> Self::Response {
243 /// Mr(msg.value)
244 /// }
245 /// }
246 ///
247 /// #[tokio::test]
248 /// async fn test() {
249 /// let link = Test {}.start();
250 /// let resp = link.send(MyMessage {
251 /// value: "Test123".to_string()
252 /// })
253 /// .await
254 /// .unwrap();
255 ///
256 /// assert_eq!(&resp, "Test123")
257 /// }
258 /// ```
259 pub async fn send<M>(&self, msg: M) -> LinkResult<M::Response>
260 where
261 M: Message,
262 S: Handler<M>,
263 {
264 let (tx, rx) = oneshot::channel();
265 self.tx(Envelope::new(msg, Some(tx)))?;
266 rx.await.map_err(|_| LinkError::Recv)
267 }
268
269 /// Sends a message to the service. The service must implement a
270 /// Handler for the message. Will not wait for a response from
271 /// the service
272 ///
273 /// ```
274 ///
275 /// use interlink::prelude::*;
276 ///
277 /// #[derive(Service)]
278 /// struct Test;
279 ///
280 /// #[derive(Message)]
281 /// struct MyMessage {
282 /// value: String,
283 /// }
284 ///
285 /// impl Handler<MyMessage> for Test {
286 /// type Response = ();
287 ///
288 /// fn handle(&mut self, msg: MyMessage, ctx: &mut ServiceContext<Self>) {
289 /// assert_eq!(&msg.value, "Test123");
290 /// }
291 /// }
292 ///
293 /// #[tokio::test]
294 /// async fn test() {
295 /// let link = Test {}.start();
296 /// link.do_send(MyMessage {
297 /// value: "Test123".to_string()
298 /// })
299 /// .unwrap();
300 /// }
301 /// ```
302 pub fn do_send<M>(&self, msg: M) -> LinkResult<()>
303 where
304 M: Message,
305 S: Handler<M>,
306 {
307 self.tx(Envelope::new(msg, None))
308 }
309
310 /// Executes the provided action on the service and service context
311 /// awaiting the promise from this function will result in the return
312 /// value of the closure. The provided closure is given mutable access
313 /// to the service and context
314 ///
315 /// ```
316 /// use interlink::prelude::*;
317 ///
318 /// #[derive(Service)]
319 /// struct Test {
320 /// value: String
321 /// }
322 ///
323 /// #[tokio::test]
324 /// async fn test() {
325 /// let link = Test { value: "Test".to_string() }.start();
326 ///
327 /// let value = link.exec(|service: &mut Test, _ctx| {
328 /// service.value.push('A');
329 ///
330 /// service.value.clone()
331 /// })
332 /// .await
333 /// .expect("Failed to execute action on service");
334 ///
335 /// assert_eq!(value, "TestA");
336 /// }
337 ///
338 /// ```
339 pub async fn exec<F, R>(&self, action: F) -> LinkResult<R>
340 where
341 F: FnOnce(&mut S, &mut ServiceContext<S>) -> R + Send + 'static,
342 R: Send + 'static,
343 {
344 let (tx, rx) = oneshot::channel();
345
346 self.tx(ExecutorEnvelope::new(action, Some(tx)))?;
347
348 rx.await.map_err(|_| LinkError::Recv)
349 }
350
351 /// Executes the provided action on the service and service context
352 /// ignoring the result of the action. The provided closure is given
353 /// mutable access to the service and context
354 ///
355 /// ```
356 /// use interlink::prelude::*;
357 ///
358 /// #[derive(Service)]
359 /// struct Test {
360 /// value: String
361 /// }
362 ///
363 /// #[tokio::test]
364 /// async fn test() {
365 /// let link = Test { value: "Test".to_string() }.start();
366 ///
367 /// link.do_exec(|service: &mut Test, _ctx| {
368 /// println!("Value: {}", service.value);
369 ///
370 /// service.value.push('A');
371 ///
372 /// println!("Value: {}", service.value);
373 /// })
374 /// .expect("Failed to execute action on service");
375 /// }
376 ///
377 /// ```
378 pub fn do_exec<F, R>(&self, action: F) -> LinkResult<()>
379 where
380 F: FnOnce(&mut S, &mut ServiceContext<S>) -> R + Send + 'static,
381 R: Send + 'static,
382 {
383 self.tx(ExecutorEnvelope::new(action, None))
384 }
385
386 /// Tells the associated service to stop processing messages. After
387 /// this message is recieved no more messages will be processed.
388 pub fn stop(&self) {
389 // Send the stop message to the service
390 let _ = self.tx(Box::new(StopEnvelope));
391 }
392}