use std::sync::{Arc, atomic::AtomicBool};

use crate::{define::*, gop_cache::*, packet::*, traits::ISource};
use anyhow::Result;
use better_tokio_select::tokio_select;
pub struct SourceEs {
    cmd_tx: SubCaller,
}
#[async_trait::async_trait]
impl ISource for SourceEs {
    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }
}
fn process_data(cache: &mut GopCache, pkt: PacketEs) {
    match pkt.info() {
        EsInfo::Video(Video { frame_type, .. }) => {
            let is_key = frame_type == &FrameType::I;
            cache.input_packet(vec![pkt], is_key);
        },
        EsInfo::Audio(..) => {
            cache.input_packet(vec![pkt], false);
        },

        EsInfo::RtpFull(_) => {},
        _ => {},
    }
}
fn process_sub(cache: &mut GopCache, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();
            let sub = cache.subscribe_output();
            let _ = ret.return_ok(sub);
        },
        Sub::Fet(actor_pkt) => {
            let _ = actor_pkt.return_fail(anyhow::anyhow!("No support yet"));
        },
    }
}
impl SourceEs {
    pub fn new(config: &SSConfig, is_idle: Arc<AtomicBool>, mut data_rx: SourceEleReceiver) -> Result<Self> {
        let config = config.clone();

        let mut cache = GopCache::new_from_config(&config, is_idle, false);

        let (cmd_tx, mut cmd_rx) = Sub::actor();

        tokio::spawn(async move {
            loop {
                tokio_select!(match .. {
                    .. if let pkt = data_rx.recv() => {
                        match pkt {
                            Ok(pkt) => process_data(&mut cache, pkt),
                            Err(BroadcastRecvError::Closed) => {
                                return;
                            },
                            Err(BroadcastRecvError::Lagged(skiped)) => {
                                tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
                            },
                        }
                    },
                    .. if let ret = cmd_rx.recv() => {
                        match ret {
                            Some(ret) => process_sub(&mut cache, ret),
                            None => break,
                        }
                    },
                })
            }
        });
        Ok(Self { cmd_tx })
    }
}