1use std::io::{Read, Write};
2
3mod scanner;
4use scanner::{
5 Scanner,
6 ScannerState,
7 ChrIndex, Message,
8};
9
10#[derive(Debug)]
11pub struct KeyRemover {
12 scanner: Scanner,
13 buffer_size: usize,
14 first_buffer_index: usize,
15 buffer_queue: Vec<Vec<u8>>,
16 buffer_length_queue: Vec<usize>,
17 message_queue: Vec<Message>,
18}
19
20impl KeyRemover {
21 pub fn init(
22 buffer_size: usize,
23 keys_to_remove: Vec<String>,
24 ) -> Self {
25 let scanner = Scanner::new(keys_to_remove);
26
27 Self {
28 scanner,
29 buffer_size,
30 first_buffer_index: 0,
31 buffer_queue: Vec::new(),
32 buffer_length_queue: Vec::new(),
33 message_queue: Vec::new(),
34 }
35 }
36 pub fn process<R, W>(&mut self, mut reader: R, mut writer: W) where
37 R: Read, W: Write,
38 {
39 let mut mode = Mode::Remain;
40
41 let mut first_buffer = vec![0; self.buffer_size];
43 let mut filled_byte_size = reader.read(&mut first_buffer).unwrap(); self.scanner.process_new_buffer(&first_buffer[..filled_byte_size]);
45 self.buffer_queue.push(first_buffer);
46 self.buffer_length_queue.push(filled_byte_size);
47
48 while filled_byte_size != 0 { self.message_queue.append(&mut self.scanner.queue);
52
53 let optional_data_index_to_write = match self.scanner.state {
55 ScannerState::WaitingNextKey => {
56 let last_buffer_index_to_write = self.scanner.next_buffer_index - 1;
58 let message_end_index = self.message_queue.len();
59
60 Some((last_buffer_index_to_write, message_end_index))
61 },
62 ScannerState::CheckingValueRange => {
63 if self.scanner.next_buffer_index == 1 {
64 None
65 } else {
66 let last_buffer_index_to_write = self.scanner.next_buffer_index - 2;
67 if last_buffer_index_to_write < self.first_buffer_index {
68 None
69 } else {
70 let message_end_index = self.get_message_end_index(last_buffer_index_to_write);
71 Some((last_buffer_index_to_write, message_end_index))
72 }
73 }
74 },
75 ScannerState::FindingNextComma => {
76 let cached_skip_end_msg = &self.scanner.skip_end_msg_cache;
77 let last_buffer_index_to_write = match self.get_optional_skip_end_index(cached_skip_end_msg) {
78 Some(chr_index) => chr_index.0 - 1,
79 None => panic!("Error 1"), };
81 if last_buffer_index_to_write < self.first_buffer_index {
82 None
83 } else {
84 let message_end_index = self.get_message_end_index(last_buffer_index_to_write);
85 Some((last_buffer_index_to_write, message_end_index))
86 }
87 },
88 _ => {
89 let chr_index = self.scanner.key_cand_opener_index();
92 if self.first_buffer_index < chr_index.0 {
93 let last_buffer_index_to_write = chr_index.0 - 1;
94 let message_end_index = self.get_message_end_index(last_buffer_index_to_write);
95 Some((last_buffer_index_to_write, message_end_index))
96 }else {
97 None
98 }
99 },
100 };
101
102 if let Some((last_buffer_index_to_write, message_end_index)) = optional_data_index_to_write {
104 let mut messages_to_write: Vec<Message> = self.message_queue.drain(0..message_end_index).collect();
106
107 if let Some(Message::SkipEndPreviousTo(chr_index)) = messages_to_write.first() {
109 if chr_index.1 == 0 && chr_index.0 == self.first_buffer_index {
110 messages_to_write.remove(0);
111 mode = Mode::Remain;
112 }
113 }
114 if let Mode::Skip = mode {
115 messages_to_write.insert(0, Message::SkipStartFrom((self.first_buffer_index, 0)));
116 }
117 if let Some(Message::SkipStartFrom(_)) = messages_to_write.last() {
118 messages_to_write.push(Message::SkipEndTo((
119 last_buffer_index_to_write,
120 self.buffer_length_queue[last_buffer_index_to_write-self.first_buffer_index]-1
121 )));
122 mode = Mode::Skip;
123 } else {
124 mode = Mode::Remain;
125 }
126
127 messages_to_write.chunks(2).rev().for_each(|messages| {
129 let skip_start_chr_index = match messages[0] {
130 Message::SkipStartFrom(chr_index) => {
131 (chr_index.0 - self.first_buffer_index, chr_index.1)
132 },
133 _ => panic!("Error 2"),
134 };
135 let skip_end_chr_index = match messages[1] {
136 Message::SkipEndTo(chr_index) => {
137 (chr_index.0 - self.first_buffer_index, chr_index.1)
138 },
139 Message::SkipEndPreviousTo(chr_index) => {
140 let chr_index = self.previous_chr_index(&chr_index);
141 (chr_index.0 - self.first_buffer_index, chr_index.1)
142 },
143 _ => panic!("Error 3"),
144 };
145
146 if skip_start_chr_index.0 < skip_end_chr_index.0 {
147 let buffer_index = skip_end_chr_index.0;
148 let buffer = &mut self.buffer_queue[buffer_index];
149 let buffer_length = &mut self.buffer_length_queue[buffer_index];
150 buffer.drain(..=skip_end_chr_index.1);
151 *buffer_length -= skip_end_chr_index.1 + 1;
152 for buffer_index in skip_start_chr_index.0+1..skip_end_chr_index.0 {
154 let buffer = &mut self.buffer_queue[buffer_index];
155 let buffer_length = &mut self.buffer_length_queue[buffer_index];
156 buffer.clear();
157 *buffer_length = 0;
158 }
159 let buffer_index = skip_start_chr_index.0;
161 let buffer = &mut self.buffer_queue[buffer_index];
162 let buffer_length = &mut self.buffer_length_queue[buffer_index];
163 buffer.drain(skip_start_chr_index.1..);
164 *buffer_length = skip_start_chr_index.1;
165 } else { let buffer_index = skip_start_chr_index.0;
167 let buffer = &mut self.buffer_queue[buffer_index];
168 let buffer_length = &mut self.buffer_length_queue[buffer_index];
169 buffer.drain(skip_start_chr_index.1..=skip_end_chr_index.1);
170 *buffer_length -= skip_end_chr_index.1 - skip_start_chr_index.1 + 1;
171 }
172 });
173
174 let count_of_buffer_to_write = last_buffer_index_to_write - self.first_buffer_index + 1;
176 for buffer_index in 0..count_of_buffer_to_write {
177 let buffer = &self.buffer_queue[buffer_index];
178 let length = self.buffer_length_queue[buffer_index];
179 writer.write_all(&buffer[..length]).unwrap();
180 };
181 self.buffer_queue.drain(..count_of_buffer_to_write);
182 self.buffer_length_queue.drain(..count_of_buffer_to_write);
183 self.first_buffer_index += count_of_buffer_to_write;
184 }
185
186 let mut next_buffer = vec![0; self.buffer_size];
188 filled_byte_size = reader.read(&mut next_buffer).unwrap(); self.buffer_length_queue.push(filled_byte_size);
190 self.scanner.process_new_buffer(&next_buffer[..filled_byte_size]);
191 self.buffer_queue.push(next_buffer);
192 }
193
194 writer.flush().unwrap();
195 }
196 fn get_message_end_index(&self, last_buffer_index_to_write: usize) -> usize {
197 let mut message_end_index = 0;
198 for message in self.message_queue.iter() {
199 let chr_index = match &message {
200 Message::SkipStartFrom(v) => v,
201 Message::SkipEndTo(v) => v,
202 Message::SkipEndPreviousTo(v) => v,
203 };
204 if chr_index.0 > last_buffer_index_to_write {
205 break
206 }
207 message_end_index += 1;
208 }
209 message_end_index
210 }
211 fn get_optional_skip_end_index(&self, message: &Message) -> Option<ChrIndex> {
212 match message {
213 Message::SkipStartFrom(_) => {
214 None
215 },
216 Message::SkipEndTo(chr_index) => {
217 Some(chr_index.clone())
218 },
219 Message::SkipEndPreviousTo(chr_index) => {
220 Some(self.previous_chr_index(chr_index))
221 },
222 }
223 }
224 fn previous_chr_index(&self, chr_index: &ChrIndex) -> ChrIndex {
225 if chr_index.1 == 0 {
226 let previous_buffer_index = chr_index.0 - 1;
227 let buffer_queue_index = previous_buffer_index - self.first_buffer_index;
228 (chr_index.0-1, self.buffer_length_queue[buffer_queue_index] - 1)
229 } else {
230 (chr_index.0, chr_index.1-1)
231 }
232 }
233}
234
235#[derive(Debug)]
236enum Mode {
237 Remain,
238 Skip,
239}
240
241#[cfg(test)]
242#[allow(dead_code)]
243mod tests {
244 use std::io::Cursor;
245
246 use super::*;
247
248 fn get_sample_json_string() -> String {
249 let str = "{
250 \"key_0\": {
251 \"key_0_0\": \"val_0_0\",
252 \"key_0_1\": \"val_0_1\",
253 \"key_0_2\": {
254 \"key_0_2_0\": [
255 {
256 \"key_0_2_0_0\": \"val_0_2_0_0\",
257 \"key_0_2_0_1\": \"val_0_2_0_1\"
258 },
259 {
260 \"key_0_2_1_0\": \"val_0_2_1_0\",
261 \"key_0_2_1_1\": \"val_0_2_1_1\"
262 },
263 {
264 \"key_0_2_2_0\": \"val_0_2_2_0\",
265 \"key_0_2_2_1\": \"val_0_2_2_1\"
266 }
267 ]
268 }
269 }
270 }";
271
272 str.to_string()
273 }
274
275 fn print_sample_json() {
276 let sample_json_string = get_sample_json_string();
277 for (idx, string) in sample_json_string.as_bytes().iter().enumerate() {
278 println!("# {}: {}", idx, String::from_utf8([*string].to_vec()).unwrap());
279 }
280 }
281
282 #[test]
283 fn test_1() {
284 let sample_json_string = get_sample_json_string();
285 let cloned_input_string = sample_json_string.clone();
286 let mut key_remover = KeyRemover::init(128, vec!["key_0_2_0".to_string()]);
287
288 let input = Cursor::new(sample_json_string);
289 let mut output = Vec::new();
290 key_remover.process(input, &mut output);
291
292 println!("# Input");
293 println!("{:?}", cloned_input_string);
294 println!("# Output");
295 println!("{:?}", String::from_utf8(output).unwrap());
296 }
297}