1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//! Provides an abstraction over WebSocket frames called `Message`.
use humphrey::stream::Stream;
use crate::error::WebsocketError;
use crate::frame::{Frame, Opcode};
use crate::restion::Restion;
use std::io::{Read, Write};
/// Represents a WebSocket message.
#[derive(Debug, Clone)]
pub struct Message {
payload: Vec<u8>,
text: bool,
}
impl Message {
/// Creates a new message with the given payload.
///
/// Marks the message as text if the payload is valid UTF-8.
/// To avoid this behaviour, use the `Message::new_binary` constructor.
pub fn new<T>(payload: T) -> Self
where
T: AsRef<[u8]>,
{
Self {
payload: payload.as_ref().to_vec(),
text: std::str::from_utf8(payload.as_ref()).is_ok(),
}
}
/// Creates a new binary message with the given payload.
pub fn new_binary<T>(payload: T) -> Self
where
T: AsRef<[u8]>,
{
Self {
payload: payload.as_ref().to_vec(),
text: false,
}
}
/// Attemps to read a message from the given stream.
///
/// Silently responds to pings with pongs, as specified in [RFC 6455 Section 5.5.2](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2).
pub fn from_stream<T>(mut stream: T) -> Result<Self, WebsocketError>
where
T: Read + Write,
{
let mut frames: Vec<Frame> = Vec::new();
// Keep reading frames until we get the finish frame
while frames.last().map(|f| !f.fin).unwrap_or(true) {
let frame = Frame::from_stream(&mut stream)?;
// If this is a ping, respond with a pong
if frame.opcode == Opcode::Ping {
let pong = Frame::new(Opcode::Pong, frame.payload);
stream
.write_all(pong.as_ref())
.map_err(|_| WebsocketError::WriteError)?;
continue;
}
// If this closes the connection, return the error
if frame.opcode == Opcode::Close {
let close = Frame::new(Opcode::Close, frame.payload);
stream
.write_all(close.as_ref())
.map_err(|_| WebsocketError::WriteError)?;
return Err(WebsocketError::ConnectionClosed);
}
frames.push(frame);
}
// Concatenate the payloads of all frames into a single payload
let payload = frames.iter().fold(Vec::new(), |mut acc, frame| {
acc.extend(frame.payload.iter());
acc
});
Ok(Self {
payload,
text: frames
.first()
.map(|f| f.opcode == Opcode::Text)
.unwrap_or(false),
})
}
/// Attemps to read a message from the given stream without blocking.
///
/// Silently responds to pings with pongs, as specified in [RFC 6455 Section 5.5.2](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2).
pub fn from_stream_nonblocking(mut stream: &mut Stream) -> Restion<Self, WebsocketError> {
let mut frames: Vec<Frame> = Vec::new();
let mut is_first_frame = true;
// Keep reading frames until we get the finish frame
while frames.last().map(|f| !f.fin).unwrap_or(true) {
let frame = if is_first_frame {
Frame::from_stream_nonblocking(stream)
} else {
Frame::from_stream(&mut stream).into()
};
match frame {
Restion::Ok(frame) => {
// If this is a ping, respond with a pong
if frame.opcode == Opcode::Ping {
let pong = Frame::new(Opcode::Pong, frame.payload);
if stream.write_all(pong.as_ref()).is_err() {
return Restion::Err(WebsocketError::WriteError);
}
continue;
}
// If this closes the connection, return the error
if frame.opcode == Opcode::Close {
let close = Frame::new(Opcode::Close, frame.payload);
if stream.write_all(close.as_ref()).is_err() {
return Restion::Err(WebsocketError::WriteError);
}
return Restion::Err(WebsocketError::ConnectionClosed);
}
frames.push(frame);
}
Restion::Err(e) => return Restion::Err(e),
Restion::None => return Restion::None,
}
is_first_frame = false;
}
// Concatenate the payloads of all frames into a single payload
let payload = frames.iter().fold(Vec::new(), |mut acc, frame| {
acc.extend(frame.payload.iter());
acc
});
Restion::Ok(Self {
payload,
text: frames
.first()
.map(|f| f.opcode == Opcode::Text)
.unwrap_or(false),
})
}
/// Returns whether the sender of this message specified that it contains text.
pub fn is_text(&self) -> bool {
self.text
}
/// Returns the payload as a string, if possible.
///
/// If the opcode is `Opcode::Text` (`0x1`), but the payload is not valid UTF-8, the function will return `None`.
/// Otherwise, it will not attempt to convert the payload to a string and will immediately return `None`.
pub fn text(&self) -> Option<&str> {
if self.text {
std::str::from_utf8(&self.payload).ok()
} else {
None
}
}
/// Returns the payload as a slice of bytes.
pub fn bytes(&self) -> &[u8] {
&self.payload
}
/// Converts the message to a `Vec<u8>` for transmission.
pub fn to_frame(self) -> Vec<u8> {
if self.text {
Frame::new(Opcode::Text, self.payload).into()
} else {
Frame::new(Opcode::Binary, self.payload).into()
}
}
}
impl AsRef<[u8]> for Message {
fn as_ref(&self) -> &[u8] {
&self.payload
}
}