snakeice_rdkafka/
util.rs

1//! Utility functions and types.
2
3use std::cmp;
4use std::ffi::CStr;
5use std::fmt;
6use std::future::Future;
7use std::ops::Deref;
8use std::os::raw::c_char;
9use std::os::raw::c_void;
10use std::ptr;
11use std::ptr::NonNull;
12use std::slice;
13use std::sync::Arc;
14#[cfg(feature = "naive-runtime")]
15use std::thread;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18#[cfg(feature = "naive-runtime")]
19use futures_channel::oneshot;
20#[cfg(feature = "naive-runtime")]
21use futures_util::future::{FutureExt, Map};
22
23use crate::log::trace;
24
25use rdkafka_sys as rdsys;
26
27/// Returns a tuple representing the version of `librdkafka` in hexadecimal and
28/// string format.
29pub fn get_rdkafka_version() -> (i32, String) {
30    let version_number = unsafe { rdsys::rd_kafka_version() };
31    let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
32    (version_number, c_str.to_string_lossy().into_owned())
33}
34
35pub(crate) enum Deadline {
36    At(Instant),
37    Never,
38}
39
40impl Deadline {
41    // librdkafka's flush api requires an i32 millisecond timeout
42    const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64);
43
44    pub(crate) fn new(duration: Option<Duration>) -> Self {
45        if let Some(d) = duration {
46            Self::At(Instant::now() + d)
47        } else {
48            Self::Never
49        }
50    }
51
52    pub(crate) fn remaining(&self) -> Duration {
53        if let Deadline::At(i) = self {
54            *i - Instant::now()
55        } else {
56            Duration::MAX
57        }
58    }
59
60    pub(crate) fn remaining_millis_i32(&self) -> i32 {
61        cmp::min(Deadline::MAX_FLUSH_DURATION, self.remaining()).as_millis() as i32
62    }
63
64    pub(crate) fn elapsed(&self) -> bool {
65        self.remaining() <= Duration::ZERO
66    }
67}
68
69/// Specifies a timeout for a Kafka operation.
70#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
71pub enum Timeout {
72    /// Time out after the specified duration elapses.
73    After(Duration),
74    /// Block forever.
75    Never,
76}
77
78impl Timeout {
79    /// Converts a timeout to Kafka's expected representation.
80    pub(crate) fn as_millis(&self) -> i32 {
81        match self {
82            Timeout::After(d) => d.as_millis() as i32,
83            Timeout::Never => -1,
84        }
85    }
86
87    /// Saturating `Duration` subtraction to Timeout.
88    pub(crate) fn saturating_sub(&self, rhs: Duration) -> Timeout {
89        match (self, rhs) {
90            (Timeout::After(lhs), rhs) => Timeout::After(lhs.saturating_sub(rhs)),
91            (Timeout::Never, _) => Timeout::Never,
92        }
93    }
94
95    /// Returns `true` if the timeout is zero.
96    pub(crate) fn is_zero(&self) -> bool {
97        match self {
98            Timeout::After(d) => d.is_zero(),
99            Timeout::Never => false,
100        }
101    }
102}
103
104impl std::ops::SubAssign for Timeout {
105    fn sub_assign(&mut self, other: Self) {
106        match (self, other) {
107            (Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
108            (Timeout::Never, Timeout::After(_)) => (),
109            _ => panic!("subtraction of Timeout::Never is ill-defined"),
110        }
111    }
112}
113
114impl From<Timeout> for Deadline {
115    fn from(t: Timeout) -> Deadline {
116        if let Timeout::After(dur) = t {
117            Deadline::new(Some(dur))
118        } else {
119            Deadline::new(None)
120        }
121    }
122}
123
124impl From<&Deadline> for Timeout {
125    fn from(d: &Deadline) -> Timeout {
126        if let Deadline::Never = d {
127            Timeout::Never
128        } else {
129            Timeout::After(d.remaining())
130        }
131    }
132}
133
134impl From<Duration> for Timeout {
135    fn from(d: Duration) -> Timeout {
136        Timeout::After(d)
137    }
138}
139
140impl From<Option<Duration>> for Timeout {
141    fn from(v: Option<Duration>) -> Timeout {
142        match v {
143            None => Timeout::Never,
144            Some(d) => Timeout::After(d),
145        }
146    }
147}
148
149/// Converts the given time to the number of milliseconds since the Unix epoch.
150pub fn millis_to_epoch(time: SystemTime) -> i64 {
151    time.duration_since(UNIX_EPOCH)
152        .unwrap_or_else(|_| Duration::from_secs(0))
153        .as_millis() as i64
154}
155
156/// Returns the current time in milliseconds since the Unix epoch.
157pub fn current_time_millis() -> i64 {
158    millis_to_epoch(SystemTime::now())
159}
160
161/// Converts a pointer to an array to an optional slice. If the pointer is null,
162/// returns `None`.
163pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
164    unsafe {
165        if ptr.is_null() {
166            None
167        } else {
168            Some(slice::from_raw_parts::<T>(ptr as *const T, size))
169        }
170    }
171}
172
173pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
174    ptr: *const c_void,
175    size: usize,
176) -> Option<&'a mut [T]> {
177    unsafe {
178        if ptr.is_null() {
179            None
180        } else {
181            Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
182        }
183    }
184}
185
186/// Converts a pointer to an array to a slice. If the pointer is null or the
187/// size is zero, returns a zero-length slice..
188pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
189    unsafe {
190        if ptr.is_null() || size == 0 {
191            &[][..]
192        } else {
193            slice::from_raw_parts::<T>(ptr as *const T, size)
194        }
195    }
196}
197
198/// Converts Rust data to and from raw pointers.
199///
200/// This conversion is used to pass opaque objects to the C library and vice
201/// versa.
202pub trait IntoOpaque: Send + Sync + Sized {
203    /// Converts the object into a raw pointer.
204    fn into_ptr(self) -> *mut c_void;
205
206    /// Converts the raw pointer back to the original Rust object.
207    ///
208    /// # Safety
209    ///
210    /// The pointer must be created with [into_ptr](IntoOpaque::into_ptr).
211    ///
212    /// Care must be taken to not call more than once if it would result
213    /// in an aliasing violation (e.g. [Box]).
214    unsafe fn from_ptr(_: *mut c_void) -> Self;
215}
216
217impl IntoOpaque for () {
218    fn into_ptr(self) -> *mut c_void {
219        ptr::null_mut()
220    }
221
222    unsafe fn from_ptr(_: *mut c_void) -> Self {}
223}
224
225impl IntoOpaque for usize {
226    fn into_ptr(self) -> *mut c_void {
227        self as *mut c_void
228    }
229
230    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
231        ptr as usize
232    }
233}
234
235impl<T: Send + Sync> IntoOpaque for Box<T> {
236    fn into_ptr(self) -> *mut c_void {
237        Box::into_raw(self) as *mut c_void
238    }
239
240    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
241        unsafe { Box::from_raw(ptr as *mut T) }
242    }
243}
244
245impl<T: Send + Sync> IntoOpaque for Arc<T> {
246    fn into_ptr(self) -> *mut c_void {
247        Arc::into_raw(self) as *mut c_void
248    }
249
250    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
251        unsafe { Arc::from_raw(ptr as *const T) }
252    }
253}
254
255/// Converts a C string into a [`String`].
256///
257/// # Safety
258///
259/// `cstr` must point to a valid, null-terminated C string.
260pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
261    unsafe {
262        CStr::from_ptr(cstr as *const c_char)
263            .to_string_lossy()
264            .into_owned()
265    }
266}
267
268pub(crate) struct ErrBuf {
269    buf: [u8; ErrBuf::MAX_ERR_LEN],
270}
271
272impl ErrBuf {
273    const MAX_ERR_LEN: usize = 512;
274
275    pub fn new() -> ErrBuf {
276        ErrBuf {
277            buf: [0; ErrBuf::MAX_ERR_LEN],
278        }
279    }
280
281    pub fn as_mut_ptr(&mut self) -> *mut c_char {
282        self.buf.as_mut_ptr() as *mut c_char
283    }
284
285    pub fn filled(&self) -> &[u8] {
286        let i = self.buf.iter().position(|c| *c == 0).unwrap();
287        &self.buf[..i + 1]
288    }
289
290    pub fn len(&self) -> usize {
291        self.filled().len()
292    }
293
294    pub fn capacity(&self) -> usize {
295        self.buf.len()
296    }
297}
298
299impl Default for ErrBuf {
300    fn default() -> ErrBuf {
301        ErrBuf::new()
302    }
303}
304
305impl fmt::Display for ErrBuf {
306    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307        write!(
308            f,
309            "{}",
310            CStr::from_bytes_with_nul(self.filled())
311                .unwrap()
312                .to_string_lossy()
313        )
314    }
315}
316
317pub(crate) trait AsCArray<T> {
318    fn as_c_array(&self) -> *mut *mut T;
319}
320
321impl<T: KafkaDrop> AsCArray<T> for Vec<NativePtr<T>> {
322    fn as_c_array(&self) -> *mut *mut T {
323        self.as_ptr() as *mut *mut T
324    }
325}
326
327pub(crate) struct NativePtr<T>
328where
329    T: KafkaDrop,
330{
331    ptr: NonNull<T>,
332}
333
334impl<T> Drop for NativePtr<T>
335where
336    T: KafkaDrop,
337{
338    fn drop(&mut self) {
339        trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
340        unsafe { T::DROP(self.ptr.as_ptr()) }
341        trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
342    }
343}
344
345// This function is an internal implementation detail
346#[allow(clippy::missing_safety_doc)]
347pub(crate) unsafe trait KafkaDrop {
348    const TYPE: &'static str;
349    const DROP: unsafe extern "C" fn(*mut Self);
350}
351
352impl<T> Deref for NativePtr<T>
353where
354    T: KafkaDrop,
355{
356    type Target = T;
357    fn deref(&self) -> &Self::Target {
358        unsafe { self.ptr.as_ref() }
359    }
360}
361
362impl<T> fmt::Debug for NativePtr<T>
363where
364    T: KafkaDrop,
365{
366    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367        self.ptr.fmt(f)
368    }
369}
370
371impl<T> NativePtr<T>
372where
373    T: KafkaDrop,
374{
375    pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
376        NonNull::new(ptr).map(|ptr| Self { ptr })
377    }
378
379    pub(crate) fn ptr(&self) -> *mut T {
380        self.ptr.as_ptr()
381    }
382}
383
384/// An abstraction over asynchronous runtimes.
385///
386/// There are several asynchronous runtimes available for Rust. By default
387/// rust-rdkafka uses Tokio, via the [`TokioRuntime`], but it has pluggable
388/// support for any runtime that can satisfy this trait.
389///
390/// For an example of using the [smol] runtime with rust-rdkafka, see the
391/// [runtime_smol] example.
392///
393/// For an example of using the [async-std] runtime with rust-rdkafka, see the
394/// [runtime_async_std] example.
395///
396/// [smol]: https://docs.rs/smol
397/// [async-std]: https://docs.rs/async-std
398/// [runtime_smol]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/runtime_smol.rs
399/// [runtime_async_std]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/runtime_async_std.rs
400pub trait AsyncRuntime: Send + Sync + 'static {
401    /// The type of the future returned by
402    /// [`delay_for`](AsyncRuntime::delay_for).
403    type Delay: Future<Output = ()> + Send;
404
405    /// Spawns an asynchronous task.
406    ///
407    /// The task should be be polled to completion, unless the runtime exits
408    /// first. With some runtimes this requires an explicit "detach" step.
409    fn spawn<T>(task: T)
410    where
411        T: Future<Output = ()> + Send + 'static;
412
413    /// Constructs a future that will resolve after `duration` has elapsed.
414    fn delay_for(duration: Duration) -> Self::Delay;
415}
416
417/// The default [`AsyncRuntime`] used when one is not explicitly specified.
418///
419/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
420/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
421///
422/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
423/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
424/// compilation errors if used as one. You will need to explicitly specify a
425/// custom async runtime wherever one is required.
426#[cfg(not(any(feature = "tokio", feature = "naive-runtime")))]
427pub type DefaultRuntime = ();
428
429/// The default [`AsyncRuntime`] used when one is not explicitly specified.
430///
431/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
432/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
433///
434/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
435/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
436/// compilation errors if used as one. You will need to explicitly specify a
437/// custom async runtime wherever one is required.
438#[cfg(all(not(feature = "tokio"), feature = "naive-runtime"))]
439pub type DefaultRuntime = NaiveRuntime;
440
441/// The default [`AsyncRuntime`] used when one is not explicitly specified.
442///
443/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
444/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
445///
446/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
447/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
448/// compilation errors if used as one. You will need to explicitly specify a
449/// custom async runtime wherever one is required.
450#[cfg(feature = "tokio")]
451pub type DefaultRuntime = TokioRuntime;
452
453/// An [`AsyncRuntime`] implementation backed by the executor in the
454/// [`futures_executor`](futures_executor) crate.
455///
456/// This runtime should not be used when performance is a concern, as it makes
457/// heavy use of threads to compensate for the lack of a timer in the futures
458/// executor.
459#[cfg(feature = "naive-runtime")]
460#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
461pub struct NaiveRuntime;
462
463#[cfg(feature = "naive-runtime")]
464#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
465impl AsyncRuntime for NaiveRuntime {
466    type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
467
468    fn spawn<T>(task: T)
469    where
470        T: Future<Output = ()> + Send + 'static,
471    {
472        thread::spawn(|| futures_executor::block_on(task));
473    }
474
475    fn delay_for(duration: Duration) -> Self::Delay {
476        let (tx, rx) = oneshot::channel();
477        thread::spawn(move || {
478            thread::sleep(duration);
479            tx.send(())
480        });
481        rx.map(|_| ())
482    }
483}
484
485/// An [`AsyncRuntime`] implementation backed by [Tokio](tokio).
486///
487/// This runtime is used by default throughout the crate, unless the `tokio`
488/// feature is disabled.
489#[cfg(feature = "tokio")]
490#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
491pub struct TokioRuntime;
492
493#[cfg(feature = "tokio")]
494#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
495impl AsyncRuntime for TokioRuntime {
496    type Delay = tokio::time::Sleep;
497
498    fn spawn<T>(task: T)
499    where
500        T: Future<Output = ()> + Send + 'static,
501    {
502        tokio::spawn(task);
503    }
504
505    fn delay_for(duration: Duration) -> Self::Delay {
506        tokio::time::sleep(duration)
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513
514    #[test]
515    fn test_rdkafka_version() {
516        let rdk_version = unsafe { rdsys::rd_kafka_version() };
517        let (version_int, _) = get_rdkafka_version();
518        assert_eq!(rdk_version, version_int);
519    }
520}