1use std::io::Read;
4use std::io::Result as IoResult;
5
6use hyper::buffer::BufReader;
7
8use crate::dataframe::{DataFrame, Opcode};
9use crate::message::OwnedMessage;
10use crate::result::{WebSocketError, WebSocketResult};
11pub use crate::stream::sync::Shutdown;
12use crate::stream::sync::{AsTcpStream, Stream};
13use crate::ws;
14use crate::ws::receiver::Receiver as ReceiverTrait;
15use crate::ws::receiver::{DataFrameIterator, MessageIterator};
16
17const DEFAULT_MAX_DATAFRAME_SIZE : usize = 1024*1024*100;
18const DEFAULT_MAX_MESSAGE_SIZE : usize = 1024*1024*200;
19const MAX_DATAFRAMES_IN_ONE_MESSAGE: usize = 1024*1024;
20const PER_DATAFRAME_OVERHEAD : usize = 64; pub struct Reader<R>
25where
26 R: Read,
27{
28 pub stream: BufReader<R>,
30 pub receiver: Receiver,
32}
33
34impl<R> Reader<R>
35where
36 R: Read,
37{
38 pub fn recv_dataframe(&mut self) -> WebSocketResult<DataFrame> {
40 self.receiver.recv_dataframe(&mut self.stream)
41 }
42
43 pub fn incoming_dataframes(&mut self) -> DataFrameIterator<Receiver, BufReader<R>> {
45 self.receiver.incoming_dataframes(&mut self.stream)
46 }
47
48 pub fn recv_message(&mut self) -> WebSocketResult<OwnedMessage> {
50 self.receiver.recv_message(&mut self.stream)
51 }
52
53 pub fn incoming_messages<'a>(&'a mut self) -> MessageIterator<'a, Receiver, BufReader<R>> {
56 self.receiver.incoming_messages(&mut self.stream)
57 }
58}
59
60impl<S> Reader<S>
61where
62 S: AsTcpStream + Stream + Read,
63{
64 pub fn shutdown(&self) -> IoResult<()> {
67 self.stream.get_ref().as_tcp().shutdown(Shutdown::Read)
68 }
69
70 pub fn shutdown_all(&self) -> IoResult<()> {
73 self.stream.get_ref().as_tcp().shutdown(Shutdown::Both)
74 }
75}
76
77pub struct Receiver {
80 buffer: Vec<DataFrame>,
81 mask: bool,
82 max_dataframe_size: u32,
84 max_message_size: u32,
85}
86
87impl Receiver {
88 pub fn new(mask: bool) -> Receiver {
92 Receiver::new_with_limits(mask, DEFAULT_MAX_DATAFRAME_SIZE, DEFAULT_MAX_MESSAGE_SIZE)
93 }
94
95 pub fn new_with_limits(mask: bool, max_dataframe_size: usize, max_message_size: usize) -> Receiver {
102 let max_dataframe_size: u32 = max_dataframe_size.min(u32::MAX as usize) as u32;
103 let max_message_size: u32 = max_message_size.min(u32::MAX as usize) as u32;
104 Receiver {
105 buffer: Vec::new(),
106 mask,
107 max_dataframe_size,
108 max_message_size,
109 }
110 }
111}
112
113impl ws::Receiver for Receiver {
114 type F = DataFrame;
115
116 type M = OwnedMessage;
117
118 fn recv_dataframe<R>(&mut self, reader: &mut R) -> WebSocketResult<DataFrame>
120 where
121 R: Read,
122 {
123 DataFrame::read_dataframe_with_limit(reader, self.mask, self.max_dataframe_size as usize)
124 }
125
126 fn recv_message_dataframes<R>(&mut self, reader: &mut R) -> WebSocketResult<Vec<DataFrame>>
128 where
129 R: Read,
130 {
131 let mut current_message_length : usize = self.buffer.iter().map(|x|x.data.len()).sum();
132 let mut finished = if self.buffer.is_empty() {
133 let first = self.recv_dataframe(reader)?;
134
135 if first.opcode == Opcode::Continuation {
136 return Err(WebSocketError::ProtocolError(
137 "Unexpected continuation data frame opcode",
138 ));
139 }
140
141 let finished = first.finished;
142 current_message_length += first.data.len() + PER_DATAFRAME_OVERHEAD;
143 self.buffer.push(first);
144 finished
145 } else {
146 false
147 };
148
149 while !finished {
150 let next = self.recv_dataframe(reader)?;
151 finished = next.finished;
152
153 match next.opcode as u8 {
154 0 => {
156 current_message_length += next.data.len() + PER_DATAFRAME_OVERHEAD;
157 self.buffer.push(next)
158 }
159 8..=15 => {
161 return Ok(vec![next]);
162 }
163 _ => {
165 return Err(WebSocketError::ProtocolError(
166 "Unexpected data frame opcode",
167 ));
168 }
169 }
170
171 if !finished {
172 if self.buffer.len() >= MAX_DATAFRAMES_IN_ONE_MESSAGE {
173 return Err(WebSocketError::ProtocolError(
174 "Exceeded count of data frames in one WebSocket message",
175 ));
176 }
177 if current_message_length >= self.max_message_size as usize {
178 return Err(WebSocketError::ProtocolError(
179 "Exceeded maximum WebSocket message size",
180 ));
181 }
182 }
183 }
184
185 Ok(::std::mem::replace(&mut self.buffer, Vec::new()))
186 }
187}