websocat/
lengthprefixed_peer.rs1use futures::future::ok;
2
3use std::rc::Rc;
4
5use crate::{io_other_error, simple_err, peer_strerr};
6
7use super::{BoxedNewPeerFuture, Peer};
8use super::{ConstructParams, PeerConstructor, Specifier};
9
10use std::io::{Read, Write};
11use tokio_io::{AsyncRead, AsyncWrite};
12
13use std::io::Error as IoError;
14
15#[derive(Debug)]
16pub struct LengthPrefixed<T: Specifier>(pub T);
17impl<T: Specifier> Specifier for LengthPrefixed<T> {
18 fn construct(&self, cp: ConstructParams) -> PeerConstructor {
19 let inner = self.0.construct(cp.clone());
20 inner.map(move |p, _| {
21 lengthprefixed_peer(
22 p,
23 cp.program_options.lengthprefixed_header_bytes,
24 cp.program_options.lengthprefixed_little_endian,
25 cp.program_options.lengthprefixed_skip_read_direction,
26 cp.program_options.lengthprefixed_skip_write_direction,
27 )
28 })
29 }
30 specifier_boilerplate!(noglobalstate has_subspec);
31 self_0_is_subspecifier!(proxy_is_multiconnect);
32}
33specifier_class!(
34 name = LengthPrefixedClass,
35 target = LengthPrefixed,
36 prefixes = ["lengthprefixed:"],
37 arg_handling = subspec,
38 overlay = true,
39 MessageOriented,
40 MulticonnectnessDependsOnInnerType,
41 help = r#"
42Turn stream of bytes to/from data packets with length-prefixed framing. [A]
43
44You can choose the number of header bytes (1 to 8) and endianness. Default is 4 bytes big endian.
45
46This affects both reading and writing - attach this overlay to stream specifier to turn it into a packet-orineted specifier.
47
48Mind the buffer size (-B). All packets should fit in there.
49
50Examples:
51
52 websocat -u -b udp-l:127.0.0.1:1234 lengthprefixed:writefile:test.dat
53
54 websocat -u -b lengthprefixed:readfile:test.dat udp:127.0.0.1:1235
55
56This would save incoming UDP packets to a file, then replay the datagrams back to UDP socket
57
58 websocat -b lengthprefixed:- ws://127.0.0.1:1234/ --binary-prefix=B --text-prefix=T
59
60This allows to mix and match text and binary WebSocket messages to and from stdio without the base64 overhead.
61"#
62);
63
64pub fn lengthprefixed_peer(
65 inner_peer: Peer,
66 num_bytes_in_length_prefix: usize,
67 little_endian: bool,
68 lengthprefixed_skip_read_direction: bool,
69 lengthprefixed_skip_write_direction: bool,
70) -> BoxedNewPeerFuture {
71 if !(1..=8).contains(&num_bytes_in_length_prefix) {
72 return peer_strerr("Number of header bytes for lengthprefixed overlay should be from 1 to 8");
73 }
74
75 let (length_starting_pos, length_ending_pos) = if little_endian {
76 (0, num_bytes_in_length_prefix)
77 } else {
78 (8 - num_bytes_in_length_prefix, 8)
79 };
80 let reader = Lengthprefixed2PacketWrapper {
81 inner: inner_peer.0,
82 length_buffer: [0; 8],
83 length_starting_pos,
84 length_pos: length_starting_pos,
85 length_ending_pos,
86 little_endian,
87 data_read_so_far: 0,
88 };
89 let writer = Packet2LengthPrefixedWrapper {
90 inner: inner_peer.1,
91 length_buffer: [0; 8],
92 length_starting_pos,
93 length_pos: length_starting_pos,
94 length_ending_pos,
95 little_endian,
96 data_written_so_far: 0,
97 };
98 let thepeer = match (lengthprefixed_skip_read_direction, lengthprefixed_skip_write_direction) {
99 (true, true) => Peer::new(reader.inner, writer.inner, inner_peer.2),
100 (true, false) => Peer::new(reader.inner, writer, inner_peer.2),
101 (false, true) => Peer::new(reader, writer.inner, inner_peer.2),
102 (false, false) => Peer::new(reader, writer, inner_peer.2),
103 };
104 Box::new(ok(thepeer)) as BoxedNewPeerFuture
105}
106struct Lengthprefixed2PacketWrapper {
107 inner: Box<dyn AsyncRead>,
108 length_buffer: [u8; 8],
109 length_starting_pos: usize,
110 length_ending_pos: usize,
111 length_pos: usize,
112 little_endian: bool,
113 data_read_so_far: usize,
114}
115impl Read for Lengthprefixed2PacketWrapper {
116 fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
117 loop {
118 assert!(self.length_pos <= self.length_ending_pos);
119 assert!(self.length_pos >= self.length_starting_pos);
120 if self.length_ending_pos != self.length_pos {
121 match self
122 .inner
123 .read(&mut self.length_buffer[self.length_pos..self.length_ending_pos])
124 {
125 Err(e) => return Err(e),
126 Ok(0) => {
127 if self.length_pos != self.length_starting_pos {
128 error!("Possibly trimmed length-prefixed data.")
129 }
130 return Ok(0);
131 }
132 Ok(n) => {
133 self.length_pos += n;
134 continue;
135 }
136 }
137 } else {
138 let packet_len = if self.little_endian {
139 u64::from_le_bytes(self.length_buffer)
140 } else {
141 u64::from_be_bytes(self.length_buffer)
142 };
143 if packet_len >= (buf.len() as u64) {
144 error!("Failed to process too big packet. You may need to increase the -B buffer size.");
145 return Err(io_other_error(simple_err("Packet length overflow".into())));
146 }
147 let packet_len = packet_len as usize;
148 if packet_len == 0 {
149 return Ok(0);
150 }
151
152 if self.data_read_so_far == packet_len {
153 self.data_read_so_far = 0;
154 self.length_buffer = [0; 8];
155 self.length_pos = self.length_starting_pos;
156 return Ok(packet_len);
157 }
158
159 match self.inner.read(&mut buf[self.data_read_so_far..packet_len]) {
162 Err(e) => return Err(e),
163 Ok(0) => {
164 return Err(io_other_error(simple_err("Data trimmed".into())));
165 }
166 Ok(n) => {
167 self.data_read_so_far += n;
168 continue;
169 }
170 }
171 }
172 }
173 }
174}
175impl AsyncRead for Lengthprefixed2PacketWrapper {}
176
177struct Packet2LengthPrefixedWrapper {
178 inner: Box<dyn AsyncWrite>,
179 length_buffer: [u8; 8],
180 length_starting_pos: usize,
181 length_ending_pos: usize,
182 length_pos: usize,
183 little_endian: bool,
184 data_written_so_far: usize,
185}
186
187impl Write for Packet2LengthPrefixedWrapper {
188 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
189 loop {
191 if self.length_pos == self.length_starting_pos {
192 if self.little_endian {
193 self.length_buffer = (buf.len() as u64).to_le_bytes()
194 } else {
195 self.length_buffer = (buf.len() as u64).to_be_bytes()
196 }
197 }
198 if self.length_pos < self.length_ending_pos {
199 match self
200 .inner
201 .write(&self.length_buffer[self.length_pos..self.length_ending_pos])
202 {
203 Err(x) => return Err(x),
204 Ok(n) => self.length_pos += n,
205 }
206 continue;
207 }
208
209 if self.data_written_so_far == buf.len() {
210 self.data_written_so_far = 0;
211 self.length_pos = self.length_starting_pos;
212 self.length_buffer = [0; 8];
213 return Ok(buf.len());
214 }
215
216 match self.inner.write(&buf[self.data_written_so_far..]) {
217 Err(e) => return Err(e),
218 Ok(n) => self.data_written_so_far += n,
219 }
220 }
221 }
222
223 fn flush(&mut self) -> std::io::Result<()> {
224 self.inner.flush()
225 }
226}
227
228impl AsyncWrite for Packet2LengthPrefixedWrapper {
229 fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> {
230 self.inner.shutdown()
231 }
232}