use anyhow::{Result, bail};
use std::{
    self,
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
};

use crate::{
    blob::source::SourceBlob, data::source::SourceData, define::*, es::source::SourceEs, flv::source::SourceFlv,
    fmp4::source::SourceFmp4, g711::source::SourceG711, hls::source::SourceHls, ps::source::SourcePs,
    relay::SourceRelay, rtp::source::SourceRtp, traits::ISource, ts::source::SourceTs,
};

pub struct SourceElement {
    source: Box<dyn ISource>,
    have_relay: Arc<AtomicBool>,
}
impl SourceElement {
    pub async fn new(
        source_relay: &SourceRelay,
        is_idle: Arc<AtomicBool>,
        config: &SSConfig,
        data_bus_rx: SourceEleReceiver,
    ) -> Result<Self> {
        let scheme = source_relay.scheme();

        Ok(Self {
            source: match scheme {
                SSScheme::Hls => Box::new(SourceHls::new(source_relay, config, is_idle, data_bus_rx)?),
                SSScheme::MpegTs => Box::new(SourceTs::new(config, is_idle, data_bus_rx)?),
                SSScheme::Flv => Box::new(SourceFlv::new(config, false, is_idle, data_bus_rx)?),
                SSScheme::EnhanceFlv => Box::new(SourceFlv::new(config, true, is_idle, data_bus_rx)?),
                SSScheme::Fmp4 => Box::new(SourceFmp4::new(source_relay, config, is_idle, data_bus_rx)?),
                SSScheme::Es => Box::new(SourceEs::new(config, is_idle, data_bus_rx)?),
                SSScheme::Rtp => Box::new(SourceRtp::new(source_relay, config, is_idle, "rtpMuxer", data_bus_rx)?),
                SSScheme::Mp2p => Box::new(SourceRtp::new(source_relay, config, is_idle, "mp2pMuxer", data_bus_rx)?),
                SSScheme::Ps => Box::new(SourcePs::new(source_relay, config, is_idle, data_bus_rx)?),
                SSScheme::Data => Box::new(SourceData::new(config, is_idle, data_bus_rx)?),
                SSScheme::Blob => Box::new(SourceBlob::new(config, is_idle, data_bus_rx).await?),
                SSScheme::G711a => Box::new(SourceG711::new(config, is_idle, data_bus_rx)?),
                SSScheme::Unkown => bail!("no support scheme {:?}", scheme),
            },
            have_relay: Arc::new(AtomicBool::new(false)),
        })
    }

    pub async fn fetch_resource(&self, res: SourceResourceFileRet, resource_name: &str) -> Result<()> {
        self.have_relay.store(true, Ordering::SeqCst);
        self.source.fetch(res, resource_name).await
    }

    pub async fn subscribe(&self) -> Result<SourceDataReceiver> {
        self.have_relay.store(true, Ordering::SeqCst);
        self.source.subscribe().await
    }

    pub fn have_relay(&self) -> bool {
        self.have_relay.load(Ordering::SeqCst)
    }
}