ump_server/
task.rs

1//! ump server running in an async task.
2//!
3//! # Example
4//! ```
5//! # tokio_test::block_on(async {
6//! use std::ops::ControlFlow;
7//! use ump_server::{
8//!   async_trait,
9//!   task::{Handler, spawn},
10//!   ReplyContext, WeakClient
11//! };
12//!
13//! enum Request {
14//!   Add(usize, usize)
15//! }
16//! enum Reply {
17//!   Sum(usize)
18//! }
19//! #[derive(Debug)]
20//! enum MyError { }
21//!
22//! struct MyHandler {
23//!   wclnt: WeakClient<Request, Reply, MyError>
24//! };
25//! #[async_trait]
26//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
27//!   async fn proc_req(
28//!     &mut self,
29//!     msg: Request,
30//!     rctx: ReplyContext<Reply, MyError>
31//!   ) -> ControlFlow<(), ()> {
32//!     match msg {
33//!       Request::Add(a, b) => {
34//!         rctx.reply(Reply::Sum(a + b));
35//!         ControlFlow::Continue(())
36//!       }
37//!     }
38//!   }
39//! }
40//!
41//! let (clnt, jh) = spawn(|clnt| {
42//!   // Store a weak client in the handler so it doesn't keep the dispatch
43//!   // loop alive when the Client returned to the application is dropped.
44//!   Ok(MyHandler {
45//!     wclnt: clnt.weak()
46//!   })
47//! }).unwrap();
48//!
49//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
50//!   panic!("Unexpected reply");
51//! };
52//! assert_eq!(sum, 10);
53//!
54//! // Dropping the only client will terminate the dispatch loop
55//! drop(clnt);
56//!
57//! let _ = jh.await;
58//! # });
59//! ```
60
61use std::ops::ControlFlow;
62
63use tokio::task::{self, JoinHandle};
64
65use async_trait::async_trait;
66
67use super::{channel, Client, ReplyContext};
68
69/// Message processing trait for an async handler.
70#[async_trait]
71pub trait Handler<S, R, E, RV> {
72  /// Optional initialization callback.
73  ///
74  /// This is called on the dispatcher task before the main message dispatch
75  /// loop is entered.
76  #[allow(unused_variables)]
77  fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
78
79  /// Message processing callback.
80  ///
81  /// The callback must return `ControlFlow::Continue(())` to keep the
82  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
83  /// dispatcher loop to abort and returns the value in `RV` from the task.
84  async fn proc_req(
85    &mut self,
86    msg: S,
87    rctx: ReplyContext<R, E>
88  ) -> ControlFlow<RV, ()>;
89
90  /// Optional termination callback.
91  ///
92  /// This is called on the dispatcher task just after the main message
93  /// processing loop has been terminated.
94  ///
95  /// The `rv` argument is set to the return value returned from the dispatcher
96  /// loop.  It will be set to `Some()` value if a request handler returned
97  /// `ControlFlow::Break(RV)`.  If will be set to `None` if the dispatch loop
98  /// terminated because the queue is empty and all of the linked clients have
99  /// been dropped.
100  ///
101  /// The value returned from this callback is returned from the dispatcher
102  /// task when it is joined.
103  ///
104  /// The default implementation simply returns the `rv` parameter.
105  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
106    rv
107  }
108}
109
110/// Run a task which will process incoming messages from an ump server
111/// end-point.
112///
113/// See top module's documentation for an overview of the [dispatch
114/// loop](crate#dispatch-loop).
115///
116/// # Errors
117/// An application-defined error `E` is returned if the dispatch loop is
118/// terminated by a handler returning `ControlFlow::Break(E)`.
119#[allow(clippy::type_complexity)]
120pub fn spawn<S, R, E, RV, F>(
121  hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
122) -> Result<(Client<S, R, E>, JoinHandle<Option<RV>>), E>
123where
124  S: 'static + Send,
125  R: 'static + Send,
126  E: 'static + Send,
127  RV: 'static + Send,
128  F: Handler<S, R, E, RV> + Send + 'static
129{
130  let (server, client) = channel();
131
132  let mut handler = hbldr(&client)?;
133
134  #[cfg(feature = "watchdog")]
135  let wdog = crate::wdog::run();
136
137  let weak_client = client.weak();
138  let jh = task::spawn(async move {
139    handler.init(weak_client);
140    let ret = loop {
141      let (msg, rctx) = match server.async_wait().await {
142        Ok(d) => d,
143        Err(_) => break None
144      };
145
146      #[cfg(feature = "watchdog")]
147      wdog.begin_process();
148
149      let res = handler.proc_req(msg, rctx).await;
150
151      #[cfg(feature = "watchdog")]
152      wdog.end_process();
153
154      match res {
155        ControlFlow::Continue(()) => {}
156        ControlFlow::Break(rv) => break Some(rv)
157      }
158    };
159
160    #[cfg(feature = "watchdog")]
161    let _ = wdog.kill();
162
163    handler.term(ret)
164  });
165
166  Ok((client, jh))
167}
168
169// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :