use std::ffi::CStr;
use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::os::raw::c_char;
use std::os::raw::c_void;
use std::ptr;
use std::ptr::NonNull;
use std::slice;
use std::sync::Arc;
#[cfg(feature = "naive-runtime")]
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(feature = "naive-runtime")]
use futures_channel::oneshot;
#[cfg(feature = "naive-runtime")]
use futures_util::future::{FutureExt, Map};
use crate::log::trace;
use rdkafka_sys as rdsys;
pub fn get_rdkafka_version() -> (u16, String) {
let version_number = unsafe { rdsys::rd_kafka_version() } as u16;
let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
(version_number, c_str.to_string_lossy().into_owned())
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub enum Timeout {
After(Duration),
Never,
}
impl Timeout {
pub(crate) fn as_millis(&self) -> i32 {
match self {
Timeout::After(d) => d.as_millis() as i32,
Timeout::Never => -1,
}
}
}
impl std::ops::SubAssign for Timeout {
fn sub_assign(&mut self, other: Self) {
match (self, other) {
(Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
(Timeout::Never, Timeout::After(_)) => (),
_ => panic!("subtraction of Timeout::Never is ill-defined"),
}
}
}
impl From<Duration> for Timeout {
fn from(d: Duration) -> Timeout {
Timeout::After(d)
}
}
impl From<Option<Duration>> for Timeout {
fn from(v: Option<Duration>) -> Timeout {
match v {
None => Timeout::Never,
Some(d) => Timeout::After(d),
}
}
}
pub fn millis_to_epoch(time: SystemTime) -> i64 {
time.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_millis() as i64
}
pub fn current_time_millis() -> i64 {
millis_to_epoch(SystemTime::now())
}
pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
if ptr.is_null() {
None
} else {
Some(slice::from_raw_parts::<T>(ptr as *const T, size))
}
}
pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
ptr: *const c_void,
size: usize,
) -> Option<&'a mut [T]> {
if ptr.is_null() {
None
} else {
Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
}
}
pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
if ptr.is_null() || size == 0 {
&[][..]
} else {
slice::from_raw_parts::<T>(ptr as *const T, size)
}
}
pub trait IntoOpaque: Send + Sync + Sized {
fn into_ptr(self) -> *mut c_void;
unsafe fn from_ptr(_: *mut c_void) -> Self;
}
impl IntoOpaque for () {
fn into_ptr(self) -> *mut c_void {
ptr::null_mut()
}
unsafe fn from_ptr(_: *mut c_void) -> Self {}
}
impl IntoOpaque for usize {
fn into_ptr(self) -> *mut c_void {
self as *mut c_void
}
unsafe fn from_ptr(ptr: *mut c_void) -> Self {
ptr as usize
}
}
impl<T: Send + Sync> IntoOpaque for Box<T> {
fn into_ptr(self) -> *mut c_void {
Box::into_raw(self) as *mut c_void
}
unsafe fn from_ptr(ptr: *mut c_void) -> Self {
Box::from_raw(ptr as *mut T)
}
}
impl<T: Send + Sync> IntoOpaque for Arc<T> {
fn into_ptr(self) -> *mut c_void {
Arc::into_raw(self) as *mut c_void
}
unsafe fn from_ptr(ptr: *mut c_void) -> Self {
Arc::from_raw(ptr as *const T)
}
}
pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
CStr::from_ptr(cstr as *const c_char)
.to_string_lossy()
.into_owned()
}
pub(crate) struct ErrBuf {
buf: [u8; ErrBuf::MAX_ERR_LEN],
}
impl ErrBuf {
const MAX_ERR_LEN: usize = 512;
pub fn new() -> ErrBuf {
ErrBuf {
buf: [0; ErrBuf::MAX_ERR_LEN],
}
}
pub fn as_mut_ptr(&mut self) -> *mut c_char {
self.buf.as_mut_ptr() as *mut c_char
}
pub fn filled(&self) -> &[u8] {
let i = self.buf.iter().position(|c| *c == 0).unwrap();
&self.buf[..i + 1]
}
pub fn len(&self) -> usize {
self.filled().len()
}
pub fn capacity(&self) -> usize {
self.buf.len()
}
}
impl Default for ErrBuf {
fn default() -> ErrBuf {
ErrBuf::new()
}
}
impl fmt::Display for ErrBuf {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
CStr::from_bytes_with_nul(self.filled())
.unwrap()
.to_string_lossy()
)
}
}
pub(crate) trait WrappedCPointer {
type Target;
fn ptr(&self) -> *mut Self::Target;
fn is_null(&self) -> bool {
self.ptr().is_null()
}
}
pub(crate) trait AsCArray<T: WrappedCPointer> {
fn as_c_array(&self) -> *mut *mut T::Target;
}
impl<T: WrappedCPointer> AsCArray<T> for Vec<T> {
fn as_c_array(&self) -> *mut *mut T::Target {
self.as_ptr() as *mut *mut T::Target
}
}
pub(crate) struct NativePtr<T>
where
T: KafkaDrop,
{
ptr: NonNull<T>,
}
impl<T> Drop for NativePtr<T>
where
T: KafkaDrop,
{
fn drop(&mut self) {
trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
unsafe { T::DROP(self.ptr.as_ptr()) }
trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
}
}
#[allow(clippy::missing_safety_doc)]
pub(crate) unsafe trait KafkaDrop {
const TYPE: &'static str;
const DROP: unsafe extern "C" fn(*mut Self);
}
impl<T> WrappedCPointer for NativePtr<T>
where
T: KafkaDrop,
{
type Target = T;
fn ptr(&self) -> *mut T {
self.ptr.as_ptr()
}
}
impl<T> Deref for NativePtr<T>
where
T: KafkaDrop,
{
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { self.ptr.as_ref() }
}
}
impl<T> fmt::Debug for NativePtr<T>
where
T: KafkaDrop,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.ptr.fmt(f)
}
}
impl<T> NativePtr<T>
where
T: KafkaDrop,
{
pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
NonNull::new(ptr).map(|ptr| Self { ptr })
}
pub(crate) fn ptr(&self) -> *mut T {
self.ptr.as_ptr()
}
}
pub(crate) struct OnDrop<F>(pub F)
where
F: Fn();
impl<F> Drop for OnDrop<F>
where
F: Fn(),
{
fn drop(&mut self) {
(self.0)()
}
}
pub trait AsyncRuntime: Send + Sync + 'static {
type Delay: Future<Output = ()> + Send;
fn spawn<T>(task: T)
where
T: Future<Output = ()> + Send + 'static;
fn delay_for(duration: Duration) -> Self::Delay;
}
#[cfg(not(any(feature = "tokio", feature = "naive-runtime")))]
pub type DefaultRuntime = ();
#[cfg(all(not(feature = "tokio"), feature = "naive-runtime"))]
pub type DefaultRuntime = NaiveRuntime;
#[cfg(feature = "tokio")]
pub type DefaultRuntime = TokioRuntime;
#[cfg(feature = "naive-runtime")]
#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
pub struct NaiveRuntime;
#[cfg(feature = "naive-runtime")]
#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
impl AsyncRuntime for NaiveRuntime {
type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
fn spawn<T>(task: T)
where
T: Future<Output = ()> + Send + 'static,
{
thread::spawn(|| futures_executor::block_on(task));
}
fn delay_for(duration: Duration) -> Self::Delay {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(duration);
tx.send(())
});
rx.map(|_| ())
}
}
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub struct TokioRuntime;
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
impl AsyncRuntime for TokioRuntime {
type Delay = tokio::time::Sleep;
fn spawn<T>(task: T)
where
T: Future<Output = ()> + Send + 'static,
{
tokio::spawn(task);
}
fn delay_for(duration: Duration) -> Self::Delay {
tokio::time::sleep(duration)
}
}