use std::sync::{Arc, atomic::AtomicBool};

use crate::{blob_cache::*, define::*, packet::*, traits::ISource};
use anyhow::Result;
use better_tokio_select::tokio_select;
use tracing::{Instrument as _, instrument};

pub struct SourceBlob {
    cmd_tx: SubCaller,
}
#[async_trait::async_trait]
impl ISource for SourceBlob {
    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }

    async fn fetch(&self, res: SourceResourceFileRet, resource_name: &str) -> Result<()> {
        let resource_name = resource_name.to_string();
        let cmd_tx = self.cmd_tx.clone();
        tokio::spawn(async move {
            let result = cmd_tx.fet(resource_name).await;
            let _ = res.send(result);
        });
        Ok(())
    }
}
async fn process_data(cache: &mut BlobCache, pkt: PacketEs) {
    let _ = cache.input_packet(pkt).await;
}

async fn process_sub(cache: &mut BlobCache, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();
            let sub = cache.subscribe_output();
            let _ = ret.return_ok(sub);
        },
        Sub::Fet(actor_pkt) => {
            let rx = cache.source_readied().await;
            tokio::spawn(async move {
                let resource_path = rx.await.map_err(|e| e.into());
                let _ = actor_pkt.return_(resource_path);
            });
        },
    }
}
impl SourceBlob {
    #[instrument(name = "Blob", level = "error", skip_all)]
    pub async fn new(config: &SSConfig, is_idle: Arc<AtomicBool>, mut data_rx: SourceEleReceiver) -> Result<Self> {
        let span = tracing::Span::current();

        let mut cache = BlobCache::new(config, is_idle).await?;

        let (cmd_tx, mut cmd_rx) = Sub::actor();

        tokio::spawn(
            async move {
                loop {
                    tokio_select!(match .. {
                        .. if let pkt = data_rx.recv() => {
                            match pkt {
                                Ok(pkt) => process_data(&mut cache, pkt).await,
                                Err(BroadcastRecvError::Closed) => {
                                    return;
                                },
                                Err(BroadcastRecvError::Lagged(skiped)) => {
                                    tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
                                },
                            }
                        },
                        .. if let ret = cmd_rx.recv() => {
                            match ret {
                                Some(ret) => process_sub(&mut cache, ret).await,
                                None => break,
                            }
                        },
                    })
                }
            }
            .instrument(span),
        );
        Ok(Self { cmd_tx })
    }
}