1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::ops::ControlFlow;

use tokio::task::{self, JoinHandle};

use async_trait::async_trait;

use super::{channel, Client, MsgType, ReplyContext, Server};

#[async_trait]
pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher task before the main message
  /// processing loop is entered.
  #[allow(unused_variables)]
  fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}

  /// Put message processing callback.
  ///
  /// The callback must return `ControlFlow::Continue(())` to keep the
  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
  /// dispatcher loop to abort and returns the value in `RV` from the task.
  async fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;

  /// Request message processing callback.
  ///
  /// The callback must return `ControlFlow::Continue(())` to keep the
  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
  /// dispatcher loop to abort and returns the value in `RV` from the task.
  async fn req(
    &mut self,
    msg: S,
    rctx: ReplyContext<R, E>
  ) -> ControlFlow<RV, ()>;

  /// Optional termination callback.
  ///
  /// This is called on the dispatcher task just after the main message
  /// processing loop has been terminbated.
  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
    rv
  }
}

/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<P, S, R, E, RV>(
  mut handler: impl Handler<P, S, R, E, RV> + Send + 'static
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send
{
  let (server, client) = channel();
  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      match server.async_wait().await {
        Ok(msg) => match msg {
          MsgType::Put(m) => match handler.post(m).await {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          },
          MsgType::Request(m, rctx) => match handler.req(m, rctx).await {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }
        },
        Err(_) => break None
      }
    };
    handler.term(ret)
  });

  (client, jh)
}


/// Spawn a task to run a pre-initialized handler.
///
/// It is assumed that the caller has initialized the handler, thus its
/// `init()` method will not be called.
pub fn spawn_preinit<P, S, R, E, RV>(
  server: Server<P, S, R, E>,
  mut handler: impl Handler<P, S, R, E, RV> + Send + 'static
) -> JoinHandle<Option<RV>>
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send
{
  task::spawn(async move {
    let ret = loop {
      match server.async_wait().await {
        Ok(msg) => match msg {
          MsgType::Put(m) => match handler.post(m).await {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          },
          MsgType::Request(m, rctx) => match handler.req(m, rctx).await {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }
        },
        Err(_) => break None
      }
    };
    handler.term(ret)
  })
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :