use crate::{define::*, packet::*, relay::SourceRelay};
use anyhow::{Result, bail};
use parking_lot::RwLock;
use std::{
    self, cmp,
    collections::HashMap,
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
    time::Duration,
};
use tokio::{
    sync::{
        broadcast,
        mpsc::{UnboundedSender, unbounded_channel},
    },
    time::{self},
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;

use crate::source_ele::SourceElement;
pub struct Source {
    sources: Arc<RwLock<HashMap<SchemeKey, Arc<SourceElement>>>>,
    config: SSConfig,
    is_idles: Arc<RwLock<Vec<Arc<AtomicBool>>>>,
    data_tx: UnboundedSender<PacketEs>,
    data_bus_tx: broadcast::Sender<PacketEs>,
    cancel_token: CancellationToken,
}

impl Source {
    const ACCESS_IDLE_CNT: usize = 1;

    pub async fn new(event_tx: SourceEventSender, source_id: &str, config: &SSConfig) -> Result<Self> {
        let is_idles: Arc<RwLock<Vec<Arc<AtomicBool>>>> = Arc::new(RwLock::new(Vec::new()));
        let cancel_token = CancellationToken::new();
        tokio::spawn({
            let event_tx = event_tx.clone();
            let source_id = source_id.to_owned();
            let config = config.clone();
            let is_idles = is_idles.clone();

            let scan_time = cmp::min(
                config.source_idleTime.into_inner(),
                config.source_protectTime.into_inner(),
            ) / 10;
            let mut interval = tokio::time::interval(scan_time);
            let task = async move {
                loop {
                    interval.tick().await;
                    // Note:
                    // If have no scheme, no idle will happen
                    if is_idles.read().is_empty() {
                        continue;
                    }

                    let all_idle = is_idles.read().iter().all(|e| e.load(Ordering::SeqCst));
                    if all_idle {
                        let _ = event_tx.send(SourceEvent::Idles(source_id.to_owned()));
                        break;
                    }
                }
            };

            let cancel_token = cancel_token.child_token();
            async move { cancel_token.run_until_cancelled(task).await }
        });

        let (data_tx, mut data_rx) = unbounded_channel::<PacketEs>();
        tokio::spawn(
            {
                let data_tx = data_tx.downgrade();
                let event_tx = event_tx.clone();
                let source_id = source_id.to_owned();
                let timeout_idle = config.source_idleTime.into_inner();
                let protect_time = config.source_protectTime.into_inner();
                let scan_time = cmp::min(timeout_idle, protect_time) / 5;
                let mut interval = time::interval(scan_time);
                let mut start_time = time::Instant::now();
                let mut have_access = false;
                let cancel_token = cancel_token.child_token();

                let task = async move {
                    loop {
                        interval.tick().await;
                        let cnt = data_tx.strong_count();
                        let have_idle = if cnt == Source::ACCESS_IDLE_CNT {
                            if have_access {
                                start_time.elapsed() >= timeout_idle
                            } else {
                                start_time.elapsed() >= protect_time
                            }
                        } else {
                            have_access = true;
                            start_time = time::Instant::now();
                            false
                        };

                        if have_idle {
                            let _ = event_tx.send(SourceEvent::PushIdle {
                                source_id,
                                hint: format!("Source input-data subscribe idle, have_access: {}", have_access),
                            });

                            return;
                        }
                    }
                };

                async move { cancel_token.run_until_cancelled(task).await }
            }
            .instrument(tracing::info_span!("IdleTask", source_id = source_id)),
        );
        let source_jitter_max = *config.source_jitter_max;
        let (data_bus_tx, _) = broadcast::channel(source_jitter_max);

        tokio::spawn(
            {
                let source_enable_audio = config.source_enable_audio;
                let data_bus_tx = data_bus_tx.clone();
                let once = std::sync::Once::new();
                let task = async move {
                    loop {
                        while let Some(data) = data_rx.recv().await {
                            if !source_enable_audio && data.is_audio_packet() {
                                once.call_once(|| {
                                    tracing::warn!("Got audio, but audio was disabled, see `source.enable_audio`");
                                });
                                continue;
                            }
                            // wait avalible capacity in data bus, avoid lose data when input_data `super fast`
                            while data_bus_tx.len() == source_jitter_max {
                                tracing::warn!("Source data bus maybe full, source_jitter_max: {}, see source.jitterMax for more detail", source_jitter_max);
                                tokio::time::sleep(Duration::from_millis(5)).await;
                            }
                            // Note: As long as there are subscribers, send will be successful.

                            if let Err(broadcast::error::SendError(drop_data)) = data_bus_tx.send(data) {
                                tracing::info!(
                                    "No scheme subscribers for source, data drop, size: {}",
                                    drop_data.payload().len()
                                );
                            }
                        }
                    }
                };
                let cancel_token = cancel_token.child_token();

                async move { cancel_token.run_until_cancelled(task).await }
            }
            .instrument(tracing::error_span!("DataInput", source_id = source_id)),
        );

        Ok(Self {
            sources: Arc::new(RwLock::new(HashMap::new())),
            config: config.clone(),
            is_idles,
            data_tx,
            data_bus_tx,
            cancel_token,
        })
    }
    pub async fn add_scheme(&self, source_relay: &SourceRelay) -> Result<()> {
        if self.sources.read().contains_key(&source_relay.scheme_key()) {
            return Ok(());
        }
        let is_idle = Arc::new(AtomicBool::new(false));
        let data_bus_rx = self.data_bus_tx.subscribe();
        let source = SourceElement::new(source_relay, is_idle.clone(), &self.config, data_bus_rx).await?;
        self.is_idles.write().push(is_idle);

        self.sources.write().insert(source_relay.scheme_key(), Arc::new(source));

        Ok(())
    }

    pub async fn fetch_resource(
        &self,
        scheme_key: &SchemeKey,
        res: SourceResourceFileRet,
        resource_name: &str,
    ) -> Result<()> {
        let Some(source) = self.sources.read().get(scheme_key).cloned() else {
            bail!("scheme no found, scheme {:?}", scheme_key);
        };
        source.fetch_resource(res, resource_name).await
    }
    pub async fn subscribe(&self, scheme_key: &SchemeKey) -> Result<SourceDataReceiver> {
        let Some(source) = self.sources.read().get(scheme_key).cloned() else {
            bail!("scheme no found, scheme_key {:?}", scheme_key);
        };
        let result = source.subscribe().await?;
        Ok(result)
    }
    pub fn subscribe_input(&self) -> Result<IndexPacketSender> {
        Ok(IndexPacketSender {
            input_data: self.data_tx.clone(),
        })
    }
    pub fn have_relay(&self) -> bool {
        self.sources.read().iter().any(|(_, s)| s.have_relay())
    }
}
impl Drop for Source {
    fn drop(&mut self) {
        self.cancel_token.cancel();
    }
}