websocat/
lengthprefixed_peer.rs

1use futures::future::ok;
2
3use std::rc::Rc;
4
5use crate::{io_other_error, simple_err, peer_strerr};
6
7use super::{BoxedNewPeerFuture, Peer};
8use super::{ConstructParams, PeerConstructor, Specifier};
9
10use std::io::{Read, Write};
11use tokio_io::{AsyncRead, AsyncWrite};
12
13use std::io::Error as IoError;
14
15#[derive(Debug)]
16pub struct LengthPrefixed<T: Specifier>(pub T);
17impl<T: Specifier> Specifier for LengthPrefixed<T> {
18    fn construct(&self, cp: ConstructParams) -> PeerConstructor {
19        let inner = self.0.construct(cp.clone());
20        inner.map(move |p, _| {
21            lengthprefixed_peer(
22                p,
23                cp.program_options.lengthprefixed_header_bytes,
24                cp.program_options.lengthprefixed_little_endian,
25                cp.program_options.lengthprefixed_skip_read_direction,
26                cp.program_options.lengthprefixed_skip_write_direction,
27            )
28        })
29    }
30    specifier_boilerplate!(noglobalstate has_subspec);
31    self_0_is_subspecifier!(proxy_is_multiconnect);
32}
33specifier_class!(
34    name = LengthPrefixedClass,
35    target = LengthPrefixed,
36    prefixes = ["lengthprefixed:"],
37    arg_handling = subspec,
38    overlay = true,
39    MessageOriented,
40    MulticonnectnessDependsOnInnerType,
41    help = r#"
42Turn stream of bytes to/from data packets with length-prefixed framing.  [A]
43
44You can choose the number of header bytes (1 to 8) and endianness. Default is 4 bytes big endian.
45
46This affects both reading and writing - attach this overlay to stream specifier to turn it into a packet-orineted specifier.
47
48Mind the buffer size (-B). All packets should fit in there.
49
50Examples:
51
52    websocat -u -b udp-l:127.0.0.1:1234 lengthprefixed:writefile:test.dat
53
54    websocat -u -b lengthprefixed:readfile:test.dat udp:127.0.0.1:1235
55
56This would save incoming UDP packets to a file, then replay the datagrams back to UDP socket
57
58    websocat -b lengthprefixed:- ws://127.0.0.1:1234/ --binary-prefix=B --text-prefix=T
59
60This allows to mix and match text and binary WebSocket messages to and from stdio without the base64 overhead.
61"#
62);
63
64pub fn lengthprefixed_peer(
65    inner_peer: Peer,
66    num_bytes_in_length_prefix: usize,
67    little_endian: bool,
68    lengthprefixed_skip_read_direction: bool,
69    lengthprefixed_skip_write_direction: bool,
70) -> BoxedNewPeerFuture {
71    if !(1..=8).contains(&num_bytes_in_length_prefix) {
72        return peer_strerr("Number of header bytes for lengthprefixed overlay should be from 1 to 8");
73    }
74
75    let (length_starting_pos, length_ending_pos) = if little_endian {
76        (0, num_bytes_in_length_prefix)
77    } else {
78        (8 - num_bytes_in_length_prefix, 8)
79    };
80    let reader = Lengthprefixed2PacketWrapper {
81        inner: inner_peer.0,
82        length_buffer: [0; 8],
83        length_starting_pos,
84        length_pos: length_starting_pos,
85        length_ending_pos,
86        little_endian,
87        data_read_so_far: 0,
88    };
89    let writer = Packet2LengthPrefixedWrapper {
90        inner: inner_peer.1,
91        length_buffer: [0; 8],
92        length_starting_pos,
93        length_pos: length_starting_pos,
94        length_ending_pos,
95        little_endian,
96        data_written_so_far: 0,
97    };
98    let thepeer = match (lengthprefixed_skip_read_direction, lengthprefixed_skip_write_direction) {
99        (true, true)   => Peer::new(reader.inner, writer.inner, inner_peer.2),
100        (true, false)  => Peer::new(reader.inner, writer,       inner_peer.2),
101        (false, true)  => Peer::new(reader,       writer.inner, inner_peer.2),
102        (false, false) => Peer::new(reader,       writer,       inner_peer.2),
103    };
104    Box::new(ok(thepeer)) as BoxedNewPeerFuture
105}
106struct Lengthprefixed2PacketWrapper {
107    inner: Box<dyn AsyncRead>,
108    length_buffer: [u8; 8],
109    length_starting_pos: usize,
110    length_ending_pos: usize,
111    length_pos: usize,
112    little_endian: bool,
113    data_read_so_far: usize,
114}
115impl Read for Lengthprefixed2PacketWrapper {
116    fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
117        loop {
118            assert!(self.length_pos <= self.length_ending_pos);
119            assert!(self.length_pos >= self.length_starting_pos);
120            if self.length_ending_pos != self.length_pos {
121                match self
122                    .inner
123                    .read(&mut self.length_buffer[self.length_pos..self.length_ending_pos])
124                {
125                    Err(e) => return Err(e),
126                    Ok(0) => {
127                        if self.length_pos != self.length_starting_pos {
128                            error!("Possibly trimmed length-prefixed data.")
129                        }
130                        return Ok(0);
131                    }
132                    Ok(n) => {
133                        self.length_pos += n;
134                        continue;
135                    }
136                }
137            } else {
138                let packet_len = if self.little_endian {
139                    u64::from_le_bytes(self.length_buffer)
140                } else {
141                    u64::from_be_bytes(self.length_buffer)
142                };
143                if packet_len >= (buf.len() as u64) {
144                    error!("Failed to process too big packet. You may need to increase the -B buffer size.");
145                    return Err(io_other_error(simple_err("Packet length overflow".into())));
146                }
147                let packet_len = packet_len as usize;
148                if packet_len == 0 {
149                    return Ok(0);
150                }
151
152                if self.data_read_so_far == packet_len {
153                    self.data_read_so_far = 0;
154                    self.length_buffer = [0; 8];
155                    self.length_pos = self.length_starting_pos;
156                    return Ok(packet_len);
157                }
158
159                // Assume we are called with the same buffer until we return success, so we
160                // can use buffer as a persistent scratch space
161                match self.inner.read(&mut buf[self.data_read_so_far..packet_len]) {
162                    Err(e) => return Err(e),
163                    Ok(0) => {
164                        return Err(io_other_error(simple_err("Data trimmed".into())));
165                    }
166                    Ok(n) => {
167                        self.data_read_so_far += n;
168                        continue;
169                    }
170                }
171            }
172        }
173    }
174}
175impl AsyncRead for Lengthprefixed2PacketWrapper {}
176
177struct Packet2LengthPrefixedWrapper {
178    inner: Box<dyn AsyncWrite>,
179    length_buffer: [u8; 8],
180    length_starting_pos: usize,
181    length_ending_pos: usize,
182    length_pos: usize,
183    little_endian: bool,
184    data_written_so_far: usize,
185}
186
187impl Write for Packet2LengthPrefixedWrapper {
188    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
189        // Assuming `write` is retried with the same buffer when we return WouldBlock
190        loop {
191            if self.length_pos == self.length_starting_pos {
192                if self.little_endian {
193                    self.length_buffer = (buf.len() as u64).to_le_bytes()
194                } else {
195                    self.length_buffer = (buf.len() as u64).to_be_bytes()
196                }
197            }
198            if self.length_pos < self.length_ending_pos {
199                match self
200                    .inner
201                    .write(&self.length_buffer[self.length_pos..self.length_ending_pos])
202                {
203                    Err(x) => return Err(x),
204                    Ok(n) => self.length_pos += n,
205                }
206                continue;
207            }
208
209            if self.data_written_so_far == buf.len() {
210                self.data_written_so_far = 0;
211                self.length_pos = self.length_starting_pos;
212                self.length_buffer = [0; 8];
213                return Ok(buf.len());
214            }
215
216            match self.inner.write(&buf[self.data_written_so_far..]) {
217                Err(e) => return Err(e),
218                Ok(n) => self.data_written_so_far += n,
219            }
220        }
221    }
222
223    fn flush(&mut self) -> std::io::Result<()> {
224        self.inner.flush()
225    }
226}
227
228impl AsyncWrite for Packet2LengthPrefixedWrapper {
229    fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> {
230        self.inner.shutdown()
231    }
232}