1use std::cmp;
4use std::ffi::CStr;
5use std::fmt;
6use std::future::Future;
7use std::ops::Deref;
8use std::os::raw::c_char;
9use std::os::raw::c_void;
10use std::ptr;
11use std::ptr::NonNull;
12use std::slice;
13use std::sync::Arc;
14#[cfg(feature = "naive-runtime")]
15use std::thread;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18#[cfg(feature = "naive-runtime")]
19use futures_channel::oneshot;
20#[cfg(feature = "naive-runtime")]
21use futures_util::future::{FutureExt, Map};
22
23use crate::log::trace;
24
25use rdkafka_sys as rdsys;
26
27pub fn get_rdkafka_version() -> (i32, String) {
30 let version_number = unsafe { rdsys::rd_kafka_version() };
31 let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
32 (version_number, c_str.to_string_lossy().into_owned())
33}
34
35pub(crate) enum Deadline {
36 At(Instant),
37 Never,
38}
39
40impl Deadline {
41 const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64);
43
44 pub(crate) fn new(duration: Option<Duration>) -> Self {
45 if let Some(d) = duration {
46 Self::At(Instant::now() + d)
47 } else {
48 Self::Never
49 }
50 }
51
52 pub(crate) fn remaining(&self) -> Duration {
53 if let Deadline::At(i) = self {
54 *i - Instant::now()
55 } else {
56 Duration::MAX
57 }
58 }
59
60 pub(crate) fn remaining_millis_i32(&self) -> i32 {
61 cmp::min(Deadline::MAX_FLUSH_DURATION, self.remaining()).as_millis() as i32
62 }
63
64 pub(crate) fn elapsed(&self) -> bool {
65 self.remaining() <= Duration::ZERO
66 }
67}
68
69#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
71pub enum Timeout {
72 After(Duration),
74 Never,
76}
77
78impl Timeout {
79 pub(crate) fn as_millis(&self) -> i32 {
81 match self {
82 Timeout::After(d) => d.as_millis() as i32,
83 Timeout::Never => -1,
84 }
85 }
86
87 pub(crate) fn saturating_sub(&self, rhs: Duration) -> Timeout {
89 match (self, rhs) {
90 (Timeout::After(lhs), rhs) => Timeout::After(lhs.saturating_sub(rhs)),
91 (Timeout::Never, _) => Timeout::Never,
92 }
93 }
94
95 pub(crate) fn is_zero(&self) -> bool {
97 match self {
98 Timeout::After(d) => d.is_zero(),
99 Timeout::Never => false,
100 }
101 }
102}
103
104impl std::ops::SubAssign for Timeout {
105 fn sub_assign(&mut self, other: Self) {
106 match (self, other) {
107 (Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
108 (Timeout::Never, Timeout::After(_)) => (),
109 _ => panic!("subtraction of Timeout::Never is ill-defined"),
110 }
111 }
112}
113
114impl From<Timeout> for Deadline {
115 fn from(t: Timeout) -> Deadline {
116 if let Timeout::After(dur) = t {
117 Deadline::new(Some(dur))
118 } else {
119 Deadline::new(None)
120 }
121 }
122}
123
124impl From<&Deadline> for Timeout {
125 fn from(d: &Deadline) -> Timeout {
126 if let Deadline::Never = d {
127 Timeout::Never
128 } else {
129 Timeout::After(d.remaining())
130 }
131 }
132}
133
134impl From<Duration> for Timeout {
135 fn from(d: Duration) -> Timeout {
136 Timeout::After(d)
137 }
138}
139
140impl From<Option<Duration>> for Timeout {
141 fn from(v: Option<Duration>) -> Timeout {
142 match v {
143 None => Timeout::Never,
144 Some(d) => Timeout::After(d),
145 }
146 }
147}
148
149pub fn millis_to_epoch(time: SystemTime) -> i64 {
151 time.duration_since(UNIX_EPOCH)
152 .unwrap_or_else(|_| Duration::from_secs(0))
153 .as_millis() as i64
154}
155
156pub fn current_time_millis() -> i64 {
158 millis_to_epoch(SystemTime::now())
159}
160
161pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
164 unsafe {
165 if ptr.is_null() {
166 None
167 } else {
168 Some(slice::from_raw_parts::<T>(ptr as *const T, size))
169 }
170 }
171}
172
173pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
174 ptr: *const c_void,
175 size: usize,
176) -> Option<&'a mut [T]> {
177 unsafe {
178 if ptr.is_null() {
179 None
180 } else {
181 Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
182 }
183 }
184}
185
186pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
189 unsafe {
190 if ptr.is_null() || size == 0 {
191 &[][..]
192 } else {
193 slice::from_raw_parts::<T>(ptr as *const T, size)
194 }
195 }
196}
197
198pub trait IntoOpaque: Send + Sync + Sized {
203 fn into_ptr(self) -> *mut c_void;
205
206 unsafe fn from_ptr(_: *mut c_void) -> Self;
215}
216
217impl IntoOpaque for () {
218 fn into_ptr(self) -> *mut c_void {
219 ptr::null_mut()
220 }
221
222 unsafe fn from_ptr(_: *mut c_void) -> Self {}
223}
224
225impl IntoOpaque for usize {
226 fn into_ptr(self) -> *mut c_void {
227 self as *mut c_void
228 }
229
230 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
231 ptr as usize
232 }
233}
234
235impl<T: Send + Sync> IntoOpaque for Box<T> {
236 fn into_ptr(self) -> *mut c_void {
237 Box::into_raw(self) as *mut c_void
238 }
239
240 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
241 unsafe { Box::from_raw(ptr as *mut T) }
242 }
243}
244
245impl<T: Send + Sync> IntoOpaque for Arc<T> {
246 fn into_ptr(self) -> *mut c_void {
247 Arc::into_raw(self) as *mut c_void
248 }
249
250 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
251 unsafe { Arc::from_raw(ptr as *const T) }
252 }
253}
254
255pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
261 unsafe {
262 CStr::from_ptr(cstr as *const c_char)
263 .to_string_lossy()
264 .into_owned()
265 }
266}
267
268pub(crate) struct ErrBuf {
269 buf: [u8; ErrBuf::MAX_ERR_LEN],
270}
271
272impl ErrBuf {
273 const MAX_ERR_LEN: usize = 512;
274
275 pub fn new() -> ErrBuf {
276 ErrBuf {
277 buf: [0; ErrBuf::MAX_ERR_LEN],
278 }
279 }
280
281 pub fn as_mut_ptr(&mut self) -> *mut c_char {
282 self.buf.as_mut_ptr() as *mut c_char
283 }
284
285 pub fn filled(&self) -> &[u8] {
286 let i = self.buf.iter().position(|c| *c == 0).unwrap();
287 &self.buf[..i + 1]
288 }
289
290 pub fn len(&self) -> usize {
291 self.filled().len()
292 }
293
294 pub fn capacity(&self) -> usize {
295 self.buf.len()
296 }
297}
298
299impl Default for ErrBuf {
300 fn default() -> ErrBuf {
301 ErrBuf::new()
302 }
303}
304
305impl fmt::Display for ErrBuf {
306 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307 write!(
308 f,
309 "{}",
310 CStr::from_bytes_with_nul(self.filled())
311 .unwrap()
312 .to_string_lossy()
313 )
314 }
315}
316
317pub(crate) trait AsCArray<T> {
318 fn as_c_array(&self) -> *mut *mut T;
319}
320
321impl<T: KafkaDrop> AsCArray<T> for Vec<NativePtr<T>> {
322 fn as_c_array(&self) -> *mut *mut T {
323 self.as_ptr() as *mut *mut T
324 }
325}
326
327pub(crate) struct NativePtr<T>
328where
329 T: KafkaDrop,
330{
331 ptr: NonNull<T>,
332}
333
334impl<T> Drop for NativePtr<T>
335where
336 T: KafkaDrop,
337{
338 fn drop(&mut self) {
339 trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
340 unsafe { T::DROP(self.ptr.as_ptr()) }
341 trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
342 }
343}
344
345#[allow(clippy::missing_safety_doc)]
347pub(crate) unsafe trait KafkaDrop {
348 const TYPE: &'static str;
349 const DROP: unsafe extern "C" fn(*mut Self);
350}
351
352impl<T> Deref for NativePtr<T>
353where
354 T: KafkaDrop,
355{
356 type Target = T;
357 fn deref(&self) -> &Self::Target {
358 unsafe { self.ptr.as_ref() }
359 }
360}
361
362impl<T> fmt::Debug for NativePtr<T>
363where
364 T: KafkaDrop,
365{
366 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367 self.ptr.fmt(f)
368 }
369}
370
371impl<T> NativePtr<T>
372where
373 T: KafkaDrop,
374{
375 pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
376 NonNull::new(ptr).map(|ptr| Self { ptr })
377 }
378
379 pub(crate) fn ptr(&self) -> *mut T {
380 self.ptr.as_ptr()
381 }
382}
383
384pub trait AsyncRuntime: Send + Sync + 'static {
401 type Delay: Future<Output = ()> + Send;
404
405 fn spawn<T>(task: T)
410 where
411 T: Future<Output = ()> + Send + 'static;
412
413 fn delay_for(duration: Duration) -> Self::Delay;
415}
416
417#[cfg(not(any(feature = "tokio", feature = "naive-runtime")))]
427pub type DefaultRuntime = ();
428
429#[cfg(all(not(feature = "tokio"), feature = "naive-runtime"))]
439pub type DefaultRuntime = NaiveRuntime;
440
441#[cfg(feature = "tokio")]
451pub type DefaultRuntime = TokioRuntime;
452
453#[cfg(feature = "naive-runtime")]
460#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
461pub struct NaiveRuntime;
462
463#[cfg(feature = "naive-runtime")]
464#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
465impl AsyncRuntime for NaiveRuntime {
466 type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
467
468 fn spawn<T>(task: T)
469 where
470 T: Future<Output = ()> + Send + 'static,
471 {
472 thread::spawn(|| futures_executor::block_on(task));
473 }
474
475 fn delay_for(duration: Duration) -> Self::Delay {
476 let (tx, rx) = oneshot::channel();
477 thread::spawn(move || {
478 thread::sleep(duration);
479 tx.send(())
480 });
481 rx.map(|_| ())
482 }
483}
484
485#[cfg(feature = "tokio")]
490#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
491pub struct TokioRuntime;
492
493#[cfg(feature = "tokio")]
494#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
495impl AsyncRuntime for TokioRuntime {
496 type Delay = tokio::time::Sleep;
497
498 fn spawn<T>(task: T)
499 where
500 T: Future<Output = ()> + Send + 'static,
501 {
502 tokio::spawn(task);
503 }
504
505 fn delay_for(duration: Duration) -> Self::Delay {
506 tokio::time::sleep(duration)
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 #[test]
515 fn test_rdkafka_version() {
516 let rdk_version = unsafe { rdsys::rd_kafka_version() };
517 let (version_int, _) = get_rdkafka_version();
518 assert_eq!(rdk_version, version_int);
519 }
520}