1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::error::WebsocketError;
use crate::frame::{Frame, Opcode};
use std::io::{Read, Write};
/// Represents a WebSocket message.
#[derive(Debug)]
pub struct Message {
payload: Vec<u8>,
text: bool,
}
impl Message {
/// Creates a new message with the given payload.
///
/// Marks the message as text if the payload is valid UTF-8.
/// To avoid this behaviour, use the `Message::new_binary` constructor.
pub fn new<T>(payload: T) -> Self
where
T: AsRef<[u8]>,
{
Self {
payload: payload.as_ref().to_vec(),
text: std::str::from_utf8(payload.as_ref()).is_ok(),
}
}
/// Creates a new binary message with the given payload.
pub fn new_binary<T>(payload: T) -> Self
where
T: AsRef<[u8]>,
{
Self {
payload: payload.as_ref().to_vec(),
text: false,
}
}
/// Attemps to read a message from the given stream.
///
/// Silently responds to pings with pongs, as specified in [RFC 6455 Section 5.5.2](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2).
pub fn from_stream<T>(mut stream: T) -> Result<Self, WebsocketError>
where
T: Read + Write,
{
let mut frames: Vec<Frame> = Vec::new();
// Keep reading frames until we get the finish frame
while frames.last().map(|f| !f.fin).unwrap_or(true) {
let frame = Frame::from_stream(&mut stream)?;
// If this is a ping, respond with a pong
if frame.opcode == Opcode::Ping {
let pong = Frame::new(Opcode::Pong, frame.payload);
stream
.write_all(pong.as_ref())
.map_err(|_| WebsocketError::WriteError)?;
continue;
}
// If this closes the connection, return the error
if frame.opcode == Opcode::Close {
let close = Frame::new(Opcode::Close, frame.payload);
stream
.write_all(close.as_ref())
.map_err(|_| WebsocketError::WriteError)?;
return Err(WebsocketError::ConnectionClosed);
}
frames.push(frame);
}
// Concatenate the payloads of all frames into a single payload
let payload = frames.iter().fold(Vec::new(), |mut acc, frame| {
acc.extend(frame.payload.iter());
acc
});
Ok(Self {
payload,
text: frames
.first()
.map(|f| f.opcode == Opcode::Text)
.unwrap_or(false),
})
}
/// Returns whether the sender of this message specified that it contains text.
pub fn is_text(&self) -> bool {
self.text
}
/// Returns the payload as a string, if possible.
///
/// If the opcode is `Opcode::Text` (`0x1`), but the payload is not valid UTF-8, the function will return `None`.
/// Otherwise, it will not attempt to convert the payload to a string and will immediately return `None`.
pub fn text(&self) -> Option<&str> {
if self.text {
std::str::from_utf8(&self.payload).ok()
} else {
None
}
}
/// Returns the payload as a slice of bytes.
pub fn bytes(&self) -> &[u8] {
&self.payload
}
/// Converts the message to a `Vec<u8>` for transmission.
pub fn to_bytes(&self) -> Vec<u8> {
if self.text {
Frame::new(Opcode::Text, self.payload.clone()).into()
} else {
Frame::new(Opcode::Binary, self.payload.clone()).into()
}
}
}
impl AsRef<[u8]> for Message {
fn as_ref(&self) -> &[u8] {
&self.payload
}
}