1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use crate::{
base::DEFAULT_TIMEOUT,
common::*,
error::{Error as RsError, ErrorChecker, Result},
frame::{AnyFrame, Frame, GenericFrameEx},
frame_kind::FrameKind,
};
#[derive(Debug)]
pub struct FrameQueue {
pub(crate) ptr: NonNull<sys::rs2_frame_queue>,
}
impl FrameQueue {
pub fn with_capacity(capacity: usize) -> Result<Self> {
let queue = unsafe {
let mut checker = ErrorChecker::new();
let ptr = sys::rs2_create_frame_queue(capacity as c_int, checker.inner_mut_ptr());
checker.check()?;
Self::from_raw(ptr)
};
Ok(queue)
}
pub fn enqueue<Kind>(&mut self, frame: Frame<Kind>)
where
Kind: FrameKind,
{
unsafe {
sys::rs2_enqueue_frame(frame.ptr.as_ptr(), self.ptr.cast::<c_void>().as_ptr());
}
}
pub fn wait(&mut self, timeout: Option<Duration>) -> Result<AnyFrame> {
let timeout_ms = timeout.unwrap_or(DEFAULT_TIMEOUT).as_millis() as c_uint;
let frame = loop {
let mut checker = ErrorChecker::new();
let ptr = unsafe {
sys::rs2_wait_for_frame(self.ptr.as_ptr(), timeout_ms, checker.inner_mut_ptr())
};
match (timeout, checker.check()) {
(None, Err(RsError::Timeout(..))) => continue,
tuple => {
let (_, result) = tuple;
result?;
}
}
let frame = unsafe { Frame::from_raw(ptr) };
break frame;
};
Ok(frame)
}
pub async fn wait_async(&mut self, timeout: Option<Duration>) -> Result<AnyFrame> {
let timeout_ms = timeout
.map(|duration| duration.as_millis() as c_uint)
.unwrap_or(sys::RS2_DEFAULT_TIMEOUT as c_uint);
let (tx, rx) = futures::channel::oneshot::channel();
let queue_ptr = AtomicPtr::new(self.ptr.as_ptr());
thread::spawn(move || {
let result = unsafe {
loop {
let mut checker = ErrorChecker::new();
let ptr = sys::rs2_wait_for_frame(
queue_ptr.load(Ordering::Relaxed),
timeout_ms,
checker.inner_mut_ptr(),
);
let result = match (timeout, checker.check()) {
(None, Err(RsError::Timeout(..))) => continue,
(_, result) => result.map(|_| Frame::from_raw(ptr)),
};
break result;
}
};
let _ = tx.send(result);
});
let frame = rx.await.unwrap()?;
Ok(frame)
}
pub fn try_wait(&mut self) -> Result<Option<AnyFrame>> {
unsafe {
let mut checker = ErrorChecker::new();
let mut ptr: *mut sys::rs2_frame = ptr::null_mut();
let ret = sys::rs2_poll_for_frame(
self.ptr.as_ptr(),
&mut ptr as *mut _,
checker.inner_mut_ptr(),
);
checker.check()?;
if ret != 0 {
let frame = Frame::from_raw(ptr);
Ok(Some(frame))
} else {
Ok(None)
}
}
}
pub fn into_raw(self) -> *mut sys::rs2_frame_queue {
let ptr = self.ptr;
mem::forget(self);
ptr.as_ptr()
}
pub unsafe fn from_raw(ptr: *mut sys::rs2_frame_queue) -> Self {
Self {
ptr: NonNull::new(ptr).unwrap(),
}
}
pub(crate) unsafe fn unsafe_clone(&self) -> Self {
Self { ptr: self.ptr }
}
}
impl Drop for FrameQueue {
fn drop(&mut self) {
unsafe {
sys::rs2_delete_frame_queue(self.ptr.as_ptr());
}
}
}
unsafe impl Send for FrameQueue {}