pub mod device_node;
pub mod node;
use autocxx::c_int;
use depthai_sys::{depthai, DaiPipeline};
pub use device_node::{CreateInPipeline, CreateInPipelineWith, DeviceNode, DeviceNodeWithParams};
pub use node::Node;
use std::collections::HashMap;
use std::sync::Arc;
use std::{
ffi::{CStr, CString},
path::{Path, PathBuf},
};
use crate::{
camera::{CameraBoardSocket, CameraNode},
device::Device,
error::{clear_error_flag, last_error, DepthaiError, Result},
host_node::{create_host_node, HostNode, HostNodeImpl},
threaded_host_node::{create_threaded_host_node, ThreadedHostNode, ThreadedHostNodeImpl},
};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[repr(i32)]
pub enum OpenVinoVersion {
V2020_3 = 0,
V2020_4 = 1,
V2021_1 = 2,
V2021_2 = 3,
V2021_3 = 4,
V2021_4 = 5,
V2022_1 = 6,
Universal = 7,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[repr(i32)]
pub enum SerializationType {
LibNop = 0,
Json = 1,
JsonMsgPack = 2,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PipelineNodeInfo {
pub id: i32,
pub alias: String,
pub name: String,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PipelineConnectionInfo {
#[serde(rename = "outputId")]
pub output_id: i32,
#[serde(rename = "outputGroup")]
pub output_group: String,
#[serde(rename = "outputName")]
pub output_name: String,
#[serde(rename = "inputId")]
pub input_id: i32,
#[serde(rename = "inputGroup")]
pub input_group: String,
#[serde(rename = "inputName")]
pub input_name: String,
}
fn take_owned_json_string(ptr: *mut std::ffi::c_char, context: &str) -> Result<String> {
if ptr.is_null() {
return Err(last_error(context));
}
let s = unsafe { CStr::from_ptr(ptr).to_string_lossy().into_owned() };
unsafe { depthai::dai_free_cstring(ptr) };
Ok(s)
}
fn parse_json_value(s: &str) -> Result<serde_json::Value> {
serde_json::from_str(s)
.map_err(|e| DepthaiError::new(format!("invalid JSON from depthai-core: {e}")))
}
pub(crate) struct PipelineInner {
handle: DaiPipeline,
}
unsafe impl Send for PipelineInner {}
unsafe impl Sync for PipelineInner {}
impl Drop for PipelineInner {
fn drop(&mut self) {
if !self.handle.is_null() {
unsafe { depthai::dai_pipeline_delete(self.handle) };
}
}
}
#[derive(Clone)]
pub struct Pipeline {
inner: Arc<PipelineInner>,
}
#[derive(Clone, Default)]
pub struct PipelineBuilder {
device: Option<Device>,
create_implicit_device: Option<bool>,
xlink_chunk_size: Option<i32>,
sipp_buffer_size: Option<i32>,
sipp_dma_buffer_size: Option<i32>,
camera_tuning_blob_path: Option<PathBuf>,
openvino_version: Option<OpenVinoVersion>,
calibration_data_json: Option<serde_json::Value>,
global_properties_json: Option<serde_json::Value>,
board_config_json: Option<serde_json::Value>,
eeprom_data_json: Option<serde_json::Value>,
holistic_record_json: Option<serde_json::Value>,
holistic_replay_path: Option<PathBuf>,
}
impl PipelineBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_device(mut self, device: &Device) -> Self {
self.device = Some(device.clone());
self
}
pub fn with_implicit_device(mut self, create_implicit_device: bool) -> Self {
self.create_implicit_device = Some(create_implicit_device);
self
}
pub fn host_only(mut self) -> Self {
self.device = None;
self.create_implicit_device = Some(false);
self
}
pub fn xlink_chunk_size(mut self, size_bytes: i32) -> Self {
self.xlink_chunk_size = Some(size_bytes);
self
}
pub fn sipp_buffer_size(mut self, size_bytes: i32) -> Self {
self.sipp_buffer_size = Some(size_bytes);
self
}
pub fn sipp_dma_buffer_size(mut self, size_bytes: i32) -> Self {
self.sipp_dma_buffer_size = Some(size_bytes);
self
}
pub fn camera_tuning_blob_path(mut self, path: impl Into<PathBuf>) -> Self {
self.camera_tuning_blob_path = Some(path.into());
self
}
pub fn openvino_version(mut self, version: OpenVinoVersion) -> Self {
self.openvino_version = Some(version);
self
}
pub fn calibration_data_json(mut self, value: serde_json::Value) -> Self {
self.calibration_data_json = Some(value);
self
}
pub fn global_properties_json(mut self, value: serde_json::Value) -> Self {
self.global_properties_json = Some(value);
self
}
pub fn board_config_json(mut self, value: serde_json::Value) -> Self {
self.board_config_json = Some(value);
self
}
pub fn eeprom_data_json(mut self, value: serde_json::Value) -> Self {
self.eeprom_data_json = Some(value);
self
}
pub fn holistic_record_json(mut self, value: serde_json::Value) -> Self {
self.holistic_record_json = Some(value);
self
}
pub fn holistic_replay_path(mut self, path: impl Into<PathBuf>) -> Self {
self.holistic_replay_path = Some(path.into());
self
}
pub fn build(self) -> Result<Pipeline> {
let pipeline = if let Some(device) = &self.device {
Pipeline::create_with_device(device)?
} else if let Some(create_implicit_device) = self.create_implicit_device {
Pipeline::new_with_implicit_device(create_implicit_device)?
} else {
Pipeline::try_new()?
};
if let Some(v) = self.xlink_chunk_size {
pipeline.set_xlink_chunk_size(v)?;
}
if let Some(v) = self.sipp_buffer_size {
pipeline.set_sipp_buffer_size(v)?;
}
if let Some(v) = self.sipp_dma_buffer_size {
pipeline.set_sipp_dma_buffer_size(v)?;
}
if let Some(path) = self.camera_tuning_blob_path {
pipeline.set_camera_tuning_blob_path(path)?;
}
if let Some(v) = self.openvino_version {
pipeline.set_openvino_version(v)?;
}
if let Some(v) = self.calibration_data_json {
pipeline.set_calibration_data_json(&v)?;
}
if let Some(v) = self.global_properties_json {
pipeline.set_global_properties_json(&v)?;
}
if let Some(v) = self.board_config_json {
pipeline.set_board_config_json(&v)?;
}
if let Some(v) = self.eeprom_data_json {
pipeline.set_eeprom_data_json(&v)?;
}
if let Some(v) = self.holistic_record_json {
pipeline.enable_holistic_record_json(&v)?;
}
if let Some(path) = self.holistic_replay_path {
pipeline.enable_holistic_replay(path)?;
}
Ok(pipeline)
}
}
impl Pipeline {
pub fn new() -> PipelineBuilder {
PipelineBuilder::new()
}
pub fn builder() -> PipelineBuilder {
PipelineBuilder::new()
}
pub fn try_new() -> Result<Self> {
clear_error_flag();
let handle = depthai::dai_pipeline_new();
if handle.is_null() {
Err(last_error("failed to create pipeline"))
} else {
Ok(Self {
inner: Arc::new(PipelineInner { handle }),
})
}
}
pub fn new_with_implicit_device(create_implicit_device: bool) -> Result<Self> {
clear_error_flag();
let handle = depthai::dai_pipeline_new_ex(create_implicit_device);
if handle.is_null() {
Err(last_error("failed to create pipeline"))
} else {
Ok(Self {
inner: Arc::new(PipelineInner { handle }),
})
}
}
pub fn new_host_only() -> Result<Self> {
Self::new_with_implicit_device(false)
}
pub fn with_device(device: &Device) -> Result<Self> {
Self::create_with_device(device)
}
pub(crate) fn create_with_device(device: &Device) -> Result<Self> {
clear_error_flag();
let handle = unsafe { depthai::dai_pipeline_new_with_device(device.handle()) };
if handle.is_null() {
Err(last_error("failed to create pipeline with device"))
} else {
Ok(Self {
inner: Arc::new(PipelineInner { handle }),
})
}
}
pub fn default_device(&self) -> Result<Device> {
clear_error_flag();
let handle = unsafe { depthai::dai_pipeline_get_default_device(self.inner.handle) };
if handle.is_null() {
Err(last_error("failed to get pipeline default device"))
} else {
Ok(Device::from_handle(handle))
}
}
pub fn create<T: CreateInPipeline>(&self) -> Result<T> {
T::create(self)
}
pub fn create_with<T: CreateInPipelineWith<P>, P>(&self, params: P) -> Result<T> {
T::create_with(self, params)
}
pub fn create_node(&self, name: &str) -> Result<Node> {
node::create_node_by_name(self.inner_arc(), name)
}
pub fn create_host_node<T: HostNodeImpl>(&self, node: T) -> Result<HostNode> {
create_host_node(self, node)
}
pub fn create_threaded_host_node<T: ThreadedHostNodeImpl, F>(&self, init: F) -> Result<ThreadedHostNode>
where
F: FnOnce(&ThreadedHostNode) -> Result<T>,
{
create_threaded_host_node(self, init)
}
pub fn create_camera(&self, socket: CameraBoardSocket) -> Result<CameraNode> {
clear_error_flag();
let handle =
unsafe { depthai::dai_pipeline_create_camera(self.inner.handle, c_int(socket.as_raw())) };
if handle.is_null() {
Err(last_error("failed to create camera node"))
} else {
Ok(CameraNode::from_handle(self.inner_arc(), handle))
}
}
pub fn start(&self) -> Result<()> {
clear_error_flag();
let started = unsafe { depthai::dai_pipeline_start(self.inner.handle) };
if started {
Ok(())
} else {
Err(last_error("failed to start pipeline"))
}
}
pub fn is_running(&self) -> Result<bool> {
clear_error_flag();
let v = unsafe { depthai::dai_pipeline_is_running(self.inner.handle) };
if let Some(e) = crate::error::take_error_if_any("failed to query pipeline running state") {
Err(e)
} else {
Ok(v)
}
}
pub fn is_built(&self) -> Result<bool> {
clear_error_flag();
let v = unsafe { depthai::dai_pipeline_is_built(self.inner.handle) };
if let Some(e) = crate::error::take_error_if_any("failed to query pipeline built state") {
Err(e)
} else {
Ok(v)
}
}
pub fn build(&self) -> Result<()> {
clear_error_flag();
let ok = unsafe { depthai::dai_pipeline_build(self.inner.handle) };
if ok {
Ok(())
} else {
Err(last_error("failed to build pipeline"))
}
}
pub fn wait(&self) -> Result<()> {
clear_error_flag();
let ok = unsafe { depthai::dai_pipeline_wait(self.inner.handle) };
if ok {
Ok(())
} else {
Err(last_error("failed while waiting for pipeline"))
}
}
pub fn stop(&self) -> Result<()> {
clear_error_flag();
let ok = unsafe { depthai::dai_pipeline_stop(self.inner.handle) };
if ok {
Ok(())
} else {
Err(last_error("failed to stop pipeline"))
}
}
pub fn run(&self) -> Result<()> {
clear_error_flag();
let ok = unsafe { depthai::dai_pipeline_run(self.inner.handle) };
if ok {
Ok(())
} else {
Err(last_error("failed to run pipeline"))
}
}
pub fn process_tasks(&self, wait_for_tasks: bool, timeout_seconds: f64) -> Result<()> {
clear_error_flag();
let ok = unsafe {
depthai::dai_pipeline_process_tasks(self.inner.handle, wait_for_tasks, timeout_seconds)
};
if ok {
Ok(())
} else {
Err(last_error("failed to process pipeline tasks"))
}
}
pub fn set_xlink_chunk_size(&self, size_bytes: i32) -> Result<()> {
clear_error_flag();
let ok = unsafe {
depthai::dai_pipeline_set_xlink_chunk_size(self.inner.handle, c_int(size_bytes))
};
if ok {
Ok(())
} else {
Err(last_error("failed to set XLink chunk size"))
}
}
pub fn set_sipp_buffer_size(&self, size_bytes: i32) -> Result<()> {
clear_error_flag();
let ok = unsafe {
depthai::dai_pipeline_set_sipp_buffer_size(self.inner.handle, c_int(size_bytes))
};
if ok {
Ok(())
} else {
Err(last_error("failed to set SIPP buffer size"))
}
}
pub fn set_sipp_dma_buffer_size(&self, size_bytes: i32) -> Result<()> {
clear_error_flag();
let ok = unsafe {
depthai::dai_pipeline_set_sipp_dma_buffer_size(self.inner.handle, c_int(size_bytes))
};
if ok {
Ok(())
} else {
Err(last_error("failed to set SIPP DMA buffer size"))
}
}
pub fn set_camera_tuning_blob_path(&self, path: impl AsRef<Path>) -> Result<()> {
clear_error_flag();
let path = path.as_ref();
let path_str = path
.to_str()
.ok_or_else(|| last_error("camera tuning blob path must be valid UTF-8"))?;
let path_c = CString::new(path_str).map_err(|_| last_error("invalid path"))?;
let ok = unsafe {
depthai::dai_pipeline_set_camera_tuning_blob_path(self.inner.handle, path_c.as_ptr())
};
if ok {
Ok(())
} else {
Err(last_error("failed to set camera tuning blob path"))
}
}
pub fn set_openvino_version(&self, version: OpenVinoVersion) -> Result<()> {
clear_error_flag();
let ok = unsafe {
depthai::dai_pipeline_set_openvino_version(self.inner.handle, c_int(version as i32))
};
if ok {
Ok(())
} else {
Err(last_error("failed to set OpenVINO version"))
}
}
pub fn serialize_to_json(&self, include_assets: bool) -> Result<serde_json::Value> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_serialize_to_json(self.inner.handle, include_assets) };
let s = take_owned_json_string(ptr, "failed to serialize pipeline to json")?;
parse_json_value(&s)
}
pub fn schema_json(&self, serialization_type: SerializationType) -> Result<serde_json::Value> {
clear_error_flag();
let ptr = unsafe {
depthai::dai_pipeline_get_schema_json(self.inner.handle, c_int(serialization_type as i32))
};
let s = take_owned_json_string(ptr, "failed to get pipeline schema json")?;
parse_json_value(&s)
}
pub fn all_nodes(&self) -> Result<Vec<PipelineNodeInfo>> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_all_nodes_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get pipeline nodes")?;
let v = parse_json_value(&s)?;
serde_json::from_value(v)
.map_err(|e| DepthaiError::new(format!("invalid nodes JSON from depthai-core: {e}")))
}
pub fn source_nodes(&self) -> Result<Vec<PipelineNodeInfo>> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_source_nodes_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get pipeline source nodes")?;
let v = parse_json_value(&s)?;
serde_json::from_value(v)
.map_err(|e| DepthaiError::new(format!("invalid source nodes JSON from depthai-core: {e}")))
}
pub fn node_by_id(&self, id: i32) -> Result<Option<Node>> {
clear_error_flag();
let handle = unsafe { depthai::dai_pipeline_get_node_by_id(self.inner.handle, c_int(id)) };
if handle.is_null() {
if let Some(e) = crate::error::take_error_if_any("failed to get node by id") {
Err(e)
} else {
Ok(None)
}
} else {
Ok(Some(Node::from_handle(self.inner_arc(), handle)))
}
}
pub fn remove_node(&self, node: &Node) -> Result<()> {
clear_error_flag();
let ok = unsafe { depthai::dai_pipeline_remove_node(self.inner.handle, node.handle()) };
if ok {
Ok(())
} else {
Err(last_error("failed to remove node from pipeline"))
}
}
pub fn connections(&self) -> Result<Vec<PipelineConnectionInfo>> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_connections_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get pipeline connections")?;
let v = parse_json_value(&s)?;
serde_json::from_value(v).map_err(|e| {
DepthaiError::new(format!("invalid connections JSON from depthai-core: {e}"))
})
}
pub fn connection_map(&self) -> Result<HashMap<i32, Vec<PipelineConnectionInfo>>> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_connection_map_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get pipeline connection map")?;
let v = parse_json_value(&s)?;
let raw: HashMap<String, Vec<PipelineConnectionInfo>> = serde_json::from_value(v)
.map_err(|e| DepthaiError::new(format!("invalid connection map JSON from depthai-core: {e}")))?;
let mut out = HashMap::with_capacity(raw.len());
for (k, v) in raw {
let id = k.parse::<i32>().map_err(|e| {
DepthaiError::new(format!("invalid connection map key '{k}': {e}"))
})?;
out.insert(id, v);
}
Ok(out)
}
pub fn is_calibration_data_available(&self) -> Result<bool> {
clear_error_flag();
let v = unsafe { depthai::dai_pipeline_is_calibration_data_available(self.inner.handle) };
if let Some(e) = crate::error::take_error_if_any("failed to query pipeline calibration availability") {
Err(e)
} else {
Ok(v)
}
}
pub fn calibration_data_json(&self) -> Result<Option<serde_json::Value>> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_calibration_data_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get pipeline calibration data")?;
let v = parse_json_value(&s)?;
if v.is_null() {
Ok(None)
} else {
Ok(Some(v))
}
}
pub fn set_calibration_data_json(&self, eeprom_data: &serde_json::Value) -> Result<()> {
clear_error_flag();
if eeprom_data.is_null() {
return Err(DepthaiError::new(
"calibration data cannot be null (DepthAI does not support clearing it)",
));
}
let s = serde_json::to_string(eeprom_data)
.map_err(|e| DepthaiError::new(format!("failed to serialize JSON: {e}")))?;
let c = CString::new(s).map_err(|_| last_error("invalid JSON (contains NUL)"))?;
let ok = unsafe { depthai::dai_pipeline_set_calibration_data_json(self.inner.handle, c.as_ptr()) };
if ok {
Ok(())
} else {
Err(last_error("failed to set pipeline calibration data"))
}
}
pub fn global_properties_json(&self) -> Result<serde_json::Value> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_global_properties_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get global properties")?;
parse_json_value(&s)
}
pub fn set_global_properties_json(&self, value: &serde_json::Value) -> Result<()> {
clear_error_flag();
let s = serde_json::to_string(value)
.map_err(|e| DepthaiError::new(format!("failed to serialize JSON: {e}")))?;
let c = CString::new(s).map_err(|_| last_error("invalid JSON (contains NUL)"))?;
let ok = unsafe { depthai::dai_pipeline_set_global_properties_json(self.inner.handle, c.as_ptr()) };
if ok {
Ok(())
} else {
Err(last_error("failed to set global properties"))
}
}
pub fn board_config_json(&self) -> Result<serde_json::Value> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_board_config_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get board config")?;
parse_json_value(&s)
}
pub fn set_board_config_json(&self, value: &serde_json::Value) -> Result<()> {
clear_error_flag();
let s = serde_json::to_string(value)
.map_err(|e| DepthaiError::new(format!("failed to serialize JSON: {e}")))?;
let c = CString::new(s).map_err(|_| last_error("invalid JSON (contains NUL)"))?;
let ok = unsafe { depthai::dai_pipeline_set_board_config_json(self.inner.handle, c.as_ptr()) };
if ok {
Ok(())
} else {
Err(last_error("failed to set board config"))
}
}
pub fn device_config_json(&self) -> Result<serde_json::Value> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_device_config_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get device config")?;
parse_json_value(&s)
}
pub fn eeprom_data_json(&self) -> Result<serde_json::Value> {
clear_error_flag();
let ptr = unsafe { depthai::dai_pipeline_get_eeprom_data_json(self.inner.handle) };
let s = take_owned_json_string(ptr, "failed to get EEPROM data")?;
parse_json_value(&s)
}
pub fn set_eeprom_data_json(&self, value: &serde_json::Value) -> Result<()> {
clear_error_flag();
let s = serde_json::to_string(value)
.map_err(|e| DepthaiError::new(format!("failed to serialize JSON: {e}")))?;
let c = CString::new(s).map_err(|_| last_error("invalid JSON (contains NUL)"))?;
let ok = unsafe { depthai::dai_pipeline_set_eeprom_data_json(self.inner.handle, c.as_ptr()) };
if ok {
Ok(())
} else {
Err(last_error("failed to set EEPROM data"))
}
}
pub fn eeprom_id(&self) -> Result<u32> {
clear_error_flag();
let id = unsafe { depthai::dai_pipeline_get_eeprom_id(self.inner.handle) };
if let Some(e) = crate::error::take_error_if_any("failed to get EEPROM id") {
Err(e)
} else {
Ok(id)
}
}
pub fn enable_holistic_record_json(&self, record_config: &serde_json::Value) -> Result<()> {
clear_error_flag();
let s = serde_json::to_string(record_config)
.map_err(|e| DepthaiError::new(format!("failed to serialize JSON: {e}")))?;
let c = CString::new(s).map_err(|_| last_error("invalid JSON (contains NUL)"))?;
let ok = unsafe {
depthai::dai_pipeline_enable_holistic_record_json(self.inner.handle, c.as_ptr())
};
if ok {
Ok(())
} else {
Err(last_error("failed to enable holistic record"))
}
}
pub fn enable_holistic_replay(&self, path_to_recording: impl AsRef<Path>) -> Result<()> {
clear_error_flag();
let path = path_to_recording.as_ref();
let path_str = path
.to_str()
.ok_or_else(|| last_error("recording path must be valid UTF-8"))?;
let c = CString::new(path_str).map_err(|_| last_error("invalid path"))?;
let ok = unsafe {
depthai::dai_pipeline_enable_holistic_replay(self.inner.handle, c.as_ptr())
};
if ok {
Ok(())
} else {
Err(last_error("failed to enable holistic replay"))
}
}
#[deprecated(note = "use Pipeline::start()")]
pub fn start_default(&self) -> Result<()> {
self.start()
}
pub(crate) fn handle(&self) -> DaiPipeline {
self.inner.handle
}
pub(crate) fn inner_arc(&self) -> Arc<PipelineInner> {
Arc::clone(&self.inner)
}
}
unsafe impl Send for Pipeline {}
unsafe impl Sync for Pipeline {}