use std::sync::{Arc, atomic::AtomicBool};

use crate::{define::*, gop_cache::*, packet::*, traits::ISource};
use anyhow::Result;
use better_tokio_select::tokio_select;
use bytes::Bytes;
type ChunkerInner = cyberex::buf_pro::chunk::Chunker<u8>;
type Chunker = Option<ChunkerInner>;

pub struct SourceG711 {
    cmd_tx: SubCaller,
}
#[async_trait::async_trait]
impl ISource for SourceG711 {
    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }
}
fn get_chunker_size(sample_rate: i32) -> usize {
    (sample_rate as usize / 8000) * 320
}
fn down_sample<'a>(payload: &'a Bytes, audio: &Audio) -> Vec<&'a [u8]> {
    let chunk_size = ((audio.bits / 8) * (audio.sample_rate / ES_G711_DF_SAMPLE_RATE)) as usize;
    payload.chunks(chunk_size).map(|c| &c[..2]).collect()
}

fn process_data(config: &SSConfig, chunker: &mut Chunker, cache: &mut GopCache, pkt: PacketEs) {
    let Some((payload, mut audio)) = pkt.into_g711a() else {
        return;
    };

    if config.source_g711a_forceSampleRate8000 && audio.sample_rate != ES_G711_DF_SAMPLE_RATE {
        for payload in down_sample(&payload, &audio) {
            audio.sample_rate = ES_G711_DF_SAMPLE_RATE;

            chunk(chunker, payload, &audio, cache);
        }
        return;
    }
    chunk(chunker, &payload, &audio, cache);
}

fn chunk(chunker: &mut Chunker, payload: &[u8], audio: &Audio, cache: &mut GopCache) {
    let chunker = chunker.get_or_insert_with(|| ChunkerInner::new(get_chunker_size(audio.sample_rate)));
    for payload in chunker.chunk(payload) {
        let payload = Bytes::copy_from_slice(payload);
        let info = EsInfo::Audio(audio.to_owned());
        cache.input_packet(vec![PacketEs::new(payload, info)], true);
    }
}

fn process_sub(cache: &mut GopCache, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();
            let sub = cache.subscribe_output();
            let _ = ret.return_ok(sub);
        },
        Sub::Fet(actor_pkt) => {
            let _ = actor_pkt.return_fail(anyhow::anyhow!("No support yet"));
        },
    }
}

impl SourceG711 {
    pub fn new(config: &SSConfig, is_idle: Arc<AtomicBool>, mut data_rx: SourceEleReceiver) -> Result<Self> {
        let config = config.clone();

        let mut cache = GopCache::new_from_config(&config, is_idle, false);

        let (cmd_tx, mut cmd_rx) = Sub::actor();
        let mut chunker = None;

        tokio::spawn(async move {
            loop {
                tokio_select!(match .. {
                    .. if let pkt = data_rx.recv() => {
                        match pkt {
                            Ok(pkt) => process_data(&config, &mut chunker, &mut cache, pkt),
                            Err(BroadcastRecvError::Closed) => {
                                return;
                            },
                            Err(BroadcastRecvError::Lagged(skiped)) => {
                                tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
                            },
                        }
                    },
                    .. if let ret = cmd_rx.recv() => {
                        match ret {
                            Some(ret) => process_sub(&mut cache, ret),
                            None => break,
                        }
                    },
                });
            }
        });
        Ok(Self { cmd_tx })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn test_case() {
        assert_eq!(get_chunker_size(8000), 320);
        assert_eq!(get_chunker_size(16000), 640);
        assert_eq!(get_chunker_size(44100), 1600);
    }
}