cyfs_bdt/ndn/chunk/cache/
encode.rs1use std::{
2 collections::LinkedList,
3 ops::Range,
4};
5use cyfs_base::*;
6use crate::{
7 types::*
8};
9use super::super::super::{
10 channel::{protocol::v0::*},
11 types::*
12};
13
14#[derive(Clone, Eq, PartialEq)]
15pub enum ChunkDecoderState {
16 Decoding(u32),
17 Ready,
18}
19
20pub trait ChunkDecoder: Send + Sync {
21 fn clone_as_decoder(&self) -> Box<dyn ChunkDecoder>;
22 fn chunk(&self) -> &ChunkId;
23 fn desc(&self) -> &ChunkCodecDesc;
24 fn require_index(&self) -> Option<(Option<u32>, Option<Vec<Range<u32>>>)>;
25 fn push_piece_data(&self, piece: &PieceData) -> BuckyResult<PushIndexResult>;
26}
27
28pub trait ChunkEncoder: Send + Sync {
29 fn clone_as_encoder(&self) -> Box<dyn ChunkEncoder>;
30 fn chunk(&self) -> &ChunkId;
31 fn desc(&self) -> &ChunkCodecDesc;
32 fn next_piece(
33 &self,
34 session_id: &TempSeq,
35 buf: &mut [u8]
36 ) -> BuckyResult<usize>;
37 fn reset(&self) -> bool;
38 fn merge(
39 &self,
40 max_index: u32,
41 lost_index: Vec<Range<u32>>
42 ) -> bool;
43}
44
45
46#[derive(Clone, Copy, Debug)]
47pub struct PushIndexResult {
48 pub valid: bool,
49 pub exists: bool,
50 pub finished: bool
51}
52
53impl PushIndexResult {
54 pub fn pushed(&self) -> bool {
55 !self.finished && !self.exists
56 }
57}
58
59pub struct IncomeIndexQueue {
60 end: u32,
61 queue: LinkedList<Range<u32>>
62}
63
64impl IncomeIndexQueue {
65 pub fn new(end: u32) -> Self {
66 Self {
67 end,
68 queue: LinkedList::new()
69 }
70 }
71
72 pub fn require(&self, start: u32, end: u32, step: i32) -> Option<(Option<u32>, Option<Vec<Range<u32>>>)> {
73 if self.finished() {
74 return None;
75 }
76
77 let mut exists = LinkedList::new();
78 for exist in &self.queue {
79 if exist.end <= start {
80 continue;
81 }
82 if exist.start >= end {
83 break;
84 }
85 exists.push_back(u32::max(start, exist.start)..u32::min(end, exist.end));
86 }
87
88 let mut require = LinkedList::new();
89 let mut remain = start..end;
90 for exist in exists {
91 let cur = remain.start..exist.start;
92 if cur.end > cur.start {
93 require.push_back(cur);
94 }
95 remain.start = exist.end
96 }
97 if remain.end > remain.start {
98 require.push_back(remain);
99 }
100
101 if require.len() > 0 {
102 if step > 0 {
103 Some((Some(self.queue.back().unwrap().end - 1), Some(require.into_iter().collect())))
104 } else {
105 Some((Some(self.queue.front().unwrap().start), Some(require.into_iter().collect())))
106 }
107 } else {
108 None
109 }
110 }
111
112 pub fn finished(&self) -> bool {
113 if self.queue.len() != 1 {
114 return false;
115 }
116 let index = self.queue.front().unwrap();
117 index.start == 0 && index.end == self.end
118 }
119
120 pub fn end(&self) -> u32 {
121 self.end
122 }
123
124 pub fn try_push(&self, index: Range<u32>) -> PushIndexResult {
125 if index.start >= self.end {
126 return PushIndexResult {
127 valid: false,
128 exists: false,
129 finished: self.finished()
130 };
131 }
132
133 let mut exists = false;
134
135 for range in self.queue.iter() {
136 if index.start >= range.start && index.end < range.end {
137 exists = true;
138 break;
139 }
140 }
141
142 PushIndexResult {
143 valid: true,
144 exists,
145 finished: self.finished()
146 }
147 }
148
149 pub fn push(&mut self, index: Range<u32>) -> PushIndexResult {
150 if index.start >= self.end {
151 return PushIndexResult {
152 valid: false,
153 exists: false,
154 finished: self.finished()
155 };
156 }
157
158 enum ChangeQueue {
159 None,
160 Insert(usize),
161 CheckMerge(usize),
162 PushBack
163 }
164
165 let mut exists = false;
166 if self.queue.len() > 0 {
167 let mut change = ChangeQueue::PushBack;
168 for (i, next) in self.queue.iter_mut().enumerate() {
169 if index.start >= next.start
170 && index.end <= next.end {
171 change = ChangeQueue::None;
173 exists = true;
174 break;
175 } else if index.end < next.start {
176 change = ChangeQueue::Insert(i);
178 break;
179 } else if index.end == next.start {
180 next.start = index.start;
182 change = ChangeQueue::None;
183 break;
184 } else if index.start <= next.end {
185 next.start = std::cmp::min(index.start, next.start);
187 next.end = index.end;
188 change = ChangeQueue::CheckMerge(i);
189 break;
190 } else {
191 continue;
192 }
193 }
194
195
196 match change {
197 ChangeQueue::None => {
198 },
200 ChangeQueue::Insert(i) => {
201 let mut last_part = self.queue.split_off(i);
202 last_part.push_front(index);
203 self.queue.append(&mut last_part);
204 },
205 ChangeQueue::CheckMerge(i) => {
206 let mut merged_len = 0;
207 let mut iter = self.queue.iter().skip(i);
208 let base = iter.next().unwrap().clone();
209 for next in iter {
210 if next.start > base.end {
211 break;
212 }
213 merged_len += 1;
214 }
215 if merged_len > 0 {
216 let mut last_part = self.queue.split_off(i + 1);
217 let mut append_back = last_part.split_off(merged_len);
218 let base_ref = self.queue.back_mut().unwrap();
219 let merge_end = last_part.back().unwrap().end;
220 if base_ref.end < merge_end {
221 base_ref.end = merge_end;
222 }
223 self.queue.append(&mut append_back);
224 }
225 },
226 ChangeQueue::PushBack => {
227 self.queue.push_back(index);
228 }
229 };
230 } else {
231 self.queue.push_back(index);
232 }
233
234 PushIndexResult {
235 valid: true,
236 exists,
237 finished: self.finished()
238 }
239
240 }
241
242 pub fn exists(&self, index: u32) -> BuckyResult<bool> {
243 if index >= self.end {
244 return Err(BuckyError::new(BuckyErrorCode::OutOfLimit, ""));
245 }
246 for exists in self.queue.iter() {
247 if index >= exists.start && index < exists.end {
248 return Ok(true);
249 }
250 }
251 Ok(false)
252 }
253}
254
255
256
257#[test]
258fn test_income_index_queue() {
259 {
260 let mut indices = IncomeIndexQueue {
261 end: 10u32,
262 queue: LinkedList::new()
263 };
264
265 let result = indices.push(0..1);
266 assert!(!result.exists && result.valid && !result.finished);
267 assert!(indices.require(0, 10, 1).is_some());
268
269 let result = indices.push(1..2);
270 assert!(!result.exists && result.valid && !result.finished);
271 let result = indices.push(2..3);
272 assert!(!result.exists && result.valid && !result.finished);
273 let result = indices.push(3..4);
274 assert!(!result.exists && result.valid && !result.finished);
275 let result = indices.push(4..5);
276 assert!(!result.exists && result.valid && !result.finished);
277 let result = indices.push(5..6);
278 assert!(!result.exists && result.valid && !result.finished);
279 let result = indices.push(6..7);
280 assert!(!result.exists && result.valid && !result.finished);
281 let result = indices.push(7..8);
282 assert!(!result.exists && result.valid && !result.finished);
283 let result = indices.push(8..9);
284 assert!(!result.exists && result.valid && !result.finished);
285 assert!(indices.require(0, 10, 1).is_some());
286
287 let result = indices.push(9..10);
288 assert!(!result.exists && result.valid && result.finished);
289 assert!(indices.require(0, 10, 1).is_none());
290 }
291
292
293 {
294 let mut indices = IncomeIndexQueue {
295 end: 10u32,
296 queue: LinkedList::new()
297 };
298
299 let index = 9;
300 let result = indices.push(index..index + 1);
301 assert!(!result.exists && result.valid && !result.finished);
302 assert!(indices.require(0, 10, 1).is_some());
303
304 let index = 8;
305 let result = indices.push(index..index + 1);
306 assert!(!result.exists && result.valid && !result.finished);
307 let index = 7;
308 let result = indices.push(index..index + 1);
309 assert!(!result.exists && result.valid && !result.finished);
310 let index = 6;
311 let result = indices.push(index..index + 1);
312 assert!(!result.exists && result.valid && !result.finished);
313 let index = 5;
314 let result = indices.push(index..index + 1);
315 assert!(!result.exists && result.valid && !result.finished);
316 let index = 4;
317 let result = indices.push(index..index + 1);
318 assert!(!result.exists && result.valid && !result.finished);
319 let index = 3;
320 let result = indices.push(index..index + 1);
321 assert!(!result.exists && result.valid && !result.finished);
322 let index = 2;
323 let result = indices.push(index..index + 1);
324 assert!(!result.exists && result.valid && !result.finished);
325 let index = 1;
326 let result = indices.push(index..index + 1);
327 assert!(!result.exists && result.valid && !result.finished);
328 assert!(indices.require(0, 10, 1).is_some());
329
330 let index = 0;
331 let result = indices.push(index..index + 1);
332 assert!(!result.exists && result.valid && result.finished);
333 assert!(indices.require(0, 10, 1).is_none());
334 }
335
336
337 {
338 let mut indices = IncomeIndexQueue {
339 end: 10u32,
340 queue: LinkedList::new()
341 };
342
343 let index = 9;
344 let result = indices.push(index..index + 1);
345 assert!(!result.exists && result.valid && !result.finished);
346 assert!(indices.require(0, 10, 1).is_some());
347
348 let index = 7;
349 let result = indices.push(index..index + 1);
350 assert!(!result.exists && result.valid && !result.finished);
351 let index = 8;
352 let result = indices.push(index..index + 1);
353 assert!(!result.exists && result.valid && !result.finished);
354 let index = 6;
355 let result = indices.push(index..index + 1);
356 assert!(!result.exists && result.valid && !result.finished);
357 let index = 5;
358 let result = indices.push(index..index + 1);
359 assert!(!result.exists && result.valid && !result.finished);
360 let index = 4;
361 let result = indices.push(index..index + 1);
362 assert!(!result.exists && result.valid && !result.finished);
363 let index = 3;
364 let result = indices.push(index..index + 1);
365 assert!(!result.exists && result.valid && !result.finished);
366 let index = 2;
367 let result = indices.push(index..index + 1);
368 assert!(!result.exists && result.valid && !result.finished);
369 let index = 1;
370 let result = indices.push(index..index + 1);
371 assert!(!result.exists && result.valid && !result.finished);
372 assert!(indices.require(0, 10, 1).is_some());
373
374 let index = 0;
375 let result = indices.push(index..index + 1);
376 assert!(!result.exists && result.valid && result.finished);
377 assert!(indices.require(0, 10, 1).is_none());
378 }
379
380
381
382 {
383 let mut indices = IncomeIndexQueue {
384 end: 10u32,
385 queue: LinkedList::new()
386 };
387
388 let index = 9;
389 let result = indices.push(index..index + 1);
390 assert!(!result.exists && result.valid && !result.finished);
391 assert!(indices.require(0, 10, 1).is_some());
392
393 let index = 7;
394 let result = indices.push(index..index + 1);
395 assert!(!result.exists && result.valid && !result.finished);
396 let index = 8;
397 let result = indices.push(index..index + 1);
398 assert!(!result.exists && result.valid && !result.finished);
399 let index = 6;
400 let result = indices.push(index..index + 1);
401 assert!(!result.exists && result.valid && !result.finished);
402 let index = 5;
403 let result = indices.push(index..index + 1);
404 assert!(!result.exists && result.valid && !result.finished);
405 let index = 4;
406 let result = indices.push(index..index + 1);
407 assert!(!result.exists && result.valid && !result.finished);
408 let index = 3;
409 let result = indices.push(index..index + 1);
410 assert!(!result.exists && result.valid && !result.finished);
411
412
413 let index = 0;
414 let result = indices.push(index..index + 1);
415 assert!(!result.exists && result.valid && !result.finished);
416
417 let (_, lost_indices) = indices.require(0, 10, -1).unwrap();
418 let lost = &lost_indices.unwrap()[0];
419 assert!(lost.start == 1 && lost.end == 3);
420
421 }
422
423}
424
425
426
427
428#[derive(Debug)]
429pub struct OutcomeIndexQueue {
430 step: i32,
431 start: u32,
432 end: u32,
433 queue: LinkedList<Range<u32>>
434}
435
436
437impl OutcomeIndexQueue {
438 pub fn new(start: u32, end: u32, step: i32) -> Self {
439 let mut queue = LinkedList::new();
440 queue.push_back(start..end);
441 Self {
442 step,
443 start,
444 end,
445 queue
446 }
447 }
448
449 pub fn reset(&mut self) -> bool {
450 if self.queue.len() == 1 {
451 let r = self.queue.front().unwrap();
452 if r.start == self.start && r.end == self.end {
453 return false;
454 }
455 }
456 let mut queue = LinkedList::new();
457 queue.push_back(self.start..self.end);
458 self.queue = queue;
459 true
460 }
461
462 pub fn merge(&mut self, max_index: u32, lost_index: Vec<Range<u32>>) -> bool {
463 enum ChangeQueue {
464 None,
465 Insert(usize),
466 CheckMerge(usize),
467 PushBack
468 }
469
470 let mut changed = false;
471
472 let mut merge_one = |lost: Range<u32>, skip| {
473 if self.queue.len() > 0 {
474 let mut change = ChangeQueue::PushBack;
475 let mut skip = skip;
476 for (i, next) in self.queue.iter_mut().enumerate().skip(skip) {
477 if lost.start >= next.start
478 && lost.end <= next.end {
479 change = ChangeQueue::None;
481 break;
482 } else if lost.end < next.start {
483 changed = true;
485 ChangeQueue::Insert(i);
486 break;
487 } else if lost.end == next.start {
488 changed = true;
490 next.start = lost.start;
491 change = ChangeQueue::None;
492 break;
493 } else if lost.start <= next.end {
494 changed = true;
495 next.start = std::cmp::min(lost.start, next.start);
497 next.end = lost.end;
498 change = ChangeQueue::CheckMerge(i);
499 break;
500 } else {
501 skip += 1;
502 continue;
503 }
504 }
505
506 match change {
507 ChangeQueue::None => {
508 },
510 ChangeQueue::Insert(i) => {
511 let mut last_part = self.queue.split_off(i);
512 last_part.push_front(lost);
513 self.queue.append(&mut last_part);
514 skip += 1;
515 },
516 ChangeQueue::CheckMerge(i) => {
517 let mut merged_len = 0;
518 let mut iter = self.queue.iter().skip(i);
519 let base = iter.next().unwrap().clone();
520 for next in iter {
521 if next.start > base.end {
522 break;
523 }
524 merged_len += 1;
525 }
526 if merged_len > 0 {
527 let mut last_part = self.queue.split_off(i + 1);
528 let mut append_back = last_part.split_off(merged_len);
529 let base_ref = self.queue.back_mut().unwrap();
530 let merge_end = last_part.back().unwrap().end;
531 if base_ref.end < merge_end {
532 base_ref.end = merge_end;
533 }
534 self.queue.append(&mut append_back);
535 }
536 },
537 ChangeQueue::PushBack => {
538 self.queue.push_back(lost);
539 skip += 1;
540 }
541 }
542 skip
543 } else {
544 self.queue.push_back(lost);
545 1
546 }
547 };
548
549
550 let mut skip = 0;
551 for lost in lost_index {
552 skip = merge_one(lost.clone(), skip);
553 }
554
555 if self.step > 0 {
556 if max_index < (self.end - 1) {
557 merge_one(max_index + 1..self.end, skip);
558 }
559 } else {
560 if max_index > self.start {
561 merge_one(self.start..max_index, skip);
562 }
563 }
564
565
566 changed
567
568 }
569
570 pub fn next(&self) -> Option<u32> {
571 if self.step > 0 {
572 self.queue.front().map(|r| r.start)
573 } else {
574 self.queue.back().map(|r| r.end - 1)
575 }
576 }
577
578 pub fn pop_next(&mut self) -> Option<u32> {
579 if self.queue.len() > 0 {
580 if self.step > 0 {
581 let range = self.queue.front_mut().unwrap();
582 let index = if (range.end - range.start) == 1 {
583 self.queue.pop_front().unwrap().start
584 } else {
585 let index = range.start;
586 range.start += 1;
587 index
588 };
589 Some(index)
590 } else {
591 let range = self.queue.back_mut().unwrap();
592 let index = if (range.end - range.start) == 1 {
593 self.queue.pop_back().unwrap().end - 1
594 } else {
595 let index = range.end - 1;
596 range.end -= 1;
597 index
598 };
599 Some(index)
600 }
601 } else {
602 None
603 }
604 }
605}
606
607
608#[test]
609fn test_outcome_index_queue() {
610 let mut queue = OutcomeIndexQueue::new(0, 9, 1);
611 assert_eq!(queue.pop_next(), Some(0));
612 assert_eq!(queue.pop_next(), Some(1));
613 assert_eq!(queue.pop_next(), Some(2));
614 assert_eq!(queue.pop_next(), Some(3));
615 assert_eq!(queue.pop_next(), Some(4));
616 assert_eq!(queue.pop_next(), Some(5));
617
618 queue.merge(5, vec![]);
619 assert_eq!(queue.pop_next(), Some(6));
620
621 queue.merge(4, vec![]);
622 assert_eq!(queue.pop_next(), Some(5));
623 assert_eq!(queue.pop_next(), Some(6));
624 assert_eq!(queue.pop_next(), Some(7));
625 assert_eq!(queue.pop_next(), Some(8));
626 assert_eq!(queue.pop_next(), None);
627
628
629
630 let mut queue = OutcomeIndexQueue::new(0, 9, -1);
631 assert_eq!(queue.pop_next(), Some(8));
632 assert_eq!(queue.pop_next(), Some(7));
633 assert_eq!(queue.pop_next(), Some(6));
634 assert_eq!(queue.pop_next(), Some(5));
635 assert_eq!(queue.pop_next(), Some(4));
636
637 queue.merge(5, vec![]);
638 assert_eq!(queue.pop_next(), Some(4));
639
640 queue.merge(5, vec![]);
641 assert_eq!(queue.pop_next(), Some(4));
642 assert_eq!(queue.pop_next(), Some(3));
643 assert_eq!(queue.pop_next(), Some(2));
644 assert_eq!(queue.pop_next(), Some(1));
645 assert_eq!(queue.pop_next(), Some(0));
646 assert_eq!(queue.pop_next(), None);
647}
648
649
650
651