more_itertools/others/
replace.rs

1use std::collections::{LinkedList, VecDeque};
2
3use crate::error::{self, Error};
4
5struct ReplaceOutputItem<T> {
6    items: VecDeque<T>
7}
8
9struct Replace<T> {
10    output_list: LinkedList<ReplaceOutputItem<T>>,
11    cache: VecDeque<T>,
12    iter: Box<dyn Iterator<Item = Result<T,Error>>>,
13    iter_finished: bool,
14    query: Vec<T>,
15    sub: Vec<T>,
16    error: Option<Error>,
17    emitted_error: bool
18}
19
20impl<T> Replace<T> 
21where T: Clone
22{
23    fn flush_sub(&mut self) {
24        let mut item = ReplaceOutputItem {
25            items: VecDeque::new()
26        };
27
28        for i in self.sub.iter() {
29            item.items.push_back(i.clone());
30        }
31
32        self.cache.clear();
33
34        self.output_list.push_back(item);
35    }
36
37    fn flush_cache(&mut self) {
38        let mut item = ReplaceOutputItem {
39            items: VecDeque::new()
40        };
41
42        std::mem::swap(&mut item.items, &mut self.cache);
43
44        self.output_list.push_back(item);
45    }
46
47    fn push_and_pop(&mut self, v: T) -> Option<T> {
48        if self.cache.len() < self.query.len() {
49            self.cache.push_back(v);
50            return None;
51        } else {
52            let ret = self.cache.pop_front();
53            self.cache.push_back(v);
54            return ret;
55        }
56    }
57
58    fn push_back(&mut self, v: T) {
59        if self.output_list.len() == 0 {
60            self.output_list.push_back(ReplaceOutputItem{
61                items: VecDeque::new()
62            });
63            self.output_list.back_mut().unwrap().items.push_back(v);
64        }
65    }
66}
67
68fn is_equals<T>(a: &VecDeque<T>, b: &Vec<T>) -> bool 
69where T: PartialEq
70{
71    if a.len() != b.len() {
72        return false;
73    }
74
75    let mut i: usize = 0;
76    for a_v in a.iter() { 
77        if let Some(b_v) = b.get(i) {
78            if *a_v != *b_v {
79                return false;
80            }
81        } else {
82            return false;
83        }
84        i += 1;
85    }
86
87    return true;
88}
89
90impl<T> Iterator for Replace<T> 
91where T: Clone + PartialEq + 'static
92{
93    type Item = Result<T,Error>;
94
95    fn next(&mut self) -> Option<Self::Item> {
96        loop {
97            if self.output_list.len() > 0 {
98                let front = self.output_list.front_mut();
99                if front.as_ref().unwrap().items.len() == 0 {
100                    self.output_list.pop_front();
101                    continue;
102                } else {
103                    return Some(Ok(front.unwrap().items.pop_front().unwrap()));
104                }
105            }
106
107            if self.error.is_some() {
108                if !self.emitted_error {
109                    self.emitted_error = true;
110                    return Some(Err(self.error.as_ref().unwrap().clone()));
111                }
112            }
113            
114            if self.iter_finished {
115                return None;
116            }
117
118            if is_equals(&self.cache, &self.query) {
119                self.flush_sub();
120            } else {
121                if let Some(_next) = self.iter.next() {
122                    match _next {
123                        Ok(ok_next) => {
124                            if let Some(poped_value) = self.push_and_pop(ok_next) {
125                                self.push_back(poped_value);
126                            } else {
127                                // cache is not full
128                            }
129                        },
130                        Err(err_next) => { // upstream error
131                            self.flush_cache();
132                            self.error = Some(err_next);
133                            self.iter_finished = true;
134                        }
135                    }
136                    
137                } else {
138                    self.flush_cache();
139                    self.iter_finished = true;
140                }
141            }
142        }
143    }
144}
145
146
147
148pub fn replace<T>(mut iter: Box<dyn Iterator<Item = Result<T,Error>>>, query: Vec<T>, sub: Vec<T>) -> Box<dyn Iterator<Item = Result<T,Error>>> 
149where T: Clone + PartialEq + 'static
150{
151    let mut iter_finished = false;
152    let mut cache = VecDeque::new();
153    let cache_size = query.len();
154    let mut error: Option<Error> = None;
155    if cache_size == 0 {
156        error = Some(error::value_error("[replace:len of query should gt 0]".to_string()));
157    } else {
158        loop {
159            if let Some(v) = iter.next() {
160                match v {
161                    Ok(ok_v) => {
162                        cache.push_back(ok_v);
163                        if cache.len() == cache_size {
164                            break;
165                        }
166                    },
167                    Err(err_v) => { // upstream error
168                        iter_finished = true;
169                        error = Some(err_v)
170                    }
171                }
172            } else {
173                iter_finished = true;
174                break;
175            }
176        }
177    }
178    
179    Box::new(Replace{
180        output_list: LinkedList::new(),
181        cache,
182        iter,
183        iter_finished,
184        query,
185        sub,
186        error,
187        emitted_error: false
188    })
189
190    //preload items into cache
191
192}
193
194
195#[cfg(test)]
196mod tests {
197
198    use crate::utils::{generate_okok_iterator, generate_okokerr_iterator};
199
200    use super::*;
201
202    #[test]
203    fn test1() {
204        let mut r = replace(generate_okok_iterator(vec![1,1,1,1,1]), vec![1,1,1], vec![3,4]);
205        assert_eq!(Some(Ok(3)), r.next());
206        assert_eq!(Some(Ok(4)), r.next());
207        assert_eq!(Some(Ok(1)), r.next());
208        assert_eq!(Some(Ok(1)), r.next());
209        assert_eq!(None, r.next());
210        assert_eq!(None, r.next());
211
212        // println!("{:?}", r.next());
213        // println!("{:?}", r.next());
214        // println!("{:?}", r.next());
215        // println!("{:?}", r.next());
216        // println!("{:?}", r.next());
217        // println!("{:?}", r.next());
218
219
220        let mut r = replace(generate_okok_iterator(vec![0, 1, 2, 5, 0, 1, 2, 5]), vec![1,2,5], vec![3,4]);
221        assert_eq!(Some(Ok(0)), r.next());
222        assert_eq!(Some(Ok(3)), r.next());
223        assert_eq!(Some(Ok(4)), r.next());
224        assert_eq!(Some(Ok(0)), r.next());
225        assert_eq!(Some(Ok(3)), r.next());
226        assert_eq!(Some(Ok(4)), r.next());
227        assert_eq!(None, r.next());
228
229        let mut r = replace(generate_okok_iterator(vec![0, 1, 2, 5, 0, 1, 2, 5]), vec![0,1,2], vec![3,4]);
230        
231        assert_eq!(Some(Ok(3)), r.next());
232        assert_eq!(Some(Ok(4)), r.next());
233        assert_eq!(Some(Ok(5)), r.next());
234        assert_eq!(Some(Ok(3)), r.next());
235        assert_eq!(Some(Ok(4)), r.next());
236        assert_eq!(Some(Ok(5)), r.next());
237        assert_eq!(None, r.next());
238    }
239
240
241    #[test]
242    fn test1_error() {
243        let mut r = replace(generate_okokerr_iterator(vec![1,1,1,1,1], error::overflow_error("[test]".to_string())), vec![1,1,1], vec![3,4]);
244        // println!("{:?}", r.next());
245        // println!("{:?}", r.next());
246        // println!("{:?}", r.next());
247        // println!("{:?}", r.next());
248        // println!("{:?}", r.next());
249        // println!("{:?}", r.next());
250
251        assert_eq!(Some(Ok(3)), r.next());
252        assert_eq!(Some(Ok(4)), r.next());
253        assert_eq!(Some(Ok(1)), r.next());
254        assert_eq!(Some(Ok(1)), r.next());
255        assert_eq!(error::Kind::OverflowError, r.next().unwrap().err().unwrap().kind());
256        assert_eq!(None, r.next());
257    }
258}