cyfs_bdt/ndn/chunk/cache/
encode.rs

1use std::{
2    collections::LinkedList, 
3    ops::Range, 
4};
5use cyfs_base::*;
6use crate::{
7    types::*
8};
9use super::super::super::{
10    channel::{protocol::v0::*}, 
11    types::*
12};
13
14#[derive(Clone, Eq, PartialEq)]
15pub enum ChunkDecoderState {
16    Decoding(u32), 
17    Ready, 
18}
19
20pub trait ChunkDecoder: Send + Sync {
21    fn clone_as_decoder(&self) -> Box<dyn ChunkDecoder>;
22    fn chunk(&self) -> &ChunkId;
23    fn desc(&self) -> &ChunkCodecDesc;
24    fn require_index(&self) -> Option<(Option<u32>, Option<Vec<Range<u32>>>)>;
25    fn push_piece_data(&self, piece: &PieceData) -> BuckyResult<PushIndexResult>;
26}
27
28pub trait ChunkEncoder: Send + Sync {
29    fn clone_as_encoder(&self) -> Box<dyn ChunkEncoder>;
30    fn chunk(&self) -> &ChunkId;
31    fn desc(&self) -> &ChunkCodecDesc;
32    fn next_piece(
33        &self, 
34        session_id: &TempSeq, 
35        buf: &mut [u8]
36    ) -> BuckyResult<usize>;
37    fn reset(&self) -> bool;
38    fn merge(
39        &self, 
40        max_index: u32, 
41        lost_index: Vec<Range<u32>>
42    ) -> bool;
43}
44
45
46#[derive(Clone, Copy, Debug)]
47pub struct PushIndexResult {
48    pub valid: bool, 
49    pub exists: bool, 
50    pub finished: bool
51}
52
53impl PushIndexResult {
54    pub fn pushed(&self) -> bool {
55        !self.finished && !self.exists
56    }
57}
58
59pub struct IncomeIndexQueue {
60    end: u32, 
61    queue: LinkedList<Range<u32>>
62}
63
64impl IncomeIndexQueue {
65    pub fn new(end: u32) -> Self {
66        Self {
67            end, 
68            queue: LinkedList::new()
69        }
70    }
71
72    pub fn require(&self, start: u32, end: u32, step: i32) -> Option<(Option<u32>, Option<Vec<Range<u32>>>)> {
73        if self.finished() {
74            return None;
75        }
76
77        let mut exists = LinkedList::new();
78        for exist in &self.queue {
79            if exist.end <= start {
80                continue;
81            }
82            if exist.start >= end {
83                break;
84            }
85            exists.push_back(u32::max(start, exist.start)..u32::min(end, exist.end));
86        }
87        
88        let mut require = LinkedList::new();
89        let mut remain = start..end;
90        for exist in exists {
91            let cur = remain.start..exist.start;
92            if cur.end > cur.start {
93                require.push_back(cur);
94            }
95            remain.start = exist.end
96        }
97        if remain.end > remain.start {
98            require.push_back(remain);
99        }
100
101        if require.len() > 0 {
102            if step > 0 {
103                Some((Some(self.queue.back().unwrap().end - 1), Some(require.into_iter().collect())))
104            } else {
105                Some((Some(self.queue.front().unwrap().start), Some(require.into_iter().collect())))
106            }
107        } else {
108            None
109        }
110    }
111
112    pub fn finished(&self) -> bool {
113        if self.queue.len() != 1 {
114            return false;
115        }
116        let index = self.queue.front().unwrap();
117        index.start == 0 && index.end == self.end
118    }
119
120    pub fn end(&self) -> u32 {
121        self.end
122    }
123
124    pub fn try_push(&self, index: Range<u32>) -> PushIndexResult {
125        if index.start >= self.end {
126            return PushIndexResult {
127                valid: false, 
128                exists: false,
129                finished: self.finished()
130            };
131        }
132
133        let mut exists = false;
134        
135        for range in self.queue.iter() {
136            if index.start >= range.start && index.end < range.end {
137                exists = true;
138                break;
139            } 
140        }
141        
142        PushIndexResult {
143            valid: true, 
144            exists,
145            finished: self.finished()
146        }
147    }
148
149    pub fn push(&mut self, index: Range<u32>) -> PushIndexResult {
150        if index.start >= self.end {
151            return PushIndexResult {
152                valid: false, 
153                exists: false,
154                finished: self.finished()
155            };
156        }
157        
158        enum ChangeQueue {
159            None, 
160            Insert(usize), 
161            CheckMerge(usize), 
162            PushBack
163        }
164
165        let mut exists = false;
166        if self.queue.len() > 0 {
167            let mut change = ChangeQueue::PushBack;
168            for (i, next) in self.queue.iter_mut().enumerate() {
169                if index.start >= next.start 
170                    && index.end <= next.end {
171                    // 最常见的情况,包含在其中
172                    change = ChangeQueue::None;
173                    exists = true;
174                    break;
175                } else if index.end < next.start {
176                    // 朝前附加
177                    change = ChangeQueue::Insert(i);
178                    break;
179                } else if index.end == next.start {
180                    // 和当前合并
181                    next.start = index.start;
182                    change = ChangeQueue::None;
183                    break;
184                } else if index.start <= next.end {
185                    // 扩展当前,检查后面的是否合并
186                    next.start = std::cmp::min(index.start, next.start);
187                    next.end = index.end;
188                    change = ChangeQueue::CheckMerge(i);
189                    break;
190                } else {
191                    continue;
192                }
193            }
194
195            
196            match change {
197                ChangeQueue::None => {
198                    // skip 不变
199                },  
200                ChangeQueue::Insert(i) => {
201                    let mut last_part = self.queue.split_off(i);
202                    last_part.push_front(index);
203                    self.queue.append(&mut last_part);
204                },
205                ChangeQueue::CheckMerge(i) => {
206                    let mut merged_len = 0;
207                    let mut iter = self.queue.iter().skip(i);
208                    let base = iter.next().unwrap().clone();
209                    for next in iter {
210                        if next.start > base.end {
211                            break;
212                        } 
213                        merged_len += 1;
214                    }
215                    if merged_len > 0 {
216                        let mut last_part = self.queue.split_off(i + 1);
217                        let mut append_back = last_part.split_off(merged_len);
218                        let base_ref = self.queue.back_mut().unwrap();
219                        let merge_end = last_part.back().unwrap().end;
220                        if base_ref.end < merge_end {
221                            base_ref.end = merge_end;
222                        }
223                        self.queue.append(&mut append_back);
224                    }
225                }, 
226                ChangeQueue::PushBack => {
227                    self.queue.push_back(index);
228                }
229            };
230        } else {
231            self.queue.push_back(index);
232        }
233
234        PushIndexResult {
235            valid: true, 
236            exists, 
237            finished: self.finished()
238        }
239        
240    }
241
242    pub fn exists(&self, index: u32) -> BuckyResult<bool> {
243        if index >= self.end {
244            return Err(BuckyError::new(BuckyErrorCode::OutOfLimit, ""));
245        }
246        for exists in self.queue.iter() {
247            if index >= exists.start && index < exists.end {
248                return Ok(true);
249            } 
250        }
251        Ok(false)
252    }
253}
254
255
256
257#[test]
258fn test_income_index_queue() {
259    {
260        let mut indices = IncomeIndexQueue {
261            end: 10u32, 
262            queue: LinkedList::new()
263        };
264
265        let result = indices.push(0..1);
266        assert!(!result.exists && result.valid && !result.finished);
267        assert!(indices.require(0, 10, 1).is_some());
268
269        let result = indices.push(1..2);
270        assert!(!result.exists && result.valid && !result.finished);
271        let result = indices.push(2..3);
272        assert!(!result.exists && result.valid && !result.finished);
273        let result = indices.push(3..4);
274        assert!(!result.exists && result.valid && !result.finished);
275        let result = indices.push(4..5);
276        assert!(!result.exists && result.valid && !result.finished);
277        let result = indices.push(5..6);
278        assert!(!result.exists && result.valid && !result.finished);
279        let result = indices.push(6..7);
280        assert!(!result.exists && result.valid && !result.finished);
281        let result = indices.push(7..8);
282        assert!(!result.exists && result.valid && !result.finished);
283        let result = indices.push(8..9);
284        assert!(!result.exists && result.valid && !result.finished);
285        assert!(indices.require(0, 10, 1).is_some());
286
287        let result = indices.push(9..10);
288        assert!(!result.exists && result.valid && result.finished);
289        assert!(indices.require(0, 10, 1).is_none());
290    }
291
292
293    {
294        let mut indices = IncomeIndexQueue {
295            end: 10u32, 
296            queue: LinkedList::new()
297        };
298
299        let index = 9;
300        let result = indices.push(index..index + 1);
301        assert!(!result.exists && result.valid && !result.finished);
302        assert!(indices.require(0, 10, 1).is_some());
303
304        let index = 8;
305        let result = indices.push(index..index + 1);
306        assert!(!result.exists && result.valid && !result.finished);
307        let index = 7;
308        let result = indices.push(index..index + 1);
309        assert!(!result.exists && result.valid && !result.finished);
310        let index = 6;
311        let result = indices.push(index..index + 1);
312        assert!(!result.exists && result.valid && !result.finished);
313        let index = 5;
314        let result = indices.push(index..index + 1);
315        assert!(!result.exists && result.valid && !result.finished);
316        let index = 4;
317        let result = indices.push(index..index + 1);
318        assert!(!result.exists && result.valid && !result.finished);
319        let index = 3;
320        let result = indices.push(index..index + 1);
321        assert!(!result.exists && result.valid && !result.finished);
322        let index = 2;
323        let result = indices.push(index..index + 1);
324        assert!(!result.exists && result.valid && !result.finished);
325        let index = 1;
326        let result = indices.push(index..index + 1);
327        assert!(!result.exists && result.valid && !result.finished);
328        assert!(indices.require(0, 10, 1).is_some());
329
330        let index = 0;
331        let result = indices.push(index..index + 1);
332        assert!(!result.exists && result.valid && result.finished);
333        assert!(indices.require(0, 10, 1).is_none());
334    }
335
336
337    {
338        let mut indices = IncomeIndexQueue {
339            end: 10u32, 
340            queue: LinkedList::new()
341        };
342
343        let index = 9;
344        let result = indices.push(index..index + 1);
345        assert!(!result.exists && result.valid && !result.finished);
346        assert!(indices.require(0, 10, 1).is_some());
347
348        let index = 7;
349        let result = indices.push(index..index + 1);
350        assert!(!result.exists && result.valid && !result.finished);
351        let index = 8;
352        let result = indices.push(index..index + 1);
353        assert!(!result.exists && result.valid && !result.finished);
354        let index = 6;
355        let result = indices.push(index..index + 1);
356        assert!(!result.exists && result.valid && !result.finished);
357        let index = 5;
358        let result = indices.push(index..index + 1);
359        assert!(!result.exists && result.valid && !result.finished);
360        let index = 4;
361        let result = indices.push(index..index + 1);
362        assert!(!result.exists && result.valid && !result.finished);
363        let index = 3;
364        let result = indices.push(index..index + 1);
365        assert!(!result.exists && result.valid && !result.finished);
366        let index = 2;
367        let result = indices.push(index..index + 1);
368        assert!(!result.exists && result.valid && !result.finished);
369        let index = 1;
370        let result = indices.push(index..index + 1);
371        assert!(!result.exists && result.valid && !result.finished);
372        assert!(indices.require(0, 10, 1).is_some());
373
374        let index = 0;
375        let result = indices.push(index..index + 1);
376        assert!(!result.exists && result.valid && result.finished);
377        assert!(indices.require(0, 10, 1).is_none());
378    }
379
380
381
382    {
383        let mut indices = IncomeIndexQueue {
384            end: 10u32, 
385            queue: LinkedList::new()
386        };
387
388        let index = 9;
389        let result = indices.push(index..index + 1);
390        assert!(!result.exists && result.valid && !result.finished);
391        assert!(indices.require(0, 10, 1).is_some());
392
393        let index = 7;
394        let result = indices.push(index..index + 1);
395        assert!(!result.exists && result.valid && !result.finished);
396        let index = 8;
397        let result = indices.push(index..index + 1);
398        assert!(!result.exists && result.valid && !result.finished);
399        let index = 6;
400        let result = indices.push(index..index + 1);
401        assert!(!result.exists && result.valid && !result.finished);
402        let index = 5;
403        let result = indices.push(index..index + 1);
404        assert!(!result.exists && result.valid && !result.finished);
405        let index = 4;
406        let result = indices.push(index..index + 1);
407        assert!(!result.exists && result.valid && !result.finished);
408        let index = 3;
409        let result = indices.push(index..index + 1);
410        assert!(!result.exists && result.valid && !result.finished);
411        
412
413        let index = 0;
414        let result = indices.push(index..index + 1);
415        assert!(!result.exists && result.valid && !result.finished);
416        
417        let (_, lost_indices) = indices.require(0, 10, -1).unwrap();
418        let lost = &lost_indices.unwrap()[0];
419        assert!(lost.start == 1 && lost.end == 3);
420
421    }
422    
423}   
424
425
426
427
428#[derive(Debug)]
429pub struct OutcomeIndexQueue {
430    step: i32, 
431    start: u32, 
432    end: u32, 
433    queue: LinkedList<Range<u32>>
434}
435
436
437impl OutcomeIndexQueue {
438    pub fn new(start: u32, end: u32, step: i32) -> Self {
439        let mut queue = LinkedList::new();
440        queue.push_back(start..end);
441        Self {
442            step, 
443            start, 
444            end, 
445            queue 
446        }
447    }
448
449    pub fn reset(&mut self) -> bool {
450        if self.queue.len() == 1 {
451            let r = self.queue.front().unwrap();
452            if r.start == self.start && r.end == self.end {
453                return false;
454            }
455        } 
456        let mut queue = LinkedList::new();
457        queue.push_back(self.start..self.end);
458        self.queue = queue;
459        true
460    }
461
462    pub fn merge(&mut self, max_index: u32, lost_index: Vec<Range<u32>>) -> bool {
463        enum ChangeQueue {
464            None, 
465            Insert(usize), 
466            CheckMerge(usize), 
467            PushBack
468        }
469
470        let mut changed = false;
471        
472        let mut merge_one = |lost: Range<u32>, skip| {
473            if self.queue.len() > 0 {
474                let mut change = ChangeQueue::PushBack;
475                let mut skip = skip;
476                for (i, next) in self.queue.iter_mut().enumerate().skip(skip) {
477                    if lost.start >= next.start 
478                    && lost.end <= next.end {
479                        // 最常见的情况,包含在其中
480                        change = ChangeQueue::None;
481                        break;
482                    } else if lost.end < next.start {
483                        // 朝前附加
484                        changed = true;
485                        ChangeQueue::Insert(i);
486                        break;
487                    } else if lost.end == next.start {
488                        // 和当前合并
489                        changed = true;
490                        next.start = lost.start;
491                        change = ChangeQueue::None;
492                        break;
493                    } else if lost.start <= next.end {
494                        changed = true;
495                        // 扩展当前,检查后面的是否合并
496                        next.start = std::cmp::min(lost.start, next.start);
497                        next.end = lost.end;
498                        change = ChangeQueue::CheckMerge(i);
499                        break;
500                    } else {
501                        skip += 1;
502                        continue;
503                    }
504                }
505
506                match change {
507                    ChangeQueue::None => {
508                        // skip 不变
509                    },  
510                    ChangeQueue::Insert(i) => {
511                        let mut last_part = self.queue.split_off(i);
512                        last_part.push_front(lost);
513                        self.queue.append(&mut last_part);
514                        skip += 1;
515                    },
516                    ChangeQueue::CheckMerge(i) => {
517                        let mut merged_len = 0;
518                        let mut iter = self.queue.iter().skip(i);
519                        let base = iter.next().unwrap().clone();
520                        for next in iter {
521                            if next.start > base.end {
522                                break;
523                            } 
524                            merged_len += 1;
525                        }
526                        if merged_len > 0 {
527                            let mut last_part = self.queue.split_off(i + 1);
528                            let mut append_back = last_part.split_off(merged_len);
529                            let base_ref = self.queue.back_mut().unwrap();
530                            let merge_end = last_part.back().unwrap().end;
531                            if base_ref.end < merge_end {
532                                base_ref.end = merge_end;
533                            }
534                            self.queue.append(&mut append_back);
535                        }
536                    }, 
537                    ChangeQueue::PushBack => {
538                        self.queue.push_back(lost);
539                        skip += 1;
540                    }
541                }
542                skip
543            } else {
544                self.queue.push_back(lost);
545                1
546            }
547        };
548        
549
550        let mut skip = 0;
551        for lost in lost_index {
552            skip = merge_one(lost.clone(), skip);
553        }
554
555        if self.step > 0 {
556            if max_index < (self.end - 1) {
557                merge_one(max_index + 1..self.end, skip);
558            }
559        } else {
560            if max_index > self.start {
561                merge_one(self.start..max_index, skip);
562            }
563        }
564
565
566        changed
567        
568    } 
569
570    pub fn next(&self) -> Option<u32> {
571        if self.step > 0 {
572            self.queue.front().map(|r| r.start)
573        } else {
574            self.queue.back().map(|r| r.end - 1)
575        }
576    }
577
578    pub fn pop_next(&mut self) -> Option<u32> {
579        if self.queue.len() > 0 {
580            if self.step > 0 {
581                let range = self.queue.front_mut().unwrap();
582                let index = if (range.end - range.start) == 1 {
583                    self.queue.pop_front().unwrap().start
584                } else {
585                    let index = range.start;
586                    range.start += 1;
587                    index
588                };
589                Some(index)
590            } else {
591                let range = self.queue.back_mut().unwrap();
592                let index = if (range.end - range.start) == 1 {
593                    self.queue.pop_back().unwrap().end - 1
594                } else {
595                    let index = range.end - 1;
596                    range.end -= 1;
597                    index
598                };
599                Some(index)
600            }
601        } else {
602            None
603        }
604    }
605}
606
607
608#[test]
609fn test_outcome_index_queue() {
610    let mut queue = OutcomeIndexQueue::new(0, 9, 1);
611    assert_eq!(queue.pop_next(), Some(0));
612    assert_eq!(queue.pop_next(), Some(1));
613    assert_eq!(queue.pop_next(), Some(2));
614    assert_eq!(queue.pop_next(), Some(3));
615    assert_eq!(queue.pop_next(), Some(4));
616    assert_eq!(queue.pop_next(), Some(5));
617
618    queue.merge(5, vec![]);
619    assert_eq!(queue.pop_next(), Some(6));
620
621    queue.merge(4, vec![]);
622    assert_eq!(queue.pop_next(), Some(5));
623    assert_eq!(queue.pop_next(), Some(6));
624    assert_eq!(queue.pop_next(), Some(7));
625    assert_eq!(queue.pop_next(), Some(8));
626    assert_eq!(queue.pop_next(), None);
627
628
629
630    let mut queue = OutcomeIndexQueue::new(0, 9, -1);
631    assert_eq!(queue.pop_next(), Some(8));
632    assert_eq!(queue.pop_next(), Some(7));
633    assert_eq!(queue.pop_next(), Some(6));
634    assert_eq!(queue.pop_next(), Some(5));
635    assert_eq!(queue.pop_next(), Some(4));
636
637    queue.merge(5, vec![]);
638    assert_eq!(queue.pop_next(), Some(4));
639
640    queue.merge(5, vec![]);
641    assert_eq!(queue.pop_next(), Some(4));
642    assert_eq!(queue.pop_next(), Some(3));
643    assert_eq!(queue.pop_next(), Some(2));
644    assert_eq!(queue.pop_next(), Some(1));
645    assert_eq!(queue.pop_next(), Some(0));
646    assert_eq!(queue.pop_next(), None);
647}
648
649
650
651