json_key_remover/
lib.rs

1use std::io::{Read, Write};
2
3mod scanner;
4use scanner::{
5    Scanner,
6    ScannerState,
7    ChrIndex, Message,
8};
9
10#[derive(Debug)]
11pub struct KeyRemover {
12    scanner: Scanner,
13    buffer_size: usize,
14    first_buffer_index: usize,
15    buffer_queue: Vec<Vec<u8>>,
16    buffer_length_queue: Vec<usize>,
17    message_queue: Vec<Message>,
18}
19
20impl KeyRemover {
21    pub fn init(
22        buffer_size: usize,
23        keys_to_remove: Vec<String>,
24    ) -> Self {
25        let scanner = Scanner::new(keys_to_remove);
26
27        Self {
28            scanner,
29            buffer_size,
30            first_buffer_index: 0,
31            buffer_queue: Vec::new(),
32            buffer_length_queue: Vec::new(),
33            message_queue: Vec::new(),
34        }
35    }
36    pub fn process<R, W>(&mut self, mut reader: R, mut writer: W) where
37        R: Read, W: Write,
38    {
39        let mut mode = Mode::Remain;
40
41        // (1) Read first buffer
42        let mut first_buffer = vec![0; self.buffer_size];
43        let mut filled_byte_size = reader.read(&mut first_buffer).unwrap(); // TODO: Deal error
44        self.scanner.process_new_buffer(&first_buffer[..filled_byte_size]);
45        self.buffer_queue.push(first_buffer);
46        self.buffer_length_queue.push(filled_byte_size);
47
48        // (2) While file end
49        while filled_byte_size != 0 { // TODO: Handle with slow stream
50            // (1) Pull out messages
51            self.message_queue.append(&mut self.scanner.queue);
52
53            // (2) Get range of data to write
54            let optional_data_index_to_write = match self.scanner.state {
55                ScannerState::WaitingNextKey => {
56                    // Write all message and buffer
57                    let last_buffer_index_to_write = self.scanner.next_buffer_index - 1;
58                    let message_end_index = self.message_queue.len();
59
60                    Some((last_buffer_index_to_write, message_end_index))
61                },
62                ScannerState::CheckingValueRange => {
63                    if self.scanner.next_buffer_index == 1 {
64                        None
65                    } else {
66                        let last_buffer_index_to_write = self.scanner.next_buffer_index - 2;
67                        if last_buffer_index_to_write < self.first_buffer_index {
68                            None
69                        } else {
70                            let message_end_index = self.get_message_end_index(last_buffer_index_to_write);
71                            Some((last_buffer_index_to_write, message_end_index))
72                        }
73                    }
74                },
75                ScannerState::FindingNextComma => {
76                    let cached_skip_end_msg = &self.scanner.skip_end_msg_cache;
77                    let last_buffer_index_to_write = match self.get_optional_skip_end_index(cached_skip_end_msg) {
78                        Some(chr_index) => chr_index.0 - 1,
79                        None => panic!("Error 1"), // TODO: Unreachable error msg
80                    };
81                    if last_buffer_index_to_write < self.first_buffer_index {
82                        None
83                    } else {
84                        let message_end_index = self.get_message_end_index(last_buffer_index_to_write);
85                        Some((last_buffer_index_to_write, message_end_index))
86                    }
87                },
88                _ => {
89                    // ScannerState::MeetKeyCandOpener | ScannerState::ConfirmingKey | ScannerState::DefiningValueType
90                    // Defer
91                    let chr_index = self.scanner.key_cand_opener_index();
92                    if self.first_buffer_index < chr_index.0 {
93                        let last_buffer_index_to_write = chr_index.0 - 1;
94                        let message_end_index = self.get_message_end_index(last_buffer_index_to_write);
95                        Some((last_buffer_index_to_write, message_end_index))
96                    }else {
97                        None
98                    }
99                },
100            };
101
102            // (3) Write data
103            if let Some((last_buffer_index_to_write, message_end_index)) = optional_data_index_to_write {
104                // (1) Get messages
105                let mut messages_to_write: Vec<Message> = self.message_queue.drain(0..message_end_index).collect();
106
107                // (2) Adjust messages by mode
108                if let Some(Message::SkipEndPreviousTo(chr_index)) = messages_to_write.first() {
109                    if chr_index.1 == 0 && chr_index.0 == self.first_buffer_index {
110                        messages_to_write.remove(0);
111                        mode = Mode::Remain;
112                    }
113                }
114                if let Mode::Skip = mode {
115                    messages_to_write.insert(0, Message::SkipStartFrom((self.first_buffer_index, 0)));
116                }
117                if let Some(Message::SkipStartFrom(_)) = messages_to_write.last() {
118                    messages_to_write.push(Message::SkipEndTo((
119                        last_buffer_index_to_write,
120                        self.buffer_length_queue[last_buffer_index_to_write-self.first_buffer_index]-1
121                    )));
122                    mode = Mode::Skip;
123                } else {
124                    mode = Mode::Remain;
125                }
126
127                // (3) Transform buffers to write
128                messages_to_write.chunks(2).rev().for_each(|messages| {
129                    let skip_start_chr_index = match messages[0] {
130                        Message::SkipStartFrom(chr_index) => {
131                            (chr_index.0 - self.first_buffer_index, chr_index.1)
132                        },
133                        _ => panic!("Error 2"),
134                    };
135                    let skip_end_chr_index = match messages[1] {
136                        Message::SkipEndTo(chr_index) => {
137                            (chr_index.0 - self.first_buffer_index, chr_index.1)
138                        },
139                        Message::SkipEndPreviousTo(chr_index) => {
140                            let chr_index = self.previous_chr_index(&chr_index);
141                            (chr_index.0 - self.first_buffer_index, chr_index.1)
142                        },
143                        _ => panic!("Error 3"),
144                    };
145
146                    if skip_start_chr_index.0 < skip_end_chr_index.0 {
147                        let buffer_index = skip_end_chr_index.0;
148                        let buffer = &mut self.buffer_queue[buffer_index];
149                        let buffer_length = &mut self.buffer_length_queue[buffer_index];
150                        buffer.drain(..=skip_end_chr_index.1);
151                        *buffer_length -= skip_end_chr_index.1 + 1;
152                        // middle buffer
153                        for buffer_index in skip_start_chr_index.0+1..skip_end_chr_index.0 {
154                            let buffer = &mut self.buffer_queue[buffer_index];
155                            let buffer_length = &mut self.buffer_length_queue[buffer_index];
156                            buffer.clear();
157                            *buffer_length = 0;
158                        }
159                        // first buffer
160                        let buffer_index = skip_start_chr_index.0;
161                        let buffer = &mut self.buffer_queue[buffer_index];
162                        let buffer_length = &mut self.buffer_length_queue[buffer_index];
163                        buffer.drain(skip_start_chr_index.1..);
164                        *buffer_length = skip_start_chr_index.1;
165                    } else { // if skip_start_chr == skip_end_chr
166                        let buffer_index = skip_start_chr_index.0;
167                        let buffer = &mut self.buffer_queue[buffer_index];
168                        let buffer_length = &mut self.buffer_length_queue[buffer_index];
169                        buffer.drain(skip_start_chr_index.1..=skip_end_chr_index.1);
170                        *buffer_length -= skip_end_chr_index.1 - skip_start_chr_index.1 + 1;
171                    }
172                });
173
174                // (3) Write buffers
175                let count_of_buffer_to_write = last_buffer_index_to_write - self.first_buffer_index + 1;
176                for buffer_index in 0..count_of_buffer_to_write {
177                    let buffer = &self.buffer_queue[buffer_index];
178                    let length = self.buffer_length_queue[buffer_index];
179                    writer.write_all(&buffer[..length]).unwrap();
180                };
181                self.buffer_queue.drain(..count_of_buffer_to_write);
182                self.buffer_length_queue.drain(..count_of_buffer_to_write);
183                self.first_buffer_index += count_of_buffer_to_write;
184            }
185
186            // (4) Load next buffer
187            let mut next_buffer = vec![0; self.buffer_size];
188            filled_byte_size = reader.read(&mut next_buffer).unwrap(); // TODO: Deal error
189            self.buffer_length_queue.push(filled_byte_size);
190            self.scanner.process_new_buffer(&next_buffer[..filled_byte_size]);
191            self.buffer_queue.push(next_buffer);
192        }
193
194        writer.flush().unwrap();
195    }
196    fn get_message_end_index(&self, last_buffer_index_to_write: usize) -> usize {
197        let mut message_end_index = 0;
198        for message in self.message_queue.iter() {
199            let chr_index = match &message {
200                Message::SkipStartFrom(v) => v,
201                Message::SkipEndTo(v) => v,
202                Message::SkipEndPreviousTo(v) => v,
203            };
204            if chr_index.0 > last_buffer_index_to_write {
205                break
206            }
207            message_end_index += 1;
208        }
209        message_end_index
210    }
211    fn get_optional_skip_end_index(&self, message: &Message) -> Option<ChrIndex> {
212        match message {
213            Message::SkipStartFrom(_) => {
214                None
215            },
216            Message::SkipEndTo(chr_index) => {
217                Some(chr_index.clone())
218            },
219            Message::SkipEndPreviousTo(chr_index) => {
220                Some(self.previous_chr_index(chr_index))
221            },
222        }
223    }
224    fn previous_chr_index(&self, chr_index: &ChrIndex) -> ChrIndex {
225        if chr_index.1 == 0 {
226            let previous_buffer_index = chr_index.0 - 1;
227            let buffer_queue_index = previous_buffer_index - self.first_buffer_index;
228            (chr_index.0-1, self.buffer_length_queue[buffer_queue_index] - 1)
229        } else {
230            (chr_index.0, chr_index.1-1)
231        }
232    }
233}
234
235#[derive(Debug)]
236enum Mode {
237    Remain,
238    Skip,
239}
240
241#[cfg(test)]
242#[allow(dead_code)]
243mod tests {
244    use std::io::Cursor;
245
246    use super::*;
247
248    fn get_sample_json_string() -> String {
249        let str = "{
250            \"key_0\": {
251              \"key_0_0\": \"val_0_0\",
252              \"key_0_1\": \"val_0_1\",
253              \"key_0_2\": {
254                \"key_0_2_0\": [
255                  {
256                    \"key_0_2_0_0\": \"val_0_2_0_0\",
257                    \"key_0_2_0_1\": \"val_0_2_0_1\"
258                  },
259                  {
260                    \"key_0_2_1_0\": \"val_0_2_1_0\",
261                    \"key_0_2_1_1\": \"val_0_2_1_1\"
262                  },
263                  {
264                    \"key_0_2_2_0\": \"val_0_2_2_0\",
265                    \"key_0_2_2_1\": \"val_0_2_2_1\"
266                  }
267                ]
268              }
269            }
270          }";
271        
272        str.to_string()
273    }
274
275    fn print_sample_json() {
276        let sample_json_string = get_sample_json_string();
277        for (idx, string) in sample_json_string.as_bytes().iter().enumerate() {
278            println!("# {}: {}", idx, String::from_utf8([*string].to_vec()).unwrap());
279        }
280    }
281
282    #[test]
283    fn test_1() {
284        let sample_json_string = get_sample_json_string();
285        let cloned_input_string = sample_json_string.clone();
286        let mut key_remover = KeyRemover::init(128, vec!["key_0_2_0".to_string()]);
287
288        let input = Cursor::new(sample_json_string);
289        let mut output = Vec::new();
290        key_remover.process(input, &mut output);
291
292        println!("# Input");
293        println!("{:?}", cloned_input_string);
294        println!("# Output");
295        println!("{:?}", String::from_utf8(output).unwrap());
296    }
297}