use std::sync::{Arc, atomic::AtomicBool};
use crate::{define::*, gop_cache::*, packet::*, traits::ISource};
use anyhow::Result;
use better_tokio_select::tokio_select;
pub struct SourceEs {
cmd_tx: SubCaller,
}
#[async_trait::async_trait]
impl ISource for SourceEs {
async fn subscribe(&self) -> Result<SourceDataReceiver> {
let full_data = self.cmd_tx.sub(()).await?;
Ok(SourceDataReceiver { full_data })
}
}
fn process_data(cache: &mut GopCache, pkt: PacketEs) {
match pkt.info() {
EsInfo::Video(Video { frame_type, .. }) => {
let is_key = frame_type == &FrameType::I;
cache.input_packet(vec![pkt], is_key);
},
EsInfo::Audio(..) => {
cache.input_packet(vec![pkt], false);
},
EsInfo::RtpFull(_) => {},
_ => {},
}
}
fn process_sub(cache: &mut GopCache, sub: Sub) {
match sub {
Sub::Sub(actor_pkt) => {
let (_, ret) = actor_pkt.split();
let sub = cache.subscribe_output();
let _ = ret.return_ok(sub);
},
Sub::Fet(actor_pkt) => {
let _ = actor_pkt.return_fail(anyhow::anyhow!("No support yet"));
},
}
}
impl SourceEs {
pub fn new(config: &SSConfig, is_idle: Arc<AtomicBool>, mut data_rx: SourceEleReceiver) -> Result<Self> {
let config = config.clone();
let mut cache = GopCache::new_from_config(&config, is_idle, false);
let (cmd_tx, mut cmd_rx) = Sub::actor();
tokio::spawn(async move {
loop {
tokio_select!(match .. {
.. if let pkt = data_rx.recv() => {
match pkt {
Ok(pkt) => process_data(&mut cache, pkt),
Err(BroadcastRecvError::Closed) => {
return;
},
Err(BroadcastRecvError::Lagged(skiped)) => {
tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
},
}
},
.. if let ret = cmd_rx.recv() => {
match ret {
Some(ret) => process_sub(&mut cache, ret),
None => break,
}
},
})
}
});
Ok(Self { cmd_tx })
}
}