use crate::cpu_bound::AACEncoder;
use crate::define::*;
use crate::gop_cache::GopCache;
use crate::packet::{Audio, AudioCodec, EsInfo, FrameType, PacketEs, Video};
use crate::traits::ISource;
use crate::utils::{DataInstant, ElapsedState};

use anyhow::{Result, anyhow, bail};
use better_tokio_select::tokio_select;
use bytes::Bytes;
use xux_rs::define::{CESStreamInfo, MpegtsMuxerConfigBuilder};
use xux_rs::muxer::Muxer;

use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
const SCHEME: SSScheme = SSScheme::MpegTs;
type ThisMuxer = Muxer;

pub struct Ctx {
    muxer: Option<Pin<Box<ThisMuxer>>>,
    cache: GopCache,
    encoder: Option<AACEncoder>,
    config: SSConfig,
    probe_cache: Vec<PacketEs>,
    is_probe: bool,
    got_data_time: DataInstant,
}

pub struct SourceTs {
    cmd_tx: SubCaller,
}

#[async_trait::async_trait]
impl ISource for SourceTs {
    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    match make_sure_probe(ctx, &pkt)? {
        ProbeState::Success => Ok(()),
        ProbeState::Already => update_detail(ctx, pkt),
    }
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();
            let sub = ctx.cache.subscribe_output();
            let _ = ret.return_ok(sub);
        },
        Sub::Fet(actor_pkt) => {
            let _ = actor_pkt.return_fail(anyhow::anyhow!("No support yet"));
        },
    }
}

fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
    if pkt.is_mpegts() {
        return Ok(ProbeState::Already);
    }
    if ctx.is_probe {
        return Ok(ProbeState::Already);
    }
    ctx.probe_cache.push(pkt.clone());
    let mut goto_probe = false;

    if ctx.config.source_enable_audio {
        if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
            tracing::warn!(
                "Got to probe due to wait audio timeout, scheme:{}, source_wait_audio: {:?}",
                SCHEME,
                ctx.got_data_time.timeout()
            );
            goto_probe = true;
        }
    } else {
        goto_probe = true;
    }

    if pkt.is_audio_packet() {
        goto_probe = true;
    }

    if !goto_probe {
        bail!("Continue");
    }
    probe(ctx)?;

    ctx.is_probe = true;
    let probe_cache = std::mem::take(&mut ctx.probe_cache);

    if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
        tracing::warn!(
            "probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
            probe_cache.len(),
            ctx.config.source_client_jitter_max
        );
    }

    for pkt in probe_cache {
        // Note: here just ignore value
        let _ = update_detail(ctx, pkt);
    }
    Ok(ProbeState::Success)
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    let (payload, info) = pkt.into_splite();
    match info {
        EsInfo::Video(video) => updata_video(ctx, video, payload),
        EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
        EsInfo::MpegTs => update_mpegts(ctx, payload),
        EsInfo::Com => Ok(()),
        _ => Ok(()),
    }
}
fn probe(ctx: &mut Ctx) -> Result<()> {
    let (video, payload) = ctx
        .probe_cache
        .iter()
        .find(|e| e.is_video_key_packet())
        .ok_or_else(|| anyhow!("Can not wait any video key frame"))?
        .clone()
        .into_video();
    let muxer = match &mut ctx.muxer {
        Some(muxer) => muxer,
        None => {
            let mux_config = MpegtsMuxerConfigBuilder::default()
                .use_time_stamp(ctx.config.source_ts_enable_timeStamp)
                .build()?;
            let config_str = serde_json::to_string(&mux_config)?;
            let muxer = ThisMuxer::new_with_name("mpegtsMuxer", &config_str)?;
            ctx.muxer.insert(muxer)
        },
    };

    let _ = mux_write_video(muxer, true, video, payload)?;

    if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
        let (audio, payload) = first_audio.to_owned().into_audio();

        if ctx.config.source_ts_audioForceAAC && audio.codec != AudioCodec::Aac {
            ctx.encoder.get_or_insert_with(AACEncoder::new);
        }

        let aac = match &mut ctx.encoder {
            Some(encoder) => Some(encoder.write(&audio, payload)?),
            None => {
                if audio.codec != AudioCodec::Aac {
                    None
                } else {
                    Some(payload)
                }
            },
        };
        let Some(aac) = aac else {
            return Ok(());
        };

        let _ = mux_write_audio(muxer, true, audio, aac);
    }

    Ok(())
}
fn mux_write_video(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    Video {
        frame_type,
        codec,
        width,
        height,
        fps,
        time_stamp,
    }: Video,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let tss = muxer.write(
        &payload,
        &CESStreamInfo {
            streamType: if is_probe { 9 } else { 0 },
            vCodecType: codec as _,
            frameType: frame_type as _,
            timestamp: time_stamp as _,
            frameRate: fps as _,
            video_height: height,
            video_width: width,
            ..Default::default()
        },
    )?;
    let mut ts_bytes = Vec::new();

    for ts in tss {
        let info = EsInfo::MpegTs;
        let es = PacketEs::new(Bytes::copy_from_slice(ts), info);
        ts_bytes.push(es);
    }
    Ok(ts_bytes)
}

fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };
    let is_key = video.frame_type == FrameType::I;

    let outs = mux_write_video(muxer, false, video, payload)?;
    ctx.cache.input_packet(outs, is_key);
    Ok(())
}

fn mux_write_audio(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    Audio {
        codec: _,
        channels,
        bits,
        sample_rate,
    }: Audio,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let tss = muxer.write(
        &payload,
        &CESStreamInfo {
            streamType: if is_probe { 10 } else { 1 },
            aCodecType: 2,
            aSampleRate: sample_rate,
            aChannels: channels,
            aSampleBits: bits,
            ..Default::default()
        },
    )?;

    let ts_bytes = tss
        .into_iter()
        .map(|ts| PacketEs::new(Bytes::copy_from_slice(ts), EsInfo::MpegTs))
        .collect();

    Ok(ts_bytes)
}
fn update_mpegts(ctx: &mut Ctx, payload: Bytes) -> Result<()> {
    let ts_pkt = PacketEs::new(payload, EsInfo::MpegTs);
    ctx.cache.input_packet(vec![ts_pkt], true);
    Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };

    let aac = match &mut ctx.encoder {
        Some(encoder) => Some(encoder.write(&audio, payload)?),
        None => {
            if audio.codec != AudioCodec::Aac {
                None
            } else {
                Some(payload)
            }
        },
    };
    let Some(aac) = aac else {
        return Ok(());
    };

    let outs = mux_write_audio(muxer, false, audio, aac)?;
    ctx.cache.input_packet(outs, false);
    Ok(())
}

impl SourceTs {
    pub fn new(config: &SSConfig, is_idle: Arc<AtomicBool>, mut data_rx: SourceEleReceiver) -> Result<Self> {
        let cache = GopCache::new_from_config(config, is_idle, false);

        let (cmd_tx, mut cmd_rx) = Sub::actor();

        let mut ctx = Ctx {
            muxer: None,
            cache,
            config: config.to_owned(),
            encoder: None,
            probe_cache: Vec::new(),
            is_probe: false,
            got_data_time: DataInstant::new(*config.source_wait_audio),
        };

        tokio::spawn(async move {
            loop {
                tokio_select!(match .. {
                    .. if let pkt = data_rx.recv() => {
                        match pkt {
                            Ok(pkt) => {
                                let _ = process_data(&mut ctx, pkt);
                            },
                            Err(BroadcastRecvError::Closed) => {
                                return;
                            },
                            Err(BroadcastRecvError::Lagged(skiped)) => {
                                tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
                            },
                        }
                    },
                    .. if let ret = cmd_rx.recv() => {
                        match ret {
                            Some(ret) => process_sub(&mut ctx, ret),
                            None => break,
                        }
                    },
                });
            }
        });

        Ok(Self { cmd_tx })
    }
}