pub struct MockCluster<'c, C: ClientContext> { /* private fields */ }
Expand description

Mock Kafka cluster with a configurable number of brokers that support a reasonable subset of Kafka protocol operations, error injection, etc.

Mock clusters provide localhost listeners that can be used as the bootstrap servers by multiple Kafka client instances.

Currently supported functionality:

  • Producer
  • Idempotent Producer
  • Transactional Producer
  • Low-level consumer
  • High-level balanced consumer groups with offset commits
  • Topic Metadata and auto creation

The mock cluster can be either created with MockCluster::new() or by configuring the test.mock.num.brokers property when creating a producer/consumer. This will override that producer/consumer’s bootstrap servers setting and internally create a mock cluster. You can then obtain this mock cluster using Client::mock_cluster().

Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.

Implementations§

source§

impl MockCluster<'static, DefaultProducerContext>

source

pub fn new(broker_count: i32) -> KafkaResult<Self>

Creates a new mock cluster with the given number of brokers

source§

impl<'c, C> MockCluster<'c, C>
where C: ClientContext,

source

pub fn bootstrap_servers(&self) -> String

Returns the mock cluster’s bootstrap.servers list

source

pub fn clear_request_errors(&self, api_key: RDKafkaApiKey)

Clear the cluster’s error state for the given ApiKey.

source

pub fn request_errors(&self, api_key: RDKafkaApiKey, errors: &[RDKafkaRespErr])

Push errors onto the cluster’s error stack for the given ApiKey.

The protocol requests matching the given ApiKey will fail with the provided error code and removed from the stack, starting with the first error code, then the second, etc.

Passing RD_KAFKA_RESP_ERR__TRANSPORT will make the mock broker disconnect the client which can be useful to trigger a disconnect on certain requests.

source

pub fn topic_error(&self, topic: &str, error: RDKafkaRespErr) -> KafkaResult<()>

Set the topic error to return in protocol requests.

Currently only used for TopicMetadataRequest and AddPartitionsToTxnRequest.

source

pub fn create_topic( &self, topic: &str, partition_count: i32, replication_factor: i32 ) -> KafkaResult<()>

Create a topic

This is an alternative to automatic topic creation as performed by the client itself.

NOTE: The Topic Admin API (CreateTopics) is not supported by the mock broker

source

pub fn partition_leader( &self, topic: &str, partition: i32, broker_id: Option<i32> ) -> KafkaResult<()>

Sets the partition leader

The topic will be created if it does not exist.

broker_id needs to be an existing broker, or None to make the partition leader-less.

source

pub fn partition_follower( &self, topic: &str, partition: i32, broker_id: i32 ) -> KafkaResult<()>

Sets the partition’s preferred replica / follower.

The topic will be created if it does not exist.

broker_id does not need to point to an existing broker.

source

pub fn follower_watermarks( &self, topic: &str, partition: i32, low_watermark: Option<i64>, high_watermark: Option<i64> ) -> KafkaResult<()>

Sets the partition’s preferred replica / follower low and high watermarks.

The topic will be created if it does not exist.

Setting an offset to None will revert back to the leader’s corresponding watermark.

source

pub fn broker_down(&self, broker_id: i32) -> KafkaResult<()>

Disconnects the broker and disallows any new connections. Use -1 for all brokers, or >= 0 for a specific broker.

NOTE: This does NOT trigger leader change.

source

pub fn broker_up(&self, broker_id: i32) -> KafkaResult<()>

Makes the broker accept connections again. Use -1 for all brokers, or >= 0 for a specific broker.

NOTE: This does NOT trigger leader change.

source

pub fn broker_round_trip_time( &self, broker_id: i32, delay: Duration ) -> KafkaResult<()>

Set broker round-trip-time delay in milliseconds. Use -1 for all brokers, or >= 0 for a specific broker.

source

pub fn broker_rack(&self, broker_id: i32, rack: &str) -> KafkaResult<()>

Sets the broker’s rack as reported in Metadata to the client. Use -1 for all brokers, or >= 0 for a specific broker.

source

pub fn coordinator( &self, coordinator: MockCoordinator, broker_id: i32 ) -> KafkaResult<()>

Explicitly sets the coordinator.

If this API is not a standard hashing scheme will be used.

broker_id does not need to point to an existing broker.

source

pub fn apiversion( &self, api_key: RDKafkaApiKey, min_version: Option<i16>, max_version: Option<i16> ) -> KafkaResult<()>

Set the allowed ApiVersion range for the given ApiKey.

Set min_version and max_version to None to disable the API completely. max_version MUST not exceed the maximum implemented value.

Trait Implementations§

source§

impl<'c, C> Drop for MockCluster<'c, C>
where C: ClientContext,

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<'c, C> RefUnwindSafe for MockCluster<'c, C>
where C: RefUnwindSafe,

§

impl<'c, C> !Send for MockCluster<'c, C>

§

impl<'c, C> !Sync for MockCluster<'c, C>

§

impl<'c, C> Unpin for MockCluster<'c, C>

§

impl<'c, C> UnwindSafe for MockCluster<'c, C>
where C: RefUnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more