use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex;
const IGNORE_RETRANSMISSION: u8 = 100;
#[derive(Default, Clone)]
pub struct PacketMeta {
pub source_seq_no: u16,
pub target_seq_no: u16,
pub timestamp: u32,
last_nack: u128,
pub layer: u8,
misc: u32,
}
impl PacketMeta {
pub fn set_vp8_payload_meta(&mut self, tlz0_idx: u8, pic_id: u16) {
self.misc = ((tlz0_idx as u32) << 16) | (pic_id as u32);
}
pub fn get_vp8_payload_meta(&self) -> (u8, u16) {
((self.misc >> 16) as u8, self.misc as u16)
}
}
#[derive(Default)]
struct Sequencer {
init: bool,
max: i32,
seq: HashMap<i32, PacketMeta>,
step: i32,
head_sn: u16,
start_time: u128,
}
#[derive(Default)]
pub struct AtomicSequencer {
sequencer: Arc<Mutex<Sequencer>>,
}
impl Sequencer {
pub fn new(max_track: i32) -> Self {
Self {
max: max_track,
seq: HashMap::new(),
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
..Default::default()
}
}
}
impl AtomicSequencer {
pub fn new(max_track: i32) -> Self {
Self {
sequencer: Arc::new(Mutex::new(Sequencer::new(max_track))),
}
}
pub async fn push(
&mut self,
sn: u16,
off_sn: u16,
timastamp: u32,
layer: u8,
head: bool,
) -> Option<PacketMeta> {
let mut sequencer = self.sequencer.lock().await;
if !sequencer.init {
sequencer.head_sn = off_sn;
sequencer.init = true;
}
if head {
let inc = off_sn - sequencer.head_sn;
for _i in 1..inc {
sequencer.step += 1;
if sequencer.step >= sequencer.max {
sequencer.step = 0;
}
}
sequencer.head_sn = off_sn;
} else {
let step = sequencer.step - (sequencer.head_sn - off_sn) as i32;
if step < 0 && -step >= sequencer.max {
return None;
}
}
let cur_step = sequencer.step;
sequencer.seq.insert(
cur_step,
PacketMeta {
source_seq_no: sn,
target_seq_no: off_sn,
timestamp: timastamp,
layer,
..Default::default()
},
);
sequencer.step += 1;
if sequencer.step >= sequencer.max {
sequencer.step = 0;
}
if let Some(data) = sequencer.seq.get(&sequencer.step) {
Some(data.clone())
} else {
log::trace!("sequencer step: {}", sequencer.step);
None
}
}
pub async fn get_seq_no_pairs(&self, seq_nos: &[u16]) -> Vec<PacketMeta> {
let mut sequencer = self.sequencer.lock().await;
let mut meta: Vec<PacketMeta> = Vec::new();
let ref_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
- sequencer.start_time;
for sn in seq_nos {
let mut step = sequencer.step - (sequencer.head_sn - sn) as i32 - 1;
if step < 0 {
if -step >= sequencer.max {
continue;
}
step += sequencer.max;
}
let seq = sequencer.seq.get_mut(&step).unwrap();
if seq.target_seq_no == *sn
&& (seq.last_nack == 0 || ref_time - seq.last_nack > IGNORE_RETRANSMISSION as u128)
{
seq.last_nack = ref_time;
meta.push(seq.clone());
}
}
meta
}
}