websocket/
receiver.rs

1//! The default implementation of a WebSocket Receiver.
2
3use std::io::Read;
4use std::io::Result as IoResult;
5
6use hyper::buffer::BufReader;
7
8use crate::dataframe::{DataFrame, Opcode};
9use crate::message::OwnedMessage;
10use crate::result::{WebSocketError, WebSocketResult};
11pub use crate::stream::sync::Shutdown;
12use crate::stream::sync::{AsTcpStream, Stream};
13use crate::ws;
14use crate::ws::receiver::Receiver as ReceiverTrait;
15use crate::ws::receiver::{DataFrameIterator, MessageIterator};
16
17const DEFAULT_MAX_DATAFRAME_SIZE : usize = 1024*1024*100;
18const DEFAULT_MAX_MESSAGE_SIZE : usize = 1024*1024*200;
19const MAX_DATAFRAMES_IN_ONE_MESSAGE: usize = 1024*1024;
20const PER_DATAFRAME_OVERHEAD : usize = 64; // not actually measured, just to prevent filling memory with empty buffers
21
22/// This reader bundles an existing stream with a parsing algorithm.
23/// It is used by the client in its `.split()` function as the reading component.
24pub struct Reader<R>
25where
26	R: Read,
27{
28	/// the stream to be read from
29	pub stream: BufReader<R>,
30	/// the parser to parse bytes into messages
31	pub receiver: Receiver,
32}
33
34impl<R> Reader<R>
35where
36	R: Read,
37{
38	/// Reads a single data frame from the remote endpoint.
39	pub fn recv_dataframe(&mut self) -> WebSocketResult<DataFrame> {
40		self.receiver.recv_dataframe(&mut self.stream)
41	}
42
43	/// Returns an iterator over incoming data frames.
44	pub fn incoming_dataframes(&mut self) -> DataFrameIterator<Receiver, BufReader<R>> {
45		self.receiver.incoming_dataframes(&mut self.stream)
46	}
47
48	/// Reads a single message from this receiver.
49	pub fn recv_message(&mut self) -> WebSocketResult<OwnedMessage> {
50		self.receiver.recv_message(&mut self.stream)
51	}
52
53	/// An iterator over incoming messsages.
54	/// This iterator will block until new messages arrive and will never halt.
55	pub fn incoming_messages<'a>(&'a mut self) -> MessageIterator<'a, Receiver, BufReader<R>> {
56		self.receiver.incoming_messages(&mut self.stream)
57	}
58}
59
60impl<S> Reader<S>
61where
62	S: AsTcpStream + Stream + Read,
63{
64	/// Closes the receiver side of the connection, will cause all pending and future IO to
65	/// return immediately with an appropriate value.
66	pub fn shutdown(&self) -> IoResult<()> {
67		self.stream.get_ref().as_tcp().shutdown(Shutdown::Read)
68	}
69
70	/// Shuts down both Sender and Receiver, will cause all pending and future IO to
71	/// return immediately with an appropriate value.
72	pub fn shutdown_all(&self) -> IoResult<()> {
73		self.stream.get_ref().as_tcp().shutdown(Shutdown::Both)
74	}
75}
76
77/// A Receiver that wraps a Reader and provides a default implementation using
78/// DataFrames and Messages.
79pub struct Receiver {
80	buffer: Vec<DataFrame>,
81	mask: bool,
82	// u32s instead uf usizes to economize used memory by this struct
83	max_dataframe_size: u32,
84	max_message_size: u32,
85}
86
87impl Receiver {
88	/// Create a new Receiver using the specified Reader.
89	/// 
90	/// Uses built-in limits for dataframe and message sizes. 
91	pub fn new(mask: bool) -> Receiver {
92		Receiver::new_with_limits(mask, DEFAULT_MAX_DATAFRAME_SIZE, DEFAULT_MAX_MESSAGE_SIZE)
93	}
94
95	/// Create a new Receiver using the specified Reader, with configurable limits
96	/// 
97	/// Sizes should not be larger than `u32::MAX`.
98	/// 
99	/// Note that `max_message_size` denotes message size where no new dataframes would be read,
100	/// so actual maximum message size is larger.
101	pub fn new_with_limits(mask: bool, max_dataframe_size: usize, max_message_size: usize) -> Receiver {
102		let max_dataframe_size: u32 = max_dataframe_size.min(u32::MAX as usize) as u32;
103		let max_message_size: u32 = max_message_size.min(u32::MAX as usize) as u32;
104		Receiver {
105			buffer: Vec::new(),
106			mask,
107			max_dataframe_size,
108			max_message_size,
109		}
110	}
111}
112
113impl ws::Receiver for Receiver {
114	type F = DataFrame;
115
116	type M = OwnedMessage;
117
118	/// Reads a single data frame from the remote endpoint.
119	fn recv_dataframe<R>(&mut self, reader: &mut R) -> WebSocketResult<DataFrame>
120	where
121		R: Read,
122	{
123		DataFrame::read_dataframe_with_limit(reader, self.mask, self.max_dataframe_size as usize)
124	}
125
126	/// Returns the data frames that constitute one message.
127	fn recv_message_dataframes<R>(&mut self, reader: &mut R) -> WebSocketResult<Vec<DataFrame>>
128	where
129		R: Read,
130	{
131		let mut current_message_length : usize = self.buffer.iter().map(|x|x.data.len()).sum();
132		let mut finished = if self.buffer.is_empty() {
133			let first = self.recv_dataframe(reader)?;
134
135			if first.opcode == Opcode::Continuation {
136				return Err(WebSocketError::ProtocolError(
137					"Unexpected continuation data frame opcode",
138				));
139			}
140
141			let finished = first.finished;
142			current_message_length += first.data.len() + PER_DATAFRAME_OVERHEAD;
143			self.buffer.push(first);
144			finished
145		} else {
146			false
147		};
148
149		while !finished {
150			let next = self.recv_dataframe(reader)?;
151			finished = next.finished;
152
153			match next.opcode as u8 {
154				// Continuation opcode
155				0 => {
156					current_message_length += next.data.len() + PER_DATAFRAME_OVERHEAD;
157					self.buffer.push(next)
158				}
159				// Control frame
160				8..=15 => {
161					return Ok(vec![next]);
162				}
163				// Others
164				_ => {
165					return Err(WebSocketError::ProtocolError(
166						"Unexpected data frame opcode",
167					));
168				}
169			}
170
171			if !finished {
172				if self.buffer.len() >= MAX_DATAFRAMES_IN_ONE_MESSAGE {
173					return Err(WebSocketError::ProtocolError(
174						"Exceeded count of data frames in one WebSocket message",
175					));
176				}
177				if current_message_length >= self.max_message_size as usize {
178					return Err(WebSocketError::ProtocolError(
179						"Exceeded maximum WebSocket message size",
180					));
181				}
182			}
183		}
184
185		Ok(::std::mem::replace(&mut self.buffer, Vec::new()))
186	}
187}