ump_server/task.rs
1//! ump server running in an async task.
2//!
3//! # Example
4//! ```
5//! # tokio_test::block_on(async {
6//! use std::ops::ControlFlow;
7//! use ump_server::{
8//! async_trait,
9//! task::{Handler, spawn},
10//! ReplyContext, WeakClient
11//! };
12//!
13//! enum Request {
14//! Add(usize, usize)
15//! }
16//! enum Reply {
17//! Sum(usize)
18//! }
19//! #[derive(Debug)]
20//! enum MyError { }
21//!
22//! struct MyHandler {
23//! wclnt: WeakClient<Request, Reply, MyError>
24//! };
25//! #[async_trait]
26//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
27//! async fn proc_req(
28//! &mut self,
29//! msg: Request,
30//! rctx: ReplyContext<Reply, MyError>
31//! ) -> ControlFlow<(), ()> {
32//! match msg {
33//! Request::Add(a, b) => {
34//! rctx.reply(Reply::Sum(a + b));
35//! ControlFlow::Continue(())
36//! }
37//! }
38//! }
39//! }
40//!
41//! let (clnt, jh) = spawn(|clnt| {
42//! // Store a weak client in the handler so it doesn't keep the dispatch
43//! // loop alive when the Client returned to the application is dropped.
44//! Ok(MyHandler {
45//! wclnt: clnt.weak()
46//! })
47//! }).unwrap();
48//!
49//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
50//! panic!("Unexpected reply");
51//! };
52//! assert_eq!(sum, 10);
53//!
54//! // Dropping the only client will terminate the dispatch loop
55//! drop(clnt);
56//!
57//! let _ = jh.await;
58//! # });
59//! ```
60
61use std::ops::ControlFlow;
62
63use tokio::task::{self, JoinHandle};
64
65use async_trait::async_trait;
66
67use super::{channel, Client, ReplyContext};
68
69/// Message processing trait for an async handler.
70#[async_trait]
71pub trait Handler<S, R, E, RV> {
72 /// Optional initialization callback.
73 ///
74 /// This is called on the dispatcher task before the main message dispatch
75 /// loop is entered.
76 #[allow(unused_variables)]
77 fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
78
79 /// Message processing callback.
80 ///
81 /// The callback must return `ControlFlow::Continue(())` to keep the
82 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
83 /// dispatcher loop to abort and returns the value in `RV` from the task.
84 async fn proc_req(
85 &mut self,
86 msg: S,
87 rctx: ReplyContext<R, E>
88 ) -> ControlFlow<RV, ()>;
89
90 /// Optional termination callback.
91 ///
92 /// This is called on the dispatcher task just after the main message
93 /// processing loop has been terminated.
94 ///
95 /// The `rv` argument is set to the return value returned from the dispatcher
96 /// loop. It will be set to `Some()` value if a request handler returned
97 /// `ControlFlow::Break(RV)`. If will be set to `None` if the dispatch loop
98 /// terminated because the queue is empty and all of the linked clients have
99 /// been dropped.
100 ///
101 /// The value returned from this callback is returned from the dispatcher
102 /// task when it is joined.
103 ///
104 /// The default implementation simply returns the `rv` parameter.
105 fn term(&mut self, rv: Option<RV>) -> Option<RV> {
106 rv
107 }
108}
109
110/// Run a task which will process incoming messages from an ump server
111/// end-point.
112///
113/// See top module's documentation for an overview of the [dispatch
114/// loop](crate#dispatch-loop).
115///
116/// # Errors
117/// An application-defined error `E` is returned if the dispatch loop is
118/// terminated by a handler returning `ControlFlow::Break(E)`.
119#[allow(clippy::type_complexity)]
120pub fn spawn<S, R, E, RV, F>(
121 hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
122) -> Result<(Client<S, R, E>, JoinHandle<Option<RV>>), E>
123where
124 S: 'static + Send,
125 R: 'static + Send,
126 E: 'static + Send,
127 RV: 'static + Send,
128 F: Handler<S, R, E, RV> + Send + 'static
129{
130 let (server, client) = channel();
131
132 let mut handler = hbldr(&client)?;
133
134 #[cfg(feature = "watchdog")]
135 let wdog = crate::wdog::run();
136
137 let weak_client = client.weak();
138 let jh = task::spawn(async move {
139 handler.init(weak_client);
140 let ret = loop {
141 let (msg, rctx) = match server.async_wait().await {
142 Ok(d) => d,
143 Err(_) => break None
144 };
145
146 #[cfg(feature = "watchdog")]
147 wdog.begin_process();
148
149 let res = handler.proc_req(msg, rctx).await;
150
151 #[cfg(feature = "watchdog")]
152 wdog.end_process();
153
154 match res {
155 ControlFlow::Continue(()) => {}
156 ControlFlow::Break(rv) => break Some(rv)
157 }
158 };
159
160 #[cfg(feature = "watchdog")]
161 let _ = wdog.kill();
162
163 handler.term(ret)
164 });
165
166 Ok((client, jh))
167}
168
169// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :