use std::{
sync::{Arc, atomic::AtomicBool},
time::Duration,
};
use better_tokio_select::tokio_select;
use parking_lot::Mutex;
use tokio::{
sync::{
broadcast::{self, error::RecvError},
mpsc::channel,
},
task::JoinHandle,
time::timeout,
};
use tracing::Instrument;
use crate::{define::*, gops::Gops, packet::PacketEs, utils::spawn_idle};
enum InitDataCache {
Annexb,
Mp4 {
init_cache_suc: bool,
init_cache: Mutex<Option<PacketEs>>,
init_tx: EsDataSender,
},
}
pub struct GopCache {
gops: Gops,
data_tx: IndexDataSender,
init_data_cache: InitDataCache,
timeout_data: Duration,
th_idle: JoinHandle<()>,
}
impl GopCache {
pub fn new_from_config(config: &SSConfig, is_idle: Arc<AtomicBool>, have_init_data: bool) -> Self {
GopCache::new(
is_idle,
config.source_data_IdleTime.into_inner(),
config.source_idleTime.into_inner(),
config.source_protectTime.into_inner(),
have_init_data,
*config.source_data_GopMax,
*config.source_client_jitter_max,
)
}
fn new(
is_idle: Arc<AtomicBool>,
timeout_data: Duration,
timeout_idle: Duration,
protect_time: Duration,
have_init_data: bool,
gop_max: usize,
jitter_max: usize,
) -> Self {
let (data_tx, _) = broadcast::channel::<IndexPacket>(jitter_max);
let th_idle = spawn_idle(data_tx.clone(), timeout_idle, protect_time, is_idle);
Self {
data_tx,
gops: Gops::new(gop_max),
timeout_data,
th_idle,
init_data_cache: if have_init_data {
InitDataCache::Mp4 {
init_cache_suc: false,
init_cache: Mutex::new(None),
init_tx: broadcast::channel::<PacketEs>(1).0,
}
} else {
InitDataCache::Annexb
},
}
}
pub fn set_init_data(&mut self, init_data: PacketEs) {
if init_data.is_empty() {
return;
}
tracing::debug!(
"InitData len: {}, type: {:?}",
init_data.payload().len(),
init_data.info()
);
if let InitDataCache::Mp4 {
init_cache_suc,
init_cache,
init_tx,
} = &mut self.init_data_cache
{
if !init_cache_suc.to_owned() {
let _ = init_tx.send(init_data.clone());
let mut init_cache = init_cache.lock();
let _ = init_cache.insert(init_data);
let _ = std::mem::replace(init_cache_suc, true);
}
}
}
pub fn input_packet(&mut self, payload: Vec<PacketEs>, is_key: bool) {
let Ok(pkt) = self.gops.input_data(payload, is_key) else {
return;
};
let _ = self.data_tx.send(pkt);
}
pub fn subscribe_output(&self) -> EsDataSubscriber {
let timeout_data = self.timeout_data;
let mut subscriber = self.data_tx.subscribe();
let gops = self.gops.view().clone();
let mut init_rx = None;
let mut init_data = None;
let mut have_init_data = false;
if let InitDataCache::Mp4 {
init_cache, init_tx, ..
} = &self.init_data_cache
{
init_rx = Some(init_tx.subscribe());
init_data = init_cache.lock().to_owned();
have_init_data = true;
}
let current_span = tracing::span::Span::current();
let (data_tx, a) = channel::<PacketEs>(1);
tokio::spawn(
async move {
if have_init_data {
let mut init_rx = init_rx.unwrap();
match init_data.clone() {
Some(init_data) => {
if data_tx.send(init_data).await.is_err() {
return;
}
},
None => match timeout(timeout_data, init_rx.recv()).await {
Ok(Ok(init_data)) => {
if data_tx.send(init_data).await.is_err() {
return;
}
},
_ => {
return;
},
},
}
tracing::debug!("Init data send done");
}
let mut finnal_index = None;
for index_data in gops {
finnal_index = Some(index_data.index);
for payload in index_data.payload {
if data_tx.send(payload).await.is_err() {
return;
}
}
}
if let Some(finnal_index) = &finnal_index {
tracing::debug!("Gop cache was send, packet_count: {}", finnal_index);
}
loop {
tokio_select!(match .. {
.. if let result = timeout(timeout_data, subscriber.recv()) => {
match result {
Ok(ok) => match ok {
Ok(data) => {
if subscriber.len() > 10 {
tracing::trace!(
"`Slow receiver` detect, jetter left: {}",
subscriber.len()
);
}
if matches!( finnal_index, Some(i) if data.index <= i) {
continue;
}
for payload in data.payload {
if data_tx.send(payload).await.is_err() {
break;
}
}
},
Err(e) => match e {
RecvError::Lagged(num) => {
tracing::warn!("`Slow receiver` detect, frame drop, lagged:{}", num);
continue;
},
RecvError::Closed => break,
},
},
Err(e) => {
tracing::warn!(
"Client timeout receive data, timeout: {:?}, error: {}",
timeout_data,
e
);
break;
},
}
},
.. if let _ = data_tx.closed() => {
break;
},
})
}
}
.instrument(current_span),
);
a
}
}
impl Drop for GopCache {
fn drop(&mut self) {
self.th_idle.abort();
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
#[tokio::test]
async fn test_gop_cache() {
let is_idle = Arc::new(AtomicBool::new(false));
let timeout_data = Duration::from_millis(1000);
let timeout_idle = Duration::from_millis(1000);
let protect_time = Duration::from_millis(1000);
let have_init_data = false;
let gop_max = 2;
let mut cache = GopCache::new(
is_idle,
timeout_data,
timeout_idle,
protect_time,
have_init_data,
gop_max,
128,
);
if have_init_data {
todo!()
}
let mut payloads_input = Vec::new();
payloads_input.append(&mut vec![PacketEs::new_com(Bytes::from_static(b"I")); 3]);
payloads_input.append(&mut vec![PacketEs::new_com(Bytes::from_static(b"P")); 10]);
for payload in &payloads_input {
cache.input_packet(vec![payload.clone()], payload.payload() == Bytes::from_static(b"I"));
}
let mut payloads = Vec::new();
let mut subscriber = cache.subscribe_output();
while let Some(payload) = subscriber.recv().await {
payloads.push(payload);
}
assert_eq!(payloads, payloads_input);
}
}