ump-ng-server 0.4.1

Server message dispatch loop for ump-ng.
Documentation
//! ump-ng dispatch server running on a task.
//!
//! # Example
//! ```
//! # tokio_test::block_on(async {
//! use std::ops::ControlFlow;
//! use ump_ng_server::{
//!   async_trait,
//!   task::{Handler, spawn},
//!   ump_ng::{ReplyContext, WeakClient}
//! };
//!
//! enum Post {
//!   ShoutIntoVoid
//! }
//! enum Request {
//!   Add(usize, usize)
//! }
//! enum Reply {
//!   Sum(usize)
//! }
//! #[derive(Debug)]
//! enum MyError { }
//!
//! struct MyHandler {
//!   wclnt: WeakClient<Post, Request, Reply, MyError>
//! }
//! #[async_trait]
//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
//!   async fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
//!     match msg {
//!       Post::ShoutIntoVoid => {
//!         // No reply .. but keep on trudging on
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//!   async fn req(
//!     &mut self,
//!     msg: Request,
//!     rctx: ReplyContext<Reply, MyError>
//!   ) -> ControlFlow<(), ()> {
//!     match msg {
//!       Request::Add(a, b) => {
//!         rctx.reply(Reply::Sum(a+b)).unwrap();
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   Ok(MyHandler {
//!     wclnt: clnt.weak()
//!   })
//! }).unwrap();
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // drop client to force dispatch loop to terminate
//! drop(clnt);
//!
//! jh.await;
//! # });
//! ```

use std::ops::ControlFlow;

use tokio::task::{self, JoinHandle};

use async_trait::async_trait;

use super::{Client, MsgType, ReplyContext, channel};

/// Message processing trait for an async handler.
///
/// See top module documentation for more information about [application
/// message handlers](crate#application-message-handlers).
#[async_trait]
pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher task before the main message
  /// processing loop is entered.
  #[allow(unused_variables)]
  fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}

  /// Put message processing callback.
  ///
  /// The callback must return `ControlFlow::Continue(())` to keep the
  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
  /// dispatcher loop to abort and returns the value in `RV` from the task.
  async fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;

  /// Request message processing callback.
  ///
  /// The callback must return `ControlFlow::Continue(())` to keep the
  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
  /// dispatcher loop to abort and returns the value in `RV` from the task.
  async fn req(
    &mut self,
    msg: S,
    rctx: ReplyContext<R, E>
  ) -> ControlFlow<RV, ()>;

  /// Optional termination callback.
  ///
  /// This is called on the dispatcher task just after the main message
  /// processing loop has been terminbated.
  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
    rv
  }
}

/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// `hbldr` is a closure that is expected to return the handler object that
/// will be responsible for processing received messages.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop) and what the [generic
/// parameters](crate#generics) are for.
///
/// # Errors
/// An application-defined error `E` is returned if the dispatch loop is
/// terminated by a handler returning `ControlFlow::Break(E)`.
#[allow(clippy::type_complexity)]
pub fn spawn<P, S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<P, S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client)?;

  #[cfg(feature = "watchdog")]
  let wdog = crate::wdog::run();

  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      match server.async_wait().await {
        Ok(msg) => {
          #[cfg(feature = "watchdog")]
          wdog.begin_process();

          let res = match msg {
            MsgType::Post(m) => handler.post(m).await,
            MsgType::Request(m, rctx) => handler.req(m, rctx).await
          };

          #[cfg(feature = "watchdog")]
          wdog.end_process();

          match res {
            ControlFlow::Continue(()) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }
        }
        Err(_) => break None
      }
    };

    #[cfg(feature = "watchdog")]
    let _ = wdog.kill();

    handler.term(ret)
  });

  Ok((client, jh))
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :