use std::{
    pin::Pin,
    sync::{Arc, atomic::AtomicBool},
};

use crate::{
    define::*,
    gop_cache::GopCache,
    packet::*,
    relay::SourceRelay,
    traits::ISource,
    utils::{DataInstant, ElapsedState},
};
use anyhow::{Result, anyhow, bail};
use better_tokio_select::tokio_select;
use bytes::Bytes;
use th_rs::define::*;

use xux_rs::{define::*, muxer::Muxer};

const SCHEME: SSScheme = SSScheme::Ps;

pub struct SourcePs {
    cmd_tx: SubCaller,
}
type ThisMuxer = Muxer;

struct Ctx {
    muxer: Option<Pin<Box<ThisMuxer>>>,
    cache: GopCache,
    config: SSConfig,
    got_data_time: DataInstant,
    is_probe: bool,
    probe_cache: Vec<PacketEs>,
    source_relay: SourceRelay,
}
#[async_trait::async_trait]
impl ISource for SourcePs {
    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }
}
fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
    if ctx.is_probe {
        return Ok(ProbeState::Already);
    }
    ctx.probe_cache.push(pkt.clone());
    let mut goto_probe = false;

    if ctx.config.source_enable_audio {
        if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
            goto_probe = true;
        }
    } else {
        goto_probe = true;
    }

    if pkt.is_audio_packet() {
        goto_probe = true;
    }

    if !goto_probe {
        bail!("Continue");
    }
    probe(ctx)?;

    ctx.is_probe = true;

    let probe_cache = std::mem::take(&mut ctx.probe_cache);

    if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
        tracing::warn!(
            "probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
            probe_cache.len(),
            ctx.config.source_client_jitter_max
        );
    }
    for pkt in probe_cache {
        // Note: here just ignore value
        let _ = update_detail(ctx, pkt);
    }
    Ok(ProbeState::Success)
}
fn probe(ctx: &mut Ctx) -> Result<()> {
    let (video, payload) = ctx
        .probe_cache
        .iter()
        .find(|e| e.is_video_key_packet())
        .ok_or_else(|| anyhow!("Can not wait any video key frame"))?
        .clone()
        .into_video();
    let muxer = match &mut ctx.muxer {
        Some(muxer) => muxer,
        None => ctx
            .muxer
            .insert(ThisMuxer::new_with_name("psMuxer", ctx.source_relay.muxer_config())?),
    };

    // Just ignore error here, because it can be `Continue error`
    let _ = mux_write_video(muxer, true, video, payload);

    if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
        let (audio, payload) = first_audio.to_owned().into_audio();
        // Just ignore error here, because it can be `Continue error`
        let _ = mux_write_audio(
            muxer,
            true,
            audio.codec,
            audio.sample_rate,
            audio.channels,
            audio.bits,
            payload,
        );
    }
    match muxer.get_init_data() {
        Ok(init_data) => {
            let es = PacketEs::new(init_data, EsInfo::MpegPs);

            ctx.cache.set_init_data(es);
        },
        Err(e) => {
            tracing::error!("Get init data fail, error: {}, scheme: {}", e, SCHEME);
        },
    }

    Ok(())
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    match make_sure_probe(ctx, &pkt)? {
        ProbeState::Success => Ok(()),
        ProbeState::Already => update_detail(ctx, pkt),
    }
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    let (payload, info) = pkt.into_splite();
    match info {
        EsInfo::Video(video) => updata_video(ctx, video, payload),
        EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
        EsInfo::Com => Ok(()),
        _ => Ok(()),
    }
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();
            let sub = ctx.cache.subscribe_output();
            let _ = ret.return_ok(sub);
        },
        Sub::Fet(actor_pkt) => {
            let _ = actor_pkt.return_fail(anyhow!("No support yet"));
        },
    }
}
fn mux_write_video(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    Video {
        frame_type,
        codec,
        width,
        height,
        fps,
        time_stamp: _,
    }: Video,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let info = CESStreamInfo {
        streamType: if is_probe {
            TH_DATATYPE_TH_DT_EXTRA_VIDEO
        } else {
            TH_DATATYPE_TH_DT_VIDEO
        } as _,
        vCodecType: codec as _,
        frameType: frame_type as _,
        frameRate: fps as _,
        video_height: height,
        video_width: width,
        ..Default::default()
    };

    let muxeds = muxer.write(&payload, &info)?;
    let mut outs = Vec::new();
    for muxed in muxeds {
        let info = EsInfo::MpegPs;
        let es = PacketEs::new(Bytes::copy_from_slice(muxed), info);
        outs.push(es);
    }
    Ok(outs)
}

fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };
    let is_key = video.frame_type == FrameType::I;

    let out = mux_write_video(muxer, false, video, payload)?;

    ctx.cache.input_packet(out, is_key);
    Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };

    let outs = mux_write_audio(
        muxer,
        false,
        audio.codec,
        audio.sample_rate,
        audio.channels,
        audio.bits,
        payload,
    )?;
    ctx.cache.input_packet(outs, false);
    Ok(())
}
#[allow(clippy::too_many_arguments)]
fn mux_write_audio(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    codec: AudioCodec,
    sample_rate: i32,
    channels: i32,
    bits: i32,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let info = CESStreamInfo {
        streamType: if is_probe {
            TH_DATATYPE_TH_DT_EXTRA_AUDIO
        } else {
            TH_DATATYPE_TH_DT_AUDIO
        } as _,
        aCodecType: codec as _,
        aSampleRate: sample_rate,
        aChannels: channels,
        aSampleBits: bits,
        ..Default::default()
    };

    let muxeds = muxer.write(&payload, &info)?;
    let mut outs = Vec::new();

    for muxed in muxeds {
        let info = EsInfo::MpegPs;
        let es = PacketEs::new(Bytes::copy_from_slice(muxed), info);
        outs.push(es);
    }
    Ok(outs)
}

impl SourcePs {
    pub fn new(
        source_relay: &SourceRelay,
        config: &SSConfig,
        is_idle: Arc<AtomicBool>,
        mut data_rx: SourceEleReceiver,
    ) -> Result<Self> {
        let cache = GopCache::new_from_config(config, is_idle, false);
        let config = config.clone();

        let (cmd_tx, mut cmd_rx) = Sub::actor();

        let mut ctx = Ctx {
            muxer: None,
            cache,
            config: config.to_owned(),
            got_data_time: DataInstant::new(*config.source_wait_audio),
            probe_cache: Vec::new(),
            is_probe: false,
            source_relay: source_relay.clone(),
        };
        tokio::spawn(async move {
            loop {
                tokio_select!(match .. {
                    .. if let pkt = data_rx.recv() => {
                        match pkt {
                            Ok(pkt) => {
                                let _ = process_data(&mut ctx, pkt);
                            },
                            Err(BroadcastRecvError::Closed) => {
                                return;
                            },
                            Err(BroadcastRecvError::Lagged(skiped)) => {
                                tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
                            },
                        }
                    },
                    .. if let ret = cmd_rx.recv() => {
                        match ret {
                            Some(ret) => process_sub(&mut ctx, ret),
                            None => break,
                        }
                    },
                });
            }
        });

        Ok(Self { cmd_tx })
    }
}