use std::{
    collections::HashMap,
    mem,
    sync::{atomic, Arc, Mutex, MutexGuard},
};
use gst::{glib, prelude::*};
use once_cell::sync::Lazy;
use thiserror::Error;
#[derive(Debug)]
struct WrappedAtomicU64 {
    #[cfg(not(target_has_atomic = "64"))]
    atomic: Mutex<u64>,
    #[cfg(target_has_atomic = "64")]
    atomic: atomic::AtomicU64,
}
#[cfg(target_has_atomic = "64")]
impl WrappedAtomicU64 {
    fn new(value: u64) -> WrappedAtomicU64 {
        WrappedAtomicU64 {
            atomic: atomic::AtomicU64::new(value),
        }
    }
    fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
        self.atomic.fetch_add(value, order)
    }
    fn store(&self, value: u64, order: atomic::Ordering) {
        self.atomic.store(value, order);
    }
    fn load(&self, order: atomic::Ordering) -> u64 {
        self.atomic.load(order)
    }
}
#[cfg(not(target_has_atomic = "64"))]
impl WrappedAtomicU64 {
    fn new(value: u64) -> WrappedAtomicU64 {
        WrappedAtomicU64 {
            atomic: Mutex::new(value),
        }
    }
    fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
        let mut guard = self.atomic.lock().unwrap();
        let old = *guard;
        *guard += value;
        old
    }
    fn store(&self, value: u64, _order: atomic::Ordering) {
        *self.atomic.lock().unwrap() = value;
    }
    fn load(&self, _order: atomic::Ordering) -> u64 {
        *self.atomic.lock().unwrap()
    }
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
    gst::DebugCategory::new(
        "utilsrs-stream-producer",
        gst::DebugColorFlags::empty(),
        Some("gst_app Stream Producer interface"),
    )
});
#[derive(Debug, Clone)]
pub struct StreamProducer {
    appsink: gst_app::AppSink,
    consumers: Arc<Mutex<StreamConsumers>>,
}
impl PartialEq for StreamProducer {
    fn eq(&self, other: &Self) -> bool {
        self.appsink.eq(&other.appsink)
    }
}
impl Eq for StreamProducer {}
#[derive(Debug)]
#[must_use]
pub struct ConsumptionLink {
    consumer: gst_app::AppSrc,
    producer: Option<StreamProducer>,
    dropped: Arc<WrappedAtomicU64>,
    pushed: Arc<WrappedAtomicU64>,
    discard: Arc<atomic::AtomicBool>,
}
impl ConsumptionLink {
    pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
        ConsumptionLink {
            consumer,
            producer: None,
            dropped: Arc::new(WrappedAtomicU64::new(0)),
            pushed: Arc::new(WrappedAtomicU64::new(0)),
            discard: Arc::new(atomic::AtomicBool::new(false)),
        }
    }
    pub fn change_producer(
        &mut self,
        new_producer: &StreamProducer,
        reset_stats: bool,
    ) -> Result<(), AddConsumerError> {
        self.disconnect();
        if reset_stats {
            self.dropped.store(0, atomic::Ordering::SeqCst);
            self.pushed.store(0, atomic::Ordering::SeqCst);
        }
        new_producer.add_consumer_internal(
            &self.consumer,
            self.dropped.clone(),
            self.pushed.clone(),
            self.discard.clone(),
        )?;
        self.producer = Some(new_producer.clone());
        Ok(())
    }
    pub fn disconnect(&mut self) {
        if let Some(producer) = self.producer.take() {
            producer.remove_consumer(&self.consumer);
        }
    }
    pub fn dropped(&self) -> u64 {
        self.dropped.load(atomic::Ordering::SeqCst)
    }
    pub fn pushed(&self) -> u64 {
        self.pushed.load(atomic::Ordering::SeqCst)
    }
    pub fn discard(&self) -> bool {
        self.discard.load(atomic::Ordering::SeqCst)
    }
    pub fn set_discard(&self, discard: bool) {
        self.discard.store(discard, atomic::Ordering::SeqCst)
    }
    pub fn appsrc(&self) -> &gst_app::AppSrc {
        &self.consumer
    }
}
impl Drop for ConsumptionLink {
    fn drop(&mut self) {
        self.disconnect();
    }
}
#[derive(Debug, Error)]
pub enum AddConsumerError {
    #[error("Consumer already added")]
    AlreadyAdded,
}
impl StreamProducer {
    pub fn configure_consumer(consumer: &gst_app::AppSrc) {
        consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
        consumer.set_format(gst::Format::Time);
        consumer.set_is_live(true);
        consumer.set_handle_segment_change(true);
        consumer.set_max_buffers(0);
        consumer.set_max_bytes(0);
        consumer.set_max_time(500 * gst::ClockTime::MSECOND);
        consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
        consumer.set_automatic_eos(false);
    }
    pub fn add_consumer(
        &self,
        consumer: &gst_app::AppSrc,
    ) -> Result<ConsumptionLink, AddConsumerError> {
        let dropped = Arc::new(WrappedAtomicU64::new(0));
        let pushed = Arc::new(WrappedAtomicU64::new(0));
        let discard = Arc::new(atomic::AtomicBool::new(false));
        self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?;
        Ok(ConsumptionLink {
            consumer: consumer.clone(),
            producer: Some(self.clone()),
            dropped,
            pushed,
            discard,
        })
    }
    fn add_consumer_internal(
        &self,
        consumer: &gst_app::AppSrc,
        dropped: Arc<WrappedAtomicU64>,
        pushed: Arc<WrappedAtomicU64>,
        discard: Arc<atomic::AtomicBool>,
    ) -> Result<(), AddConsumerError> {
        let mut consumers = self.consumers.lock().unwrap();
        if consumers.consumers.contains_key(consumer) {
            gst::error!(
                CAT,
                obj = &self.appsink,
                "Consumer {} ({:?}) already added",
                consumer.name(),
                consumer
            );
            return Err(AddConsumerError::AlreadyAdded);
        }
        gst::debug!(
            CAT,
            obj = &self.appsink,
            "Adding consumer {} ({:?})",
            consumer.name(),
            consumer
        );
        Self::configure_consumer(consumer);
        let srcpad = consumer.static_pad("src").unwrap();
        let appsink = &self.appsink;
        let fku_probe_id = srcpad
            .add_probe(
                gst::PadProbeType::EVENT_UPSTREAM,
                glib::clone!(
                    #[weak]
                    appsink,
                    #[upgrade_or_panic]
                    move |_pad, info| {
                        let Some(event) = info.event() else {
                            return gst::PadProbeReturn::Ok;
                        };
                        if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
                            gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
                            let pad = appsink.static_pad("sink").unwrap();
                            let _ = pad.push_event(event.clone());
                        }
                        gst::PadProbeReturn::Ok
                    }
                ),
            )
            .unwrap();
        let stream_consumer = StreamConsumer::new(consumer, fku_probe_id, dropped, pushed, discard);
        consumers
            .consumers
            .insert(consumer.clone(), stream_consumer);
        let events_to_forward = consumers.events_to_forward.clone();
        drop(consumers);
        let appsink_pad = self.appsink.static_pad("sink").unwrap();
        appsink_pad.sticky_events_foreach(|event| {
            if events_to_forward.contains(&event.type_()) {
                gst::debug!(CAT, obj = &self.appsink, "forward sticky event {:?}", event);
                consumer.send_event(event.clone());
            }
            std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
        });
        Ok(())
    }
    fn process_sample(
        sample: gst::Sample,
        appsink: &gst_app::AppSink,
        mut consumers: MutexGuard<StreamConsumers>,
    ) -> Result<gst::FlowSuccess, gst::FlowError> {
        let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
            let flags = buf.flags();
            (
                flags.contains(gst::BufferFlags::DISCONT),
                !flags.contains(gst::BufferFlags::DELTA_UNIT),
            )
        } else {
            (false, true)
        };
        gst::trace!(
            CAT,
            obj = appsink,
            "processing sample {:?}",
            sample.buffer()
        );
        let latency = consumers.current_latency;
        let latency_updated = mem::replace(&mut consumers.latency_updated, false);
        let mut needs_keyframe_request = false;
        let current_consumers = consumers
            .consumers
            .values()
            .filter_map(|consumer| {
                if let Some(latency) = latency {
                    if consumer
                        .forwarded_latency
                        .compare_exchange(
                            false,
                            true,
                            atomic::Ordering::SeqCst,
                            atomic::Ordering::SeqCst,
                        )
                        .is_ok()
                        || latency_updated
                    {
                        consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
                    }
                }
                if consumer.discard.load(atomic::Ordering::SeqCst) {
                    consumer
                        .needs_keyframe
                        .store(false, atomic::Ordering::SeqCst);
                    return None;
                }
                if is_discont && !is_keyframe {
                    consumer
                        .needs_keyframe
                        .store(true, atomic::Ordering::SeqCst);
                }
                if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
                    if !needs_keyframe_request {
                        gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
                        needs_keyframe_request = true;
                    }
                    consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
                    gst::debug!(
                        CAT,
                        obj = appsink,
                        "Ignoring frame for {} while waiting for a keyframe",
                        consumer.appsrc.name()
                    );
                    None
                } else {
                    consumer
                        .needs_keyframe
                        .store(false, atomic::Ordering::SeqCst);
                    consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
                    Some(consumer.appsrc.clone())
                }
            })
            .collect::<Vec<_>>();
        drop(consumers);
        if needs_keyframe_request {
            let pad = appsink.static_pad("sink").unwrap();
            pad.push_event(
                gst_video::UpstreamForceKeyUnitEvent::builder()
                    .all_headers(true)
                    .build(),
            );
        }
        for consumer in current_consumers {
            if let Err(err) = consumer.push_sample(&sample) {
                gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
            }
        }
        Ok(gst::FlowSuccess::Ok)
    }
    pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
        let name = consumer.name();
        if self
            .consumers
            .lock()
            .unwrap()
            .consumers
            .remove(consumer)
            .is_some()
        {
            gst::debug!(
                CAT,
                obj = &self.appsink,
                "Removed consumer {} ({:?})",
                name,
                consumer
            );
            consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
        } else {
            gst::debug!(
                CAT,
                obj = &self.appsink,
                "Consumer {} ({:?}) not found",
                name,
                consumer
            );
        }
    }
    pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
        self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect();
    }
    pub fn set_forward_preroll(&self, forward_preroll: bool) {
        self.consumers.lock().unwrap().forward_preroll = forward_preroll;
    }
    pub fn appsink(&self) -> &gst_app::AppSink {
        &self.appsink
    }
    pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
        let consumers = self.consumers.lock().unwrap();
        for consumer in consumers.consumers.keys() {
            let mut msg_builder =
                gst::message::Error::builder_from_error(error.clone()).src(consumer);
            if let Some(debug) = debug {
                msg_builder = msg_builder.debug(debug);
            }
            let _ = consumer.post_message(msg_builder.build());
        }
    }
    pub fn last_sample(&self) -> Option<gst::Sample> {
        self.appsink.property("last-sample")
    }
}
impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
    fn from(appsink: &'a gst_app::AppSink) -> Self {
        let consumers = Arc::new(Mutex::new(StreamConsumers {
            current_latency: None,
            latency_updated: false,
            consumers: HashMap::new(),
            events_to_forward: vec![gst::EventType::Eos],
            forward_preroll: true,
            just_forwarded_preroll: false,
        }));
        appsink.set_callbacks(
            gst_app::AppSinkCallbacks::builder()
                .new_sample(glib::clone!(
                    #[strong]
                    consumers,
                    move |appsink| {
                        let mut consumers = consumers.lock().unwrap();
                        let sample = match appsink.pull_sample() {
                            Ok(sample) => sample,
                            Err(_err) => {
                                gst::debug!(CAT, obj = appsink, "Failed to pull sample");
                                return Err(gst::FlowError::Flushing);
                            }
                        };
                        let just_forwarded_preroll =
                            mem::replace(&mut consumers.just_forwarded_preroll, false);
                        if just_forwarded_preroll {
                            return Ok(gst::FlowSuccess::Ok);
                        }
                        StreamProducer::process_sample(sample, appsink, consumers)
                    }
                ))
                .new_preroll(glib::clone!(
                    #[strong]
                    consumers,
                    move |appsink| {
                        let mut consumers = consumers.lock().unwrap();
                        let sample = match appsink.pull_preroll() {
                            Ok(sample) => sample,
                            Err(_err) => {
                                gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
                                return Err(gst::FlowError::Flushing);
                            }
                        };
                        if consumers.forward_preroll {
                            consumers.just_forwarded_preroll = true;
                            StreamProducer::process_sample(sample, appsink, consumers)
                        } else {
                            Ok(gst::FlowSuccess::Ok)
                        }
                    }
                ))
                .new_event(glib::clone!(
                    #[strong]
                    consumers,
                    move |appsink| {
                        match appsink
                            .pull_object()
                            .map(|obj| obj.downcast::<gst::Event>())
                        {
                            Ok(Ok(event)) => {
                                let (events_to_forward, appsrcs) = {
                                    let consumers = consumers.lock().unwrap();
                                    let events = consumers.events_to_forward.clone();
                                    let appsrcs =
                                        consumers.consumers.keys().cloned().collect::<Vec<_>>();
                                    (events, appsrcs)
                                };
                                if events_to_forward.contains(&event.type_()) {
                                    for appsrc in appsrcs {
                                        appsrc.send_event(event.clone());
                                    }
                                }
                            }
                            Ok(Err(_)) => {} Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
                        }
                        false
                    }
                ))
                .eos(glib::clone!(
                    #[strong]
                    consumers,
                    move |appsink| {
                        let stream_consumers = consumers.lock().unwrap();
                        if stream_consumers
                            .events_to_forward
                            .contains(&gst::EventType::Eos)
                        {
                            let current_consumers = stream_consumers
                                .consumers
                                .values()
                                .map(|c| c.appsrc.clone())
                                .collect::<Vec<_>>();
                            drop(stream_consumers);
                            for consumer in current_consumers {
                                gst::debug!(
                                    CAT,
                                    obj = appsink,
                                    "set EOS on consumer {}",
                                    consumer.name()
                                );
                                let _ = consumer.end_of_stream();
                            }
                        } else {
                            gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
                        }
                    }
                ))
                .build(),
        );
        let sinkpad = appsink.static_pad("sink").unwrap();
        sinkpad.add_probe(
            gst::PadProbeType::EVENT_UPSTREAM,
            glib::clone!(
                #[strong]
                consumers,
                move |_pad, info| {
                    let Some(event) = info.event() else {
                        return gst::PadProbeReturn::Ok;
                    };
                    let gst::EventView::Latency(event) = event.view() else {
                        return gst::PadProbeReturn::Ok;
                    };
                    let latency = event.latency();
                    let mut consumers = consumers.lock().unwrap();
                    consumers.current_latency = Some(latency);
                    gst::PadProbeReturn::Ok
                }
            ),
        );
        StreamProducer {
            appsink: appsink.clone(),
            consumers,
        }
    }
}
#[derive(Debug)]
struct StreamConsumers {
    current_latency: Option<gst::ClockTime>,
    latency_updated: bool,
    consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
    events_to_forward: Vec<gst::EventType>,
    forward_preroll: bool,
    just_forwarded_preroll: bool,
}
#[derive(Debug)]
struct StreamConsumer {
    appsrc: gst_app::AppSrc,
    fku_probe_id: Option<gst::PadProbeId>,
    forwarded_latency: atomic::AtomicBool,
    needs_keyframe: Arc<atomic::AtomicBool>,
    dropped: Arc<WrappedAtomicU64>,
    pushed: Arc<WrappedAtomicU64>,
    discard: Arc<atomic::AtomicBool>,
}
impl StreamConsumer {
    fn new(
        appsrc: &gst_app::AppSrc,
        fku_probe_id: gst::PadProbeId,
        dropped: Arc<WrappedAtomicU64>,
        pushed: Arc<WrappedAtomicU64>,
        discard: Arc<atomic::AtomicBool>,
    ) -> Self {
        let needs_keyframe = Arc::new(atomic::AtomicBool::new(true));
        let needs_keyframe_clone = needs_keyframe.clone();
        let dropped_clone = dropped.clone();
        appsrc.set_callbacks(
            gst_app::AppSrcCallbacks::builder()
                .enough_data(move |appsrc| {
                    gst::debug!(
                        CAT,
                        obj = appsrc,
                        "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
                        appsrc.name(),
                        appsrc,
                    );
                    needs_keyframe_clone.store(true, atomic::Ordering::SeqCst);
                    dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
                })
                .build(),
        );
        StreamConsumer {
            appsrc: appsrc.clone(),
            fku_probe_id: Some(fku_probe_id),
            forwarded_latency: atomic::AtomicBool::new(false),
            needs_keyframe,
            dropped,
            pushed,
            discard,
        }
    }
}
impl Drop for StreamConsumer {
    fn drop(&mut self) {
        if let Some(fku_probe_id) = self.fku_probe_id.take() {
            let srcpad = self.appsrc.static_pad("src").unwrap();
            srcpad.remove_probe(fku_probe_id);
        }
    }
}
impl PartialEq for StreamConsumer {
    fn eq(&self, other: &Self) -> bool {
        self.appsrc.eq(&other.appsrc)
    }
}
impl Eq for StreamConsumer {}
impl std::hash::Hash for StreamConsumer {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        std::hash::Hash::hash(&self.appsrc, state);
    }
}
impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
    #[inline]
    fn borrow(&self) -> &gst_app::AppSrc {
        &self.appsrc
    }
}
#[cfg(test)]
mod tests {
    use std::{
        str::FromStr,
        sync::{Arc, Mutex},
    };
    use futures::{
        channel::{mpsc, mpsc::Receiver},
        SinkExt, StreamExt,
    };
    use gst::prelude::*;
    use crate::{ConsumptionLink, StreamProducer};
    fn create_producer() -> (
        gst::Pipeline,
        gst_app::AppSrc,
        gst_app::AppSink,
        StreamProducer,
    ) {
        let producer_pipe =
            gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
                .unwrap()
                .downcast::<gst::Pipeline>()
                .unwrap();
        let producer_sink = producer_pipe
            .by_name("producer_sink")
            .unwrap()
            .downcast::<gst_app::AppSink>()
            .unwrap();
        (
            producer_pipe.clone(),
            producer_pipe
                .by_name("producer_src")
                .unwrap()
                .downcast::<gst_app::AppSrc>()
                .unwrap(),
            producer_sink.clone(),
            StreamProducer::from(&producer_sink),
        )
    }
    struct Consumer {
        pipeline: gst::Pipeline,
        src: gst_app::AppSrc,
        sink: gst_app::AppSink,
        receiver: Mutex<Receiver<gst::Sample>>,
        connected: Mutex<bool>,
    }
    impl Consumer {
        fn new(id: &str) -> Self {
            let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
                .unwrap()
                .downcast::<gst::Pipeline>()
                .unwrap();
            let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
            let sender = Arc::new(Mutex::new(sender));
            let sink = pipeline
                .by_name("sink")
                .unwrap()
                .downcast::<gst_app::AppSink>()
                .unwrap();
            sink.set_callbacks(
                gst_app::AppSinkCallbacks::builder()
                    .new_sample(move |appsink| {
                        let sender_clone = sender.clone();
                        futures::executor::block_on(
                            sender_clone
                                .lock()
                                .unwrap()
                                .send(appsink.pull_sample().unwrap()),
                        )
                        .unwrap();
                        Ok(gst::FlowSuccess::Ok)
                    })
                    .build(),
            );
            Self {
                pipeline: pipeline.clone(),
                src: pipeline
                    .by_name(id)
                    .unwrap()
                    .downcast::<gst_app::AppSrc>()
                    .unwrap(),
                sink,
                receiver: Mutex::new(receiver),
                connected: Mutex::new(false),
            }
        }
        fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
            {
                let mut connected = self.connected.lock().unwrap();
                *connected = true;
            }
            producer.add_consumer(&self.src).unwrap()
        }
        fn disconnect(&self, producer: &StreamProducer) {
            {
                let mut connected = self.connected.lock().unwrap();
                *connected = false;
            }
            producer.remove_consumer(&self.src);
        }
    }
    #[test]
    fn simple() {
        gst::init().unwrap();
        let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
        producer_pipe
            .set_state(gst::State::Playing)
            .expect("Couldn't set producer pipeline state");
        let mut consumers: Vec<Consumer> = Vec::new();
        let consumer = Consumer::new("consumer1");
        let link1 = consumer.connect(&producer);
        consumer
            .pipeline
            .set_state(gst::State::Playing)
            .expect("Couldn't set producer pipeline state");
        consumers.push(consumer);
        let consumer = Consumer::new("consumer2");
        let link2 = consumer.connect(&producer);
        consumer
            .pipeline
            .set_state(gst::State::Playing)
            .expect("Couldn't set producer pipeline state");
        consumers.push(consumer);
        assert!(producer.last_sample().is_none());
        for i in 0..10 {
            let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
            producer_src.set_caps(Some(&caps));
            producer_src.push_buffer(gst::Buffer::new()).unwrap();
            for consumer in &consumers {
                if *consumer.connected.lock().unwrap() {
                    let sample =
                        futures::executor::block_on(consumer.receiver.lock().unwrap().next())
                            .expect("Received an empty buffer?");
                    sample.buffer().expect("No buffer on the sample?");
                    assert_eq!(sample.caps(), Some(caps.as_ref()));
                } else {
                    debug_assert!(
                        consumer
                            .sink
                            .try_pull_sample(gst::ClockTime::from_nseconds(0))
                            .is_none(),
                        "Disconnected consumer got a new sample?!"
                    );
                }
            }
            if i == 5 {
                consumers.first().unwrap().disconnect(&producer);
            }
        }
        assert!(producer.last_sample().is_some());
        assert_eq!(link1.pushed(), 6);
        assert_eq!(link1.dropped(), 0);
        assert_eq!(link2.pushed(), 10);
        assert_eq!(link2.dropped(), 0);
    }
}