use anyhow::{Result, bail};
use std::{
self,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use crate::{
blob::source::SourceBlob, data::source::SourceData, define::*, es::source::SourceEs, flv::source::SourceFlv,
fmp4::source::SourceFmp4, g711::source::SourceG711, hls::source::SourceHls, ps::source::SourcePs,
relay::SourceRelay, rtp::source::SourceRtp, traits::ISource, ts::source::SourceTs,
};
pub struct SourceElement {
source: Box<dyn ISource>,
have_relay: Arc<AtomicBool>,
}
impl SourceElement {
pub async fn new(
source_relay: &SourceRelay,
is_idle: Arc<AtomicBool>,
config: &SSConfig,
data_bus_rx: SourceEleReceiver,
) -> Result<Self> {
let scheme = source_relay.scheme();
Ok(Self {
source: match scheme {
SSScheme::Hls => Box::new(SourceHls::new(source_relay, config, is_idle, data_bus_rx)?),
SSScheme::MpegTs => Box::new(SourceTs::new(config, is_idle, data_bus_rx)?),
SSScheme::Flv => Box::new(SourceFlv::new(config, false, is_idle, data_bus_rx)?),
SSScheme::EnhanceFlv => Box::new(SourceFlv::new(config, true, is_idle, data_bus_rx)?),
SSScheme::Fmp4 => Box::new(SourceFmp4::new(source_relay, config, is_idle, data_bus_rx)?),
SSScheme::Es => Box::new(SourceEs::new(config, is_idle, data_bus_rx)?),
SSScheme::Rtp => Box::new(SourceRtp::new(source_relay, config, is_idle, "rtpMuxer", data_bus_rx)?),
SSScheme::Mp2p => Box::new(SourceRtp::new(source_relay, config, is_idle, "mp2pMuxer", data_bus_rx)?),
SSScheme::Ps => Box::new(SourcePs::new(source_relay, config, is_idle, data_bus_rx)?),
SSScheme::Data => Box::new(SourceData::new(config, is_idle, data_bus_rx)?),
SSScheme::Blob => Box::new(SourceBlob::new(config, is_idle, data_bus_rx).await?),
SSScheme::G711a => Box::new(SourceG711::new(config, is_idle, data_bus_rx)?),
SSScheme::Unkown => bail!("no support scheme {:?}", scheme),
},
have_relay: Arc::new(AtomicBool::new(false)),
})
}
pub async fn fetch_resource(&self, res: SourceResourceFileRet, resource_name: &str) -> Result<()> {
self.have_relay.store(true, Ordering::SeqCst);
self.source.fetch(res, resource_name).await
}
pub async fn subscribe(&self) -> Result<SourceDataReceiver> {
self.have_relay.store(true, Ordering::SeqCst);
self.source.subscribe().await
}
pub fn have_relay(&self) -> bool {
self.have_relay.load(Ordering::SeqCst)
}
}