use std::{
    pin::Pin,
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
};

use crate::{
    define::*,
    gop_cache::GopCache,
    packet::*,
    relay::SourceRelay,
    traits::ISource,
    utils::{DataInstant, ElapsedState},
};
use anyhow::{Result, anyhow, bail};
use better_tokio_select::tokio_select;
use bytes::Bytes;

use th_rs::define::*;

use crate::cpu_bound::AACEncoder;

use xux_rs::{define::*, muxer::Muxer};

const SCHEME: SSScheme = SSScheme::Fmp4;

pub struct SourceFmp4 {
    cmd_tx: SubCaller,
}
type ThisMuxer = Muxer;

struct Ctx {
    muxer: Option<Pin<Box<ThisMuxer>>>,
    cache: GopCache,
    encoder: Option<AACEncoder>,
    config: SSConfig,
    got_data_time: DataInstant,
    is_probe: bool,
    probe_cache: Vec<PacketEs>,
    once_audio_warn: AtomicBool,
    source_relay: SourceRelay,
}
#[async_trait::async_trait]
impl ISource for SourceFmp4 {
    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }
}
fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
    if ctx.is_probe {
        return Ok(ProbeState::Already);
    }
    ctx.probe_cache.push(pkt.clone());
    let mut goto_probe = false;

    if ctx.config.source_enable_audio {
        if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
            tracing::warn!(
                "Got to probe due to wait audio timeout, scheme:{}, source_wait_audio: {:?}",
                SCHEME,
                ctx.got_data_time.timeout()
            );
            goto_probe = true;
        }
    } else {
        goto_probe = true;
    }

    if pkt.is_audio_packet() {
        goto_probe = true;
    }

    if !goto_probe {
        bail!("Continue");
    }
    probe(ctx)?;

    ctx.is_probe = true;

    let probe_cache = std::mem::take(&mut ctx.probe_cache);

    if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
        tracing::warn!(
            "probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
            probe_cache.len(),
            ctx.config.source_client_jitter_max
        );
    }
    for pkt in probe_cache {
        // Note: here just ignore value
        let _ = update_detail(ctx, pkt);
    }
    Ok(ProbeState::Success)
}
fn probe(ctx: &mut Ctx) -> Result<()> {
    let (video, payload) = ctx
        .probe_cache
        .iter()
        .find(|e| e.is_video_key_packet())
        .ok_or_else(|| anyhow!("Can not wait any video key frame"))?
        .clone()
        .into_video();
    let muxer = match &mut ctx.muxer {
        Some(muxer) => muxer,
        None => ctx
            .muxer
            .insert(ThisMuxer::new_with_name("fmp4Muxer", ctx.source_relay.muxer_config())?),
    };

    // Just ignore error here, because it can be `Continue error`
    let _ = mux_write_video(muxer, true, video, payload);

    if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
        let (audio, payload) = first_audio.to_owned().into_audio();
        // Just ignore error here, because it can be `Continue error`
        let _ = mux_write_audio(
            muxer,
            true,
            audio.codec,
            audio.sample_rate,
            audio.channels,
            audio.bits,
            payload,
        );
    }
    match muxer.get_init_data() {
        Ok(init_data) => {
            let es = PacketEs::new(init_data, EsInfo::Fmp4);

            ctx.cache.set_init_data(es);
        },
        Err(e) => {
            tracing::error!("Get init data fail, error: {}, scheme: {}", e, SCHEME);
        },
    }

    Ok(())
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    match make_sure_probe(ctx, &pkt)? {
        ProbeState::Success => Ok(()),
        ProbeState::Already => update_detail(ctx, pkt),
    }
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    let (payload, info) = pkt.into_splite();
    match info {
        EsInfo::Video(video) => updata_video(ctx, video, payload),
        EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
        EsInfo::Com => Ok(()),
        _ => Ok(()),
    }
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();
            let sub = ctx.cache.subscribe_output();
            let _ = ret.return_ok(sub);
        },
        Sub::Fet(actor_pkt) => {
            let _ = actor_pkt.return_fail(anyhow!("No support yet"));
        },
    }
}
fn mux_write_video(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    Video {
        frame_type,
        codec,
        width,
        height,
        fps,
        time_stamp: _,
    }: Video,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let info = CESStreamInfo {
        streamType: if is_probe {
            TH_DATATYPE_TH_DT_EXTRA_VIDEO
        } else {
            TH_DATATYPE_TH_DT_VIDEO
        } as _,
        vCodecType: codec as _,
        frameType: frame_type as _,
        frameRate: fps as _,
        video_height: height,
        video_width: width,
        ..Default::default()
    };

    let muxeds = muxer.write(&payload, &info)?;
    let mut outs = Vec::new();
    for muxed in muxeds {
        let info = EsInfo::Fmp4;

        let es = PacketEs::new(Bytes::copy_from_slice(muxed), info);

        outs.push(es);
    }
    Ok(outs)
}

fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };
    let is_key = video.frame_type == FrameType::I;

    let out = mux_write_video(muxer, false, video, payload)?;

    ctx.cache.input_packet(out, is_key);
    Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };
    if ctx.config.source_fmp4_audioForceAAC && audio.codec != AudioCodec::Aac {
        ctx.encoder.get_or_insert_with(AACEncoder::new);
    }
    let aac = match &mut ctx.encoder {
        Some(encoder) => Some(encoder.write(&audio, payload)?),
        None => {
            if audio.codec == AudioCodec::Aac {
                Some(payload)
            } else {
                None
            }
        },
    };
    let Some(aac) = aac else {
        if !ctx.once_audio_warn.swap(true, Ordering::Relaxed) {
            tracing::warn!("Fmp4 audio only support aac, but got {:?}", audio);
        }
        return Ok(());
    };

    let outs = mux_write_audio(
        muxer,
        false,
        audio.codec,
        audio.sample_rate,
        audio.channels,
        audio.bits,
        aac,
    )?;
    ctx.cache.input_packet(outs, false);
    Ok(())
}
#[allow(clippy::too_many_arguments)]
fn mux_write_audio(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    _codec: AudioCodec, //Note : here just allow aac
    sample_rate: i32,
    channels: i32,
    bits: i32,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let info = CESStreamInfo {
        streamType: if is_probe {
            TH_DATATYPE_TH_DT_EXTRA_AUDIO
        } else {
            TH_DATATYPE_TH_DT_AUDIO
        } as _,
        aCodecType: TH_AUDIOCODECTYPE_TH_ACT_AAC as _,
        aSampleRate: sample_rate,
        aChannels: channels,
        aSampleBits: bits,
        ..Default::default()
    };

    let muxeds = muxer.write(&payload, &info)?;
    let mut outs = Vec::new();

    for muxed in muxeds {
        let info = EsInfo::Fmp4;
        let es = PacketEs::new(Bytes::copy_from_slice(muxed), info);

        outs.push(es);
    }
    Ok(outs)
}

impl SourceFmp4 {
    pub fn new(
        source_relay: &SourceRelay,
        config: &SSConfig,
        is_idle: Arc<AtomicBool>,
        mut data_rx: SourceEleReceiver,
    ) -> Result<Self> {
        let cache = GopCache::new_from_config(config, is_idle, true);
        let config = config.clone();

        let (cmd_tx, mut cmd_rx) = Sub::actor();

        let mut ctx = Ctx {
            muxer: None,
            cache,
            encoder: None,
            config: config.to_owned(),
            got_data_time: DataInstant::new(*config.source_wait_audio),
            probe_cache: Vec::new(),
            is_probe: false,
            once_audio_warn: AtomicBool::new(false),
            source_relay: source_relay.clone(),
        };

        tokio::spawn(async move {
            loop {
                tokio_select!(match .. {
                    .. if let pkt = data_rx.recv() => {
                        match pkt {
                            Ok(pkt) => {
                                let _ = process_data(&mut ctx, pkt);
                            },
                            Err(BroadcastRecvError::Closed) => {
                                return;
                            },
                            Err(BroadcastRecvError::Lagged(skiped)) => {
                                tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
                            },
                        }
                    },
                    .. if let ret = cmd_rx.recv() => {
                        match ret {
                            Some(ret) => process_sub(&mut ctx, ret),
                            None => break,
                        }
                    },
                });
            }
        });

        Ok(Self { cmd_tx })
    }
}