Skip to main content

ump_ng_server/
task.rs

1//! ump-ng dispatch server running on a task.
2//!
3//! # Example
4//! ```
5//! # tokio_test::block_on(async {
6//! use std::ops::ControlFlow;
7//! use ump_ng_server::{
8//!   async_trait,
9//!   task::{Handler, spawn},
10//!   ump_ng::{ReplyContext, WeakClient}
11//! };
12//!
13//! enum Post {
14//!   ShoutIntoVoid
15//! }
16//! enum Request {
17//!   Add(usize, usize)
18//! }
19//! enum Reply {
20//!   Sum(usize)
21//! }
22//! #[derive(Debug)]
23//! enum MyError { }
24//!
25//! struct MyHandler {
26//!   wclnt: WeakClient<Post, Request, Reply, MyError>
27//! }
28//! #[async_trait]
29//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
30//!   async fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
31//!     match msg {
32//!       Post::ShoutIntoVoid => {
33//!         // No reply .. but keep on trudging on
34//!         ControlFlow::Continue(())
35//!       }
36//!     }
37//!   }
38//!   async fn req(
39//!     &mut self,
40//!     msg: Request,
41//!     rctx: ReplyContext<Reply, MyError>
42//!   ) -> ControlFlow<(), ()> {
43//!     match msg {
44//!       Request::Add(a, b) => {
45//!         rctx.reply(Reply::Sum(a+b)).unwrap();
46//!         ControlFlow::Continue(())
47//!       }
48//!     }
49//!   }
50//! }
51//!
52//! let (clnt, jh) = spawn(|clnt| {
53//!   // Store a weak client in the handler so it doesn't keep the dispatch
54//!   // loop alive when the Client returned to the application is dropped.
55//!   Ok(MyHandler {
56//!     wclnt: clnt.weak()
57//!   })
58//! }).unwrap();
59//!
60//! clnt.post(Post::ShoutIntoVoid).unwrap();
61//!
62//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
63//!   panic!("Unexpected reply");
64//! };
65//! assert_eq!(sum, 10);
66//!
67//! // drop client to force dispatch loop to terminate
68//! drop(clnt);
69//!
70//! jh.await;
71//! # });
72//! ```
73
74use std::ops::ControlFlow;
75
76use tokio::task::{self, JoinHandle};
77
78use async_trait::async_trait;
79
80use super::{Client, MsgType, ReplyContext, channel};
81
82/// Message processing trait for an async handler.
83///
84/// See top module documentation for more information about [application
85/// message handlers](crate#application-message-handlers).
86#[async_trait]
87pub trait Handler<P, S, R, E, RV> {
88  /// Optional initialization callback.
89  ///
90  /// This is called on the dispatcher task before the main message
91  /// processing loop is entered.
92  #[allow(unused_variables)]
93  fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
94
95  /// Put message processing callback.
96  ///
97  /// The callback must return `ControlFlow::Continue(())` to keep the
98  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
99  /// dispatcher loop to abort and returns the value in `RV` from the task.
100  async fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;
101
102  /// Request message processing callback.
103  ///
104  /// The callback must return `ControlFlow::Continue(())` to keep the
105  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
106  /// dispatcher loop to abort and returns the value in `RV` from the task.
107  async fn req(
108    &mut self,
109    msg: S,
110    rctx: ReplyContext<R, E>
111  ) -> ControlFlow<RV, ()>;
112
113  /// Optional termination callback.
114  ///
115  /// This is called on the dispatcher task just after the main message
116  /// processing loop has been terminbated.
117  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
118    rv
119  }
120}
121
122/// Launch a task that will process incoming messages from an ump-ng server
123/// end-point.
124///
125/// `hbldr` is a closure that is expected to return the handler object that
126/// will be responsible for processing received messages.
127///
128/// See top module's documentation for an overview of the [dispatch
129/// loop](crate#dispatch-loop) and what the [generic
130/// parameters](crate#generics) are for.
131///
132/// # Errors
133/// An application-defined error `E` is returned if the dispatch loop is
134/// terminated by a handler returning `ControlFlow::Break(E)`.
135#[allow(clippy::type_complexity)]
136pub fn spawn<P, S, R, E, RV, F>(
137  hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
138) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
139where
140  P: 'static + Send,
141  S: 'static + Send,
142  R: 'static + Send,
143  E: 'static + Send,
144  RV: 'static + Send,
145  F: Handler<P, S, R, E, RV> + Send + 'static
146{
147  let (server, client) = channel();
148
149  let mut handler = hbldr(&client)?;
150
151  #[cfg(feature = "watchdog")]
152  let wdog = crate::wdog::run();
153
154  let weak_client = client.weak();
155  let jh = task::spawn(async move {
156    handler.init(weak_client);
157    let ret = loop {
158      match server.async_wait().await {
159        Ok(msg) => {
160          #[cfg(feature = "watchdog")]
161          wdog.begin_process();
162
163          let res = match msg {
164            MsgType::Post(m) => handler.post(m).await,
165            MsgType::Request(m, rctx) => handler.req(m, rctx).await
166          };
167
168          #[cfg(feature = "watchdog")]
169          wdog.end_process();
170
171          match res {
172            ControlFlow::Continue(()) => {}
173            ControlFlow::Break(rv) => break Some(rv)
174          }
175        }
176        Err(_) => break None
177      }
178    };
179
180    #[cfg(feature = "watchdog")]
181    let _ = wdog.kill();
182
183    handler.term(ret)
184  });
185
186  Ok((client, jh))
187}
188
189// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :