1use std::{
2 ops::Range,
3 time::Duration,
4 collections::LinkedList,
5};
6use serde::{
7 Deserialize,
8 Serialize,
9};
10use serde_json::{Map, Value};
11use cyfs_base::*;
12use crate::{
13 types::*
14};
15use super::{
16 channel::protocol::v0::*
17};
18
19
20#[derive(Clone, Debug, Eq, PartialEq)]
21pub enum PieceDesc {
22 Raptor(u32 , u16 ),
23 Range(u32 , u16 ),
24}
25
26impl PieceDesc {
27 pub fn raw_raptor_bytes() -> usize {
28 u8::raw_bytes().unwrap() + u32::raw_bytes().unwrap() + u16::raw_bytes().unwrap()
29 }
30
31 pub fn raw_stream_bytes() -> usize {
32 u8::raw_bytes().unwrap() + u32::raw_bytes().unwrap() + u16::raw_bytes().unwrap()
33 }
34
35 pub fn unwrap_as_stream(&self) -> (u32, u16) {
36 match self {
37 Self::Range(index, range) => (*index, *range),
38 Self::Raptor(..) => unreachable!()
39 }
40 }
41
42 pub fn stream_end_index(chunk: &ChunkId, range: u32) -> u32 {
43 (chunk.len() as u32 + range - 1) / range - 1
44 }
45
46 pub fn stream_piece_range(&self, chunk: &ChunkId) -> (u32, Range<u64>) {
47 match self {
48 Self::Range(index, range) => {
49 if *index == Self::stream_end_index(chunk, *range as u32) {
50 (*index, (*index * (*range) as u32) as u64..chunk.len() as u64)
51 } else {
52 (*index, (*index * (*range) as u32) as u64..((*index + 1) * (*range) as u32) as u64)
53 }
54 },
55 Self::Raptor(..) => unreachable!()
56 }
57 }
58
59 pub fn from_stream_offset(range: usize, offset: u32) -> (Self, u32) {
60 let index = offset / range as u32;
61 let offset = offset - index * range as u32;
62 (Self::Range(index, range as u16), offset)
63 }
64}
65
66impl RawFixedBytes for PieceDesc {
67 fn raw_bytes() -> Option<usize> {
68 Some(Self::raw_raptor_bytes())
69 }
70}
71
72impl RawEncode for PieceDesc {
73 fn raw_measure(&self, _purpose: &Option<RawEncodePurpose>) -> BuckyResult<usize> {
74 Ok(Self::raw_bytes().unwrap())
75 }
76
77 fn raw_encode<'a>(
78 &self,
79 buf: &'a mut [u8],
80 purpose: &Option<RawEncodePurpose>,
81 ) -> BuckyResult<&'a mut [u8]> {
82 match self {
83 Self::Raptor(index, k) => {
84 let buf = 0u8.raw_encode(buf, purpose)?;
85 let buf = index.raw_encode(buf, purpose)?;
86 k.raw_encode(buf, purpose)
87 },
88 Self::Range(index, len) => {
89 let buf = 1u8.raw_encode(buf, purpose)?;
90 let buf = index.raw_encode(buf, purpose)?;
91 len.raw_encode(buf, purpose)
92 }
93 }
94 }
95}
96
97impl<'de> RawDecode<'de> for PieceDesc {
98 fn raw_decode(buf: &'de [u8]) -> BuckyResult<(Self, &'de [u8])> {
99 let (code, buf) = u8::raw_decode(buf)?;
100 match code {
101 0u8 => {
102 let (index, buf) = u32::raw_decode(buf)?;
103 let (k, buf) = u16::raw_decode(buf)?;
104 Ok((Self::Raptor(index, k), buf))
105 },
106 1u8 => {
107 let (index, buf) = u32::raw_decode(buf)?;
108 let (len, buf) = u16::raw_decode(buf)?;
109 Ok((Self::Range(index, len), buf))
110 },
111 _ => Err(BuckyError::new(BuckyErrorCode::InvalidData, "invalid piece desc type code"))
112 }
113 }
114}
115
116
117const PIECE_SESSION_FLAGS_UNKNOWN: u16 = 0;
118const PIECE_SESSION_FLAGS_STREAM: u16 = 1<<0;
119const PIECE_SESSION_FLAGS_RAPTOR: u16 = 1<<1;
120const PIECE_SESSION_FLAGS_STREAM_START: u16 = 1<<2;
121const PIECE_SESSION_FLAGS_STREAM_END: u16 = 1<<3;
122const PIECE_SESSION_FLAGS_STREAM_STEP: u16 = 1<<4;
123const PIECE_SESSION_FLAGS_RAPTOR_K: u16 = 1<<2;
124const PIECE_SESSION_FLAGS_RAPTOR_SEQ: u16 = 1<<3;
125const PIECE_SESSION_FLAGS_RAPTOR_STEP: u16 = 1<<4;
126
127#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
128pub enum ChunkCodecDesc {
129 Unknown,
130 Stream(Option<u32>, Option<u32>, Option<i32>),
131 Raptor(Option<u32>, Option<u32>, Option<i32>)
132}
133
134impl ChunkCodecDesc {
135 pub fn reverse_stream(start: Option<u32>, end: Option<u32>) -> Self {
136 Self::Stream(start, end, Some(-(PieceData::max_payload() as i32)))
137 }
138
139 pub fn fill_values(&self, chunk: &ChunkId) -> Self {
140 match self {
141 Self::Unknown => Self::Unknown,
142 Self::Stream(start, end, step) => {
143 let start = start.clone().unwrap_or(0);
144 let range = step.map(|s| s.abs() as u32).unwrap_or(PieceData::max_payload() as u32);
145 let end = end.clone().unwrap_or(PieceDesc::stream_end_index(chunk, range) + 1);
146 let step = step.clone().unwrap_or(range as i32);
147 Self::Stream(Some(start), Some(end), Some(step))
148 },
149 Self::Raptor(..) => unimplemented!()
150 }
151 }
152
153 pub fn unwrap_as_stream(&self) -> (u32, u32, i32) {
154 match self {
155 Self::Stream(start, end, step) => ((*start).unwrap(), (*end).unwrap(), (*step).unwrap()),
156 _ => unreachable!()
157 }
158 }
159
160 pub fn support_desc(&self, other: &Self) -> bool {
161 match self {
162 Self::Unknown => true,
163 Self::Stream(self_start, self_end, self_step) => {
164 match other {
165 Self::Unknown => true,
166 Self::Stream(..) => {
167 let (other_start, other_end, other_step) = other.unwrap_as_stream();
168 if let Some(self_step) = self_step {
169 if *self_step * other_step < 0 {
170 return false
171 }
172
173 if other_step.abs() > self_step.abs() {
174 return false;
175 }
176 }
177
178 if let Some(self_start) = self_start {
179 if *self_start > other_start {
180 return false;
181 }
182 }
183
184 if let Some(self_end) = self_end {
185 if *self_end < other_end {
186 return false;
187 }
188 }
189
190 true
191 },
192 Self::Raptor(..) => false
193 }
194 },
195 Self::Raptor(..) => unimplemented!()
196 }
197
198 }
199}
200
201impl RawEncode for ChunkCodecDesc {
202 fn raw_measure(&self, _: &Option<RawEncodePurpose>) -> BuckyResult<usize> {
203 match self {
204 Self::Unknown => Ok(u16::raw_bytes().unwrap()),
205 Self::Stream(start, end, step) => {
206 let mut s = u16::raw_bytes().unwrap();
207 s += start.as_ref().map(|_| u32::raw_bytes().unwrap()).unwrap_or_default();
208 s += end.as_ref().map(|_| u32::raw_bytes().unwrap()).unwrap_or_default();
209 s += step.as_ref().map(|_| i32::raw_bytes().unwrap()).unwrap_or_default();
210 Ok(s)
211 },
212 Self::Raptor(k, seq, step) => {
213 let mut s = u16::raw_bytes().unwrap();
214 s += k.as_ref().map(|_| u32::raw_bytes().unwrap()).unwrap_or_default();
215 s += seq.as_ref().map(|_| u32::raw_bytes().unwrap()).unwrap_or_default();
216 s += step.as_ref().map(|_| i32::raw_bytes().unwrap()).unwrap_or_default();
217 Ok(s)
218 },
219 }
220 }
221
222 fn raw_encode<'a>(
223 &self,
224 buf: &'a mut [u8],
225 purpose: &Option<RawEncodePurpose>,
226 ) -> BuckyResult<&'a mut [u8]> {
227 match self {
228 Self::Unknown => PIECE_SESSION_FLAGS_UNKNOWN.raw_encode(buf, purpose),
229 Self::Stream(start, end, step) => {
230 let flags = PIECE_SESSION_FLAGS_STREAM
231 | start.as_ref().map(|_| PIECE_SESSION_FLAGS_STREAM_START).unwrap_or_default()
232 | end.as_ref().map(|_| PIECE_SESSION_FLAGS_STREAM_END).unwrap_or_default()
233 | step.as_ref().map(|_| PIECE_SESSION_FLAGS_STREAM_STEP).unwrap_or_default();
234
235 let buf = flags.raw_encode(buf, purpose)?;
236 let buf = if let Some(start) = start {
237 start.raw_encode(buf, purpose)?
238 } else {
239 buf
240 };
241 let buf = if let Some(end) = end {
242 end.raw_encode(buf, purpose)?
243 } else {
244 buf
245 };
246
247 if let Some(step) = step {
248 step.raw_encode(buf, purpose)
249 } else {
250 Ok(buf)
251 }
252 },
253 Self::Raptor(k, seq, step) => {
254 let flags = PIECE_SESSION_FLAGS_RAPTOR
255 | k.as_ref().map(|_| PIECE_SESSION_FLAGS_RAPTOR_K).unwrap_or_default()
256 | seq.as_ref().map(|_| PIECE_SESSION_FLAGS_RAPTOR_SEQ).unwrap_or_default()
257 | step.as_ref().map(|_| PIECE_SESSION_FLAGS_RAPTOR_STEP).unwrap_or_default();
258
259 let buf = flags.raw_encode(buf, purpose)?;
260 let buf = if let Some(k) = k {
261 k.raw_encode(buf, purpose)?
262 } else {
263 buf
264 };
265 let buf = if let Some(seq) = seq {
266 seq.raw_encode(buf, purpose)?
267 } else {
268 buf
269 };
270
271 if let Some(step) = step {
272 step.raw_encode(buf, purpose)
273 } else {
274 Ok(buf)
275 }
276 },
277 }
278 }
279}
280
281
282impl<'de> RawDecode<'de> for ChunkCodecDesc {
283 fn raw_decode(buf: &'de [u8]) -> BuckyResult<(Self, &'de [u8])> {
284 let (flags, buf) = u16::raw_decode(buf)?;
285 if flags == PIECE_SESSION_FLAGS_UNKNOWN {
286 Ok((Self::Unknown, buf))
287 } else if flags & PIECE_SESSION_FLAGS_STREAM > 0 {
288 let (start, buf) = if flags & PIECE_SESSION_FLAGS_STREAM_START > 0 {
289 let (start, buf) = u32::raw_decode(buf)?;
290 (Some(start), buf)
291 } else {
292 (None, buf)
293 };
294 let (end, buf) = if flags & PIECE_SESSION_FLAGS_STREAM_END > 0 {
295 let (end, buf) = u32::raw_decode(buf)?;
296 (Some(end), buf)
297 } else {
298 (None, buf)
299 };
300 let (step, buf) = if flags & PIECE_SESSION_FLAGS_STREAM_STEP > 0 {
301 let (step, buf) = i32::raw_decode(buf)?;
302 (Some(step), buf)
303 } else {
304 (None, buf)
305 };
306 Ok((Self::Stream(start, end, step), buf))
307 } else if flags & PIECE_SESSION_FLAGS_RAPTOR > 0 {
308 let (k, buf) = if flags & PIECE_SESSION_FLAGS_RAPTOR_K > 0 {
309 let (k, buf) = u32::raw_decode(buf)?;
310 (Some(k), buf)
311 } else {
312 (None, buf)
313 };
314 let (seq, buf) = if flags & PIECE_SESSION_FLAGS_RAPTOR_SEQ > 0 {
315 let (seq, buf) = u32::raw_decode(buf)?;
316 (Some(seq), buf)
317 } else {
318 (None, buf)
319 };
320 let (step, buf) = if flags & PIECE_SESSION_FLAGS_RAPTOR_STEP > 0 {
321 let (step, buf) = i32::raw_decode(buf)?;
322 (Some(step), buf)
323 } else {
324 (None, buf)
325 };
326 Ok((Self::Raptor(k, seq, step), buf))
327 } else {
328 Err(BuckyError::new(BuckyErrorCode::InvalidData, "invalid flags"))
329 }
330 }
331}
332
333
334impl JsonCodec<ChunkCodecDesc> for ChunkCodecDesc {
335 fn encode_json(&self) -> Map<String, Value> {
336 let mut obj = Map::new();
337 match self {
338 Self::Unknown => JsonCodecHelper::encode_string_field(&mut obj, "type", "Unknown"),
339 Self::Stream(start, end, step) => {
340 JsonCodecHelper::encode_string_field(&mut obj, "type", "Stream");
341 JsonCodecHelper::encode_option_number_field(&mut obj, "stream_start", start.clone());
342 JsonCodecHelper::encode_option_number_field(&mut obj, "stream_end", end.clone());
343 JsonCodecHelper::encode_option_number_field(&mut obj, "stream_step", step.clone());
344 },
345 Self::Raptor(k, seq, step) => {
346 JsonCodecHelper::encode_string_field(&mut obj, "type", "Raptor");
347 JsonCodecHelper::encode_option_number_field(&mut obj, "raptor_k", k.clone());
348 JsonCodecHelper::encode_option_number_field(&mut obj, "raptor_seq", seq.clone());
349 JsonCodecHelper::encode_option_number_field(&mut obj, "raptor_step", step.clone());
350 },
351 }
352 obj
353 }
354
355 fn decode_json(obj: &Map<String, Value>) -> BuckyResult<Self> {
356 let prefer_type: String = JsonCodecHelper::decode_string_field(obj, "type")?;
357 match prefer_type.as_str() {
358 "Unknown" => Ok(Self::Unknown),
359 "Stream" => {
360 let start = JsonCodecHelper::decode_option_int_field(obj, "stream_start")?;
361 let end = JsonCodecHelper::decode_option_int_field(obj, "stream_end")?;
362 let step = JsonCodecHelper::decode_option_int_field(obj, "stream_step")?;
363 Ok(Self::Stream(start, end, step))
364 },
365 "Raptor" => {
366 let k = JsonCodecHelper::decode_option_int_field(obj, "raptor_k")?;
367 let seq = JsonCodecHelper::decode_option_int_field(obj, "raptor_seq")?;
368 let step = JsonCodecHelper::decode_option_int_field(obj, "raptor_step")?;
369 Ok(Self::Raptor(k, seq, step))
370 },
371 _ => Err(BuckyError::new(BuckyErrorCode::InvalidInput, format!("invalid type {}", prefer_type)))
372 }
373 }
374}
375
376
377
378#[derive(Clone)]
379pub struct HistorySpeedConfig {
380 pub attenuation: f64,
381 pub atomic: Duration,
382 pub expire: Duration
383}
384
385#[derive(Clone)]
386pub struct HistorySpeed {
388 expire_count: usize,
389 config: HistorySpeedConfig,
390 intermediate: LinkedList<f64>,
391 last_update: Timestamp
392}
393
394impl HistorySpeed {
395 pub fn new(initial: u32, config: HistorySpeedConfig) -> Self {
396 let mut intermediate = LinkedList::new();
397 intermediate.push_back(initial as f64);
398
399 Self {
400 expire_count: (config.expire.as_micros() / config.atomic.as_micros()) as usize,
401 config,
402 intermediate,
403 last_update: bucky_time_now()
404 }
405 }
406
407 pub fn update(&mut self, cur_speed: Option<u32>, when: Timestamp) {
408 let cur_speed = cur_speed.unwrap_or(self.latest());
409
410 if when > self.last_update {
411 let mut count = ((when - self.last_update) / self.config.atomic.as_micros() as u64) as usize;
412
413 if count > self.expire_count {
414 self.intermediate.clear();
415 count = self.expire_count;
416 }
417
418 for _ in 0..count {
419 self.intermediate.iter_mut().for_each(|v| *v = (*v) * self.config.attenuation);
420 self.intermediate.push_back(cur_speed as f64);
421 if self.intermediate.len() > self.expire_count {
422 self.intermediate.pop_front();
423 }
424 }
425
426 self.last_update = when;
427 };
428 }
429
430 pub fn average(&self) -> u32 {
431 let total: f64 = self.intermediate.iter().sum();
432 (total / self.intermediate.len() as f64) as u32
433 }
434
435 pub fn latest(&self) -> u32 {
436 self.intermediate.back().cloned().unwrap() as u32
437 }
438
439 pub fn config(&self) -> &HistorySpeedConfig {
440 &self.config
441 }
442}
443
444
445pub struct SpeedCounter {
446 last_recv: u64,
447 last_update: Timestamp,
448 cur_speed: u32
449}
450
451
452impl SpeedCounter {
453 pub fn new(init_recv: usize) -> Self {
454 Self {
455 last_recv: init_recv as u64,
456 last_update: bucky_time_now(),
457 cur_speed: 0
458 }
459 }
460
461 pub fn on_recv(&mut self, recv: usize) {
462 self.last_recv += recv as u64;
463 }
464
465 pub fn update(&mut self, when: Timestamp) -> u32 {
466 if when > self.last_update {
467 let last_recv = self.last_recv;
468 self.cur_speed = ((last_recv * 1000 * 1000) as f64 / (when - self.last_update) as f64) as u32;
469 self.last_recv = 0;
470 self.last_update = when;
471 self.cur_speed
472 } else {
473 self.cur_speed
474 }
475 }
476
477 pub fn cur(&self) -> u32 {
478 self.cur_speed
479 }
480}
481
482
483
484pub struct ProgressCounter {
485 last_recv: u64,
486 last_update: Timestamp,
487 cur_speed: u32
488}
489
490
491impl ProgressCounter {
492 pub fn new(init_recv: u64) -> Self {
493 Self {
494 last_recv: init_recv,
495 last_update: bucky_time_now(),
496 cur_speed: 0
497 }
498 }
499
500 pub fn update(&mut self, cur_recv: u64, when: Timestamp) -> u32 {
501 if cur_recv < self.last_recv {
502 return 0;
503 }
504
505 if when > self.last_update {
506 let last_recv = cur_recv - self.last_recv;
507 self.cur_speed = ((last_recv * 1000 * 1000) as f64 / (when - self.last_update) as f64) as u32;
508 self.last_recv = cur_recv;
509 self.last_update = when;
510 self.cur_speed
511 } else {
512 self.cur_speed
513 }
514 }
515
516 pub fn cur_speed(&self) -> u32 {
517 self.cur_speed
518 }
519}
520
521
522#[derive(Debug, Serialize, Deserialize)]
523pub enum NdnTaskState {
524 Running,
525 Paused,
526 Error(BuckyError),
527 Finished
528}
529
530#[derive(Clone, Debug, Serialize, Deserialize)]
531pub enum NdnTaskControlState {
532 Normal,
533 Paused,
534 Canceled,
535}
536
537pub trait NdnTask: Send + Sync {
538 fn clone_as_task(&self) -> Box<dyn NdnTask>;
539 fn state(&self) -> NdnTaskState;
540 fn control_state(&self) -> NdnTaskControlState;
541
542 fn resume(&self) -> BuckyResult<NdnTaskControlState> {
543 Ok(NdnTaskControlState::Normal)
544 }
545 fn cancel(&self) -> BuckyResult<NdnTaskControlState> {
546 self.cancel_by_error(BuckyError::new(BuckyErrorCode::Interrupted, "user canceled"))
547 }
548 fn cancel_by_error(&self, _err: BuckyError) -> BuckyResult<NdnTaskControlState> {
549 Ok(NdnTaskControlState::Normal)
550 }
551 fn pause(&self) -> BuckyResult<NdnTaskControlState> {
552 Ok(NdnTaskControlState::Normal)
553 }
554
555 fn close(&self, _recursion: bool) -> BuckyResult<()> {
556 Ok(())
557 }
558
559 fn cur_speed(&self) -> u32;
560 fn history_speed(&self) -> u32;
561 fn transfered(&self) -> u64;
562}