use std::{
    pin::Pin,
    sync::{Arc, atomic::AtomicBool},
};

use crate::{
    cpu_bound::AACEncoder,
    utils::{DataInstant, ElapsedState},
};
use crate::{define::*, gop_cache::GopCache, packet::*, traits::ISource};
use anyhow::{Result, anyhow, bail};
use better_tokio_select::tokio_select;
use bytes::Bytes;

use xux_rs::{define::*, flvMuxer::FlvMuxer};

const SCHEME: SSScheme = SSScheme::Flv;

pub struct SourceFlv {
    cmd_tx: SubCaller,
}
type ThisMuxer = FlvMuxer;

struct Ctx {
    muxer: Option<Pin<Box<ThisMuxer>>>,
    cache: GopCache,
    encoder: Option<AACEncoder>,
    config: SSConfig,
    got_data_time: DataInstant,
    enhance: bool,
    is_probe: bool,
    probe_cache: Vec<PacketEs>,
}
#[async_trait::async_trait]
impl ISource for SourceFlv {
    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }
}
fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
    if ctx.is_probe {
        return Ok(ProbeState::Already);
    }
    ctx.probe_cache.push(pkt.clone());
    let mut goto_probe = false;

    if ctx.config.source_enable_audio {
        if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
            tracing::warn!(
                "Got to probe due to wait audio timeout, scheme:{}, source_wait_audio: {:?}",
                SCHEME,
                ctx.got_data_time.timeout()
            );
            goto_probe = true;
        }
    } else {
        goto_probe = true;
    }

    if pkt.is_audio_packet() {
        goto_probe = true;
    }

    if !goto_probe {
        bail!("Continue");
    }
    probe(ctx)?;

    ctx.is_probe = true;
    let probe_cache = std::mem::take(&mut ctx.probe_cache);

    if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
        tracing::warn!(
            "probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
            probe_cache.len(),
            ctx.config.source_client_jitter_max
        );
    }
    for pkt in probe_cache {
        // Note: here just ignore value
        let _ = update_detail(ctx, pkt);
    }
    Ok(ProbeState::Success)
}
fn probe(ctx: &mut Ctx) -> Result<()> {
    let (video, payload) = ctx
        .probe_cache
        .iter()
        .find(|e| e.is_video_key_packet())
        .ok_or_else(|| anyhow!("Can not wait any video key frame"))?
        .clone()
        .into_video();

    let muxer = ctx.muxer.get_or_insert_with(ThisMuxer::new);
    // Just ignore error here, because it can be `Continue error`
    let _ = mux_write_video(muxer, true, video, payload);
    if ctx.enhance {
        muxer.enable_enhance();
    }
    if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
        let (audio, payload) = first_audio.to_owned().into_audio();
        // Just ignore error here, because it can be `Continue error`
        let _ = mux_write_audio(
            muxer,
            true,
            audio.codec,
            audio.sample_rate,
            audio.channels,
            audio.bits,
            payload,
        );
    }
    match muxer.get_init_data() {
        Ok(init_data) => {
            let es = PacketEs::new(init_data, EsInfo::Flv);

            ctx.cache.set_init_data(es);
        },
        Err(e) => {
            tracing::error!("Get init data fail, error: {}, scheme: {}", e, SCHEME);
        },
    }

    Ok(())
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    match make_sure_probe(ctx, &pkt)? {
        ProbeState::Success => Ok(()),
        ProbeState::Already => update_detail(ctx, pkt),
    }
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    let (payload, info) = pkt.into_splite();
    match info {
        EsInfo::Video(video) => updata_video(ctx, video, payload),
        EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
        EsInfo::Com => Ok(()),
        _ => Ok(()),
    }
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();
            let sub = ctx.cache.subscribe_output();
            let _ = ret.return_ok(sub);
        },
        Sub::Fet(actor_pkt) => {
            let _ = actor_pkt.return_fail(anyhow!("No support yet"));
        },
    }
}
fn mux_write_video(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    Video {
        frame_type,
        codec,
        width,
        height,
        fps,
        time_stamp: _,
    }: Video,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let flvs = muxer.write(
        &payload,
        &CESStreamInfo {
            streamType: if is_probe { 9 } else { 0 },
            vCodecType: codec as _,
            frameType: frame_type as _,
            frameRate: fps as _,
            video_height: height,
            video_width: width,
            ..Default::default()
        },
    )?;
    let mut flv_bytes = Vec::new();
    for flv in flvs {
        let info = EsInfo::Flv;
        let es = PacketEs::new(Bytes::copy_from_slice(flv), info);
        flv_bytes.push(es);
    }
    Ok(flv_bytes)
}

fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };
    let is_key = video.frame_type == FrameType::I;

    let out = mux_write_video(muxer, false, video, payload)?;

    ctx.cache.input_packet(out, is_key);
    Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };
    if ctx.config.source_flv_audioForceAAC && audio.codec != AudioCodec::Aac {
        ctx.encoder.get_or_insert_with(AACEncoder::new);
    }
    let aac = match &mut ctx.encoder {
        Some(encoder) => Some(encoder.write(&audio, payload)?),
        None => {
            if audio.codec == AudioCodec::Aac {
                Some(payload)
            } else {
                None
            }
        },
    };
    let Some(aac) = aac else {
        tracing::warn!("Flv audio only support aac");
        return Ok(());
    };

    let outs = mux_write_audio(
        muxer,
        false,
        audio.codec,
        audio.sample_rate,
        audio.channels,
        audio.bits,
        aac,
    )?;
    ctx.cache.input_packet(outs, false);
    Ok(())
}
#[allow(clippy::too_many_arguments)]
fn mux_write_audio(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    _codec: AudioCodec, //Note : here just allow aac
    sample_rate: i32,
    channels: i32,
    bits: i32,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let flvs = muxer.write(
        &payload,
        &CESStreamInfo {
            streamType: if is_probe { 10 } else { 1 },
            aCodecType: 2,
            aSampleRate: sample_rate,
            aChannels: channels,
            aSampleBits: bits,
            ..Default::default()
        },
    )?;
    let mut flv_bytes = Vec::new();

    for flv in flvs {
        let info = EsInfo::Flv;

        let es = PacketEs::new(Bytes::copy_from_slice(flv), info);

        flv_bytes.push(es);
    }
    Ok(flv_bytes)
}

impl SourceFlv {
    pub fn new(
        config: &SSConfig,
        enhance: bool,
        is_idle: Arc<AtomicBool>,
        mut data_rx: SourceEleReceiver,
    ) -> Result<Self> {
        let cache = GopCache::new_from_config(config, is_idle, true);
        let config = config.clone();

        let (cmd_tx, mut cmd_rx) = Sub::actor();

        let mut ctx = Ctx {
            muxer: None,
            cache,
            encoder: None,
            config: config.to_owned(),
            got_data_time: DataInstant::new(*config.source_wait_audio),
            enhance,
            probe_cache: Vec::new(),
            is_probe: false,
        };

        tokio::spawn(async move {
            loop {
                tokio_select!(match .. {
                    .. if let pkt = data_rx.recv() => {
                        match pkt {
                            Ok(pkt) => {
                                let _ = process_data(&mut ctx, pkt);
                            },
                            Err(BroadcastRecvError::Closed) => {
                                return;
                            },
                            Err(BroadcastRecvError::Lagged(skiped)) => {
                                tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
                            },
                        }
                    },
                    .. if let ret = cmd_rx.recv() => {
                        match ret {
                            Some(ret) => process_sub(&mut ctx, ret),
                            None => break,
                        }
                    },
                });
            }
        });

        Ok(Self { cmd_tx })
    }
}