use std::{
    cmp,
    path::PathBuf,
    sync::{
        Arc,
        atomic::{AtomicBool, AtomicI64, Ordering},
    },
    time::Duration,
};

use better_tokio_select::tokio_select;
use bytes::Bytes;
use chrono::Utc;
use tokio::sync::{mpsc::channel, oneshot};
use tokio_util::sync::CancellationToken;
use xux_rs::{
    define::*,
    hls::{
        hlx::{Hls, HlsConfigBuilder},
        io_fs::{HlsIoFs, HlsIoFsConfigBuilder},
    },
};

use tracing::Instrument;

use crate::{
    define::*,
    packet::{EsInfo, PacketEs, Video},
    relay::SourceRelay,
    traits::ISource,
    utils::wait_file_ready,
};

use anyhow::{Result, anyhow, bail};

struct Ctx {
    last_client_time: Arc<AtomicI64>,
    config: SSConfig,
    have_relay: Arc<AtomicBool>,
    hls: Hls,
    source_id: String,
}

pub struct SourceHls {
    cmd_tx: SubCaller,
    cancel_token: CancellationToken,
}

#[async_trait::async_trait]
impl ISource for SourceHls {
    async fn fetch(&self, res: SourceResourceFileRet, resource_name: &str) -> Result<()> {
        let resource_name = resource_name.to_owned();
        let cmd_tx = self.cmd_tx.clone();
        tokio::spawn(async move {
            let result = cmd_tx.fet(resource_name).await;
            let _ = res.send(result);
        });

        Ok(())
    }

    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    let (payload, info) = pkt.into_splite();

    tokio::task::block_in_place(move || match info {
        EsInfo::Com => bail!("Hls no support ps data input "),
        EsInfo::Video(video) => updata_video(ctx, video, payload),
        _ => Ok(()),
    })
}
async fn subscribe_output(ctx: &Ctx) -> Result<EsDataSubscriber> {
    let m3u8_file = format!("{}.m3u8", ctx.source_id);
    let rxx = wait_resource_ready(ctx, m3u8_file).await;
    let (tx, rx) = channel::<PacketEs>(1);
    tokio::spawn(async move {
        if let Ok(Ok(m3u8_path)) = rxx.await {
            if let Ok(payload) = tokio::fs::read(m3u8_path).await {
                let _ = tx.send(PacketEs::new_com(payload)).await;
            }
        }
    });

    Ok(rx)
}

async fn process_sub(ctx: &Ctx, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();

            let sub_result = subscribe_output(ctx).await;
            let _ = ret.return_(sub_result);
        },
        Sub::Fet(actor_pkt) => {
            let (args, ret) = actor_pkt.split();

            let retv = wait_resource_ready(ctx, args).await;
            tokio::spawn(async move {
                let v = retv.await.map_err(|e| e.into()).flatten();
                let _ = ret.return_(v);
            });
        },
    }
}

async fn wait_resource_ready(ctx: &Ctx, hls_file: String) -> tokio::sync::oneshot::Receiver<Result<PathBuf>> {
    // resolve the path
    let (tx, rx) = oneshot::channel();

    let Ok(file_path) = ctx.hls.query_hls(&hls_file) else {
        let _ = tx.send(Err(anyhow::anyhow!("query hls fail")));
        return rx;
    };
    let last_client_time = ctx.last_client_time.clone();
    let have_relay = ctx.have_relay.clone();
    let timeout = Duration::from_secs(1) * (ctx.config.hls_retryCnt.into_inner() as _);
    tokio::spawn(async move {
        tokio_select!(match .. {
            .. if let Ok(_) = wait_file_ready(&file_path) => {
                last_client_time.store(Utc::now().timestamp(), Ordering::SeqCst);
                have_relay.store(true, Ordering::SeqCst);

                let _ = tx.send(Ok(PathBuf::from(&file_path)));
            },
            .. if let _ = tokio::time::sleep(timeout) => {
                let _ = tx.send(Err(anyhow::anyhow!("timeout wait hls file, timeout:{:?}", timeout)));
            },
        });
    });
    rx
}
impl SourceHls {
    pub fn new(
        source_relay: &SourceRelay,
        config: &SSConfig,
        is_idle: Arc<AtomicBool>,
        mut data_rx: SourceEleReceiver,
    ) -> Result<Self> {
        let span = tracing::error_span!("Hls", source_id = source_relay.source_id());
        let _guard = span.enter();
        let last_client_time = Arc::new(AtomicI64::new(Utc::now().timestamp()));
        let have_relay = Arc::new(AtomicBool::new(false));
        let cancel_token = CancellationToken::new();
        let source_id = source_relay.source_id().to_string();
        tokio::spawn(
            {
                let cancel_token = cancel_token.child_token();
                // if no one accesses the source during an m3u8 list cycle, then idle
                let last_client_time = last_client_time.clone();
                let have_relay = have_relay.clone();
                let timeout_idle =
                    config.hls_segDur.into_inner() * (*config.hls_liveCnt as _) * (*config.source_hls_idleFactor as _);
                let protect_time = config.source_protectTime.into_inner();
                let scan_time = cmp::min(timeout_idle, protect_time) / 5;
                let mut interval = tokio::time::interval(scan_time);
                async move {
                    cancel_token
                        .run_until_cancelled(async move {
                            loop {
                                interval.tick().await;
                                let have_relay = have_relay.load(Ordering::SeqCst);
                                let idle_time = if have_relay { timeout_idle } else { protect_time };

                                let have_idle = {
                                    let dif = Utc::now().timestamp() - last_client_time.load(Ordering::SeqCst);

                                    if dif < 0 {
                                        true
                                    } else {
                                        dif as u64 >= idle_time.as_secs()
                                    }
                                };
                                is_idle.store(have_idle, Ordering::SeqCst);
                                if have_idle {
                                    tracing::info!(
                                        "Hls source idle, idle_time:{:?}, have_relay: {}",
                                        idle_time,
                                        have_relay
                                    );
                                    return;
                                }
                            }
                        })
                        .await
                }
            }
            .instrument(span.clone()),
        );
        let all_root = std::env::current_exe()?
            .parent()
            .ok_or_else(|| anyhow!("get root hls segment path fail"))?
            .join(&*config.hls_filePath)
            .join(&source_id)
            .display()
            .to_string();
        let hls_muxer_type = *config.hls_muxer_type;
        let hls_config = HlsConfigBuilder::default()
            .hls_live_cnt((*config.hls_liveCnt).try_into()?)
            .hls_segDur(config.hls_segDur.into_inner())
            .muxer_type(hls_muxer_type)
            .use_time_stamp(config.source_ts_enable_timeStamp)
            .segment_root_path(SEGMENT_ROOT_PATH_PLACEHOLDER.to_string())
            .source_enable_audio(config.source_enable_audio)
            .build()?;
        let fs_io_config = HlsIoFsConfigBuilder::default().all_root(&all_root).build()?;

        let hls = Hls::new_with_config(&hls_config, HlsIoFs::new_with_config(&fs_io_config)?)?;

        let mut ctx = Ctx {
            hls,
            last_client_time,
            config: config.to_owned(),
            have_relay,
            source_id,
        };
        let (cmd_tx, mut cmd_rx) = Sub::actor();

        tokio::spawn(
            {
                let cancel_token = cancel_token.child_token();
                async move {
                    loop {
                        tokio_select!(match .. {
                            .. if let Ok(pkt) = data_rx.recv() => {
                                let _ = process_data(&mut ctx, pkt);
                            },
                            .. if let Some(ret) = cmd_rx.recv() => {
                                process_sub(&ctx, ret).await;
                            },
                            .. if let _ = cancel_token.cancelled() => {
                                clean(ctx);
                                return;
                            },
                            _ => {
                                return;
                            },
                        });
                    }
                }
            }
            .instrument(span.clone()),
        );
        Ok(Self { cmd_tx, cancel_token })
    }
    fn clear(&self) {
        self.cancel_token.cancel();
    }
}

fn updata_video(
    ctx: &mut Ctx,
    Video {
        frame_type,
        codec,
        height,
        width,
        fps,
        time_stamp,
    }: Video,
    payload: Bytes,
) -> Result<()> {
    ctx.hls.updata(
        &payload,
        &CESStreamInfo {
            vCodecType: codec as _,
            frameType: frame_type as _,
            timestamp: time_stamp as _,
            frameRate: fps as _,
            video_height: height,
            video_width: width,
            ..Default::default()
        },
    )
}
fn clean(ctx: Ctx) {
    ctx.hls.clear();
}

impl Drop for SourceHls {
    fn drop(&mut self) {
        self.clear();
    }
}