use std::cmp;
use std::ffi::CStr;
use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::os::raw::c_char;
use std::os::raw::c_void;
use std::ptr;
use std::ptr::NonNull;
use std::slice;
use std::sync::Arc;
#[cfg(feature = "naive-runtime")]
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(feature = "naive-runtime")]
use futures_channel::oneshot;
#[cfg(feature = "naive-runtime")]
use futures_util::future::{FutureExt, Map};
use crate::log::trace;
use rdkafka_sys as rdsys;
pub fn get_rdkafka_version() -> (i32, String) {
let version_number = unsafe { rdsys::rd_kafka_version() };
let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
(version_number, c_str.to_string_lossy().into_owned())
}
pub(crate) enum Deadline {
At(Instant),
Never,
}
impl Deadline {
const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64);
pub(crate) fn new(duration: Option<Duration>) -> Self {
if let Some(d) = duration {
Self::At(Instant::now() + d)
} else {
Self::Never
}
}
pub(crate) fn remaining(&self) -> Duration {
if let Deadline::At(i) = self {
*i - Instant::now()
} else {
Duration::MAX
}
}
#[allow(dead_code)]
pub(crate) fn remaining_millis_i32(&self) -> i32 {
cmp::min(Deadline::MAX_FLUSH_DURATION, self.remaining()).as_millis() as i32
}
pub(crate) fn elapsed(&self) -> bool {
self.remaining() <= Duration::ZERO
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub enum Timeout {
After(Duration),
Never,
}
impl Timeout {
pub(crate) fn as_millis(&self) -> i32 {
match self {
Timeout::After(d) => d.as_millis() as i32,
Timeout::Never => -1,
}
}
pub(crate) fn saturating_sub(&self, rhs: Duration) -> Timeout {
match (self, rhs) {
(Timeout::After(lhs), rhs) => Timeout::After(lhs.saturating_sub(rhs)),
(Timeout::Never, _) => Timeout::Never,
}
}
pub(crate) fn is_zero(&self) -> bool {
match self {
Timeout::After(d) => d.is_zero(),
Timeout::Never => false,
}
}
}
impl std::ops::SubAssign for Timeout {
fn sub_assign(&mut self, other: Self) {
match (self, other) {
(Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
(Timeout::Never, Timeout::After(_)) => (),
_ => panic!("subtraction of Timeout::Never is ill-defined"),
}
}
}
impl From<Timeout> for Deadline {
fn from(t: Timeout) -> Deadline {
if let Timeout::After(dur) = t {
Deadline::new(Some(dur))
} else {
Deadline::new(None)
}
}
}
impl From<&Deadline> for Timeout {
fn from(d: &Deadline) -> Timeout {
if let Deadline::Never = d {
Timeout::Never
} else {
Timeout::After(d.remaining())
}
}
}
impl From<Duration> for Timeout {
fn from(d: Duration) -> Timeout {
Timeout::After(d)
}
}
impl From<Option<Duration>> for Timeout {
fn from(v: Option<Duration>) -> Timeout {
match v {
None => Timeout::Never,
Some(d) => Timeout::After(d),
}
}
}
pub fn millis_to_epoch(time: SystemTime) -> i64 {
time.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_millis() as i64
}
pub fn current_time_millis() -> i64 {
millis_to_epoch(SystemTime::now())
}
pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
if ptr.is_null() {
None
} else {
Some(slice::from_raw_parts::<T>(ptr as *const T, size))
}
}
pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
ptr: *const c_void,
size: usize,
) -> Option<&'a mut [T]> {
if ptr.is_null() {
None
} else {
Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
}
}
pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
if ptr.is_null() || size == 0 {
&[][..]
} else {
slice::from_raw_parts::<T>(ptr as *const T, size)
}
}
pub trait IntoOpaque: Send + Sync + Sized {
fn into_ptr(self) -> *mut c_void;
unsafe fn from_ptr(_: *mut c_void) -> Self;
}
impl IntoOpaque for () {
fn into_ptr(self) -> *mut c_void {
ptr::null_mut()
}
unsafe fn from_ptr(_: *mut c_void) -> Self {}
}
impl IntoOpaque for usize {
fn into_ptr(self) -> *mut c_void {
self as *mut c_void
}
unsafe fn from_ptr(ptr: *mut c_void) -> Self {
ptr as usize
}
}
impl<T: Send + Sync> IntoOpaque for Box<T> {
fn into_ptr(self) -> *mut c_void {
Box::into_raw(self) as *mut c_void
}
unsafe fn from_ptr(ptr: *mut c_void) -> Self {
Box::from_raw(ptr as *mut T)
}
}
impl<T: Send + Sync> IntoOpaque for Arc<T> {
fn into_ptr(self) -> *mut c_void {
Arc::into_raw(self) as *mut c_void
}
unsafe fn from_ptr(ptr: *mut c_void) -> Self {
Arc::from_raw(ptr as *const T)
}
}
pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
CStr::from_ptr(cstr as *const c_char)
.to_string_lossy()
.into_owned()
}
pub(crate) struct ErrBuf {
buf: [u8; ErrBuf::MAX_ERR_LEN],
}
impl ErrBuf {
const MAX_ERR_LEN: usize = 512;
pub fn new() -> ErrBuf {
ErrBuf {
buf: [0; ErrBuf::MAX_ERR_LEN],
}
}
pub fn as_mut_ptr(&mut self) -> *mut c_char {
self.buf.as_mut_ptr() as *mut c_char
}
pub fn filled(&self) -> &[u8] {
let i = self.buf.iter().position(|c| *c == 0).unwrap();
&self.buf[..i + 1]
}
pub fn len(&self) -> usize {
self.filled().len()
}
pub fn capacity(&self) -> usize {
self.buf.len()
}
}
impl Default for ErrBuf {
fn default() -> ErrBuf {
ErrBuf::new()
}
}
impl fmt::Display for ErrBuf {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
CStr::from_bytes_with_nul(self.filled())
.unwrap()
.to_string_lossy()
)
}
}
pub(crate) trait AsCArray<T> {
fn as_c_array(&self) -> *mut *mut T;
}
impl<T: KafkaDrop> AsCArray<T> for Vec<NativePtr<T>> {
fn as_c_array(&self) -> *mut *mut T {
self.as_ptr() as *mut *mut T
}
}
pub(crate) struct NativePtr<T>
where
T: KafkaDrop,
{
ptr: NonNull<T>,
}
impl<T> Drop for NativePtr<T>
where
T: KafkaDrop,
{
fn drop(&mut self) {
trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
unsafe { T::DROP(self.ptr.as_ptr()) }
trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
}
}
#[allow(clippy::missing_safety_doc)]
pub(crate) unsafe trait KafkaDrop {
const TYPE: &'static str;
const DROP: unsafe extern "C" fn(*mut Self);
}
impl<T> Deref for NativePtr<T>
where
T: KafkaDrop,
{
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { self.ptr.as_ref() }
}
}
impl<T> fmt::Debug for NativePtr<T>
where
T: KafkaDrop,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.ptr.fmt(f)
}
}
impl<T> NativePtr<T>
where
T: KafkaDrop,
{
pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
NonNull::new(ptr).map(|ptr| Self { ptr })
}
pub(crate) fn ptr(&self) -> *mut T {
self.ptr.as_ptr()
}
}
pub trait AsyncRuntime: Send + Sync + 'static {
type Delay: Future<Output = ()> + Send;
fn spawn<T>(task: T)
where
T: Future<Output = ()> + Send + 'static;
fn delay_for(duration: Duration) -> Self::Delay;
}
#[cfg(not(any(feature = "tokio", feature = "naive-runtime")))]
pub type DefaultRuntime = ();
#[cfg(all(not(feature = "tokio"), feature = "naive-runtime"))]
pub type DefaultRuntime = NaiveRuntime;
#[allow(rustdoc::broken_intra_doc_links)]
#[cfg(feature = "tokio")]
pub type DefaultRuntime = TokioRuntime;
#[cfg(feature = "naive-runtime")]
#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
pub struct NaiveRuntime;
#[cfg(feature = "naive-runtime")]
#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
impl AsyncRuntime for NaiveRuntime {
type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
fn spawn<T>(task: T)
where
T: Future<Output = ()> + Send + 'static,
{
thread::spawn(|| futures_executor::block_on(task));
}
fn delay_for(duration: Duration) -> Self::Delay {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(duration);
tx.send(())
});
rx.map(|_| ())
}
}
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub struct TokioRuntime;
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
impl AsyncRuntime for TokioRuntime {
type Delay = tokio::time::Sleep;
fn spawn<T>(task: T)
where
T: Future<Output = ()> + Send + 'static,
{
tokio::spawn(task);
}
fn delay_for(duration: Duration) -> Self::Delay {
tokio::time::sleep(duration)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rdkafka_version() {
let rdk_version = unsafe { rdsys::rd_kafka_version() };
let (version_int, _) = get_rdkafka_version();
assert_eq!(rdk_version, version_int);
}
}