use std::convert::TryInto;
use std::ffi::{CStr, CString};
use std::os::raw::c_int;
use std::time::Duration;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::Client;
use crate::config::ClientConfig;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::producer::DefaultProducerContext;
use crate::ClientContext;
enum MockClusterClient<'c, C: ClientContext> {
#[allow(dead_code)]
Owned(Client<C>),
#[allow(dead_code)]
Borrowed(&'c Client<C>),
}
pub struct MockCluster<'c, C: ClientContext> {
mock_cluster: *mut RDKafkaMockCluster,
client: MockClusterClient<'c, C>,
}
macro_rules! return_mock_op {
($op:expr) => {
match $op {
err if err.is_error() => Err(KafkaError::MockCluster(err.into())),
_ => Ok(()),
}
};
}
pub enum MockCoordinator {
Transaction(String),
Group(String),
}
impl MockCluster<'static, DefaultProducerContext> {
pub fn new(broker_count: i32) -> KafkaResult<Self> {
let config = ClientConfig::new();
let native_config = config.create_native_config()?;
let context = DefaultProducerContext {};
let client = Client::new(
&config,
native_config,
RDKafkaType::RD_KAFKA_PRODUCER,
context,
)?;
let mock_cluster =
unsafe { rdsys::rd_kafka_mock_cluster_new(client.native_ptr(), broker_count) };
if mock_cluster.is_null() {
return Err(KafkaError::MockCluster(rdsys::RDKafkaErrorCode::Fail));
}
Ok(MockCluster {
mock_cluster,
client: MockClusterClient::Owned(client),
})
}
}
impl<'c, C> MockCluster<'c, C>
where
C: ClientContext,
{
pub(crate) fn from_client(client: &'c Client<C>) -> Option<Self> {
let mock_cluster = unsafe { rdsys::rd_kafka_handle_mock_cluster(client.native_ptr()) };
if mock_cluster.is_null() {
return None;
}
Some(MockCluster {
mock_cluster,
client: MockClusterClient::Borrowed(client),
})
}
pub fn bootstrap_servers(&self) -> String {
let bootstrap =
unsafe { CStr::from_ptr(rdsys::rd_kafka_mock_cluster_bootstraps(self.mock_cluster)) };
bootstrap.to_string_lossy().into_owned()
}
pub fn clear_request_errors(&self, api_key: RDKafkaApiKey) {
unsafe { rdsys::rd_kafka_mock_clear_request_errors(self.mock_cluster, api_key.into()) }
}
pub fn request_errors(&self, api_key: RDKafkaApiKey, errors: &[RDKafkaRespErr]) {
unsafe {
rdsys::rd_kafka_mock_push_request_errors_array(
self.mock_cluster,
api_key.into(),
errors.len(),
errors.as_ptr(),
)
}
}
pub fn topic_error(&self, topic: &str, error: RDKafkaRespErr) -> KafkaResult<()> {
let topic_c = CString::new(topic)?;
unsafe { rdsys::rd_kafka_mock_topic_set_error(self.mock_cluster, topic_c.as_ptr(), error) }
Ok(())
}
pub fn create_topic(
&self,
topic: &str,
partition_count: i32,
replication_factor: i32,
) -> KafkaResult<()> {
let topic_c = CString::new(topic)?;
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_topic_create(
self.mock_cluster,
topic_c.as_ptr(),
partition_count,
replication_factor,
)
}
}
}
pub fn partition_leader(
&self,
topic: &str,
partition: i32,
broker_id: Option<i32>,
) -> KafkaResult<()> {
let topic_c = CString::new(topic)?;
let broker_id = broker_id.unwrap_or(-1);
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_partition_set_leader(
self.mock_cluster,
topic_c.as_ptr(),
partition,
broker_id,
)
}
}
}
pub fn partition_follower(
&self,
topic: &str,
partition: i32,
broker_id: i32,
) -> KafkaResult<()> {
let topic_c = CString::new(topic)?;
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_partition_set_follower(
self.mock_cluster, topic_c.as_ptr(), partition, broker_id)
}
}
}
pub fn follower_watermarks(
&self,
topic: &str,
partition: i32,
low_watermark: Option<i64>,
high_watermark: Option<i64>,
) -> KafkaResult<()> {
let topic_c = CString::new(topic)?;
let low_watermark = low_watermark.unwrap_or(-1);
let high_watermark = high_watermark.unwrap_or(-1);
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_partition_set_follower_wmarks(
self.mock_cluster,
topic_c.as_ptr(),
partition,
low_watermark,
high_watermark
)
}
}
}
pub fn broker_down(&self, broker_id: i32) -> KafkaResult<()> {
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_broker_set_down(self.mock_cluster, broker_id)
}
}
}
pub fn broker_up(&self, broker_id: i32) -> KafkaResult<()> {
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_broker_set_up(self.mock_cluster, broker_id)
}
}
}
pub fn broker_round_trip_time(&self, broker_id: i32, delay: Duration) -> KafkaResult<()> {
let rtt_ms = delay.as_millis().try_into().unwrap_or(c_int::MAX);
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_broker_set_rtt(
self.mock_cluster,
broker_id,
rtt_ms
)
}
}
}
pub fn broker_rack(&self, broker_id: i32, rack: &str) -> KafkaResult<()> {
let rack_c = CString::new(rack)?;
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_broker_set_rack(
self.mock_cluster,
broker_id,
rack_c.as_ptr()
)
}
}
}
pub fn coordinator(&self, coordinator: MockCoordinator, broker_id: i32) -> KafkaResult<()> {
let (kind, key) = match coordinator {
MockCoordinator::Transaction(key) => ("transaction", key),
MockCoordinator::Group(key) => ("group", key),
};
let kind_c = CString::new(kind)?;
let raw_c = CString::new(key)?;
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_coordinator_set(
self.mock_cluster,
kind_c.as_ptr(),
raw_c.as_ptr(),
broker_id
)
}
}
}
pub fn apiversion(
&self,
api_key: RDKafkaApiKey,
min_version: Option<i16>,
max_version: Option<i16>,
) -> KafkaResult<()> {
let min_version = min_version.unwrap_or(-1);
let max_version = max_version.unwrap_or(-1);
return_mock_op! {
unsafe {
rdsys::rd_kafka_mock_set_apiversion(
self.mock_cluster,
api_key.into(),
min_version,
max_version,
)
}
}
}
}
impl<C> Drop for MockCluster<'_, C>
where
C: ClientContext,
{
fn drop(&mut self) {
if let MockClusterClient::Owned(..) = self.client {
unsafe {
rdsys::rd_kafka_mock_cluster_destroy(self.mock_cluster);
}
}
}
}
#[cfg(test)]
mod tests {
use crate::consumer::{Consumer, StreamConsumer};
use crate::message::ToBytes;
use crate::producer::{FutureProducer, FutureRecord};
use crate::Message;
use tokio;
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_mockcluster() {
const TOPIC: &str = "test_topic";
let mock_cluster = MockCluster::new(2).unwrap();
let bootstrap_servers = mock_cluster.bootstrap_servers();
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &bootstrap_servers)
.create()
.expect("Producer creation error");
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", &bootstrap_servers)
.set("group.id", "rust-rdkafka-mockcluster-test")
.set("auto.offset.reset", "earliest")
.create()
.expect("Client creation error");
let rec = FutureRecord::to(TOPIC).key("msg1").payload("test");
producer.send_result(rec).unwrap().await.unwrap().unwrap();
consumer.subscribe(&[TOPIC]).unwrap();
let msg = consumer.recv().await.unwrap();
assert_eq!(msg.key(), Some("msg1".to_bytes()));
assert_eq!(msg.payload(), Some("test".to_bytes()));
}
}