kex/
streamer.rs

1use super::format::*;
2use std::io::*;
3
4const ROW_SEPARATOR: &[u8] = b"\n";
5const DUPLICATE_PLACEHOLDER: &[u8] = b"*";
6
7pub(super) struct Streamer<A: AddressFormatting, B: ByteFormatting, C: CharFormatting> {
8    addr_fmt: Option<A>,
9    byte_fmt: B,
10    char_fmt: Option<C>,
11
12    total_formatted: usize,
13    printable_offset: usize,
14
15    cache: Vec<u8>,
16    available: usize,
17
18    dedup_enabled: bool,
19    row_state: RowState,
20}
21
22impl<A: AddressFormatting, B: ByteFormatting, C: CharFormatting> Streamer<A, B, C> {
23    pub(super) fn new(
24        addr_fmt: Option<A>,
25        byte_fmt: B,
26        char_fmt: Option<C>,
27        printable_offset: usize,
28        dedup_enabled: bool,
29    ) -> Self {
30        let bpr = byte_fmt.groupping().bytes_per_row();
31        Self {
32            addr_fmt,
33            byte_fmt,
34            char_fmt,
35            total_formatted: 0,
36            printable_offset,
37            cache: vec![0u8; bpr],
38            available: 0,
39            dedup_enabled,
40            row_state: RowState::Changed,
41        }
42    }
43
44    pub(crate) fn push<O: std::io::Write>(&mut self, bytes: &[u8], out: &mut O) -> Result<()> {
45        if self.dedup_enabled {
46            self.push_deduplicated(bytes, out)
47        } else {
48            self.push_groupped(bytes, out)
49        }
50    }
51
52    fn push_groupped<O: std::io::Write>(&mut self, bytes: &[u8], out: &mut O) -> Result<()> {
53        use std::cmp::min;
54        let mut tmp = &bytes[..];
55
56        let gr = &self.byte_fmt.groupping();
57        let bpr = gr.bytes_per_row();
58        let group_size = gr.max_group_size();
59
60        while tmp.len() != 0 {
61            let byte_in_row = self.total_formatted % bpr;
62
63            if self.available == 0 {
64                self.start_row(out)?;
65            }
66
67            let to_cache = min(self.cache.len() - self.available, tmp.len());
68
69            let old_available = self.available;
70            if to_cache != 0 {
71                tmp.read_exact(&mut self.cache[old_available..old_available + to_cache])?;
72                self.available += to_cache;
73            }
74
75            assert!(self.available <= bpr, "Too much bytes written");
76
77            let group_cache = self.calculated_group_cache(old_available, self.available);
78            assert_eq!(group_cache.len() % group_size, 0, "Unaligned group cache");
79
80            // Start reading from cache
81            if group_cache.len() != 0 {
82                self.total_formatted += self.byte_fmt.format(group_cache, byte_in_row, out)?;
83            }
84
85            // Finish row
86            if self.available == bpr {
87                self.finish_row(out)?;
88            }
89        }
90
91        Ok(())
92    }
93
94    fn push_deduplicated<O: std::io::Write>(&mut self, bytes: &[u8], out: &mut O) -> Result<()> {
95        use std::cmp::min;
96        let mut tmp = &bytes[..];
97
98        let gr = &self.byte_fmt.groupping();
99        let bpr = gr.bytes_per_row();
100
101        while tmp.len() != 0 {
102            let ignore_dedup = self.total_formatted < bpr;
103
104            let to_check = min(self.cache.len() - self.available, tmp.len());
105
106            let cache_part = &self.cache[self.available..self.available + to_check];
107            let should_write = ignore_dedup || &tmp[..to_check] != cache_part;
108
109            if should_write {
110                self.row_state = RowState::Changed;
111            }
112
113            let mut cache_part = &mut self.cache[self.available..self.available + to_check];
114            if should_write {
115                tmp.read_exact(&mut cache_part)?;
116            } else {
117                tmp = &tmp[to_check..];
118            }
119
120            // Deduplication was previously interrupted and we continue write row
121            self.available += to_check;
122
123            if self.cache.len() - self.available == 0 {
124                match self.row_state {
125                    RowState::Changed => {
126                        self.start_row(out)?;
127                        self.total_formatted += self.byte_fmt.format(&self.cache, 0, out)?;
128                        self.row_state = RowState::Changed;
129                    },
130                    RowState::NeedsPlaceholder | RowState::Skipped => {
131                        // We suppose what duplicate bytes formatted too
132                        self.total_formatted += self.cache.len();
133                    },
134                }
135
136                self.finish_row(out)?;
137            }
138        }
139
140        Ok(())
141    }
142
143    pub(crate) fn write_tail<O: Write>(&mut self, out: &mut O) -> Result<()> {
144        if self.dedup_enabled {
145            self.start_row(out)?;
146        }
147
148        if self.available == 0 {
149            out.write_all(ROW_SEPARATOR)?;
150            return Ok(());
151        }
152
153        let bpr = self.byte_fmt.groupping().bytes_per_row();
154        let formatted_in_row = self.total_formatted % bpr;
155        assert!(
156            self.available >= formatted_in_row,
157            "Bytes written more than available"
158        );
159
160        let remaining = &self.cache[formatted_in_row..self.available];
161        self.total_formatted += self.byte_fmt.format(remaining, formatted_in_row, out)?;
162        
163        self.finish_row(out)?;
164        
165        self.write_current_offset(out)?;
166        
167        out.write_all(ROW_SEPARATOR)?;
168
169        Ok(())
170    }
171
172    #[inline(always)]
173    fn calculated_group_cache(&self, at_number: usize, available: usize) -> &[u8] {
174        assert!(
175            at_number <= available,
176            "Unbalanced groupping. Start number greater than end number"
177        );
178
179        let gr = &self.byte_fmt.groupping();
180        let group_size = gr.max_group_size();
181
182        let gr = self.byte_fmt.groupping();
183
184        let start = gr.group_of_byte(at_number) * group_size;
185        let end = gr.group_of_byte(available) * group_size;
186
187        &self.cache[start..end]
188    }
189
190    fn start_row<O: Write>(&self, out: &mut O) -> Result<()> {
191        self.write_current_offset(out)?;
192        out.write_all(&self.byte_fmt.separators().trailing)?;
193
194        Ok(())
195    }
196
197    fn write_current_offset<O: Write>(&self, out: &mut O) -> Result<()> {
198        if let Some(fmt) = &self.addr_fmt {
199            out.write_all(&fmt.separators().trailing)?;
200            fmt.format(self.total_formatted + self.printable_offset, out)?;
201            out.write_all(&fmt.separators().leading)?;
202        }
203
204        Ok(())
205    }
206
207    fn finish_row<O: Write>(&mut self, out: &mut O) -> Result<()> {
208        if self.row_state.is_changed() || !self.dedup_enabled {
209            self.byte_fmt.format_padding(self.available, out)?;
210    
211            out.write_all(&self.byte_fmt.separators().leading)?;
212    
213            self.write_text(out)?;
214    
215            out.write_all(ROW_SEPARATOR)?;
216
217            self.row_state = RowState::NeedsPlaceholder;
218        } else if let RowState::NeedsPlaceholder = self.row_state {
219            self.replace_row_with_placeholder(out)?;
220        }
221
222        self.available = 0;
223        Ok(())
224    }
225
226    fn replace_row_with_placeholder<O: Write>(&mut self, out: &mut O) -> Result<()> {
227        match self.row_state {
228            RowState::NeedsPlaceholder => {
229                self.row_state = RowState::Skipped;
230
231                out.write_all(DUPLICATE_PLACEHOLDER)?;
232                out.write_all(ROW_SEPARATOR)
233            }
234            _ => panic!("replace_row_with_placeholder(): Row does not need a placeholder"),
235        }
236    }
237
238    fn write_text<O: Write>(&self, out: &mut O) -> Result<()> {
239        if let Some(fmt) = &self.char_fmt {
240            out.write_all(&fmt.separators().trailing)?;
241
242            fmt.format(&self.cache[..self.available], out)?;
243
244            let tail_len = self.byte_fmt.groupping().bytes_per_row() - self.available;
245            fmt.format_padding(tail_len, out)?;
246
247            out.write_all(&fmt.separators().leading)?;
248        }
249
250        Ok(())
251    }
252}
253
254enum RowState {
255    Changed,
256    NeedsPlaceholder,
257    Skipped,
258}
259
260impl RowState {
261    fn is_changed(&self) -> bool {
262        match self {
263            RowState::Changed => true,
264            _ => false
265        }
266    }
267}