1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use crate::{
talker::{Talker, TalkerCompatible},
Protocol, WsIncoming,
};
use anyhow::Error;
use async_trait::async_trait;
use futures::channel::mpsc;
use meio::{ActionHandler, Actor, Address, LiteTask, ShutdownReceiver};
use std::net::SocketAddr;
use warp::ws::WebSocket;
struct WsInfo<P: Protocol> {
addr: SocketAddr,
connection: WebSocket,
rx: mpsc::UnboundedReceiver<P::ToClient>,
}
pub struct WsHandler<P: Protocol> {
info: Option<WsInfo<P>>,
tx: mpsc::UnboundedSender<P::ToClient>,
}
impl<P: Protocol> WsHandler<P> {
pub fn new(addr: SocketAddr, websocket: WebSocket) -> Self {
let (tx, rx) = mpsc::unbounded();
let info = WsInfo {
addr,
connection: websocket,
rx,
};
Self {
info: Some(info),
tx,
}
}
pub fn worker<A>(&mut self, address: Address<A>) -> WsProcessor<P, A>
where
A: Actor + ActionHandler<WsIncoming<P::ToServer>>,
{
let info = self.info.take().expect("already started");
WsProcessor { info, address }
}
pub fn send(&mut self, msg: P::ToClient) {
if let Err(err) = self.tx.unbounded_send(msg) {
log::error!("Can't send outgoing WS message: {}", err);
}
}
}
pub struct WsProcessor<P: Protocol, A: Actor> {
info: WsInfo<P>,
address: Address<A>,
}
impl<P, A> TalkerCompatible for WsProcessor<P, A>
where
P: Protocol,
A: Actor + ActionHandler<WsIncoming<P::ToServer>>,
{
type WebSocket = WebSocket;
type Message = warp::ws::Message;
type Error = warp::Error;
type Actor = A;
type Codec = P::Codec;
type Incoming = P::ToServer;
type Outgoing = P::ToClient;
}
#[async_trait]
impl<P, A> LiteTask for WsProcessor<P, A>
where
P: Protocol,
A: Actor + ActionHandler<WsIncoming<P::ToServer>>,
{
fn name(&self) -> String {
format!("WsProcessor({})", self.info.addr)
}
async fn routine(self, signal: ShutdownReceiver) -> Result<(), Error> {
let mut talker = Talker::<Self>::new(
self.address,
self.info.connection,
self.info.rx,
signal.into(),
);
talker.routine().await.map(drop)
}
}