ump_server/task.rs
1//! ump server running in an async task.
2//!
3//! # Example
4//! ```
5//! # tokio_test::block_on(async {
6//! use std::ops::ControlFlow;
7//! use ump_server::{
8//! async_trait,
9//! task::{Handler, spawn},
10//! ReplyContext, WeakClient
11//! };
12//!
13//! enum Request {
14//! Add(usize, usize)
15//! }
16//! enum Reply {
17//! Sum(usize)
18//! }
19//! #[derive(Debug)]
20//! enum MyError { }
21//!
22//! struct MyHandler {
23//! wclnt: WeakClient<Request, Reply, MyError>
24//! };
25//! #[async_trait]
26//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
27//! async fn proc_req(
28//! &mut self,
29//! msg: Request,
30//! rctx: ReplyContext<Reply, MyError>
31//! ) -> ControlFlow<(), ()> {
32//! match msg {
33//! Request::Add(a, b) => {
34//! rctx.reply(Reply::Sum(a + b));
35//! ControlFlow::Continue(())
36//! }
37//! }
38//! }
39//! }
40//!
41//! let (clnt, jh) = spawn(|clnt| {
42//! // Store a weak client in the handler so it doesn't keep the dispatch
43//! // loop alive when the Client returned to the application is dropped.
44//! Ok(MyHandler {
45//! wclnt: clnt.weak()
46//! })
47//! }).unwrap();
48//!
49//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
50//! panic!("Unexpected reply");
51//! };
52//! assert_eq!(sum, 10);
53//!
54//! // Dropping the only client will terminate the dispatch loop
55//! drop(clnt);
56//!
57//! let _ = jh.await;
58//! # });
59//! ```
60
61use std::ops::ControlFlow;
62
63use tokio::task::{self, JoinHandle};
64
65use async_trait::async_trait;
66
67use super::{Client, ReplyContext, channel};
68
69/// Message processing trait for an async handler.
70///
71/// See top module documentation for more information about [application
72/// message handlers](crate#application-message-handlers).
73#[async_trait]
74pub trait Handler<S, R, E, RV> {
75 /// Optional initialization callback.
76 ///
77 /// This is called on the dispatcher task before the main message dispatch
78 /// loop is entered.
79 #[allow(unused_variables)]
80 fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
81
82 /// Message processing callback.
83 ///
84 /// The callback must return `ControlFlow::Continue(())` to keep the
85 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
86 /// dispatcher loop to abort and returns the value in `RV` from the task.
87 async fn proc_req(
88 &mut self,
89 msg: S,
90 rctx: ReplyContext<R, E>
91 ) -> ControlFlow<RV, ()>;
92
93 /// Optional termination callback.
94 ///
95 /// This is called on the dispatcher task just after the main message
96 /// processing loop has been terminated.
97 ///
98 /// The `rv` argument is set to the return value returned from the dispatcher
99 /// loop. It will be set to `Some()` value if a request handler returned
100 /// `ControlFlow::Break(RV)`. If will be set to `None` if the dispatch loop
101 /// terminated because the queue is empty and all of the linked clients have
102 /// been dropped.
103 ///
104 /// The value returned from this callback is returned from the dispatcher
105 /// task when it is joined.
106 ///
107 /// The default implementation simply returns the `rv` parameter.
108 fn term(&mut self, rv: Option<RV>) -> Option<RV> {
109 rv
110 }
111}
112
113/// Run a task which will process incoming messages from an ump server
114/// end-point.
115///
116/// `hbldr` is a closure that should spawn a [`Handler`].
117///
118/// See top module's documentation for an overview of the [dispatch
119/// loop](crate#dispatch-loop) and what the [generic
120/// parameters](crate#generics) are for.
121///
122/// # Errors
123/// An application-defined error `E` is returned if the dispatch loop is
124/// terminated by a handler returning `ControlFlow::Break(E)`.
125#[allow(clippy::type_complexity)]
126pub fn spawn<S, R, E, RV, F>(
127 hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
128) -> Result<(Client<S, R, E>, JoinHandle<Option<RV>>), E>
129where
130 S: 'static + Send,
131 R: 'static + Send,
132 E: 'static + Send,
133 RV: 'static + Send,
134 F: Handler<S, R, E, RV> + Send + 'static
135{
136 let (server, client) = channel();
137
138 let mut handler = hbldr(&client)?;
139
140 #[cfg(feature = "watchdog")]
141 let wdog = crate::wdog::run();
142
143 let weak_client = client.weak();
144 let jh = task::spawn(async move {
145 handler.init(weak_client);
146 let ret = loop {
147 let Ok((msg, rctx)) = server.async_wait().await else {
148 break None;
149 };
150
151 #[cfg(feature = "watchdog")]
152 wdog.begin_process();
153
154 let res = handler.proc_req(msg, rctx).await;
155
156 #[cfg(feature = "watchdog")]
157 wdog.end_process();
158
159 match res {
160 ControlFlow::Continue(()) => {}
161 ControlFlow::Break(rv) => break Some(rv)
162 }
163 };
164
165 #[cfg(feature = "watchdog")]
166 let _ = wdog.kill();
167
168 handler.term(ret)
169 });
170
171 Ok((client, jh))
172}
173
174// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :