realsense_rust/pipeline/
active.rs

1//! Defines the pipeline type.
2
3use super::{inactive::InactivePipeline, profile::PipelineProfile};
4use crate::{check_rs2_error, frame::CompositeFrame, kind::Rs2Exception};
5use anyhow::Result;
6use realsense_sys as sys;
7use std::{ptr::NonNull, task::Poll, time::Duration};
8use thiserror::Error;
9
10/// Enumeration over possible errors that can occur when waiting for a frame.
11#[derive(Error, Debug)]
12pub enum FrameWaitError {
13    /// librealsense2 had an internal error occur while waiting for frames.
14    #[error("An internal error occurred while waiting for frames. Type: {0}; Reason: {1}")]
15    DidErrorDuringFrameWait(Rs2Exception, String),
16    /// librealsense2 had an internal error while polling for the next set of frames.
17    #[error("An internal error occurred while polling for the next set of frames. Type: {0}; Reason: {1}")]
18    DidErrorDuringFramePoll(Rs2Exception, String),
19    /// The associated function timed out while waiting for frames.
20    #[error("Timed out while waiting for frame.")]
21    DidTimeoutBeforeFrameArrival,
22}
23
24/// Type representing an "active" pipeline which is configured and can acquire frames.
25#[derive(Debug)]
26pub struct ActivePipeline {
27    /// A (non-null) pointer to the pipeline.
28    pipeline_ptr: NonNull<sys::rs2_pipeline>,
29    /// The pipeline's profile, which contains the device the pipeline is configured for alongside
30    /// the stream profiles for streams in the pipeline.
31    profile: PipelineProfile,
32}
33
34impl Drop for ActivePipeline {
35    fn drop(&mut self) {
36        unsafe {
37            sys::rs2_delete_pipeline(self.pipeline_ptr.as_ptr());
38        }
39    }
40}
41
42unsafe impl Send for ActivePipeline {}
43
44impl ActivePipeline {
45    /// Constructs a new active pipeline from the constituent components
46    ///
47    /// This is only to be used / called from the [`InactivePipeline`] type.
48    pub(crate) fn new(pipeline_ptr: NonNull<sys::rs2_pipeline>, profile: PipelineProfile) -> Self {
49        Self {
50            pipeline_ptr,
51            profile,
52        }
53    }
54
55    /// Gets the active profile of pipeline.
56    pub fn profile(&self) -> &PipelineProfile {
57        &self.profile
58    }
59
60    /// Stop the pipeline.
61    ///
62    /// This method consumes the pipeline instance and returns pipeline markered inactive.
63    pub fn stop(self) -> InactivePipeline {
64        unsafe {
65            let mut err = std::ptr::null_mut::<sys::rs2_error>();
66
67            // The only "error" that can occur here is if the pipeline pointer is null.
68            //
69            // We know it is not (state is managed so that this isn't a possibility, and we use
70            // `NonNull` to try and guarantee that even beyond our state management), so there
71            // dealing with the error (and thus returning a result type) is superfluous here.
72            sys::rs2_pipeline_stop(self.pipeline_ptr.as_ptr(), &mut err);
73
74            let inactive = InactivePipeline::new(self.pipeline_ptr);
75
76            std::mem::forget(self);
77            inactive
78        }
79    }
80
81    /// Waits to get a new composite frame, blocking the calling thread.
82    ///
83    /// Returns a composite frame from the pipeline, blocking the calling thread until a frame is
84    /// available. This method can return an error if an internal exception occurs or if the thread
85    /// waits more than the duration provided by `timeout_ms` (in milliseconds).
86    ///
87    /// # Arguments
88    ///
89    /// * `timeout_ms` - The timeout in milliseconds. If the thread blocks for longer than this
90    /// duration, it will exit early with a [`FrameWaitError::DidTimeoutBeforeFrameArrival`]. If
91    /// `None` is passed in, the [default timeout](realsense_sys::RS2_DEFAULT_TIMEOUT) is applied.
92    ///
93    /// # Errors
94    ///
95    /// Returns [`FrameWaitError::DidErrorDuringFrameWait`] if an internal error occurs while
96    /// waiting for next frame(s).
97    ///
98    /// Returns [`FrameWaitError::DidTimeoutBeforeFrameArrival`] if the thread waits more than
99    /// `timeout_ms` (in milliseconds) without returning a frame.
100    pub fn wait(&mut self, timeout_ms: Option<Duration>) -> Result<CompositeFrame, FrameWaitError> {
101        let timeout_ms = match timeout_ms {
102            Some(d) => d.as_millis() as u32,
103            None => sys::RS2_DEFAULT_TIMEOUT,
104        };
105
106        unsafe {
107            let mut err = std::ptr::null_mut::<sys::rs2_error>();
108            let mut frame = std::ptr::null_mut::<sys::rs2_frame>();
109
110            // NOTE: You may notice that there is a `sys::rs2_pipeline_wait_for_frames` and you
111            // might wonder why we only use this variant. Primarily, they do the same thing, but
112            // this API is a bit cleaner since it makes it easy to detect if a timeout occurred.
113            // If you use `rs2_pipeline_wait_for_frames` instead of
114            // `rs2_pipeline_try_wait_for_frames` then you need to parse the returned `rs2_error`
115            // message to determine if a timeout occurred. Here, we can just check if
116            // `did_get_frame` is false (0), and provided no other errors occurred, then that is
117            // indicative of a timeout.
118            let did_get_frame = sys::rs2_pipeline_try_wait_for_frames(
119                self.pipeline_ptr.as_ptr(),
120                &mut frame,
121                timeout_ms,
122                &mut err,
123            );
124            check_rs2_error!(err, FrameWaitError::DidErrorDuringFrameWait)?;
125
126            if did_get_frame != 0 {
127                Ok(CompositeFrame::from(NonNull::new(frame).unwrap()))
128            } else {
129                Err(FrameWaitError::DidTimeoutBeforeFrameArrival)
130            }
131        }
132    }
133
134    /// Poll if next frame is immediately available.
135    ///
136    /// Unlike [`ActivePipeline::wait`], the method does not block and returns None immediately if
137    /// the next frame is not available. Returns [`Poll::Pending`] if no frame is yet available,
138    /// and returns [`Poll::Ready`] if the next composite frame is found.
139    ///
140    /// # Errors
141    ///
142    /// Returns [`FrameWaitError::DidErrorDuringFramePoll`] if an internal error occurs while
143    /// polling for the next frame.
144    pub fn poll(&mut self) -> Result<Poll<CompositeFrame>, FrameWaitError> {
145        unsafe {
146            let mut err = std::ptr::null_mut::<sys::rs2_error>();
147            let mut frame_ptr = std::ptr::null_mut::<sys::rs2_frame>();
148            let did_get_frame = sys::rs2_pipeline_poll_for_frames(
149                self.pipeline_ptr.as_ptr(),
150                &mut frame_ptr,
151                &mut err,
152            );
153            check_rs2_error!(err, FrameWaitError::DidErrorDuringFramePoll)?;
154
155            if did_get_frame != 0 {
156                Ok(Poll::Ready(CompositeFrame::from(
157                    NonNull::new(frame_ptr).unwrap(),
158                )))
159            } else {
160                Ok(Poll::Pending)
161            }
162        }
163    }
164}