cyfs_bdt/ndn/
types.rs

1use std::{
2    ops::Range, 
3    time::Duration, 
4    collections::LinkedList, 
5};
6use serde::{
7    Deserialize,
8    Serialize,
9};
10use serde_json::{Map, Value};
11use cyfs_base::*;
12use crate::{
13    types::*
14};
15use super::{
16    channel::protocol::v0::*
17};
18
19
20#[derive(Clone, Debug, Eq, PartialEq)]
21pub enum PieceDesc {
22    Raptor(u32 /*raptor seq*/, u16 /*raptor k*/),
23    Range(u32 /*range index*/, u16 /*range size*/),
24}
25
26impl PieceDesc {
27    pub fn raw_raptor_bytes() -> usize {
28        u8::raw_bytes().unwrap() + u32::raw_bytes().unwrap() + u16::raw_bytes().unwrap()
29    }
30
31    pub fn raw_stream_bytes() -> usize {
32        u8::raw_bytes().unwrap() + u32::raw_bytes().unwrap() + u16::raw_bytes().unwrap()
33    }
34
35    pub fn unwrap_as_stream(&self) -> (u32, u16) {
36        match self {
37            Self::Range(index, range) => (*index, *range), 
38            Self::Raptor(..) => unreachable!()
39        }
40    }
41
42    pub fn stream_end_index(chunk: &ChunkId, range: u32) -> u32 {
43        (chunk.len() as u32 + range - 1) / range - 1
44    }
45
46    pub fn stream_piece_range(&self, chunk: &ChunkId) -> (u32, Range<u64>) {
47        match self {
48            Self::Range(index, range) => {
49                if *index == Self::stream_end_index(chunk, *range as u32) {
50                    (*index, (*index * (*range) as u32) as u64..chunk.len() as u64)
51                } else {
52                    (*index, (*index * (*range) as u32) as u64..((*index + 1) * (*range) as u32) as u64)
53                }
54            }, 
55            Self::Raptor(..) => unreachable!()
56        }
57    }
58
59    pub fn from_stream_offset(range: usize, offset: u32) -> (Self, u32) {
60        let index = offset / range as u32;
61        let offset = offset - index * range as u32;
62        (Self::Range(index, range as u16), offset)
63    }
64}
65
66impl RawFixedBytes for PieceDesc {
67    fn raw_bytes() -> Option<usize> {
68        Some(Self::raw_raptor_bytes())
69    }
70}
71
72impl RawEncode for PieceDesc {
73    fn raw_measure(&self, _purpose: &Option<RawEncodePurpose>) -> BuckyResult<usize> {
74        Ok(Self::raw_bytes().unwrap())
75    }
76
77    fn raw_encode<'a>(
78        &self,
79        buf: &'a mut [u8],
80        purpose: &Option<RawEncodePurpose>,
81    ) -> BuckyResult<&'a mut [u8]> {
82        match self {
83            Self::Raptor(index, k) => {
84                let buf = 0u8.raw_encode(buf, purpose)?;
85                let buf = index.raw_encode(buf, purpose)?;
86                k.raw_encode(buf, purpose)
87            }, 
88            Self::Range(index, len) => {
89                let buf = 1u8.raw_encode(buf, purpose)?;
90                let buf = index.raw_encode(buf, purpose)?;
91                len.raw_encode(buf, purpose)
92            }
93        }
94    }
95}
96
97impl<'de> RawDecode<'de> for PieceDesc {
98    fn raw_decode(buf: &'de [u8]) -> BuckyResult<(Self, &'de [u8])> {
99        let (code, buf) = u8::raw_decode(buf)?;
100        match code {
101            0u8 => {
102                let (index, buf) = u32::raw_decode(buf)?;
103                let (k, buf) = u16::raw_decode(buf)?;
104                Ok((Self::Raptor(index, k), buf))
105            }, 
106            1u8 => {
107                let (index, buf) = u32::raw_decode(buf)?;
108                let (len, buf) = u16::raw_decode(buf)?;
109                Ok((Self::Range(index, len), buf))
110            }, 
111            _ => Err(BuckyError::new(BuckyErrorCode::InvalidData, "invalid piece desc type code"))
112        }
113    }
114}
115
116
117const PIECE_SESSION_FLAGS_UNKNOWN: u16 = 0; 
118const PIECE_SESSION_FLAGS_STREAM: u16 = 1<<0;
119const PIECE_SESSION_FLAGS_RAPTOR: u16 = 1<<1;
120const PIECE_SESSION_FLAGS_STREAM_START: u16 = 1<<2; 
121const PIECE_SESSION_FLAGS_STREAM_END: u16 = 1<<3; 
122const PIECE_SESSION_FLAGS_STREAM_STEP: u16 = 1<<4;
123const PIECE_SESSION_FLAGS_RAPTOR_K: u16 = 1<<2; 
124const PIECE_SESSION_FLAGS_RAPTOR_SEQ: u16 = 1<<3; 
125const PIECE_SESSION_FLAGS_RAPTOR_STEP: u16 = 1<<4;
126
127#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
128pub enum ChunkCodecDesc {
129    Unknown,
130    Stream(Option<u32>, Option<u32>, Option<i32>), 
131    Raptor(Option<u32>, Option<u32>, Option<i32>)
132} 
133
134impl ChunkCodecDesc {
135    pub fn reverse_stream(start: Option<u32>, end: Option<u32>) -> Self {
136        Self::Stream(start, end, Some(-(PieceData::max_payload() as i32)))
137    }
138
139    pub fn fill_values(&self, chunk: &ChunkId) -> Self {
140        match self {
141            Self::Unknown => Self::Unknown, 
142            Self::Stream(start, end, step) => {
143                let start = start.clone().unwrap_or(0);
144                let range = step.map(|s| s.abs() as u32).unwrap_or(PieceData::max_payload() as u32);
145                let end = end.clone().unwrap_or(PieceDesc::stream_end_index(chunk, range) + 1);
146                let step = step.clone().unwrap_or(range as i32);
147                Self::Stream(Some(start), Some(end), Some(step))
148            }, 
149            Self::Raptor(..) => unimplemented!()
150        }
151    }
152
153    pub fn unwrap_as_stream(&self) -> (u32, u32, i32) {
154        match self {
155            Self::Stream(start, end, step) => ((*start).unwrap(), (*end).unwrap(), (*step).unwrap()), 
156            _ => unreachable!()
157        }
158    }
159
160    pub fn support_desc(&self, other: &Self) -> bool {
161        match self {
162            Self::Unknown => true, 
163            Self::Stream(self_start, self_end, self_step) => {
164                match other {
165                    Self::Unknown => true,
166                    Self::Stream(..) => {
167                        let (other_start, other_end, other_step) = other.unwrap_as_stream();
168                        if let Some(self_step) = self_step {
169                            if *self_step * other_step < 0 {
170                                return false
171                            }
172
173                            if other_step.abs() > self_step.abs() {
174                                return false;
175                            }
176                        }
177
178                        if let Some(self_start) = self_start {
179                            if *self_start > other_start {
180                                return false;
181                            }
182                        }
183
184                        if let Some(self_end) = self_end {
185                            if *self_end < other_end {
186                                return false;
187                            }
188                        }
189
190                        true
191                    }, 
192                    Self::Raptor(..) => false
193                }
194            }, 
195            Self::Raptor(..) => unimplemented!()
196        }
197
198    }
199}
200
201impl RawEncode for ChunkCodecDesc {
202    fn raw_measure(&self, _: &Option<RawEncodePurpose>) -> BuckyResult<usize> {
203        match self {
204            Self::Unknown => Ok(u16::raw_bytes().unwrap()), 
205            Self::Stream(start, end, step) => {
206                let mut s = u16::raw_bytes().unwrap();
207                s += start.as_ref().map(|_| u32::raw_bytes().unwrap()).unwrap_or_default();
208                s += end.as_ref().map(|_| u32::raw_bytes().unwrap()).unwrap_or_default();
209                s += step.as_ref().map(|_| i32::raw_bytes().unwrap()).unwrap_or_default();
210                Ok(s)
211            },
212            Self::Raptor(k, seq, step) => {
213                let mut s = u16::raw_bytes().unwrap();
214                s += k.as_ref().map(|_| u32::raw_bytes().unwrap()).unwrap_or_default();
215                s += seq.as_ref().map(|_| u32::raw_bytes().unwrap()).unwrap_or_default();
216                s += step.as_ref().map(|_| i32::raw_bytes().unwrap()).unwrap_or_default();
217                Ok(s)
218            },
219        }
220    }
221
222    fn raw_encode<'a>(
223        &self,
224        buf: &'a mut [u8],
225        purpose: &Option<RawEncodePurpose>,
226    ) -> BuckyResult<&'a mut [u8]> {
227        match self {
228            Self::Unknown => PIECE_SESSION_FLAGS_UNKNOWN.raw_encode(buf, purpose), 
229            Self::Stream(start, end, step) => {
230                let flags = PIECE_SESSION_FLAGS_STREAM 
231                    | start.as_ref().map(|_| PIECE_SESSION_FLAGS_STREAM_START).unwrap_or_default()
232                    | end.as_ref().map(|_| PIECE_SESSION_FLAGS_STREAM_END).unwrap_or_default()
233                    | step.as_ref().map(|_| PIECE_SESSION_FLAGS_STREAM_STEP).unwrap_or_default();
234
235                let buf = flags.raw_encode(buf, purpose)?;
236                let buf = if let Some(start) = start {
237                    start.raw_encode(buf, purpose)?
238                } else {
239                    buf
240                };
241                let buf = if let Some(end) = end {
242                    end.raw_encode(buf, purpose)?
243                } else {
244                    buf
245                };
246                
247                if let Some(step) = step {
248                    step.raw_encode(buf, purpose)
249                } else {
250                    Ok(buf)
251                }
252            },
253            Self::Raptor(k, seq, step) => {
254                let flags = PIECE_SESSION_FLAGS_RAPTOR 
255                    | k.as_ref().map(|_| PIECE_SESSION_FLAGS_RAPTOR_K).unwrap_or_default()
256                    | seq.as_ref().map(|_| PIECE_SESSION_FLAGS_RAPTOR_SEQ).unwrap_or_default()
257                    | step.as_ref().map(|_| PIECE_SESSION_FLAGS_RAPTOR_STEP).unwrap_or_default();
258
259                let buf = flags.raw_encode(buf, purpose)?;
260                let buf = if let Some(k) = k {
261                    k.raw_encode(buf, purpose)?
262                } else {
263                    buf
264                };
265                let buf = if let Some(seq) = seq {
266                    seq.raw_encode(buf, purpose)?
267                } else {
268                    buf
269                };
270                
271                if let Some(step) = step {
272                    step.raw_encode(buf, purpose)
273                } else {
274                    Ok(buf)
275                }
276            },
277        }
278    }
279}
280
281
282impl<'de> RawDecode<'de> for ChunkCodecDesc {
283    fn raw_decode(buf: &'de [u8]) -> BuckyResult<(Self, &'de [u8])> {
284        let (flags, buf) = u16::raw_decode(buf)?;
285        if flags == PIECE_SESSION_FLAGS_UNKNOWN {
286            Ok((Self::Unknown, buf))
287        } else if flags & PIECE_SESSION_FLAGS_STREAM > 0 {
288            let (start, buf) = if flags & PIECE_SESSION_FLAGS_STREAM_START > 0 {
289                let (start, buf) = u32::raw_decode(buf)?;
290                (Some(start), buf)
291            } else {
292                (None, buf)
293            };
294            let (end, buf) = if flags & PIECE_SESSION_FLAGS_STREAM_END > 0 {
295                let (end, buf) = u32::raw_decode(buf)?;
296                (Some(end), buf)
297            } else {
298                (None, buf)
299            };
300            let (step, buf) = if flags & PIECE_SESSION_FLAGS_STREAM_STEP > 0 {
301                let (step, buf) = i32::raw_decode(buf)?;
302                (Some(step), buf)
303            } else {
304                (None, buf)
305            };
306            Ok((Self::Stream(start, end, step), buf))
307        } else if flags & PIECE_SESSION_FLAGS_RAPTOR > 0 {
308            let (k, buf) = if flags & PIECE_SESSION_FLAGS_RAPTOR_K > 0 {
309                let (k, buf) = u32::raw_decode(buf)?;
310                (Some(k), buf)
311            } else {
312                (None, buf)
313            };
314            let (seq, buf) = if flags & PIECE_SESSION_FLAGS_RAPTOR_SEQ > 0 {
315                let (seq, buf) = u32::raw_decode(buf)?;
316                (Some(seq), buf)
317            } else {
318                (None, buf)
319            };
320            let (step, buf) = if flags & PIECE_SESSION_FLAGS_RAPTOR_STEP > 0 {
321                let (step, buf) = i32::raw_decode(buf)?;
322                (Some(step), buf)
323            } else {
324                (None, buf)
325            };
326            Ok((Self::Raptor(k, seq, step), buf))
327        } else {
328            Err(BuckyError::new(BuckyErrorCode::InvalidData, "invalid flags"))
329        }
330    }
331}
332
333
334impl JsonCodec<ChunkCodecDesc> for ChunkCodecDesc {
335    fn encode_json(&self) -> Map<String, Value> {
336        let mut obj = Map::new();
337        match self {
338            Self::Unknown => JsonCodecHelper::encode_string_field(&mut obj, "type", "Unknown"), 
339            Self::Stream(start, end, step) => {
340                JsonCodecHelper::encode_string_field(&mut obj, "type", "Stream");
341                JsonCodecHelper::encode_option_number_field(&mut obj, "stream_start", start.clone());
342                JsonCodecHelper::encode_option_number_field(&mut obj, "stream_end", end.clone());
343                JsonCodecHelper::encode_option_number_field(&mut obj, "stream_step", step.clone());
344            }, 
345            Self::Raptor(k, seq, step) => {
346                JsonCodecHelper::encode_string_field(&mut obj, "type", "Raptor");
347                JsonCodecHelper::encode_option_number_field(&mut obj, "raptor_k", k.clone());
348                JsonCodecHelper::encode_option_number_field(&mut obj, "raptor_seq", seq.clone());
349                JsonCodecHelper::encode_option_number_field(&mut obj, "raptor_step", step.clone());
350            }, 
351        }
352        obj
353    }
354
355    fn decode_json(obj: &Map<String, Value>) -> BuckyResult<Self> {
356        let prefer_type: String = JsonCodecHelper::decode_string_field(obj, "type")?;
357        match prefer_type.as_str() {
358            "Unknown" => Ok(Self::Unknown), 
359            "Stream" => {
360                let start = JsonCodecHelper::decode_option_int_field(obj, "stream_start")?;
361                let end = JsonCodecHelper::decode_option_int_field(obj, "stream_end")?;
362                let step = JsonCodecHelper::decode_option_int_field(obj, "stream_step")?;
363                Ok(Self::Stream(start, end, step))
364            },
365            "Raptor" => {
366                let k = JsonCodecHelper::decode_option_int_field(obj, "raptor_k")?;
367                let seq = JsonCodecHelper::decode_option_int_field(obj, "raptor_seq")?;
368                let step = JsonCodecHelper::decode_option_int_field(obj, "raptor_step")?;
369                Ok(Self::Raptor(k, seq, step))
370            },
371            _ => Err(BuckyError::new(BuckyErrorCode::InvalidInput, format!("invalid type {}", prefer_type)))
372        }
373    }
374}
375
376
377
378#[derive(Clone)]
379pub struct HistorySpeedConfig {
380    pub attenuation: f64, 
381    pub atomic: Duration, 
382    pub expire: Duration
383}
384
385#[derive(Clone)]
386// 计算历史速度的方法, 在过去的一段时间内,  Sum(speed(t)*(衰减^t))/样本数
387pub struct HistorySpeed {
388    expire_count: usize, 
389    config: HistorySpeedConfig, 
390    intermediate: LinkedList<f64>, 
391    last_update: Timestamp
392}
393
394impl HistorySpeed {
395    pub fn new(initial: u32, config: HistorySpeedConfig) -> Self {
396        let mut intermediate = LinkedList::new();
397        intermediate.push_back(initial as f64);
398
399        Self {
400            expire_count: (config.expire.as_micros() / config.atomic.as_micros()) as usize, 
401            config, 
402            intermediate, 
403            last_update: bucky_time_now() 
404        }   
405    }
406
407    pub fn update(&mut self, cur_speed: Option<u32>, when: Timestamp) {
408        let cur_speed = cur_speed.unwrap_or(self.latest());
409
410        if when > self.last_update {
411            let mut count = ((when - self.last_update) / self.config.atomic.as_micros() as u64) as usize;
412
413            if count > self.expire_count {
414                self.intermediate.clear();
415                count = self.expire_count;
416            }
417
418            for _ in 0..count {
419                self.intermediate.iter_mut().for_each(|v| *v = (*v) * self.config.attenuation);
420                self.intermediate.push_back(cur_speed as f64);
421                if self.intermediate.len() > self.expire_count {
422                    self.intermediate.pop_front();
423                }
424            }
425
426            self.last_update = when;
427        };
428    }
429
430    pub fn average(&self) -> u32 {
431        let total: f64 = self.intermediate.iter().sum();
432        (total / self.intermediate.len() as f64) as u32
433    }
434
435    pub fn latest(&self) -> u32 {
436        self.intermediate.back().cloned().unwrap() as u32
437    }
438
439    pub fn config(&self) -> &HistorySpeedConfig {
440        &self.config
441    }
442}
443
444
445pub struct SpeedCounter {
446    last_recv: u64, 
447    last_update: Timestamp, 
448    cur_speed: u32
449}
450
451
452impl SpeedCounter {
453    pub fn new(init_recv: usize) -> Self {
454        Self {
455            last_recv: init_recv as u64, 
456            last_update: bucky_time_now(), 
457            cur_speed: 0
458        }
459    }
460
461    pub fn on_recv(&mut self, recv: usize) {
462        self.last_recv += recv as u64;
463    }
464
465    pub fn update(&mut self, when: Timestamp) -> u32 {
466        if when > self.last_update {
467            let last_recv = self.last_recv;
468            self.cur_speed = ((last_recv * 1000 * 1000) as f64 / (when - self.last_update) as f64) as u32;
469            self.last_recv = 0;
470            self.last_update = when;
471            self.cur_speed
472        } else {
473            self.cur_speed
474        }
475    }
476
477    pub fn cur(&self) -> u32 {
478        self.cur_speed
479    }
480}
481
482
483
484pub struct ProgressCounter {
485    last_recv: u64, 
486    last_update: Timestamp, 
487    cur_speed: u32
488}
489
490
491impl ProgressCounter {
492    pub fn new(init_recv: u64) -> Self {
493        Self {
494            last_recv: init_recv,  
495            last_update: bucky_time_now(), 
496            cur_speed: 0
497        }
498    }
499
500    pub fn update(&mut self, cur_recv: u64, when: Timestamp) -> u32 {
501        if cur_recv < self.last_recv {
502            return 0;
503        }
504
505        if when > self.last_update {
506            let last_recv = cur_recv - self.last_recv;
507            self.cur_speed = ((last_recv * 1000 * 1000) as f64 / (when - self.last_update) as f64) as u32;
508            self.last_recv = cur_recv;
509            self.last_update = when;
510            self.cur_speed
511        } else {
512            self.cur_speed
513        }
514    }
515
516    pub fn cur_speed(&self) -> u32 {
517        self.cur_speed
518    }
519}
520
521
522#[derive(Debug, Serialize, Deserialize)]
523pub enum NdnTaskState {
524    Running,
525    Paused,
526    Error(BuckyError/*被cancel的原因*/), 
527    Finished
528}
529
530#[derive(Clone, Debug, Serialize, Deserialize)]
531pub enum NdnTaskControlState {
532    Normal, 
533    Paused, 
534    Canceled, 
535}
536
537pub trait NdnTask: Send + Sync {
538    fn clone_as_task(&self) -> Box<dyn NdnTask>;
539    fn state(&self) -> NdnTaskState;
540    fn control_state(&self) -> NdnTaskControlState;
541
542    fn resume(&self) -> BuckyResult<NdnTaskControlState> {
543        Ok(NdnTaskControlState::Normal)
544    }
545    fn cancel(&self) -> BuckyResult<NdnTaskControlState> {
546        self.cancel_by_error(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"))
547	}
548    fn cancel_by_error(&self, _err: BuckyError) -> BuckyResult<NdnTaskControlState> {
549        Ok(NdnTaskControlState::Normal)
550    }
551    fn pause(&self) -> BuckyResult<NdnTaskControlState> {
552        Ok(NdnTaskControlState::Normal)
553    }
554    
555    fn close(&self, _recursion: bool) -> BuckyResult<()> {
556        Ok(())
557    }
558
559    fn cur_speed(&self) -> u32;
560    fn history_speed(&self) -> u32;
561    fn transfered(&self) -> u64;
562}