1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use futures::{stream::SplitStream, FutureExt, Stream, StreamExt};
use reqwest::Client;
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{Context, Poll},
};
type WebSocketStream = async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>;
use async_tungstenite::tungstenite::Error as WsError;
use crate::ws_protocol;
type WsResult<T> = Result<T, WsError>;
pub fn new_client() -> reqwest::Result<Client> {
const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
trace!("user agent name: {}", USER_AGENT);
Client::builder().user_agent(USER_AGENT).build()
}
pub struct LiveConnection {
room_id: u64,
heartbeat_future: Pin<Box<dyn Future<Output = WsResult<()>> + Send>>,
read: SplitStream<WebSocketStream>,
buffered_msg: VecDeque<ws_protocol::Packet>,
}
impl LiveConnection {
pub async fn new(url: &str, room_id: u64, token: String) -> WsResult<Self> {
let (websocket, _http) = async_tungstenite::tokio::connect_async(url).await?;
let (write, read) = websocket.split();
let heartbeat_future = Box::pin(async move {
use futures::prelude::*;
let mut write = write;
write
.send(ws_protocol::Packet::auth(room_id, &token).into())
.await?;
loop {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
debug!("sending heartbeat...");
write.send(ws_protocol::Packet::heartbeat().into()).await?;
}
});
Ok(Self {
room_id,
heartbeat_future,
read,
buffered_msg: VecDeque::new(),
})
}
}
impl Stream for LiveConnection {
type Item = crate::Result<ws_protocol::Packet>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.heartbeat_future.poll_unpin(cx) {
Poll::Ready(Err(e)) => {
warn!("The heartbeat future exited unexpectedly: {:?}", e);
return Poll::Ready(Some(Err(e.into())));
}
Poll::Ready(Ok(_)) => unreachable!(),
Poll::Pending => {}
}
if let Some(msg) = self.buffered_msg.pop_front() {
return Poll::Ready(Some(Ok(msg)));
}
match self.read.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(ws_message))) => {
let msgs = ws_protocol::Packet::from_ws_message(ws_message, self.room_id)?;
self.buffered_msg.extend(msgs);
match self.buffered_msg.pop_front() {
Some(msg) => Poll::Ready(Some(Ok(msg))),
None => Poll::Pending,
}
}
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
Poll::Ready(None) => Poll::Ready(None),
}
}
}