use better_tokio_select::tokio_select;
use std::{
    pin::Pin,
    sync::{Arc, atomic::AtomicBool},
};
use tracing::error;

use bytes::Bytes;
use xux_rs::{define::*, muxer::Muxer};

use crate::{
    define::*,
    gop_cache::*,
    packet::*,
    relay::SourceRelay,
    traits::ISource,
    utils::{DataInstant, ElapsedState},
};
use anyhow::{Context as _, Result, anyhow, bail};

type ThisMuxer = Muxer;
const SCHEME: SSScheme = SSScheme::Rtp;

pub struct Ctx {
    muxer: Option<Pin<Box<ThisMuxer>>>,
    cache: GopCache,
    config: SSConfig,
    probe_cache: Vec<PacketEs>,
    got_data_time: DataInstant,
    is_probe: bool,
    source_relay: SourceRelay,
    muxer_name: String,
}
pub struct SourceRtp {
    cmd_tx: SubCaller,
}
#[async_trait::async_trait]
impl ISource for SourceRtp {
    async fn subscribe(&self) -> Result<SourceDataReceiver> {
        let full_data = self.cmd_tx.sub(()).await?;
        Ok(SourceDataReceiver { full_data })
    }
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    let (payload, info) = pkt.into_splite();
    match info {
        EsInfo::Video(video) => updata_video(ctx, video, payload),
        EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
        EsInfo::Com => Ok(()),
        _ => Ok(()),
    }
}

fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
    if ctx.is_probe {
        return Ok(ProbeState::Already);
    }
    ctx.probe_cache.push(pkt.clone());
    let mut goto_probe = false;

    if ctx.config.source_enable_audio {
        if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
            goto_probe = true;
        }
    } else {
        goto_probe = true;
    }

    if pkt.is_audio_packet() {
        goto_probe = true;
    }

    if !goto_probe {
        bail!("Continue");
    }
    probe(ctx)?;

    ctx.is_probe = true;
    let probe_cache = std::mem::take(&mut ctx.probe_cache);

    if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
        tracing::warn!(
            "probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
            probe_cache.len(),
            ctx.config.source_client_jitter_max
        );
    }
    for pkt in probe_cache {
        // Note: here just ignore value
        let _ = update_detail(ctx, pkt);
    }
    Ok(ProbeState::Success)
}

fn probe(ctx: &mut Ctx) -> Result<()> {
    let (video, payload) = ctx
        .probe_cache
        .iter()
        .find(|e| e.is_video_key_packet())
        .ok_or_else(|| anyhow!("Can not wait any video key frame"))?
        .clone()
        .into_video();

    let muxer = if ctx.muxer.is_none() {
        let muxer = ThisMuxer::new_with_name(&ctx.muxer_name, ctx.source_relay.muxer_config())?;
        ctx.muxer.insert(muxer)
    } else {
        ctx.muxer.as_mut().with_context(|| "fail to get muxer")?
    };

    // Just ignore error here, because it can be `Continue error`
    let _ = mux_write_video(muxer, true, video, payload);

    if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
        let (audio, payload) = first_audio.to_owned().into_audio();
        // Just ignore error here, because it can be `Continue error`
        let _ = mux_write_audio(
            muxer,
            true,
            audio.codec,
            audio.sample_rate,
            audio.channels,
            audio.bits,
            payload,
        );
    }
    match muxer.get_init_data() {
        Ok(mut init_data) => {
            init_data.extend_from_slice(RTP_STREAM_SEPERATE); // use to seperate the initData and rtp-stream
            let info = EsInfo::RtpFull(RtpFull { stream_type: 7 }); // 7: DATA
            let es = PacketEs::new(init_data, info);
            ctx.cache.set_init_data(es);
        },
        Err(e) => {
            error!("Get init data fail {}, scheme: {}", e, SCHEME);
        },
    }

    Ok(())
}

fn mux_write_video(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    Video {
        frame_type,
        codec,
        width,
        height,
        fps,
        time_stamp,
    }: Video,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    match muxer.write(
        &payload,
        &CESStreamInfo {
            streamType: if is_probe { 9 } else { 0 },
            vCodecType: codec as _,
            frameType: frame_type as _,
            timestamp: time_stamp as _,
            frameRate: fps as _,
            video_height: height,
            video_width: width,
            ..Default::default()
        },
    ) {
        Err(e) => {
            tracing::debug!("Mux write video data fail, error: {}", e);
            Err(e)
        },
        Ok(rtps) => {
            let mut rtp_bytes = Vec::new();
            for rtp in rtps {
                let info = EsInfo::RtpFull(RtpFull { stream_type: 0 });
                let es = PacketEs::new(Bytes::copy_from_slice(rtp), info);
                rtp_bytes.push(es);
            }
            Ok(rtp_bytes)
        },
    }
}
#[allow(clippy::too_many_arguments)]
fn mux_write_audio(
    muxer: &mut Pin<Box<ThisMuxer>>,
    is_probe: bool,
    codec: AudioCodec,
    sample_rate: i32,
    channels: i32,
    bits: i32,
    payload: Bytes,
) -> Result<Vec<PacketEs>> {
    let rtps = muxer.write(
        &payload,
        &CESStreamInfo {
            streamType: if is_probe { 10 } else { 1 },
            aCodecType: codec as _,
            aSampleRate: sample_rate,
            aChannels: channels,
            aSampleBits: bits,
            ..Default::default()
        },
    )?;
    let mut rtp_bytes = Vec::new();
    for rtp in rtps {
        let info = EsInfo::RtpFull(RtpFull { stream_type: 1 });
        let es = PacketEs::new(Bytes::copy_from_slice(rtp), info);
        rtp_bytes.push(es);
    }
    Ok(rtp_bytes)
}

fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };
    let is_key = video.frame_type == FrameType::I;
    let outs = mux_write_video(muxer, false, video, payload)?;

    ctx.cache.input_packet(outs, is_key);

    Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
    let Some(muxer) = &mut ctx.muxer else {
        return Ok(());
    };

    let outs = mux_write_audio(
        muxer,
        false,
        audio.codec,
        audio.sample_rate,
        audio.channels,
        audio.bits,
        payload,
    )?;

    ctx.cache.input_packet(outs, false);
    Ok(())
}

fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
    match make_sure_probe(ctx, &pkt)? {
        ProbeState::Success => Ok(()),
        ProbeState::Already => update_detail(ctx, pkt),
    }
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
    match sub {
        Sub::Sub(actor_pkt) => {
            let (_, ret) = actor_pkt.split();
            let sub = ctx.cache.subscribe_output();
            let _ = ret.return_ok(sub);
        },
        Sub::Fet(actor_pkt) => {
            let _ = actor_pkt.return_fail(anyhow::anyhow!("No support yet"));
        },
    }
}

impl SourceRtp {
    pub fn new(
        source_relay: &SourceRelay,
        config: &SSConfig,
        is_idle: Arc<AtomicBool>,
        muxer_name: impl Into<String>,
        mut data_rx: SourceEleReceiver,
    ) -> Result<Self> {
        let cache = GopCache::new_from_config(config, is_idle, true);

        let (cmd_tx, mut cmd_rx) = Sub::actor();

        let mut ctx = Ctx {
            muxer: None,
            cache,
            config: config.to_owned(),
            got_data_time: DataInstant::new(*config.source_wait_audio),
            probe_cache: Vec::new(),
            is_probe: false,
            source_relay: source_relay.to_owned(),
            muxer_name: muxer_name.into(),
        };

        tokio::spawn(async move {
            loop {
                tokio_select!(match .. {
                    .. if let pkt = data_rx.recv() => {
                        match pkt {
                            Ok(pkt) => {
                                let _ = process_data(&mut ctx, pkt);
                            },
                            Err(BroadcastRecvError::Closed) => {
                                return;
                            },
                            Err(BroadcastRecvError::Lagged(skiped)) => {
                                tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
                            },
                        }
                    },
                    .. if let ret = cmd_rx.recv() => {
                        match ret {
                            Some(ret) => process_sub(&mut ctx, ret),
                            None => break,
                        }
                    },
                });
            }
        });

        Ok(Self { cmd_tx })
    }
}