rust_rsm/net_ext/
fec.rs

1#![allow(non_snake_case)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4
5use super::*;
6use crate::common::{self,errcode};
7use std::mem;
8
9const FEC_TYPE_ENCODER:u8=0;
10const FEC_TYPE_DECODER:u8=1;
11
12const MAX_FEC_RATIO:usize = 10;
13const MAX_FEC_BUF_SIZE:usize = MAX_PKT_BUF_SIZE as usize;
14///报文头部
15#[derive(Clone,Debug)]
16#[repr(C)]
17struct fec_header_t {
18    start_seq:u64,
19    src_pkts_num:u8,
20    alg_type:u8,
21    max_pkt_len:u16,
22    src_pkts_len:[u16;MAX_FEC_RATIO],
23}
24const FEC_HDR_SIZE:usize = mem::size_of::<fec_header_t>();
25
26///FEC报文,实现对FEC的处理
27#[derive(Clone,Debug)]
28#[repr(C)]
29pub struct fec_packet_t {
30    hdr:fec_header_t,
31    body:[u8;MAX_FEC_BUF_SIZE],
32    real_len:u16,
33    fec_type:u8,
34    ///FEC的N:1比例,多少个报文计算一个FEC报文
35    ratio:u8, 
36    src_pkts_added:[bool;MAX_FEC_RATIO],
37    fec_exist:bool,
38}
39
40
41///FEC处理,Encoder/Decoder同一套代码
42impl fec_packet_t {
43    pub fn new()->Self {
44        return Self {
45            hdr:unsafe { std::mem::zeroed::<fec_header_t>() },
46            body:[0;MAX_FEC_BUF_SIZE],
47            real_len:0,
48            fec_type:FEC_TYPE_ENCODER,
49            ratio:5,
50            src_pkts_added:[false;MAX_FEC_RATIO],
51            fec_exist:false,
52        }
53    }
54
55    fn reset(&mut self,start_seq:u64,ratio:u8) {
56        //let fec_type=self.fec_type;
57        self.hdr = unsafe { std::mem::zeroed::<fec_header_t>() };
58        self.hdr.start_seq = start_seq;
59        self.hdr.src_pkts_num = ratio;
60        self.real_len = 0;
61        self.fec_exist = false;
62        self.ratio = ratio;
63        self.body.fill(0);
64        self.src_pkts_added.fill(false);
65    }
66    //启动一次Encoder会话
67    pub fn start_encoder(&mut self,start_seq:u64,ratio:u8) {
68        self.fec_type = FEC_TYPE_ENCODER;
69        self.reset(start_seq, ratio)
70    }
71
72    //启动一次Decoder会话
73    pub fn start_decoder(&mut self,start_seq:u64,ratio:u8) {
74        self.fec_type = FEC_TYPE_DECODER;
75        self.reset(start_seq, ratio);
76    }
77
78    //接收端报文是否已经收齐
79    fn is_packet_recv_finished(&self)->bool {
80        for i in 0..self.ratio as usize {
81            if !self.src_pkts_added[i] {
82                return false;
83            }
84        }
85        return true
86    }
87    //判断FEC计算过程是否已经结束
88    pub fn is_fec_finished(&self)->bool {        
89        if self.fec_type==FEC_TYPE_ENCODER {
90            return self.is_packet_recv_finished();
91        } else {
92            //如果是Decoder,收齐N-1个原始报文和一个FEC报文可以恢复原始报文,但是如果收齐N个报文,FEC应该结束,等待进入下一次处理周期
93            let mut expect = 0;
94            for i in 0..self.ratio as usize {
95                if self.src_pkts_added[i] {
96                    expect+=1;
97                }
98            }
99            if self.fec_exist {
100                expect+=1;
101            }
102            //println!("ration={},expect={}",self.ratio,expect);
103            if expect>=self.ratio {
104                return true;
105            } else {
106                return false;
107            }
108        }
109    }
110
111    pub fn is_first_packet(&self)->bool {
112        for i in 0..self.ratio as usize {
113            if self.src_pkts_added[i] {
114                return false
115            }
116        }
117        if self.fec_exist {
118            return false
119        }
120
121        return true
122    }
123
124    fn set_raw_packet_state(&mut self,idx:usize,len:u16) {
125        self.src_pkts_added[idx] = true;
126        self.hdr.src_pkts_len[idx]=len;
127        if self.real_len< len {
128            self.real_len = len;
129            self.hdr.max_pkt_len = len;
130        }
131
132    }
133    //加入一个RawPacket,如果FEC没有结束,返回ERROR_NO_OP,成功则应用需要读取相应的报文
134    // 如果报文丢包同时且发生乱序,目前代码还需要优化
135    pub fn add_raw_packet(&mut self,seq:u64,pkt:&[u8])->errcode::RESULT {
136        if seq<self.hdr.start_seq || seq>=self.hdr.start_seq+self.ratio as u64 || pkt.len()>MAX_FEC_BUF_SIZE{
137            return errcode::ERROR_INVALID_MSG
138       }  
139        if self.fec_type == FEC_TYPE_DECODER && self.is_fec_finished() {
140            return errcode::ERROR_NO_OP;
141        } else if self.is_packet_recv_finished() {
142            return errcode::ERROR_NO_OP;
143        }
144       
145       let idx=(seq-self.hdr.start_seq) as usize;
146       if self.src_pkts_added[idx] {
147        return errcode::ERROR_ALREADY_EXIST;
148       }
149
150       if self.is_first_packet() {
151        self.set_raw_packet_state(idx,pkt.len() as u16);
152        self.body[0..pkt.len()].copy_from_slice(pkt);
153        return errcode::ERROR_NO_OP
154       } 
155
156       self.set_raw_packet_state(idx,pkt.len() as u16);
157       self.calc_fec_packet(pkt);
158       if self.is_fec_finished() {
159            errcode::RESULT_SUCCESS
160        } else {
161            return errcode::ERROR_NO_OP
162        }
163    }
164
165    ///FEC Encoder和Decoder进入下一个处理会话,Seq顺延
166    pub fn next_session(&mut self,new_ratio:u8) {
167        let seq = self.hdr.start_seq+(new_ratio as u64);
168        self.reset(seq, new_ratio)
169    }
170    pub fn next_start_seq(&self)->u64 {
171        return self.hdr.start_seq+(self.ratio as u64);
172    }
173
174    //加入一个FecPacket,如果FEC没有结束,返回ERROR_NO_OP,成功则返回编码的FEC,或者解码的原始报文   
175    pub fn add_fec_packet(&mut self,seq:u64,pkt:&[u8])->errcode::RESULT {
176        //let hdr_ptr = unsafe {&*(&pkt[0] as *const u8 as *const fec_header_t)};
177
178        if seq<self.hdr.start_seq || seq>=self.hdr.start_seq+self.ratio as u64 || pkt.len()<=FEC_HDR_SIZE || pkt.len()>MAX_FEC_BUF_SIZE+FEC_HDR_SIZE{
179            return errcode::ERROR_INVALID_MSG
180       }
181       
182        if self.is_fec_finished() {
183            self.fec_exist = true;
184            return errcode::ERROR_NO_OP;
185        }
186        self.fec_exist = true;
187        
188        unsafe {
189            std::ptr::copy_nonoverlapping(pkt.as_ptr(), &mut self.hdr as *mut _ as * mut u8,FEC_HDR_SIZE);
190        }
191        self.fec_exist = true;
192        
193        if self.is_first_packet() {
194            self.body[0..pkt.len()-FEC_HDR_SIZE].copy_from_slice(&pkt[FEC_HDR_SIZE..]);
195            return errcode::ERROR_NO_OP
196        }
197
198        self.calc_fec_packet(&pkt[FEC_HDR_SIZE..]);
199        if self.is_fec_finished() {
200            errcode::RESULT_SUCCESS
201        } else {
202            return errcode::ERROR_NO_OP
203        }
204        
205    }
206
207    //内部实际计算FEC Packet
208    //FEC计算需要对报文进行填充,填充到最大MTU
209    fn calc_fec_packet(&mut self,pkt:&[u8])->errcode::RESULT {
210
211        let pkt_len = pkt.len();
212        let u64_len:usize = pkt_len as usize/mem::size_of::<u64>();
213        
214        //let mut tmp_pkt = [0u8;MAX_PKT_BUF_SIZE as usize];
215        
216
217        //tmp_pkt[0..pkt_len].copy_from_slice(pkt);
218
219        let u64_dst = self.body.as_mut_ptr();
220        let u64_src = pkt.as_ptr();
221        //let u64_dst = unsafe { &mut *(&mut self.body[0] as *mut _ as *mut [u64;u64_len]) };
222        //let u64_src = unsafe { &*(tmp_pkt.as_ptr() as *const _ as *const [u64;u64_len]) };
223
224        for i in 0..u64_len {
225            unsafe {
226                *((u64_dst as usize + i*mem::size_of::<u64>()) as *mut u64) ^= *((u64_src as usize + i*mem::size_of::<u64>()) as *const u64);
227            }
228        }
229
230        for j in u64_len*mem::size_of::<u64>()..pkt_len as usize {
231            self.body[j] ^=pkt[j];
232        }
233
234        for k in pkt_len..MAX_FEC_BUF_SIZE {
235            self.body[k] ^=0u8;
236        }
237
238        errcode::RESULT_SUCCESS
239    }
240
241    ///获取丢失报文的序号
242    fn get_lost_packet_seq(&self)->(u64,usize) {
243        for i in 0..self.ratio as usize{
244            if !self.src_pkts_added[i] {
245                return (self.hdr.start_seq+(i as u64),i)
246            }
247        }
248        return (0,0);
249    }
250
251    ///读取RawPacket,用于FEC计算恢复的原始报文;buf是应用传入的缓冲区,应该大于MTU长度
252    /// 返回报文实际长度和恢复的rawPacket的序号,如果无丢包,则返回(0,0)
253    pub fn get_raw_packet(&self,buf:&mut [u8])->(usize,u64) {
254        if buf.len() < self.real_len as usize {
255            return (0,0)
256        }
257        let (seq,idx)=self.get_lost_packet_seq();
258        if seq==0 {
259            return (0,0);
260        }
261        unsafe {
262            std::ptr::copy_nonoverlapping(self.body.as_ptr(), buf.as_mut_ptr(), self.hdr.src_pkts_len[idx] as usize);
263        }
264
265        (self.hdr.src_pkts_len[idx] as usize,seq)
266    }
267
268    pub fn get_raw_packet_slice(&self)->Option<(&[u8],u64)> {
269        let (seq,idx)=self.get_lost_packet_seq();
270       
271        if seq==0  || idx>=self.ratio as usize || self.hdr.src_pkts_len[idx]>=MAX_FEC_BUF_SIZE as u16{
272            return None;
273        }
274       // assert!(idx<self.ratio as usize && self.hdr.src_pkts_len[idx]<MAX_FEC_BUF_SIZE as u16);
275        return Some((&self.body[0..self.hdr.src_pkts_len[idx] as usize],seq));
276    }
277
278    ///读取FecPacket,用于FEC计算结果;buf是应用传入的缓冲区,应该大于MTU长度
279    /// 返回FEC报文实际长度和起始rawPacket序号
280    pub fn get_fec_packet(&self,buf:&mut [u8])->(usize,u64) {
281        let fec_len = self.real_len as usize+FEC_HDR_SIZE;
282        if buf.len() < fec_len {
283            return (0,0)
284        }
285        unsafe {
286            std::ptr::copy_nonoverlapping(&self.hdr as *const _ as *const u8, buf.as_mut_ptr(), fec_len);
287        }
288        (fec_len,self.hdr.start_seq+self.ratio as u64-1)
289    }
290
291    pub fn get_packet_len(&self)->usize {
292        self.real_len as usize + FEC_HDR_SIZE
293    }
294    pub fn get_payload_len(&self)->usize {
295        self.real_len as usize
296    }
297
298    pub fn as_slice(&self)->&[u8] {
299        return unsafe {
300            &(*(&self.hdr as *const fec_header_t as *const u8 as *const [u8;MAX_FEC_BUF_SIZE+FEC_HDR_SIZE]))[0..FEC_HDR_SIZE+self.real_len as usize]
301        };
302    }
303
304    pub fn as_mut_slice(&mut self)->&[u8] {
305        return unsafe {
306            &mut (*(&mut self.hdr as *mut fec_header_t as *mut u8 as *mut [u8;MAX_FEC_BUF_SIZE+FEC_HDR_SIZE]))[0..FEC_HDR_SIZE+self.real_len as usize]
307        };
308    }
309
310    pub fn get_recv_pkt_count(&self)->usize {
311        let mut count=0;
312        for b in self.src_pkts_added {
313            if b {
314                count+=1;
315            }
316        }
317        count
318    }
319
320    pub fn is_fec_pkt_recved(&self)->bool {
321        self.fec_exist
322    }
323    pub fn get_start_seq(&self)->u64 {
324        self.hdr.start_seq
325    }
326}
327
328///FEC Streaming Decoder,实现接收端流式的FEC报文恢复,自动移动序号
329const MAX_FEC_PACKET_IN_STREAM:usize=16;
330pub struct fec_decoder_stream_t {
331    fecs:[fec_packet_t;MAX_FEC_PACKET_IN_STREAM],
332    start_idx:usize,
333    start_seq:u64,
334    ratio:u8,
335    max_seq:u64,
336    fec_buf:[u8;MAX_FEC_BUF_SIZE],
337}
338
339impl fec_decoder_stream_t {
340    pub fn new(start_seq:u64,ratio:u8)->fec_decoder_stream_t {
341        let mut stream=unsafe { mem::zeroed::<fec_decoder_stream_t>() };
342        stream.init(start_seq,ratio);
343        return stream
344    }
345    fn init(&mut self,start_seq:u64,ratio:u8) {
346        self.start_idx = 0;
347        self.ratio = ratio;
348        self.start_seq=start_seq;
349        self.max_seq=start_seq+(MAX_FEC_PACKET_IN_STREAM *ratio as usize) as u64-1;
350        for i in 0..MAX_FEC_PACKET_IN_STREAM as usize {
351            self.fecs[i].start_decoder(start_seq+ i as u64 * (ratio as u64),ratio);
352        }
353    }
354
355    pub fn start(&mut self,start_seq:u64,ratio:u8) {
356        self.init(start_seq,ratio);
357    }
358
359    pub fn cleanup(&mut self) {
360        while self.fecs[self.start_idx].is_fec_finished() {
361            self.start_seq+= self.ratio as u64;
362            self.fecs[self.start_idx].start_decoder(self.max_seq+1,self.ratio);
363            self.max_seq +=self.ratio as u64;
364            self.start_idx = (self.start_idx +1) % MAX_FEC_PACKET_IN_STREAM;
365        }
366    }
367    ///将当前序号移动至包含new_seq的区间
368    fn move_to_seq(&mut self,new_seq:u64) {        
369        self.cleanup();
370        if new_seq<= self.max_seq {
371            return
372        }
373        //如果new_seq超过当前窗口2倍,重新初始化
374        if new_seq>=self.max_seq+self.seq_window_len() {
375            self.init(new_seq/self.ratio as u64 * self.ratio as u64,self.ratio);
376            return
377        }
378        
379        //计算需要移动几个FEC Packet,每个Packet的seq长度为self.ratio
380        let step = common::ceiling(new_seq - self.max_seq,self.ratio as u64) as usize;
381        let new_idx = (self.start_idx + step) % MAX_FEC_PACKET_IN_STREAM;
382        let new_start_seq = self.start_seq+(step as u64 * self.ratio as u64);
383        let mut  idx = self.start_idx;
384        let mut seq = self.start_seq+(MAX_FEC_PACKET_IN_STREAM + step-1) as u64*self.ratio as u64;
385        while idx!=new_idx {
386            self.fecs[idx].start_decoder(seq,self.ratio);
387            idx=(idx + 1) % MAX_FEC_PACKET_IN_STREAM;
388            seq+=self.ratio as u64;
389        }
390        //self.fecs[idx].start_decoder(seq,self.ratio);
391        self.start_idx = new_idx;
392        self.start_seq = new_start_seq;
393        self.max_seq = new_start_seq+(self.ratio as u64 * MAX_FEC_PACKET_IN_STREAM as u64)-1;
394    }
395
396    fn get_index_by_seq(&self,seq:u64)->usize {
397        if seq<self.start_seq {
398            return 0;
399        }
400
401        (((seq-self.start_seq) / self.ratio as u64)  as usize + self.start_idx)  % MAX_FEC_PACKET_IN_STREAM
402    }
403
404    fn seq_window_len(&self)->u64 {
405        self.ratio as u64*MAX_FEC_PACKET_IN_STREAM as u64
406    }
407
408    ///产生了序号失步,需要重同步
409    fn need_resync(&mut self,recv_seq:u64)->bool {
410        if recv_seq<self.start_seq {
411            if recv_seq + self.seq_window_len()< self.start_seq || (self.start_seq - recv_seq+1) % self.ratio as u64!=0 {
412                return true;
413            }
414            return false;
415        } else if (recv_seq - self.start_seq+1) % self.ratio as u64!=0 {
416            return true
417        }
418        false
419    }
420    //fn get_decoder_by_seq(&mut self,seq:u64)->
421    pub fn add_fec_packet(&mut self,seq:u64,pkt:&[u8])->Result<(&[u8],u64),errcode::RESULT> {        
422        if self.need_resync(seq) {
423            self.init(seq+1, self.ratio);
424            return Err(errcode::ERROR_NO_OP);
425        }
426        if seq<self.start_seq {
427            return Err(errcode::ERROR_INVALID_MSG);
428        }
429
430        if seq>self.max_seq {
431            self.move_to_seq(seq);
432        }
433        let idx= self.get_index_by_seq(seq);
434        let res = self.fecs[idx].add_fec_packet(seq,pkt);
435        if res==errcode::RESULT_SUCCESS {
436            let (seq,len) = match self.fecs[idx].get_raw_packet_slice() {
437                None=> return Err(errcode::ERROR_COMMON),
438                Some((pkt,seq))=>  { 
439                    self.fec_buf[0..pkt.len()].copy_from_slice(pkt);
440                    (seq,pkt.len())                 
441                },
442            };
443            self.cleanup();
444            return Ok((&self.fec_buf[0..len],seq));
445        } else if self.fecs[idx].is_fec_finished() {
446            self.cleanup();
447        }
448
449        Err(errcode::ERROR_NO_OP)
450    }
451
452    ///add_raw_packet,加入一个RawPacket,判断是否已经解码成功
453    pub fn add_raw_packet(&mut self,seq:u64,pkt:&[u8])->Result<(&[u8],u64),errcode::RESULT> {
454        if seq<self.start_seq {
455            return Err(errcode::ERROR_INVALID_MSG);
456        }
457        if seq>self.max_seq {
458            self.move_to_seq(seq);
459        }
460        let idx= self.get_index_by_seq(seq);
461        let res = self.fecs[idx].add_raw_packet(seq,pkt);
462        if res==errcode::RESULT_SUCCESS {
463            let (seq,len) = match self.fecs[idx].get_raw_packet_slice() {
464                None=> return Err(errcode::ERROR_COMMON),
465                Some((pkt,seq))=>  { 
466                    self.fec_buf[0..pkt.len()].copy_from_slice(pkt);
467                    (seq,pkt.len())                 
468                },
469            };
470            self.cleanup();
471            return Ok((&self.fec_buf[0..len],seq));
472        } else if self.fecs[idx].is_fec_finished() {
473            self.cleanup();
474        }
475
476        Err(errcode::ERROR_NO_OP)
477    }
478
479    pub fn get_start_seq(&self)->u64 {
480        self.start_seq
481    }
482
483    pub fn get_start_idx(&self)->usize {
484        self.start_idx
485    }
486    
487    pub fn print_stats(&self) {
488        let mut idx=self.start_idx;
489        println!("Fec Stream internal State, start_seq={},Max_seq={},ratio={},start_idx={}",
490            self.start_seq,self.max_seq,self.ratio,self.start_idx);
491        while idx!= (self.start_idx+MAX_FEC_PACKET_IN_STREAM-1) % MAX_FEC_PACKET_IN_STREAM {
492            let p = &self.fecs[idx];
493            println!("No {} fec packet,start_seq={},recv {} raw packet,recv_fec={}",idx,
494            p.get_start_seq(),p.get_recv_pkt_count(),p.fec_exist);
495            idx = (idx+1) % MAX_FEC_PACKET_IN_STREAM;
496        }
497        let p = &self.fecs[idx];
498            println!("No {} fec packet,start_seq={},recv {} raw packet,recv_fec={}",idx,
499            p.get_start_seq(),p.get_recv_pkt_count(),p.fec_exist);
500    }
501}