cu-gstreamer 0.15.0

This is a Copper GStreamer sink.
Documentation
use cu29::prelude::*;
use gstreamer::prelude::*;

use bincode::de::Decoder;
use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::{Decode, Encode};
use circular_buffer::CircularBuffer;
use gstreamer::{Buffer, BufferRef, Caps, FlowSuccess, Pipeline, parse};
use gstreamer_app::{AppSink, AppSinkCallbacks};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::{Arc, Mutex};

#[derive(Debug, Clone, Default, Reflect)]
#[reflect(opaque, from_reflect = false)]
pub struct CuGstBuffer(pub Buffer);

impl Serialize for CuGstBuffer {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        let Self(r) = self;
        r.as_ref()
            .map_readable()
            .map_err(|_| serde::ser::Error::custom("Could not map readable"))?
            .serialize(serializer)
    }
}

impl<'de> Deserialize<'de> for CuGstBuffer {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let data = Vec::<u8>::deserialize(deserializer)?;
        Ok(CuGstBuffer(Buffer::from_slice(data)))
    }
}

impl Deref for CuGstBuffer {
    type Target = Buffer;

    fn deref(&self) -> &Self::Target {
        let Self(r) = self;
        r
    }
}

impl DerefMut for CuGstBuffer {
    fn deref_mut(&mut self) -> &mut Self::Target {
        let Self(r) = self;
        r
    }
}

impl Decode<()> for CuGstBuffer {
    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
        let vec: Vec<u8> = Vec::decode(decoder)?;
        let buffer = Buffer::from_slice(vec);
        Ok(CuGstBuffer(buffer))
    }
}

impl Encode for CuGstBuffer {
    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
        let Self(r) = self;
        r.as_ref()
            .map_readable()
            .map_err(|_| EncodeError::Other("Could not map readable"))?
            .encode(encoder)
    }
}

pub type CuDefaultGStreamer = CuGStreamer<8>;

#[derive(Reflect)]
#[reflect(from_reflect = false)]
pub struct CuGStreamer<const N: usize> {
    #[reflect(ignore)]
    pipeline: Pipeline,
    #[reflect(ignore)]
    circular_buffer: Arc<Mutex<CircularBuffer<N, CuGstBuffer>>>,
    #[reflect(ignore)]
    _appsink: AppSink,
}

impl<const N: usize> Freezable for CuGStreamer<N> {}

impl<const N: usize> CuSrcTask for CuGStreamer<N> {
    type Resources<'r> = ();
    type Output<'m> = output_msg!(CuGstBuffer);

    fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
    where
        Self: Sized,
    {
        if !gstreamer::INITIALIZED.load(std::sync::atomic::Ordering::SeqCst) {
            gstreamer::init()
                .map_err(|e| CuError::new_with_cause("Failed to initialize gstreamer.", e))?;
        } else {
            debug!("Gstreamer already initialized.");
        }

        let config = config.ok_or_else(|| CuError::from("No config provided."))?;

        let pipeline = if let Some(pipeline_str) = config.get::<String>("pipeline")? {
            debug!("Creating with pipeline: {}", &pipeline_str);
            let pipeline = parse::launch(pipeline_str.as_str())
                .map_err(|e| CuError::new_with_cause("Failed to parse pipeline.", e))?;
            Ok(pipeline)
        } else {
            Err(CuError::from("No pipeline provided."))
        }?;
        let caps_str = if let Some(caps_str) = config.get::<String>("caps")? {
            debug!("Creating with caps: {}", &caps_str);
            Ok(caps_str)
        } else {
            Err(CuError::from(
                "No Caps (ie format for example \"video/x-raw, format=NV12, width=1920, height=1080\") provided for the appsink element.",
            ))
        }?;

        let pipeline = pipeline
            .dynamic_cast::<Pipeline>()
            .map_err(|_| CuError::from("Failed to cast pipeline to gstreamer::Pipeline."))?;

        let appsink = pipeline.by_name("copper").ok_or::<CuError>("Failed to get find the \"appsink\" element in the pipeline string, be sure you have an appsink name=copper to feed this task.".into())?;
        let appsink = appsink
            .dynamic_cast::<AppSink>()
            .map_err(|_| CuError::from("Failed to cast appsink to gstreamer::AppSink."))?;
        let caps = Caps::from_str(caps_str.as_str())
            .map_err(|e| CuError::new_with_cause("Failed to create caps for appsink.", e))?;

        appsink.set_caps(Some(&caps));

        let circular_buffer = Arc::new(Mutex::new(CircularBuffer::new()));

        // Configure `appsink` to handle incoming buffers
        appsink.set_callbacks(
            AppSinkCallbacks::builder()
                .new_sample({
                    let circular_buffer = circular_buffer.clone();
                    move |appsink| {
                        let sample = appsink
                            .pull_sample()
                            .map_err(|_| gstreamer::FlowError::Eos)?;
                        let buffer: &BufferRef =
                            sample.buffer().ok_or(gstreamer::FlowError::Error)?;
                        circular_buffer
                            .lock()
                            .unwrap()
                            .push_back(CuGstBuffer(buffer.to_owned()));
                        Ok(FlowSuccess::Ok)
                    }
                })
                .build(),
        );

        let s = CuGStreamer {
            pipeline,
            circular_buffer,
            _appsink: appsink,
        };
        Ok(s)
    }

    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
        debug!("Gstreamer: Starting pipeline.");
        self.circular_buffer.lock().unwrap().clear();
        self.pipeline
            .set_state(gstreamer::State::Playing)
            .map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?;
        debug!("Gstreamer: Starting pipeline OK.");
        Ok(())
    }

    fn process(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'_>) -> CuResult<()> {
        let mut circular_buffer = self.circular_buffer.lock().unwrap();
        if let Some(buffer) = circular_buffer.pop_front() {
            // TODO: do precise timing metadata from gstreamer
            new_msg.tov = ctx.now().into();
            new_msg.set_payload(buffer);
        } else {
            debug!("Gstreamer: Empty circular buffer, sending no payload.");
            new_msg.clear_payload();
        }
        Ok(())
    }

    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
        self.pipeline
            .set_state(gstreamer::State::Null)
            .map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?;
        self.circular_buffer.lock().unwrap().clear();
        Ok(())
    }
}

// No test here, see the integration tests.