1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use std::future::Future;
use tokio::sync::broadcast;
use super::{func::Func, FromRequest, Request};
pub trait Return {
fn on_return(self);
}
impl Return for () {
fn on_return(self) {}
}
impl Return for Result<(), anyhow::Error> {
fn on_return(self) {
if let Err(e) = self {
warn!("handler exec failed: {:?}", e);
}
}
}
pub trait App: Sized + Clone + Send + Sync + 'static {
type Message: Clone + Send + 'static;
fn event_bus(&self) -> broadcast::Sender<Self::Message>;
fn handler<F, I, Fut>(self, f: F) -> Self
where
F: Func<I, Fut>,
I: FromRequest<Self> + Send + 'static,
Fut: Future + Send,
Fut::Output: Return + Send,
{
let receiver = self.event_bus().subscribe();
let app = self.clone();
let task = async move {
let mut receiver = receiver;
loop {
let recv = receiver.recv().await;
match recv {
Ok(message) => {
let request = Request::<Self> {
message,
app: app.clone(),
};
if let Some(input) = I::from_request(request) {
let fut = async move {
let ret = (f).call(input).await;
ret.on_return();
};
tokio::spawn(fut);
};
}
Err(broadcast::error::RecvError::Lagged(i)) => {
warn!("broadcast lagged {} messages.", i);
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
}
}
};
tokio::spawn(task);
self
}
}