use std::{
    sync::{Arc, atomic::AtomicBool},
    time::Duration,
};

use better_tokio_select::tokio_select;
use parking_lot::Mutex;
use tokio::{
    sync::{
        broadcast::{self, error::RecvError},
        mpsc::channel,
    },
    task::JoinHandle,
    time::timeout,
};
use tracing::Instrument;

use crate::{define::*, gops::Gops, packet::PacketEs, utils::spawn_idle};

enum InitDataCache {
    Annexb,
    Mp4 {
        init_cache_suc: bool,
        init_cache: Mutex<Option<PacketEs>>,
        init_tx: EsDataSender,
    },
}

pub struct GopCache {
    gops: Gops,
    data_tx: IndexDataSender,
    init_data_cache: InitDataCache,
    timeout_data: Duration,
    th_idle: JoinHandle<()>,
}

impl GopCache {
    pub fn new_from_config(config: &SSConfig, is_idle: Arc<AtomicBool>, have_init_data: bool) -> Self {
        GopCache::new(
            is_idle,
            config.source_data_IdleTime.into_inner(),
            config.source_idleTime.into_inner(),
            config.source_protectTime.into_inner(),
            have_init_data,
            *config.source_data_GopMax,
            *config.source_client_jitter_max,
        )
    }
    fn new(
        is_idle: Arc<AtomicBool>,
        timeout_data: Duration,
        timeout_idle: Duration,
        protect_time: Duration,
        have_init_data: bool,
        gop_max: usize,
        jitter_max: usize,
    ) -> Self {
        let (data_tx, _) = broadcast::channel::<IndexPacket>(jitter_max);

        let th_idle = spawn_idle(data_tx.clone(), timeout_idle, protect_time, is_idle);
        Self {
            data_tx,
            gops: Gops::new(gop_max),
            timeout_data,
            th_idle,
            init_data_cache: if have_init_data {
                InitDataCache::Mp4 {
                    init_cache_suc: false,
                    init_cache: Mutex::new(None),
                    init_tx: broadcast::channel::<PacketEs>(1).0,
                }
            } else {
                InitDataCache::Annexb
            },
        }
    }

    pub fn set_init_data(&mut self, init_data: PacketEs) {
        if init_data.is_empty() {
            return;
        }
        tracing::debug!(
            "InitData len: {}, type: {:?}",
            init_data.payload().len(),
            init_data.info()
        );
        if let InitDataCache::Mp4 {
            init_cache_suc,
            init_cache,
            init_tx,
        } = &mut self.init_data_cache
        {
            if !init_cache_suc.to_owned() {
                let _ = init_tx.send(init_data.clone());

                let mut init_cache = init_cache.lock();

                let _ = init_cache.insert(init_data);
                let _ = std::mem::replace(init_cache_suc, true);
            }
        }
    }

    pub fn input_packet(&mut self, payload: Vec<PacketEs>, is_key: bool) {
        let Ok(pkt) = self.gops.input_data(payload, is_key) else {
            return;
        };
        let _ = self.data_tx.send(pkt);
    }
    pub fn subscribe_output(&self) -> EsDataSubscriber {
        let timeout_data = self.timeout_data;
        let mut subscriber = self.data_tx.subscribe();
        let gops = self.gops.view().clone();
        let mut init_rx = None;
        let mut init_data = None;
        let mut have_init_data = false;

        if let InitDataCache::Mp4 {
            init_cache, init_tx, ..
        } = &self.init_data_cache
        {
            init_rx = Some(init_tx.subscribe());
            init_data = init_cache.lock().to_owned();
            have_init_data = true;
        }
        let current_span = tracing::span::Span::current();
        let (data_tx, a) = channel::<PacketEs>(1);
        tokio::spawn(
            async move {
                // send init data
                if have_init_data {
                    let mut init_rx = init_rx.unwrap();
                    match init_data.clone() {
                        Some(init_data) => {
                            if data_tx.send(init_data).await.is_err() {
                                return;
                            }
                        },
                        None => match timeout(timeout_data, init_rx.recv()).await {
                            Ok(Ok(init_data)) => {
                                if data_tx.send(init_data).await.is_err() {
                                    return;
                                }
                            },
                            _ => {
                                return;
                            },
                        },
                    }
                    tracing::debug!("Init data send done");
                }

                // send gop cache
                let mut finnal_index = None;
                for index_data in gops {
                    finnal_index = Some(index_data.index);
                    for payload in index_data.payload {
                        if data_tx.send(payload).await.is_err() {
                            return;
                        }
                    }
                }
                if let Some(finnal_index) = &finnal_index {
                    tracing::debug!("Gop cache was send, packet_count: {}", finnal_index);
                }

                // send data after gop
                loop {
                    tokio_select!(match .. {
                        .. if let result = timeout(timeout_data, subscriber.recv()) => {
                            match result {
                                Ok(ok) => match ok {
                                    Ok(data) => {
                                        if subscriber.len() > 10 {
                                            tracing::trace!(
                                                "`Slow receiver` detect, jetter left: {}",
                                                subscriber.len()
                                            );
                                        }

                                        if matches!( finnal_index, Some(i) if data.index <= i) {
                                            continue;
                                        }

                                        for payload in data.payload {
                                            if data_tx.send(payload).await.is_err() {
                                                break;
                                            }
                                        }
                                    },
                                    Err(e) => match e {
                                        RecvError::Lagged(num) => {
                                            tracing::warn!("`Slow receiver` detect, frame drop, lagged:{}", num);
                                            continue;
                                        },
                                        RecvError::Closed => break,
                                    },
                                },
                                Err(e) => {
                                    tracing::warn!(
                                        "Client timeout receive data, timeout: {:?}, error: {}",
                                        timeout_data,
                                        e
                                    );
                                    break;
                                },
                            }
                        },
                        .. if let _ = data_tx.closed() => {
                            break;
                        },
                    })
                }
            }
            .instrument(current_span),
        );

        a
    }
}
impl Drop for GopCache {
    fn drop(&mut self) {
        self.th_idle.abort();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use bytes::Bytes;

    #[tokio::test]
    async fn test_gop_cache() {
        let is_idle = Arc::new(AtomicBool::new(false));
        let timeout_data = Duration::from_millis(1000);
        let timeout_idle = Duration::from_millis(1000);
        let protect_time = Duration::from_millis(1000);
        let have_init_data = false;
        let gop_max = 2;

        let mut cache = GopCache::new(
            is_idle,
            timeout_data,
            timeout_idle,
            protect_time,
            have_init_data,
            gop_max,
            128,
        );
        if have_init_data {
            todo!()
        }

        let mut payloads_input = Vec::new();

        payloads_input.append(&mut vec![PacketEs::new_com(Bytes::from_static(b"I")); 3]);
        payloads_input.append(&mut vec![PacketEs::new_com(Bytes::from_static(b"P")); 10]);

        for payload in &payloads_input {
            cache.input_packet(vec![payload.clone()], payload.payload() == Bytes::from_static(b"I"));
        }

        let mut payloads = Vec::new();
        let mut subscriber = cache.subscribe_output();
        while let Some(payload) = subscriber.recv().await {
            payloads.push(payload);
        }
        assert_eq!(payloads, payloads_input);
    }
}