depthai 0.1.3

Experimental Rust bindings and idiomatic wrapper for Luxonis DepthAI-Core v3.
Documentation
use std::ffi::{c_void, CString};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::ptr;
use std::sync::{Arc, Mutex};

use depthai_sys::{depthai, DaiBuffer, DaiMessageGroup, DaiNode};

use crate::camera::ImageFrame;
use crate::error::{clear_error_flag, last_error, take_error_if_any, Result};
use crate::output::{Input, Output};
use crate::pipeline::{Node, Pipeline, PipelineInner};

pub trait HostNodeImpl: Send + 'static {
    fn process_group(&mut self, group: &MessageGroup) -> Option<Buffer>;
    fn on_start(&mut self) {}
    fn on_stop(&mut self) {}
}

#[derive(Clone)]
pub struct HostNode {
    node: Node,
}

impl HostNode {
    pub(crate) fn from_handle(pipeline: Arc<PipelineInner>, handle: DaiNode) -> Self {
        Self {
            node: Node::from_handle(pipeline, handle),
        }
    }

    pub fn as_node(&self) -> &Node {
        &self.node
    }

    pub fn input(&self, name: &str) -> Result<Input> {
        clear_error_flag();
        let name_c = CString::new(name).map_err(|_| last_error("invalid host input name"))?;
        let handle = unsafe { depthai::dai_hostnode_get_input(self.node.handle(), name_c.as_ptr()) };
        if handle.is_null() {
            Err(last_error("failed to get host node input"))
        } else {
            Ok(Input::from_handle(Arc::clone(&self.node.pipeline), handle))
        }
    }

    pub fn out(&self) -> Result<Output> {
        self.node.output("out")
    }

    pub fn run_syncing_on_host(&self) -> Result<()> {
        clear_error_flag();
        unsafe { depthai::dai_hostnode_run_sync_on_host(self.node.handle()) };
        if let Some(err) = take_error_if_any("failed to configure host syncing") {
            Err(err)
        } else {
            Ok(())
        }
    }

    pub fn run_syncing_on_device(&self) -> Result<()> {
        clear_error_flag();
        unsafe { depthai::dai_hostnode_run_sync_on_device(self.node.handle()) };
        if let Some(err) = take_error_if_any("failed to configure device syncing") {
            Err(err)
        } else {
            Ok(())
        }
    }

    pub fn send_processing_to_pipeline(&self, send: bool) -> Result<()> {
        clear_error_flag();
        unsafe { depthai::dai_hostnode_send_processing_to_pipeline(self.node.handle(), send) };
        if let Some(err) = take_error_if_any("failed to set host processing mode") {
            Err(err)
        } else {
            Ok(())
        }
    }
}

pub struct MessageGroup {
    handle: DaiMessageGroup,
}

impl Drop for MessageGroup {
    fn drop(&mut self) {
        if !self.handle.is_null() {
            unsafe { depthai::dai_message_group_release(self.handle) };
            self.handle = ptr::null_mut();
        }
    }
}

impl MessageGroup {
    pub(crate) fn from_handle(handle: DaiMessageGroup) -> Self {
        Self { handle }
    }

    pub fn get_buffer(&self, name: &str) -> Result<Option<Buffer>> {
        clear_error_flag();
        let name_c = CString::new(name).map_err(|_| last_error("invalid message name"))?;
        let handle = unsafe { depthai::dai_message_group_get_buffer(self.handle, name_c.as_ptr()) };
        if handle.is_null() {
            if let Some(err) = take_error_if_any("failed to get buffer from group") {
                Err(err)
            } else {
                Ok(None)
            }
        } else {
            Ok(Some(Buffer::from_handle(handle)))
        }
    }

    pub fn get_frame(&self, name: &str) -> Result<Option<ImageFrame>> {
        clear_error_flag();
        let name_c = CString::new(name).map_err(|_| last_error("invalid message name"))?;
        let handle = unsafe { depthai::dai_message_group_get_img_frame(self.handle, name_c.as_ptr()) };
        if handle.is_null() {
            if let Some(err) = take_error_if_any("failed to get frame from group") {
                Err(err)
            } else {
                Ok(None)
            }
        } else {
            Ok(Some(ImageFrame::from_handle(handle)))
        }
    }
}

pub struct Buffer {
    handle: DaiBuffer,
}

impl Drop for Buffer {
    fn drop(&mut self) {
        if !self.handle.is_null() {
            unsafe { depthai::dai_buffer_release(self.handle) };
            self.handle = ptr::null_mut();
        }
    }
}

impl Buffer {
    pub(crate) fn from_handle(handle: DaiBuffer) -> Self {
        Self { handle }
    }

    pub fn new(size: usize) -> Result<Self> {
        clear_error_flag();
        let handle = depthai::dai_buffer_new(size);
        if handle.is_null() {
            Err(last_error("failed to allocate buffer"))
        } else {
            Ok(Self { handle })
        }
    }

    pub fn from_bytes(data: &[u8]) -> Result<Self> {
        let buffer = Self::new(data.len())?;
        buffer.set_data(data)?;
        Ok(buffer)
    }

    pub fn set_data(&self, data: &[u8]) -> Result<()> {
        clear_error_flag();
        unsafe { depthai::dai_buffer_set_data(self.handle, data.as_ptr() as *const _, data.len()) };
        if let Some(err) = take_error_if_any("failed to set buffer data") {
            Err(err)
        } else {
            Ok(())
        }
    }

    pub(crate) fn handle(&self) -> DaiBuffer {
        self.handle
    }

    pub(crate) fn into_raw(self) -> DaiBuffer {
        let me = std::mem::ManuallyDrop::new(self);
        me.handle
    }
}

pub(crate) fn create_host_node<T: HostNodeImpl>(pipeline: &Pipeline, node: T) -> Result<HostNode> {
    clear_error_flag();
    let state = Box::new(HostNodeState {
        inner: Mutex::new(node),
    });
    let ctx = Box::into_raw(state) as *mut c_void;
    let handle = unsafe {
        depthai::dai_pipeline_create_host_node(
            pipeline.handle(),
            ctx as *mut _,
            Some(hostnode_process::<T>),
            Some(hostnode_on_start::<T>),
            Some(hostnode_on_stop::<T>),
            Some(hostnode_drop::<T>),
        )
    };
    if handle.is_null() {
        unsafe { drop(Box::from_raw(ctx as *mut HostNodeState<T>)) };
        Err(last_error("failed to create host node"))
    } else {
        Ok(HostNode::from_handle(pipeline.inner_arc(), handle))
    }
}

struct HostNodeState<T: HostNodeImpl> {
    inner: Mutex<T>,
}

unsafe extern "C" fn hostnode_process<T: HostNodeImpl>(ctx: *mut c_void, group: DaiMessageGroup) -> DaiBuffer {
    if ctx.is_null() || group.is_null() {
        return ptr::null_mut();
    }
    let state = unsafe { &*(ctx as *mut HostNodeState<T>) };
    let mut guard = match state.inner.lock() {
        Ok(g) => g,
        Err(e) => e.into_inner(),
    };
    let group = MessageGroup::from_handle(group);
    let result = catch_unwind(AssertUnwindSafe(|| guard.process_group(&group)));
    match result {
        Ok(Some(buffer)) => buffer.into_raw(),
        Ok(None) => ptr::null_mut(),
        Err(_) => ptr::null_mut(),
    }
}

unsafe extern "C" fn hostnode_on_start<T: HostNodeImpl>(ctx: *mut c_void) {
    if ctx.is_null() {
        return;
    }
    let state = unsafe { &*(ctx as *mut HostNodeState<T>) };
    let mut guard = match state.inner.lock() {
        Ok(g) => g,
        Err(e) => e.into_inner(),
    };
    let _ = catch_unwind(AssertUnwindSafe(|| guard.on_start()));
}

unsafe extern "C" fn hostnode_on_stop<T: HostNodeImpl>(ctx: *mut c_void) {
    if ctx.is_null() {
        return;
    }
    let state = unsafe { &*(ctx as *mut HostNodeState<T>) };
    let mut guard = match state.inner.lock() {
        Ok(g) => g,
        Err(e) => e.into_inner(),
    };
    let _ = catch_unwind(AssertUnwindSafe(|| guard.on_stop()));
}

unsafe extern "C" fn hostnode_drop<T: HostNodeImpl>(ctx: *mut c_void) {
    if ctx.is_null() {
        return;
    }
    unsafe { drop(Box::from_raw(ctx as *mut HostNodeState<T>)) };
}