use crate::{
    define::{EsDataSubscriber, IndexDataSender, IndexPacket, SSConfig},
    packet::PacketEs,
    utils::spawn_idle,
};
use anyhow::{Context, Result};
use std::{
    path::PathBuf,
    sync::{Arc, atomic::AtomicBool},
    time::Duration,
};
use tempfile::Builder;
use tempfile::NamedTempFile;
use tokio::{
    fs::File,
    io::{AsyncSeekExt, AsyncWriteExt},
};
use tokio::{sync::mpsc::UnboundedSender, time::timeout};
use tokio::{
    sync::{broadcast, mpsc::channel},
    task::JoinHandle,
};
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;
pub struct BlobCache {
    sub: IndexDataSender,
    data_tx: UnboundedSender<PacketEs>,
    timeout_data: Duration,
    th_idle: JoinHandle<()>,
    file: NamedTempFile,
}

impl BlobCache {
    pub async fn new(config: &SSConfig, is_idle: Arc<AtomicBool>) -> Result<Self> {
        let timeout_data = config.source_data_IdleTime.into_inner();
        let timeout_idle = config.source_idleTime.into_inner();
        let protect_time = config.source_protectTime.into_inner();
        let (sub, _) = broadcast::channel::<IndexPacket>(1);

        let th_idle = spawn_idle(sub.clone(), timeout_idle, protect_time, is_idle);
        let (data_tx, mut data_rx) = tokio::sync::mpsc::unbounded_channel::<PacketEs>();
        let file = Builder::new().prefix("h5ssblob_").rand_bytes(6).tempfile()?;
        let path = file.path().to_path_buf();
        let mut f = File::create(&path).await.with_context(|| "create temp file fail")?;
        tracing::debug!("Blob file create success, path: {:?}", path);
        tokio::spawn(async move {
            while let Ok(Some(packet)) = timeout(timeout_data, data_rx.recv()).await {
                let _ = f.write_all(&packet.payload()).await;
            }
            tracing::debug!("BlobCache data task exit");
        });

        Ok(Self {
            sub,
            timeout_data,
            th_idle,
            data_tx,
            file,
        })
    }
    pub async fn input_packet(&mut self, pkt: PacketEs) -> Result<()> {
        self.data_tx.send(pkt).with_context(|| "write to buffer fail")?;
        Ok(())
    }
    pub async fn source_readied(&self) -> tokio::sync::oneshot::Receiver<PathBuf> {
        let data_tx = self.data_tx.clone();
        let (tx, rx) = tokio::sync::oneshot::channel();
        let file_path = self.file.path().into();
        tokio::spawn(async move {
            data_tx.closed().await;
            let _ = tx.send(file_path);
        });
        rx
    }
    pub fn subscribe_output(&self) -> EsDataSubscriber {
        let live_time = self.sub.subscribe();
        let (tx, rx) = channel::<PacketEs>(1);
        let path = self.file.path().to_path_buf();

        let mut position: u64 = 0;
        let mut last_size: u64 = 0;
        let mut last_change = tokio::time::Instant::now();
        let poll_interval = Duration::from_millis(200);
        let idle_timeout = self.timeout_data;

        tokio::spawn(async move {
            loop {
                let Ok(mut f) = File::open(&path).await else {
                    tracing::error!("open file fail");
                    break;
                };
                if position > 0 {
                    if f.seek(std::io::SeekFrom::Start(position)).await.is_err() {
                        tracing::warn!("BlobCache subscribe: seek failed");
                        break;
                    }
                }
                let mut stream = ReaderStream::new(f);
                while let Some(Ok(chunk)) = stream.next().await {
                    position += chunk.len() as u64;
                    let data = PacketEs::new_com(chunk);
                    if tx.send(data).await.is_err() {
                        return;
                    }
                }
                if position != last_size {
                    last_size = position;
                    last_change = tokio::time::Instant::now();
                } else if last_change.elapsed() >= idle_timeout {
                    tracing::debug!("BlobCache subscribe: no new data for {:?}, exiting", idle_timeout);
                    return;
                }
                tokio::time::sleep(poll_interval).await;
            }
            drop(live_time);
        });
        rx
    }
}
impl Drop for BlobCache {
    fn drop(&mut self) {
        self.th_idle.abort();
    }
}