ump_ng_server/task.rs
1//! ump-ng dispatch server running on a thread.
2//!
3//! # Example
4//! ```
5//! # tokio_test::block_on(async {
6//! use std::ops::ControlFlow;
7//! use ump_ng_server::{
8//! async_trait,
9//! task::{Handler, spawn},
10//! ump_ng::{ReplyContext, WeakClient}
11//! };
12//!
13//! enum Post {
14//! ShoutIntoVoid
15//! }
16//! enum Request {
17//! Add(usize, usize)
18//! }
19//! enum Reply {
20//! Sum(usize)
21//! }
22//! #[derive(Debug)]
23//! enum MyError { }
24//!
25//! struct MyHandler {
26//! wclnt: WeakClient<Post, Request, Reply, MyError>
27//! }
28//! #[async_trait]
29//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
30//! async fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
31//! match msg {
32//! Post::ShoutIntoVoid => {
33//! // No reply .. but keep on trudging on
34//! ControlFlow::Continue(())
35//! }
36//! }
37//! }
38//! async fn req(
39//! &mut self,
40//! msg: Request,
41//! rctx: ReplyContext<Reply, MyError>
42//! ) -> ControlFlow<(), ()> {
43//! match msg {
44//! Request::Add(a, b) => {
45//! rctx.reply(Reply::Sum(a+b)).unwrap();
46//! ControlFlow::Continue(())
47//! }
48//! }
49//! }
50//! }
51//!
52//! let (clnt, jh) = spawn(|clnt| {
53//! // Store a weak client in the handler so it doesn't keep the dispatch
54//! // loop alive when the Client returned to the application is dropped.
55//! Ok(MyHandler {
56//! wclnt: clnt.weak()
57//! })
58//! }).unwrap();
59//!
60//! clnt.post(Post::ShoutIntoVoid).unwrap();
61//!
62//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
63//! panic!("Unexpected reply");
64//! };
65//! assert_eq!(sum, 10);
66//!
67//! // drop client to force dispatch loop to terminate
68//! drop(clnt);
69//!
70//! jh.await;
71//! # });
72//! ```
73
74use std::ops::ControlFlow;
75
76use tokio::task::{self, JoinHandle};
77
78use async_trait::async_trait;
79
80use super::{channel, Client, MsgType, ReplyContext};
81
82/// Message processing trait for an async handler.
83#[async_trait]
84pub trait Handler<P, S, R, E, RV> {
85 /// Optional initialization callback.
86 ///
87 /// This is called on the dispatcher task before the main message
88 /// processing loop is entered.
89 #[allow(unused_variables)]
90 fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
91
92 /// Put message processing callback.
93 ///
94 /// The callback must return `ControlFlow::Continue(())` to keep the
95 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
96 /// dispatcher loop to abort and returns the value in `RV` from the task.
97 async fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;
98
99 /// Request message processing callback.
100 ///
101 /// The callback must return `ControlFlow::Continue(())` to keep the
102 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
103 /// dispatcher loop to abort and returns the value in `RV` from the task.
104 async fn req(
105 &mut self,
106 msg: S,
107 rctx: ReplyContext<R, E>
108 ) -> ControlFlow<RV, ()>;
109
110 /// Optional termination callback.
111 ///
112 /// This is called on the dispatcher task just after the main message
113 /// processing loop has been terminbated.
114 fn term(&mut self, rv: Option<RV>) -> Option<RV> {
115 rv
116 }
117}
118
119/// Launch a task that will process incoming messages from an ump-ng server
120/// end-point.
121///
122/// See top module's documentation for an overview of the [dispatch
123/// loop](crate#dispatch-loop).
124///
125/// # Errors
126/// An application-defined error `E` is returned if the dispatch loop is
127/// terminated by a handler returning `ControlFlow::Break(E)`.
128#[allow(clippy::type_complexity)]
129pub fn spawn<P, S, R, E, RV, F>(
130 hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
131) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
132where
133 P: 'static + Send,
134 S: 'static + Send,
135 R: 'static + Send,
136 E: 'static + Send,
137 RV: 'static + Send,
138 F: Handler<P, S, R, E, RV> + Send + 'static
139{
140 let (server, client) = channel();
141
142 let mut handler = hbldr(&client)?;
143
144 #[cfg(feature = "watchdog")]
145 let wdog = crate::wdog::run();
146
147 let weak_client = client.weak();
148 let jh = task::spawn(async move {
149 handler.init(weak_client);
150 let ret = loop {
151 match server.async_wait().await {
152 Ok(msg) => {
153 #[cfg(feature = "watchdog")]
154 wdog.begin_process();
155
156 let res = match msg {
157 MsgType::Post(m) => handler.post(m).await,
158 MsgType::Request(m, rctx) => handler.req(m, rctx).await
159 };
160
161 #[cfg(feature = "watchdog")]
162 wdog.end_process();
163
164 match res {
165 ControlFlow::Continue(()) => {}
166 ControlFlow::Break(rv) => break Some(rv)
167 }
168 }
169 Err(_) => break None
170 }
171 };
172
173 #[cfg(feature = "watchdog")]
174 let _ = wdog.kill();
175
176 handler.term(ret)
177 });
178
179 Ok((client, jh))
180}
181
182// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :