1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#[cfg(any(feature = "blocking-io", feature = "async-io"))]
use crate::MAX_LINE_LEN;
use crate::{PacketLine, U16_HEX_BYTES};
#[cfg(any(feature = "blocking-io", feature = "async-io"))]
type ExhaustiveOutcome<'a> = (
bool,
Option<PacketLine<'static>>,
Option<std::io::Result<Result<PacketLine<'a>, crate::decode::Error>>>,
);
pub struct StreamingPeekableIter<T> {
read: T,
peek_buf: Vec<u8>,
#[cfg(any(feature = "blocking-io", feature = "async-io"))]
buf: Vec<u8>,
fail_on_err_lines: bool,
delimiters: &'static [PacketLine<'static>],
is_done: bool,
stopped_at: Option<PacketLine<'static>>,
}
impl<T> StreamingPeekableIter<T> {
pub fn new(read: T, delimiters: &'static [PacketLine<'static>]) -> Self {
StreamingPeekableIter {
read,
#[cfg(any(feature = "blocking-io", feature = "async-io"))]
buf: vec![0; MAX_LINE_LEN],
peek_buf: Vec::new(),
delimiters,
fail_on_err_lines: false,
is_done: false,
stopped_at: None,
}
}
pub fn peek_buffer_replace_and_truncate(&mut self, position: usize, replace_with: u8) {
let position = position + U16_HEX_BYTES;
self.peek_buf[position] = replace_with;
let new_len = position + 1;
self.peek_buf.truncate(new_len);
self.peek_buf[..4].copy_from_slice(&crate::encode::u16_to_hex((new_len) as u16));
}
pub fn stopped_at(&self) -> Option<PacketLine<'static>> {
self.stopped_at
}
pub fn reset(&mut self) {
let delimiters = std::mem::take(&mut self.delimiters);
self.reset_with(delimiters);
}
pub fn reset_with(&mut self, delimiters: &'static [PacketLine<'static>]) {
self.delimiters = delimiters;
self.is_done = false;
self.stopped_at = None;
}
pub fn fail_on_err_lines(&mut self, value: bool) {
self.fail_on_err_lines = value;
}
pub fn replace(&mut self, read: T) -> T {
let prev = std::mem::replace(&mut self.read, read);
self.reset();
self.fail_on_err_lines = false;
prev
}
pub fn into_inner(self) -> T {
self.read
}
}
#[cfg(feature = "blocking-io")]
mod blocking_io;
#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
mod async_io;
mod sidebands;
#[cfg(any(feature = "blocking-io", feature = "async-io"))]
pub use sidebands::WithSidebands;