1use std::{
2 convert::TryFrom,
3 ops::Range,
4};
5use serde_json::{Map, Value};
6use cyfs_base::*;
7use crate::{
8 types::*,
9 interface::udp::MTU,
10 protocol::{PackageCmdCode, Package},
11 tunnel::{udp::Tunnel as UdpTunnel, DynamicTunnel},
12 datagram::DatagramOptions
13};
14use super::super::super::{
15 types::*
16};
17
18#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd)]
19pub enum CommandCode {
20 Interest = 0,
21 RespInterest = 1,
22}
23
24
25impl TryFrom<u8> for CommandCode {
26 type Error = BuckyError;
27 fn try_from(v: u8) -> std::result::Result<Self, Self::Error> {
28 match v {
29 0u8 => Ok(Self::Interest),
30 1u8 => Ok(Self::RespInterest),
31 _ => Err(BuckyError::new(
32 BuckyErrorCode::InvalidParam,
33 "invalid channel command value",
34 )),
35 }
36 }
37}
38
39pub trait CommandPackage {
40 fn command_code() -> CommandCode;
41}
42
43struct FlagsEncodeContext {
44 flags: u16,
45 length: usize,
46}
47
48impl FlagsEncodeContext {
49 pub fn new<'a>(
50 command_code: u8,
51 buf: &'a mut [u8]
52 ) -> Result<(Self, &'a mut [u8]), BuckyError> {
53 let buf = command_code.raw_encode(buf, &None)?;
54 let buf = u16::from(0 as u16).raw_encode(buf, &None)?;
55 Ok((
56 Self {
57 flags: 0,
58 length: 3,
59 },
60 buf,
61 ))
62 }
63
64 pub fn encode<'a, T: RawEncode>(
66 &mut self,
67 buf: &'a mut [u8],
68 value: &T
69 ) -> Result<&'a mut [u8], BuckyError> {
70 let pre_len = buf.len();
71 let next_buf = value.raw_encode(buf, &None)?;
72 self.length += pre_len - next_buf.len();
73 Ok(next_buf)
74 }
75
76 pub fn option_encode<'a, T: RawEncode>(
77 &mut self,
78 buf: &'a mut [u8],
79 value: &Option<T>,
80 inc_flags: u16,
81 ) -> Result<&'a mut [u8], BuckyError> {
82 if let Some(v) = value {
83 let pre_len = buf.len();
84 self.flags |= inc_flags;
85 let next_buf = v.raw_encode(buf, &None)?;
86 self.length += pre_len - next_buf.len();
87 Ok(next_buf)
88 } else {
89 Ok(buf)
90 }
91 }
92
93 pub fn set_flags(&mut self, inc_flags: u16) {
94 self.flags |= inc_flags;
95 }
96
97 pub fn get_flags(&self) -> u16 {
98 self.flags
99 }
100
101 pub fn finish<'a>(&self, buf: &'a mut [u8]) -> Result<&'a mut [u8], BuckyError> {
102 let begin_buf = buf;
103 let buf = &mut begin_buf[u8::raw_bytes().unwrap()..];
104 u16::from(self.flags).raw_encode(buf, &None).map(|_| ())?;
105 Ok(&mut begin_buf[self.length..])
106 }
107}
108
109
110struct FlagsDecodeContext {
111 flags: u16,
112}
113
114impl FlagsDecodeContext {
115 pub fn new<'a>(
116 buf: &'a [u8]
117 ) -> Result<(Self, &'a [u8]), BuckyError> {
118 let (flags, buf) = u16::raw_decode(buf)?;
119 Ok((
120 Self {
121 flags,
122 },
123 buf,
124 ))
125 }
126
127
128 pub fn decode<'a, T: RawDecode<'a>>(
131 &mut self,
132 buf: &'a [u8]
133 ) -> Result<(T, &'a [u8]), BuckyError> {
134 T::raw_decode(buf)
135 }
136
137 pub fn option_decode<'a, T: RawDecode<'a>>(
138 &mut self,
139 buf: &'a [u8],
140 check_flags: u16,
141 ) -> Result<(Option<T>, &'a [u8]), BuckyError> {
142 if self.flags & check_flags == 0 {
143 Ok((None, buf))
144 } else {
145 T::raw_decode(buf).map(|(v, buf)| (Some(v), buf))
146 }
147 }
148
149 pub fn check_flags(&self, bits: u16) -> bool {
150 self.flags & bits != 0
151 }
152
153 pub fn flags(&self) -> u16 {
154 self.flags
155 }
156}
157
158
159struct FlagsCounter {
160 counter: u8,
161}
162
163impl FlagsCounter {
164 pub fn new() -> Self {
165 Self { counter: 0 }
166 }
167
168 pub fn next(&mut self) -> u16 {
169 let inc = self.counter;
170 self.counter += 1;
171 1 << inc
172 }
173}
174
175
176#[derive(Debug, Clone)]
177pub struct Interest {
178 pub session_id: TempSeq,
179 pub chunk: ChunkId,
180 pub prefer_type: ChunkCodecDesc,
181 pub referer: Option<String>,
182 pub from: Option<DeviceId>,
183 pub group_path: Option<String>
184 }
190
191
192impl RawEncodeWithContext<DatagramOptions> for Interest {
193 fn raw_measure_with_context(
194 &self,
195 _options: &mut DatagramOptions,
196 _purpose: &Option<RawEncodePurpose>
197 ) -> Result<usize, BuckyError> {
198 unimplemented!()
199 }
200 fn raw_encode_with_context<'a>(
201 &self,
202 enc_buf: &'a mut [u8],
203 options: &mut DatagramOptions,
204 _purpose: &Option<RawEncodePurpose>
205 ) -> Result<&'a mut [u8], BuckyError> {
206 options.sequence = Some(self.session_id);
207 let mut flags = FlagsCounter::new();
208 let (mut context, buf) = FlagsEncodeContext::new(CommandCode::Interest as u8, enc_buf)?;
209 let buf = context.encode(buf, &self.chunk)?;
210 let buf = context.encode(buf, &self.prefer_type)?;
211 let buf = context.option_encode(buf, &self.referer, flags.next())?;
212 let buf = context.option_encode(buf, &self.from, flags.next())?;
213 let _ = context.option_encode(buf, &self.group_path, flags.next())?;
214 context.finish(enc_buf)
215 }
216}
217
218impl<'de> RawDecodeWithContext<'de, &DatagramOptions> for Interest {
219 fn raw_decode_with_context(
220 buf: &'de [u8],
221 options: &DatagramOptions,
222 ) -> Result<(Self, &'de [u8]), BuckyError> {
223 let session_id = options.sequence.ok_or_else(||
224 BuckyError::new(BuckyErrorCode::InvalidData, "Interest package should has sequence"))?;
225 let mut flags = FlagsCounter::new();
226 let (mut context, buf) = FlagsDecodeContext::new(buf)?;
227 let (chunk, buf) = context.decode(buf)?;
228 let (prefer_type, buf) = context.decode(buf)?;
229 let (referer, buf) = context.option_decode(buf, flags.next())?;
230 let (from, buf) = context.option_decode(buf, flags.next())?;
231 let (group_path, buf) = context.option_decode(buf, flags.next())?;
232 Ok((
233 Self {
234 session_id,
235 chunk,
236 prefer_type,
237 referer,
238 from,
239 group_path
240 },
241 buf,
242 ))
243 }
244}
245
246impl JsonCodec<Interest> for Interest {
247 fn encode_json(&self) -> Map<String, Value> {
248 let mut obj = Map::new();
249 JsonCodecHelper::encode_number_field(&mut obj, "session_id", self.session_id.value());
250 JsonCodecHelper::encode_string_field(&mut obj, "chunk", &self.chunk);
251 JsonCodecHelper::encode_field(&mut obj, "prefer_type", &self.prefer_type);
252 JsonCodecHelper::encode_option_string_field(&mut obj, "referer", self.referer.as_ref());
253 JsonCodecHelper::encode_option_string_field(&mut obj, "from", self.from.as_ref());
254 JsonCodecHelper::encode_option_string_field(&mut obj, "group_path", self.group_path.as_ref());
255 obj
256 }
257
258 fn decode_json(obj: &Map<String, Value>) -> BuckyResult<Self> {
259 let session_id: u32 = JsonCodecHelper::decode_int_field(obj, "session_id")?;
260 Ok(Self {
261 session_id: TempSeq::from(session_id),
262 chunk: JsonCodecHelper::decode_string_field(obj, "chunk")?,
263 prefer_type: JsonCodecHelper::decode_field(obj, "prefer_type")?,
264 referer: JsonCodecHelper::decode_option_string_field(obj, "referer")?,
265 from: JsonCodecHelper::decode_option_string_field(obj, "from")?,
266 group_path: JsonCodecHelper::decode_option_string_field(obj, "group_path")?,
267 })
268 }
269}
270
271
272#[test]
273fn encode_protocol_ineterest() {
274 let src = Interest {
275 session_id: TempSeq::from(123),
276 chunk: ChunkId::default(),
277 prefer_type: ChunkCodecDesc::Stream(None, None, None),
278 referer: Some("referer".to_owned()),
279 from: None,
280 group_path: None
281 };
282
283 let mut buf = [0u8; 1500];
284 let mut options = DatagramOptions::default();
285 let _ = src.raw_encode_with_context(&mut buf, &mut options, &None).unwrap();
286
287 let (cmd, dec) = u8::raw_decode(&buf).map(|(code, dec)| (CommandCode::try_from(code).unwrap(), dec)).unwrap();
288 assert_eq!(cmd, CommandCode::Interest);
289 let (dst, _) = Interest::raw_decode_with_context(dec, &mut options).unwrap();
290 assert_eq!(src.chunk, dst.chunk);
291 assert_eq!(src.referer, dst.referer);
292}
293
294
295
296#[derive(Clone, Debug)]
297pub struct RespInterest {
298 pub session_id: TempSeq,
299 pub chunk: ChunkId,
300 pub err: BuckyErrorCode,
301 pub redirect: Option<DeviceId>,
302 pub redirect_referer: Option<String>,
303 pub to: Option<DeviceId>,
304}
305
306
307impl RawEncodeWithContext<DatagramOptions> for RespInterest {
308 fn raw_measure_with_context(
309 &self,
310 _options: &mut DatagramOptions,
311 _purpose: &Option<RawEncodePurpose>
312 ) -> Result<usize, BuckyError> {
313 unimplemented!()
314 }
315 fn raw_encode_with_context<'a>(
316 &self,
317 enc_buf: &'a mut [u8],
318 options: &mut DatagramOptions,
319 _purpose: &Option<RawEncodePurpose>
320 ) -> Result<&'a mut [u8], BuckyError> {
321 let mut flags = FlagsCounter::new();
322
323 options.sequence = Some(self.session_id);
324 let (mut context, buf) = FlagsEncodeContext::new(CommandCode::RespInterest as u8, enc_buf)?;
326 let buf = context.encode(buf, &self.chunk)?;
327 let buf = context.encode(buf, &(self.err.into_u16()))?;
328 let buf = context.option_encode(buf, &(self.redirect), flags.next())?;
329 let buf = context.option_encode(buf, &(self.redirect_referer), flags.next())?;
330 let _ = context.option_encode(buf, &(self.to), flags.next())?;
331 context.finish(enc_buf)
332 }
333}
334
335impl<'de> RawDecodeWithContext<'de, &DatagramOptions> for RespInterest {
336 fn raw_decode_with_context(
337 buf: &'de [u8],
338 options: &DatagramOptions,
339 ) -> Result<(Self, &'de [u8]), BuckyError> {
340 let session_id = options.sequence.ok_or_else(||
341 BuckyError::new(BuckyErrorCode::InvalidData, "RespInterest package should has sequence"))?;
342 let mut flags = FlagsCounter::new();
343
344 let (mut context, buf) = FlagsDecodeContext::new(buf)?;
345 let (chunk, buf) = context.decode(buf)?;
346 let (err, buf) = context.decode::<u16>(buf)?;
347 let err = BuckyErrorCode::from(err);
348 let (id, buf) = context.option_decode(buf, flags.next())?;
349 let (referer, buf) = context.option_decode(buf, flags.next())?;
350 let (to, buf) = context.option_decode(buf, flags.next())?;
351
352 Ok((
353 Self {
354 session_id,
355 chunk,
356 err,
357 redirect: id,
358 redirect_referer: referer,
359 to
360 },
361 buf,
362 ))
363 }
364}
365
366impl JsonCodec<RespInterest> for RespInterest {
367 fn encode_json(&self) -> Map<String, Value> {
368 let mut obj = Map::new();
369 JsonCodecHelper::encode_number_field(&mut obj, "session_id", self.session_id.value());
370 JsonCodecHelper::encode_string_field(&mut obj, "chunk", &self.chunk);
371 let err: u32 = self.err.into();
372 JsonCodecHelper::encode_number_field(&mut obj, "err", err);
373 JsonCodecHelper::encode_option_string_field(&mut obj, "redirect", self.redirect.as_ref());
374 JsonCodecHelper::encode_option_string_field(&mut obj, "redirect_referer", self.redirect_referer.as_ref());
375 JsonCodecHelper::encode_option_string_field(&mut obj, "to", self.to.as_ref());
376 obj
377 }
378
379 fn decode_json(obj: &Map<String, Value>) -> BuckyResult<Self> {
380 let session_id: u32 = JsonCodecHelper::decode_int_field(obj, "session_id")?;
381 let err: u32 = JsonCodecHelper::decode_int_field(obj, "err")?;
382 Ok(Self {
383 session_id: TempSeq::from(session_id),
384 chunk: JsonCodecHelper::decode_string_field(obj, "chunk")?,
385 err: BuckyErrorCode::from(err),
386 redirect: JsonCodecHelper::decode_option_string_field(obj, "redirect")?,
387 redirect_referer: JsonCodecHelper::decode_option_string_field(obj, "redirect_referer")?,
388 to: JsonCodecHelper::decode_option_string_field(obj, "to")?,
389 })
390 }
391}
392
393
394
395pub struct PieceData {
396 pub est_seq: Option<TempSeq>,
397 pub session_id: TempSeq,
398 pub chunk: ChunkId,
399 pub desc: PieceDesc,
400 pub data: Vec<u8>,
401}
402
403impl Package for PieceData {
404 fn version(&self) -> u8 {
405 0
406 }
407
408 fn cmd_code() -> PackageCmdCode {
409 PackageCmdCode::PieceData
410 }
411}
412
413impl PieceData {
414 pub fn max_header_len() -> usize {
415 TempSeq::raw_bytes().unwrap()
416 + TempSeq::raw_bytes().unwrap()
417 + ChunkId::raw_bytes().unwrap()
418 + PieceDesc::raw_bytes().unwrap()
419 }
420
421 pub fn max_payload() -> usize {
422 UdpTunnel::raw_data_max_payload_len() - u8::raw_bytes().unwrap() - Self::max_header_len()
423 }
424
425 pub fn encode_header<'a>(
426 buf: &'a mut [u8],
427 session_id: &TempSeq,
428 chunk: &ChunkId,
429 desc: &PieceDesc) -> BuckyResult<&'a mut [u8]> {
430 let buf = (PieceData::cmd_code() as u8).raw_encode(buf, &None)?;
431 let buf = TempSeq::default().raw_encode(buf, &None)?;
432 let buf = session_id.raw_encode(buf, &None)?;
433 let buf = chunk.raw_encode(buf, &None)?;
434 desc.raw_encode(buf, &None)
435 }
436
437 pub fn reset_estimate(buf: &mut [u8], est_seq: TempSeq) {
438 let _ = est_seq.raw_encode(&mut buf[u8::raw_bytes().unwrap()..], &None).unwrap();
439 }
440
441 pub fn decode_from_raw_data(buf: &[u8]) -> BuckyResult<Self> {
442 let (est_seq, buf) = TempSeq::raw_decode(buf).map(|(s, buf)| {
443 (
444 if s == TempSeq::default() {
445 None
446 } else {
447 Some(s)
448 },
449 buf,
450 )
451 })?;
452 let (session_id, buf) = TempSeq::raw_decode(buf)?;
453 let (chunk, buf) = ChunkId::raw_decode(buf)?;
454 let (desc, data) = PieceDesc::raw_decode(buf)?;
455 Ok(Self {
456 est_seq,
457 session_id,
458 chunk,
459 desc,
460 data: Vec::from(data),
462 })
463 }
464}
465
466#[derive(Debug, PartialEq, Eq, Clone, Copy)]
467pub enum PieceControlCommand {
468 Continue,
469 Finish,
470 Pause,
471 Cancel,
472}
473
474
475impl RawEncode for PieceControlCommand {
476 fn raw_measure(&self, _purpose: &Option<RawEncodePurpose>) -> BuckyResult<usize> {
477 Ok(u8::raw_bytes().unwrap())
478 }
479
480 fn raw_encode<'a>(
481 &self,
482 buf: &'a mut [u8],
483 purpose: &Option<RawEncodePurpose>,
484 ) -> BuckyResult<&'a mut [u8]> {
485 match self {
486 Self::Continue => 0u8.raw_encode(buf, purpose),
487 Self::Finish => 1u8.raw_encode(buf, purpose),
488 Self::Pause => 2u8.raw_encode(buf, purpose),
489 Self::Cancel => 3u8.raw_encode(buf, purpose),
490 }
491 }
492}
493
494
495impl<'de> RawDecode<'de> for PieceControlCommand {
496 fn raw_decode(buf: &'de [u8]) -> BuckyResult<(Self, &'de [u8])> {
497 let (code, buf) = u8::raw_decode(buf)?;
498 let command = match code {
499 0u8 => Ok(Self::Continue),
500 1u8 => Ok(Self::Finish),
501 2u8 => Ok(Self::Pause),
502 3u8 => Ok(Self::Cancel),
503 _ => Err(BuckyError::new(BuckyErrorCode::InvalidData, "invalid piece control command code"))
504 }?;
505 Ok((command, buf))
506 }
507}
508
509
510#[derive(Debug)]
511pub struct PieceControl {
512 pub sequence: TempSeq,
513 pub session_id: TempSeq,
514 pub chunk: ChunkId,
515 pub command: PieceControlCommand,
516 pub max_index: Option<u32>,
517 pub lost_index: Option<Vec<Range<u32>>>
518}
519
520impl PieceControl {
521 fn max_index_payload() -> usize {
522 125
523 }
524
525 pub fn split_send(mut self, tunnel: &DynamicTunnel) -> BuckyResult<()> {
526 let send_once = |ctrl: PieceControl| {
527 let mut buffer = vec![0u8; MTU];
528 let len = ctrl.raw_encode(&mut buffer[tunnel.as_ref().raw_data_header_len()..], &None)
529 .map(|buf| MTU - buf.len())?;
530 tunnel.as_ref().send_raw_data(&mut buffer[..len])?;
531 Ok(())
532 };
533
534 match self.command {
535 PieceControlCommand::Continue => {
536 if self.lost_index.is_some() {
537 let lost_index = self.lost_index.as_mut().unwrap();
538 let mut buffer = vec![0u8; MTU];
539
540 let enc_from = tunnel.as_ref().raw_data_header_len();
541
542 let mut flags = FlagsCounter::new();
543 let (mut context, buf_ptr) = FlagsEncodeContext::new(PackageCmdCode::PieceControl as u8, &mut buffer[enc_from..])?;
544 let buf_ptr = context.encode(buf_ptr, &self.sequence)?;
545 let buf_ptr = context.encode(buf_ptr, &self.session_id)?;
546 let buf_ptr = context.encode(buf_ptr, &self.chunk)?;
547 let buf_ptr = context.encode(buf_ptr, &self.command)?;
548 let index_from = MTU - buf_ptr.len();
549 let buf_ptr = context.option_encode(buf_ptr, &self.max_index, flags.next())?;
550 let _ = context.option_encode(buf_ptr, &Some(vec![0u8; 0]), flags.next())?;
551 let _ = context.finish(&mut buffer[enc_from..])?;
552
553 for indices in lost_index.chunks(Self::max_index_payload()) {
554 let buf_ptr = if let Some(max_index) = self.max_index {
555 max_index.raw_encode(&mut buffer[index_from..], &None)?
556 } else {
557 &mut buffer[index_from..]
558 };
559 let buf_ptr = indices.raw_encode(buf_ptr, &None)?;
560
561 let len = MTU - buf_ptr.len();
562 tunnel.as_ref().send_raw_data(&mut buffer[..len])?;
563 }
564 Ok(())
565 } else {
566 send_once(self)
567 }
568 },
569 _ => {
570 send_once(self)
571 }
572 }
573 }
574}
575
576impl Package for PieceControl {
577 fn version(&self) -> u8 {
578 0
579 }
580
581 fn cmd_code() -> PackageCmdCode {
582 PackageCmdCode::PieceControl
583 }
584}
585
586impl RawEncode for PieceControl {
587 fn raw_measure(&self, _purpose: &Option<RawEncodePurpose>) -> BuckyResult<usize> {
588 unimplemented!()
589 }
590
591 fn raw_encode<'a>(
592 &self,
593 enc_buf: &'a mut [u8],
594 _purpose: &Option<RawEncodePurpose>,
595 ) -> BuckyResult<&'a mut [u8]> {
596 let mut flags = FlagsCounter::new();
597 let (mut context, buf) = FlagsEncodeContext::new(PackageCmdCode::PieceControl as u8, enc_buf)?;
598 let buf = context.encode(buf, &self.sequence)?;
599 let buf = context.encode(buf, &self.session_id)?;
600 let buf = context.encode(buf, &self.chunk)?;
601 let buf = context.encode(buf, &self.command)?;
602 let buf = context.option_encode(buf, &self.max_index, flags.next())?;
603 let _buf = context.option_encode(buf, &self.lost_index, flags.next())?;
604 context.finish(enc_buf)
605 }
606}
607
608impl<'de> RawDecode<'de> for PieceControl {
609 fn raw_decode(buf: &'de [u8]) -> BuckyResult<(Self, &'de [u8])> {
610 let mut flags = FlagsCounter::new();
611 let (mut context, buf) = FlagsDecodeContext::new(buf)?;
612 let (sequence, buf) = context.decode(buf)?;
613 let (session_id, buf) = context.decode(buf)?;
614 let (chunk, buf) = context.decode(buf)?;
615 let (command, buf) = context.decode(buf)?;
616 let (max_index, buf) = context.option_decode(buf, flags.next())?;
617 let (lost_index, buf) = context.option_decode(buf, flags.next())?;
618 Ok((Self {
619 sequence,
620 session_id,
621 chunk,
622 command,
623 max_index,
624 lost_index
625 }, buf))
626 }
627}
628
629
630pub struct ChannelEstimate {
631 pub sequence: TempSeq,
632 pub recved: u64,
633}
634
635impl Default for ChannelEstimate {
636 fn default() -> Self {
637 Self {
638 sequence: TempSeq::default(),
639 recved: 0
640 }
641 }
642}
643
644impl Package for ChannelEstimate {
645 fn version(&self) -> u8 {
646 0
647 }
648
649 fn cmd_code() -> PackageCmdCode {
650 PackageCmdCode::ChannelEstimate
651 }
652}
653
654
655impl RawFixedBytes for ChannelEstimate {
656 fn raw_bytes() -> Option<usize> {
657 Some(
658 u8::raw_bytes().unwrap()
659 + TempSeq::raw_bytes().unwrap()
660 + u64::raw_bytes().unwrap())
661 }
662}
663
664impl RawEncode for ChannelEstimate {
665 fn raw_measure(&self, _purpose: &Option<RawEncodePurpose>) -> BuckyResult<usize> {
666 Ok(Self::raw_bytes().unwrap())
667 }
668
669 fn raw_encode<'a>(
670 &self,
671 buf: &'a mut [u8],
672 purpose: &Option<RawEncodePurpose>,
673 ) -> BuckyResult<&'a mut [u8]> {
674 let buf = (Self::cmd_code() as u8).raw_encode(buf, purpose)?;
675 let buf = self.sequence.raw_encode(buf, purpose)?;
676 self.recved.raw_encode(buf, purpose)
677 }
678}
679
680impl<'de> RawDecode<'de> for ChannelEstimate {
681 fn raw_decode(buf: &'de [u8]) -> BuckyResult<(Self, &'de [u8])> {
682 let (sequence, buf) = TempSeq::raw_decode(buf)?;
683 let (recved, buf) = u64::raw_decode(buf)?;
684 Ok((Self {
685 sequence,
686 recved
687 }, buf))
688 }
689}