Skip to main content

ump_server/
task.rs

1//! ump server running in an async task.
2//!
3//! # Example
4//! ```
5//! # tokio_test::block_on(async {
6//! use std::ops::ControlFlow;
7//! use ump_server::{
8//!   async_trait,
9//!   task::{Handler, spawn},
10//!   ReplyContext, WeakClient
11//! };
12//!
13//! enum Request {
14//!   Add(usize, usize)
15//! }
16//! enum Reply {
17//!   Sum(usize)
18//! }
19//! #[derive(Debug)]
20//! enum MyError { }
21//!
22//! struct MyHandler {
23//!   wclnt: WeakClient<Request, Reply, MyError>
24//! };
25//! #[async_trait]
26//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
27//!   async fn proc_req(
28//!     &mut self,
29//!     msg: Request,
30//!     rctx: ReplyContext<Reply, MyError>
31//!   ) -> ControlFlow<(), ()> {
32//!     match msg {
33//!       Request::Add(a, b) => {
34//!         rctx.reply(Reply::Sum(a + b));
35//!         ControlFlow::Continue(())
36//!       }
37//!     }
38//!   }
39//! }
40//!
41//! let (clnt, jh) = spawn(|clnt| {
42//!   // Store a weak client in the handler so it doesn't keep the dispatch
43//!   // loop alive when the Client returned to the application is dropped.
44//!   Ok(MyHandler {
45//!     wclnt: clnt.weak()
46//!   })
47//! }).unwrap();
48//!
49//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
50//!   panic!("Unexpected reply");
51//! };
52//! assert_eq!(sum, 10);
53//!
54//! // Dropping the only client will terminate the dispatch loop
55//! drop(clnt);
56//!
57//! let _ = jh.await;
58//! # });
59//! ```
60
61use std::ops::ControlFlow;
62
63use tokio::task::{self, JoinHandle};
64
65use async_trait::async_trait;
66
67use super::{Client, ReplyContext, channel};
68
69/// Message processing trait for an async handler.
70///
71/// See top module documentation for more information about [application
72/// message handlers](crate#application-message-handlers).
73#[async_trait]
74pub trait Handler<S, R, E, RV> {
75  /// Optional initialization callback.
76  ///
77  /// This is called on the dispatcher task before the main message dispatch
78  /// loop is entered.
79  #[allow(unused_variables)]
80  fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
81
82  /// Message processing callback.
83  ///
84  /// The callback must return `ControlFlow::Continue(())` to keep the
85  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
86  /// dispatcher loop to abort and returns the value in `RV` from the task.
87  async fn proc_req(
88    &mut self,
89    msg: S,
90    rctx: ReplyContext<R, E>
91  ) -> ControlFlow<RV, ()>;
92
93  /// Optional termination callback.
94  ///
95  /// This is called on the dispatcher task just after the main message
96  /// processing loop has been terminated.
97  ///
98  /// The `rv` argument is set to the return value returned from the dispatcher
99  /// loop.  It will be set to `Some()` value if a request handler returned
100  /// `ControlFlow::Break(RV)`.  If will be set to `None` if the dispatch loop
101  /// terminated because the queue is empty and all of the linked clients have
102  /// been dropped.
103  ///
104  /// The value returned from this callback is returned from the dispatcher
105  /// task when it is joined.
106  ///
107  /// The default implementation simply returns the `rv` parameter.
108  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
109    rv
110  }
111}
112
113/// Run a task which will process incoming messages from an ump server
114/// end-point.
115///
116/// `hbldr` is a closure that should spawn a [`Handler`].
117///
118/// See top module's documentation for an overview of the [dispatch
119/// loop](crate#dispatch-loop) and what the [generic
120/// parameters](crate#generics) are for.
121///
122/// # Errors
123/// An application-defined error `E` is returned if the dispatch loop is
124/// terminated by a handler returning `ControlFlow::Break(E)`.
125#[allow(clippy::type_complexity)]
126pub fn spawn<S, R, E, RV, F>(
127  hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
128) -> Result<(Client<S, R, E>, JoinHandle<Option<RV>>), E>
129where
130  S: 'static + Send,
131  R: 'static + Send,
132  E: 'static + Send,
133  RV: 'static + Send,
134  F: Handler<S, R, E, RV> + Send + 'static
135{
136  let (server, client) = channel();
137
138  let mut handler = hbldr(&client)?;
139
140  #[cfg(feature = "watchdog")]
141  let wdog = crate::wdog::run();
142
143  let weak_client = client.weak();
144  let jh = task::spawn(async move {
145    handler.init(weak_client);
146    let ret = loop {
147      let Ok((msg, rctx)) = server.async_wait().await else {
148        break None;
149      };
150
151      #[cfg(feature = "watchdog")]
152      wdog.begin_process();
153
154      let res = handler.proc_req(msg, rctx).await;
155
156      #[cfg(feature = "watchdog")]
157      wdog.end_process();
158
159      match res {
160        ControlFlow::Continue(()) => {}
161        ControlFlow::Break(rv) => break Some(rv)
162      }
163    };
164
165    #[cfg(feature = "watchdog")]
166    let _ = wdog.kill();
167
168    handler.term(ret)
169  });
170
171  Ok((client, jh))
172}
173
174// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :