1#![allow(non_snake_case)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4
5use super::*;
6use crate::common::{self,errcode};
7use std::mem;
8
9const FEC_TYPE_ENCODER:u8=0;
10const FEC_TYPE_DECODER:u8=1;
11
12const MAX_FEC_RATIO:usize = 10;
13const MAX_FEC_BUF_SIZE:usize = MAX_PKT_BUF_SIZE as usize;
14#[derive(Clone,Debug)]
16#[repr(C)]
17struct fec_header_t {
18 start_seq:u64,
19 src_pkts_num:u8,
20 alg_type:u8,
21 max_pkt_len:u16,
22 src_pkts_len:[u16;MAX_FEC_RATIO],
23}
24const FEC_HDR_SIZE:usize = mem::size_of::<fec_header_t>();
25
26#[derive(Clone,Debug)]
28#[repr(C)]
29pub struct fec_packet_t {
30 hdr:fec_header_t,
31 body:[u8;MAX_FEC_BUF_SIZE],
32 real_len:u16,
33 fec_type:u8,
34 ratio:u8,
36 src_pkts_added:[bool;MAX_FEC_RATIO],
37 fec_exist:bool,
38}
39
40
41impl fec_packet_t {
43 pub fn new()->Self {
44 return Self {
45 hdr:unsafe { std::mem::zeroed::<fec_header_t>() },
46 body:[0;MAX_FEC_BUF_SIZE],
47 real_len:0,
48 fec_type:FEC_TYPE_ENCODER,
49 ratio:5,
50 src_pkts_added:[false;MAX_FEC_RATIO],
51 fec_exist:false,
52 }
53 }
54
55 fn reset(&mut self,start_seq:u64,ratio:u8) {
56 self.hdr = unsafe { std::mem::zeroed::<fec_header_t>() };
58 self.hdr.start_seq = start_seq;
59 self.hdr.src_pkts_num = ratio;
60 self.real_len = 0;
61 self.fec_exist = false;
62 self.ratio = ratio;
63 self.body.fill(0);
64 self.src_pkts_added.fill(false);
65 }
66 pub fn start_encoder(&mut self,start_seq:u64,ratio:u8) {
68 self.fec_type = FEC_TYPE_ENCODER;
69 self.reset(start_seq, ratio)
70 }
71
72 pub fn start_decoder(&mut self,start_seq:u64,ratio:u8) {
74 self.fec_type = FEC_TYPE_DECODER;
75 self.reset(start_seq, ratio);
76 }
77
78 fn is_packet_recv_finished(&self)->bool {
80 for i in 0..self.ratio as usize {
81 if !self.src_pkts_added[i] {
82 return false;
83 }
84 }
85 return true
86 }
87 pub fn is_fec_finished(&self)->bool {
89 if self.fec_type==FEC_TYPE_ENCODER {
90 return self.is_packet_recv_finished();
91 } else {
92 let mut expect = 0;
94 for i in 0..self.ratio as usize {
95 if self.src_pkts_added[i] {
96 expect+=1;
97 }
98 }
99 if self.fec_exist {
100 expect+=1;
101 }
102 if expect>=self.ratio {
104 return true;
105 } else {
106 return false;
107 }
108 }
109 }
110
111 pub fn is_first_packet(&self)->bool {
112 for i in 0..self.ratio as usize {
113 if self.src_pkts_added[i] {
114 return false
115 }
116 }
117 if self.fec_exist {
118 return false
119 }
120
121 return true
122 }
123
124 fn set_raw_packet_state(&mut self,idx:usize,len:u16) {
125 self.src_pkts_added[idx] = true;
126 self.hdr.src_pkts_len[idx]=len;
127 if self.real_len< len {
128 self.real_len = len;
129 self.hdr.max_pkt_len = len;
130 }
131
132 }
133 pub fn add_raw_packet(&mut self,seq:u64,pkt:&[u8])->errcode::RESULT {
136 if seq<self.hdr.start_seq || seq>=self.hdr.start_seq+self.ratio as u64 || pkt.len()>MAX_FEC_BUF_SIZE{
137 return errcode::ERROR_INVALID_MSG
138 }
139 if self.fec_type == FEC_TYPE_DECODER && self.is_fec_finished() {
140 return errcode::ERROR_NO_OP;
141 } else if self.is_packet_recv_finished() {
142 return errcode::ERROR_NO_OP;
143 }
144
145 let idx=(seq-self.hdr.start_seq) as usize;
146 if self.src_pkts_added[idx] {
147 return errcode::ERROR_ALREADY_EXIST;
148 }
149
150 if self.is_first_packet() {
151 self.set_raw_packet_state(idx,pkt.len() as u16);
152 self.body[0..pkt.len()].copy_from_slice(pkt);
153 return errcode::ERROR_NO_OP
154 }
155
156 self.set_raw_packet_state(idx,pkt.len() as u16);
157 self.calc_fec_packet(pkt);
158 if self.is_fec_finished() {
159 errcode::RESULT_SUCCESS
160 } else {
161 return errcode::ERROR_NO_OP
162 }
163 }
164
165 pub fn next_session(&mut self,new_ratio:u8) {
167 let seq = self.hdr.start_seq+(new_ratio as u64);
168 self.reset(seq, new_ratio)
169 }
170 pub fn next_start_seq(&self)->u64 {
171 return self.hdr.start_seq+(self.ratio as u64);
172 }
173
174 pub fn add_fec_packet(&mut self,seq:u64,pkt:&[u8])->errcode::RESULT {
176 if seq<self.hdr.start_seq || seq>=self.hdr.start_seq+self.ratio as u64 || pkt.len()<=FEC_HDR_SIZE || pkt.len()>MAX_FEC_BUF_SIZE+FEC_HDR_SIZE{
179 return errcode::ERROR_INVALID_MSG
180 }
181
182 if self.is_fec_finished() {
183 self.fec_exist = true;
184 return errcode::ERROR_NO_OP;
185 }
186 self.fec_exist = true;
187
188 unsafe {
189 std::ptr::copy_nonoverlapping(pkt.as_ptr(), &mut self.hdr as *mut _ as * mut u8,FEC_HDR_SIZE);
190 }
191 self.fec_exist = true;
192
193 if self.is_first_packet() {
194 self.body[0..pkt.len()-FEC_HDR_SIZE].copy_from_slice(&pkt[FEC_HDR_SIZE..]);
195 return errcode::ERROR_NO_OP
196 }
197
198 self.calc_fec_packet(&pkt[FEC_HDR_SIZE..]);
199 if self.is_fec_finished() {
200 errcode::RESULT_SUCCESS
201 } else {
202 return errcode::ERROR_NO_OP
203 }
204
205 }
206
207 fn calc_fec_packet(&mut self,pkt:&[u8])->errcode::RESULT {
210
211 let pkt_len = pkt.len();
212 let u64_len:usize = pkt_len as usize/mem::size_of::<u64>();
213
214 let u64_dst = self.body.as_mut_ptr();
220 let u64_src = pkt.as_ptr();
221 for i in 0..u64_len {
225 unsafe {
226 *((u64_dst as usize + i*mem::size_of::<u64>()) as *mut u64) ^= *((u64_src as usize + i*mem::size_of::<u64>()) as *const u64);
227 }
228 }
229
230 for j in u64_len*mem::size_of::<u64>()..pkt_len as usize {
231 self.body[j] ^=pkt[j];
232 }
233
234 for k in pkt_len..MAX_FEC_BUF_SIZE {
235 self.body[k] ^=0u8;
236 }
237
238 errcode::RESULT_SUCCESS
239 }
240
241 fn get_lost_packet_seq(&self)->(u64,usize) {
243 for i in 0..self.ratio as usize{
244 if !self.src_pkts_added[i] {
245 return (self.hdr.start_seq+(i as u64),i)
246 }
247 }
248 return (0,0);
249 }
250
251 pub fn get_raw_packet(&self,buf:&mut [u8])->(usize,u64) {
254 if buf.len() < self.real_len as usize {
255 return (0,0)
256 }
257 let (seq,idx)=self.get_lost_packet_seq();
258 if seq==0 {
259 return (0,0);
260 }
261 unsafe {
262 std::ptr::copy_nonoverlapping(self.body.as_ptr(), buf.as_mut_ptr(), self.hdr.src_pkts_len[idx] as usize);
263 }
264
265 (self.hdr.src_pkts_len[idx] as usize,seq)
266 }
267
268 pub fn get_raw_packet_slice(&self)->Option<(&[u8],u64)> {
269 let (seq,idx)=self.get_lost_packet_seq();
270
271 if seq==0 || idx>=self.ratio as usize || self.hdr.src_pkts_len[idx]>=MAX_FEC_BUF_SIZE as u16{
272 return None;
273 }
274 return Some((&self.body[0..self.hdr.src_pkts_len[idx] as usize],seq));
276 }
277
278 pub fn get_fec_packet(&self,buf:&mut [u8])->(usize,u64) {
281 let fec_len = self.real_len as usize+FEC_HDR_SIZE;
282 if buf.len() < fec_len {
283 return (0,0)
284 }
285 unsafe {
286 std::ptr::copy_nonoverlapping(&self.hdr as *const _ as *const u8, buf.as_mut_ptr(), fec_len);
287 }
288 (fec_len,self.hdr.start_seq+self.ratio as u64-1)
289 }
290
291 pub fn get_packet_len(&self)->usize {
292 self.real_len as usize + FEC_HDR_SIZE
293 }
294 pub fn get_payload_len(&self)->usize {
295 self.real_len as usize
296 }
297
298 pub fn as_slice(&self)->&[u8] {
299 return unsafe {
300 &(*(&self.hdr as *const fec_header_t as *const u8 as *const [u8;MAX_FEC_BUF_SIZE+FEC_HDR_SIZE]))[0..FEC_HDR_SIZE+self.real_len as usize]
301 };
302 }
303
304 pub fn as_mut_slice(&mut self)->&[u8] {
305 return unsafe {
306 &mut (*(&mut self.hdr as *mut fec_header_t as *mut u8 as *mut [u8;MAX_FEC_BUF_SIZE+FEC_HDR_SIZE]))[0..FEC_HDR_SIZE+self.real_len as usize]
307 };
308 }
309
310 pub fn get_recv_pkt_count(&self)->usize {
311 let mut count=0;
312 for b in self.src_pkts_added {
313 if b {
314 count+=1;
315 }
316 }
317 count
318 }
319
320 pub fn is_fec_pkt_recved(&self)->bool {
321 self.fec_exist
322 }
323 pub fn get_start_seq(&self)->u64 {
324 self.hdr.start_seq
325 }
326}
327
328const MAX_FEC_PACKET_IN_STREAM:usize=16;
330pub struct fec_decoder_stream_t {
331 fecs:[fec_packet_t;MAX_FEC_PACKET_IN_STREAM],
332 start_idx:usize,
333 start_seq:u64,
334 ratio:u8,
335 max_seq:u64,
336 fec_buf:[u8;MAX_FEC_BUF_SIZE],
337}
338
339impl fec_decoder_stream_t {
340 pub fn new(start_seq:u64,ratio:u8)->fec_decoder_stream_t {
341 let mut stream=unsafe { mem::zeroed::<fec_decoder_stream_t>() };
342 stream.init(start_seq,ratio);
343 return stream
344 }
345 fn init(&mut self,start_seq:u64,ratio:u8) {
346 self.start_idx = 0;
347 self.ratio = ratio;
348 self.start_seq=start_seq;
349 self.max_seq=start_seq+(MAX_FEC_PACKET_IN_STREAM *ratio as usize) as u64-1;
350 for i in 0..MAX_FEC_PACKET_IN_STREAM as usize {
351 self.fecs[i].start_decoder(start_seq+ i as u64 * (ratio as u64),ratio);
352 }
353 }
354
355 pub fn start(&mut self,start_seq:u64,ratio:u8) {
356 self.init(start_seq,ratio);
357 }
358
359 pub fn cleanup(&mut self) {
360 while self.fecs[self.start_idx].is_fec_finished() {
361 self.start_seq+= self.ratio as u64;
362 self.fecs[self.start_idx].start_decoder(self.max_seq+1,self.ratio);
363 self.max_seq +=self.ratio as u64;
364 self.start_idx = (self.start_idx +1) % MAX_FEC_PACKET_IN_STREAM;
365 }
366 }
367 fn move_to_seq(&mut self,new_seq:u64) {
369 self.cleanup();
370 if new_seq<= self.max_seq {
371 return
372 }
373 if new_seq>=self.max_seq+self.seq_window_len() {
375 self.init(new_seq/self.ratio as u64 * self.ratio as u64,self.ratio);
376 return
377 }
378
379 let step = common::ceiling(new_seq - self.max_seq,self.ratio as u64) as usize;
381 let new_idx = (self.start_idx + step) % MAX_FEC_PACKET_IN_STREAM;
382 let new_start_seq = self.start_seq+(step as u64 * self.ratio as u64);
383 let mut idx = self.start_idx;
384 let mut seq = self.start_seq+(MAX_FEC_PACKET_IN_STREAM + step-1) as u64*self.ratio as u64;
385 while idx!=new_idx {
386 self.fecs[idx].start_decoder(seq,self.ratio);
387 idx=(idx + 1) % MAX_FEC_PACKET_IN_STREAM;
388 seq+=self.ratio as u64;
389 }
390 self.start_idx = new_idx;
392 self.start_seq = new_start_seq;
393 self.max_seq = new_start_seq+(self.ratio as u64 * MAX_FEC_PACKET_IN_STREAM as u64)-1;
394 }
395
396 fn get_index_by_seq(&self,seq:u64)->usize {
397 if seq<self.start_seq {
398 return 0;
399 }
400
401 (((seq-self.start_seq) / self.ratio as u64) as usize + self.start_idx) % MAX_FEC_PACKET_IN_STREAM
402 }
403
404 fn seq_window_len(&self)->u64 {
405 self.ratio as u64*MAX_FEC_PACKET_IN_STREAM as u64
406 }
407
408 fn need_resync(&mut self,recv_seq:u64)->bool {
410 if recv_seq<self.start_seq {
411 if recv_seq + self.seq_window_len()< self.start_seq || (self.start_seq - recv_seq+1) % self.ratio as u64!=0 {
412 return true;
413 }
414 return false;
415 } else if (recv_seq - self.start_seq+1) % self.ratio as u64!=0 {
416 return true
417 }
418 false
419 }
420 pub fn add_fec_packet(&mut self,seq:u64,pkt:&[u8])->Result<(&[u8],u64),errcode::RESULT> {
422 if self.need_resync(seq) {
423 self.init(seq+1, self.ratio);
424 return Err(errcode::ERROR_NO_OP);
425 }
426 if seq<self.start_seq {
427 return Err(errcode::ERROR_INVALID_MSG);
428 }
429
430 if seq>self.max_seq {
431 self.move_to_seq(seq);
432 }
433 let idx= self.get_index_by_seq(seq);
434 let res = self.fecs[idx].add_fec_packet(seq,pkt);
435 if res==errcode::RESULT_SUCCESS {
436 let (seq,len) = match self.fecs[idx].get_raw_packet_slice() {
437 None=> return Err(errcode::ERROR_COMMON),
438 Some((pkt,seq))=> {
439 self.fec_buf[0..pkt.len()].copy_from_slice(pkt);
440 (seq,pkt.len())
441 },
442 };
443 self.cleanup();
444 return Ok((&self.fec_buf[0..len],seq));
445 } else if self.fecs[idx].is_fec_finished() {
446 self.cleanup();
447 }
448
449 Err(errcode::ERROR_NO_OP)
450 }
451
452 pub fn add_raw_packet(&mut self,seq:u64,pkt:&[u8])->Result<(&[u8],u64),errcode::RESULT> {
454 if seq<self.start_seq {
455 return Err(errcode::ERROR_INVALID_MSG);
456 }
457 if seq>self.max_seq {
458 self.move_to_seq(seq);
459 }
460 let idx= self.get_index_by_seq(seq);
461 let res = self.fecs[idx].add_raw_packet(seq,pkt);
462 if res==errcode::RESULT_SUCCESS {
463 let (seq,len) = match self.fecs[idx].get_raw_packet_slice() {
464 None=> return Err(errcode::ERROR_COMMON),
465 Some((pkt,seq))=> {
466 self.fec_buf[0..pkt.len()].copy_from_slice(pkt);
467 (seq,pkt.len())
468 },
469 };
470 self.cleanup();
471 return Ok((&self.fec_buf[0..len],seq));
472 } else if self.fecs[idx].is_fec_finished() {
473 self.cleanup();
474 }
475
476 Err(errcode::ERROR_NO_OP)
477 }
478
479 pub fn get_start_seq(&self)->u64 {
480 self.start_seq
481 }
482
483 pub fn get_start_idx(&self)->usize {
484 self.start_idx
485 }
486
487 pub fn print_stats(&self) {
488 let mut idx=self.start_idx;
489 println!("Fec Stream internal State, start_seq={},Max_seq={},ratio={},start_idx={}",
490 self.start_seq,self.max_seq,self.ratio,self.start_idx);
491 while idx!= (self.start_idx+MAX_FEC_PACKET_IN_STREAM-1) % MAX_FEC_PACKET_IN_STREAM {
492 let p = &self.fecs[idx];
493 println!("No {} fec packet,start_seq={},recv {} raw packet,recv_fec={}",idx,
494 p.get_start_seq(),p.get_recv_pkt_count(),p.fec_exist);
495 idx = (idx+1) % MAX_FEC_PACKET_IN_STREAM;
496 }
497 let p = &self.fecs[idx];
498 println!("No {} fec packet,start_seq={},recv {} raw packet,recv_fec={}",idx,
499 p.get_start_seq(),p.get_recv_pkt_count(),p.fec_exist);
500 }
501}