realsense_rust/pipeline/active.rs
1//! Defines the pipeline type.
2
3use super::{inactive::InactivePipeline, profile::PipelineProfile};
4use crate::{check_rs2_error, frame::CompositeFrame, kind::Rs2Exception};
5use anyhow::Result;
6use realsense_sys as sys;
7use std::{ptr::NonNull, task::Poll, time::Duration};
8use thiserror::Error;
9
10/// Enumeration over possible errors that can occur when waiting for a frame.
11#[derive(Error, Debug)]
12pub enum FrameWaitError {
13 /// librealsense2 had an internal error occur while waiting for frames.
14 #[error("An internal error occurred while waiting for frames. Type: {0}; Reason: {1}")]
15 DidErrorDuringFrameWait(Rs2Exception, String),
16 /// librealsense2 had an internal error while polling for the next set of frames.
17 #[error("An internal error occurred while polling for the next set of frames. Type: {0}; Reason: {1}")]
18 DidErrorDuringFramePoll(Rs2Exception, String),
19 /// The associated function timed out while waiting for frames.
20 #[error("Timed out while waiting for frame.")]
21 DidTimeoutBeforeFrameArrival,
22}
23
24/// Type representing an "active" pipeline which is configured and can acquire frames.
25#[derive(Debug)]
26pub struct ActivePipeline {
27 /// A (non-null) pointer to the pipeline.
28 pipeline_ptr: NonNull<sys::rs2_pipeline>,
29 /// The pipeline's profile, which contains the device the pipeline is configured for alongside
30 /// the stream profiles for streams in the pipeline.
31 profile: PipelineProfile,
32}
33
34impl Drop for ActivePipeline {
35 fn drop(&mut self) {
36 unsafe {
37 sys::rs2_delete_pipeline(self.pipeline_ptr.as_ptr());
38 }
39 }
40}
41
42unsafe impl Send for ActivePipeline {}
43
44impl ActivePipeline {
45 /// Constructs a new active pipeline from the constituent components
46 ///
47 /// This is only to be used / called from the [`InactivePipeline`] type.
48 pub(crate) fn new(pipeline_ptr: NonNull<sys::rs2_pipeline>, profile: PipelineProfile) -> Self {
49 Self {
50 pipeline_ptr,
51 profile,
52 }
53 }
54
55 /// Gets the active profile of pipeline.
56 pub fn profile(&self) -> &PipelineProfile {
57 &self.profile
58 }
59
60 /// Stop the pipeline.
61 ///
62 /// This method consumes the pipeline instance and returns pipeline markered inactive.
63 pub fn stop(self) -> InactivePipeline {
64 unsafe {
65 let mut err = std::ptr::null_mut::<sys::rs2_error>();
66
67 // The only "error" that can occur here is if the pipeline pointer is null.
68 //
69 // We know it is not (state is managed so that this isn't a possibility, and we use
70 // `NonNull` to try and guarantee that even beyond our state management), so there
71 // dealing with the error (and thus returning a result type) is superfluous here.
72 sys::rs2_pipeline_stop(self.pipeline_ptr.as_ptr(), &mut err);
73
74 let inactive = InactivePipeline::new(self.pipeline_ptr);
75
76 std::mem::forget(self);
77 inactive
78 }
79 }
80
81 /// Waits to get a new composite frame, blocking the calling thread.
82 ///
83 /// Returns a composite frame from the pipeline, blocking the calling thread until a frame is
84 /// available. This method can return an error if an internal exception occurs or if the thread
85 /// waits more than the duration provided by `timeout_ms` (in milliseconds).
86 ///
87 /// # Arguments
88 ///
89 /// * `timeout_ms` - The timeout in milliseconds. If the thread blocks for longer than this
90 /// duration, it will exit early with a [`FrameWaitError::DidTimeoutBeforeFrameArrival`]. If
91 /// `None` is passed in, the [default timeout](realsense_sys::RS2_DEFAULT_TIMEOUT) is applied.
92 ///
93 /// # Errors
94 ///
95 /// Returns [`FrameWaitError::DidErrorDuringFrameWait`] if an internal error occurs while
96 /// waiting for next frame(s).
97 ///
98 /// Returns [`FrameWaitError::DidTimeoutBeforeFrameArrival`] if the thread waits more than
99 /// `timeout_ms` (in milliseconds) without returning a frame.
100 pub fn wait(&mut self, timeout_ms: Option<Duration>) -> Result<CompositeFrame, FrameWaitError> {
101 let timeout_ms = match timeout_ms {
102 Some(d) => d.as_millis() as u32,
103 None => sys::RS2_DEFAULT_TIMEOUT,
104 };
105
106 unsafe {
107 let mut err = std::ptr::null_mut::<sys::rs2_error>();
108 let mut frame = std::ptr::null_mut::<sys::rs2_frame>();
109
110 // NOTE: You may notice that there is a `sys::rs2_pipeline_wait_for_frames` and you
111 // might wonder why we only use this variant. Primarily, they do the same thing, but
112 // this API is a bit cleaner since it makes it easy to detect if a timeout occurred.
113 // If you use `rs2_pipeline_wait_for_frames` instead of
114 // `rs2_pipeline_try_wait_for_frames` then you need to parse the returned `rs2_error`
115 // message to determine if a timeout occurred. Here, we can just check if
116 // `did_get_frame` is false (0), and provided no other errors occurred, then that is
117 // indicative of a timeout.
118 let did_get_frame = sys::rs2_pipeline_try_wait_for_frames(
119 self.pipeline_ptr.as_ptr(),
120 &mut frame,
121 timeout_ms,
122 &mut err,
123 );
124 check_rs2_error!(err, FrameWaitError::DidErrorDuringFrameWait)?;
125
126 if did_get_frame != 0 {
127 Ok(CompositeFrame::from(NonNull::new(frame).unwrap()))
128 } else {
129 Err(FrameWaitError::DidTimeoutBeforeFrameArrival)
130 }
131 }
132 }
133
134 /// Poll if next frame is immediately available.
135 ///
136 /// Unlike [`ActivePipeline::wait`], the method does not block and returns None immediately if
137 /// the next frame is not available. Returns [`Poll::Pending`] if no frame is yet available,
138 /// and returns [`Poll::Ready`] if the next composite frame is found.
139 ///
140 /// # Errors
141 ///
142 /// Returns [`FrameWaitError::DidErrorDuringFramePoll`] if an internal error occurs while
143 /// polling for the next frame.
144 pub fn poll(&mut self) -> Result<Poll<CompositeFrame>, FrameWaitError> {
145 unsafe {
146 let mut err = std::ptr::null_mut::<sys::rs2_error>();
147 let mut frame_ptr = std::ptr::null_mut::<sys::rs2_frame>();
148 let did_get_frame = sys::rs2_pipeline_poll_for_frames(
149 self.pipeline_ptr.as_ptr(),
150 &mut frame_ptr,
151 &mut err,
152 );
153 check_rs2_error!(err, FrameWaitError::DidErrorDuringFramePoll)?;
154
155 if did_get_frame != 0 {
156 Ok(Poll::Ready(CompositeFrame::from(
157 NonNull::new(frame_ptr).unwrap(),
158 )))
159 } else {
160 Ok(Poll::Pending)
161 }
162 }
163 }
164}