1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use bytes::{BufMut, Bytes, BytesMut};
use nu_errors::ShellError;
extern crate encoding_rs;
use encoding_rs::{CoderResult, Decoder, Encoding, UTF_8};
#[cfg(not(test))]
const OUTPUT_BUFFER_SIZE: usize = 8192;
#[cfg(test)]
const OUTPUT_BUFFER_SIZE: usize = 4;
#[derive(Debug, Eq, PartialEq)]
pub enum StringOrBinary {
String(String),
Binary(Vec<u8>),
}
pub struct MaybeTextCodec {
decoder: Decoder,
}
impl MaybeTextCodec {
pub fn new(encoding: Option<&'static Encoding>) -> Self {
let decoder = match encoding {
Some(e) => e.new_decoder_with_bom_removal(),
None => UTF_8.new_decoder(),
};
MaybeTextCodec { decoder }
}
}
impl Default for MaybeTextCodec {
fn default() -> Self {
MaybeTextCodec {
decoder: UTF_8.new_decoder(),
}
}
}
impl futures_codec::Encoder for MaybeTextCodec {
type Item = StringOrBinary;
type Error = std::io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
match item {
StringOrBinary::String(s) => {
dst.reserve(s.len());
dst.put(s.as_bytes());
Ok(())
}
StringOrBinary::Binary(b) => {
dst.reserve(b.len());
dst.put(Bytes::from(b));
Ok(())
}
}
}
}
impl futures_codec::Decoder for MaybeTextCodec {
type Item = StringOrBinary;
type Error = ShellError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None);
}
let mut s = String::with_capacity(OUTPUT_BUFFER_SIZE);
let (res, _read, replacements) = self.decoder.decode_to_string(src, &mut s, false);
let result = if replacements {
StringOrBinary::Binary(src.to_vec())
} else {
if let CoderResult::OutputFull = res {
let mut buffer = String::with_capacity(OUTPUT_BUFFER_SIZE);
loop {
let (res, _read, _replacements) =
self.decoder
.decode_to_string(&src[s.len()..], &mut buffer, false);
s.push_str(&buffer);
if let CoderResult::InputEmpty = res {
break;
}
buffer.clear();
}
}
StringOrBinary::String(s)
};
src.clear();
Ok(Some(result))
}
}
#[cfg(test)]
mod tests {
use super::{MaybeTextCodec, StringOrBinary};
use bytes::BytesMut;
use futures_codec::Decoder;
#[test]
fn should_consume_all_bytes_from_source_when_temporary_buffer_overflows() {
let mut maybe_text = MaybeTextCodec::new(None);
let mut bytes = BytesMut::from("0123456789");
let text = maybe_text.decode(&mut bytes);
assert_eq!(
Ok(Some(StringOrBinary::String("0123456789".to_string()))),
text
);
assert!(bytes.is_empty());
}
}