1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
use std::time::Instant;
use futures_util::{
future::{select, Either},
StreamExt as _,
};
use tokio::sync::mpsc;
use tokio::{pin, time::interval};
use actix_ws::Message;
use crate::core::websocat::echo::output::{Level, OutputMessage, Ty};
use crate::core::{
auth0::Requestor,
websocat::echo::{CLIENT_TIMEOUT, CLIENT_TIMEOUT_WARNNING, HEARTBEAT_INTERVAL},
};
use super::{command::Command, UserInfo};
/// Handle and command sender for chat server.
///
/// Reduces boilerplate of setting up response channels in WebSocket handlers.
#[derive(Clone)]
pub struct Connection {
pub cmd_tx: mpsc::UnboundedSender<Command>,
}
/// Echo text & binary messages received from the client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn create(
connection: Connection,
requestor: Requestor,
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
) {
let user_info: UserInfo = if let Some(user) = requestor.get_user() {
(
user.user_id.to_owned(),
user.user_name.to_owned(),
user.user_role.to_owned(),
)
} else {
(
format!("guest_{}", uuid::Uuid::now_v7()),
"guest_user".to_string(),
"guest".to_string(),
)
};
let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL);
let mut should_be_closed = false;
let (conn_tx, mut conn_rx) = mpsc::unbounded_channel();
// unwrap: chat server is not dropped before the HTTP server
let conn_id = connection.connect(conn_tx, user_info).await;
let curr_user = requestor.get_user();
let close_reason = loop {
// most of the futures we process need to be stack-pinned to work with select()
let tick = interval.tick();
pin!(tick);
let msg_rx = conn_rx.recv();
pin!(msg_rx);
// TODO: nested select is pretty gross for readability on the match
let messages = select(msg_stream.next(), msg_rx);
pin!(messages);
match select(messages, tick).await {
// commands & messages received from client
Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => {
match msg {
// 客户端上报心跳(ping), 回复一个 pong.
Message::Ping(bytes) => {
if !should_be_closed {
last_heartbeat = Instant::now();
// unwrap :(
session.pong(&bytes).await.unwrap();
}
}
// 客户端响应心跳(ping), 此处更新心跳时间
Message::Pong(_) => {
if !should_be_closed {
last_heartbeat = Instant::now();
}
}
// 在文本消息上,(这个我们会做最多的!)将其发送到大厅。大厅将负责将其代理到需要去的地方。
Message::Text(text) => {
connection.process_message(conn_id, curr_user, text.trim());
}
// 客户端发送的二进制消息,我们将把它发送到 WebSocket 上下文,WebSocket 上下文会弄清楚如何处理它。
// 实际上,这应该永远不会被触发。
Message::Binary(_bin) => {
log::warn!("unexpected binary message");
}
// 我们不会响应连续帧(简而言之,这些是无法适应一个消息的 WebSocket 消息)。
Message::Continuation(_) => {}
// 在 nop 时执行 nop(无操作)。
Message::Nop => {}
// 客户端主动断开连接。
Message::Close(reason) => {
break reason;
}
}
}
// client WebSocket stream error
Either::Left((Either::Left((Some(Err(err)), _)), _)) => {
log::error!("{}", err);
break None;
}
// client WebSocket stream ended
Either::Left((Either::Left((None, _)), _)) => break None,
// chat messages received from other room participants
Either::Left((Either::Right((Some(msg), _)), _)) => {
if msg != "Close" {
match session.text(&*msg).await {
Ok(_) => {}
Err(e) => {
log::error!("msg={}, error={}", &msg, e);
}
}
} else {
// 当前连接将要被关闭, 停止更新心跳
log::info!("当前连接将要被关闭, 停止更新心跳: connid={}", conn_id);
should_be_closed = true;
}
}
// all connection's message senders were dropped
Either::Left((Either::Right((None, _)), _)) => unreachable!(
"all connection message senders were dropped; chat server may have panicked"
),
// heartbeat internal tick
Either::Right((_inst, _)) => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over ({CLIENT_TIMEOUT:?}), connid={}",
conn_id
);
break None;
} else if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT_WARNNING {
#[rustfmt::skip]
let msg = format!(
"Your conneciton will be closed in {}ms",
CLIENT_TIMEOUT.as_millis() - Instant::now().duration_since(last_heartbeat).as_millis()
);
let out = OutputMessage {
id: None,
level: Level::Warn,
ty: Ty::Disconnect,
msg: Some(msg),
data: None,
};
if let Some(message) = out.message() {
connection.fire_message(conn_id, message);
}
}
// send heartbeat ping
let _ = session.ping(b"Are you there?").await;
}
};
};
connection.disconnect(conn_id);
// attempt to close connection gracefully
let _ = session.close(close_reason).await;
}