pub struct KafkaMockBroker {
pub topics: Arc<RwLock<HashMap<String, Topic>>>,
pub consumer_groups: Arc<RwLock<ConsumerGroupManager>>,
/* private fields */
}Expand description
Mock Kafka broker implementation
The KafkaMockBroker simulates a complete Apache Kafka broker, handling
TCP connections and responding to Kafka protocol requests. It supports
multiple concurrent connections and provides comprehensive metrics collection.
§Architecture
The broker maintains several key components:
- Topics: Managed topic and partition storage
- Consumer Groups: Consumer group coordination and partition assignment
- Spec Registry: Fixture-based request/response handling
- Metrics: Real-time performance and usage statistics
§Supported Operations
- Produce: Message production with acknowledgments
- Fetch: Message consumption with offset tracking
- Metadata: Topic and broker discovery
- ListGroups/DescribeGroups: Consumer group management
- ApiVersions: Protocol version negotiation
- CreateTopics/DeleteTopics: Dynamic topic management
- DescribeConfigs: Configuration retrieval
§Example
use mockforge_kafka::KafkaMockBroker;
use mockforge_core::config::KafkaConfig;
let config = KafkaConfig {
port: 9092,
..Default::default()
};
let broker = KafkaMockBroker::new(config).await?;
broker.start().await?;Fields§
§topics: Arc<RwLock<HashMap<String, Topic>>>Topic storage with thread-safe access
consumer_groups: Arc<RwLock<ConsumerGroupManager>>Consumer group manager
Implementations§
Source§impl KafkaMockBroker
impl KafkaMockBroker
Sourcepub async fn new(config: KafkaConfig) -> Result<Self>
pub async fn new(config: KafkaConfig) -> Result<Self>
Create a new Kafka mock broker
Initializes the broker with the provided configuration, setting up internal data structures for topics, consumer groups, and metrics.
§Arguments
config- Kafka broker configuration including port, timeouts, and fixture paths
§Returns
Returns a Result containing the initialized broker or an error if initialization fails.
§Example
use mockforge_kafka::KafkaMockBroker;
use mockforge_core::config::KafkaConfig;
let config = KafkaConfig::default();
let broker = KafkaMockBroker::new(config).await?;Sourcepub async fn start(&self) -> Result<()>
pub async fn start(&self) -> Result<()>
Start the Kafka broker server
Binds to the configured host and port, then begins accepting TCP connections. Each connection is handled in a separate async task, allowing concurrent client connections.
The broker will run indefinitely until the task is cancelled or an error occurs.
§Returns
Returns a Result that indicates whether the broker started successfully.
The method only returns on error, as it runs an infinite accept loop.
§Example
use mockforge_kafka::KafkaMockBroker;
use mockforge_core::config::KafkaConfig;
let config = KafkaConfig::default();
let broker = KafkaMockBroker::new(config).await?;
// Start the broker (this will run indefinitely)
broker.start().await?;Sourcepub async fn test_commit_offsets(
&self,
group_id: &str,
offsets: HashMap<(String, i32), i64>,
) -> Result<()>
pub async fn test_commit_offsets( &self, group_id: &str, offsets: HashMap<(String, i32), i64>, ) -> Result<()>
Test helper: Commit offsets for a consumer group (only available in tests)
Sourcepub async fn test_get_committed_offsets(
&self,
group_id: &str,
) -> HashMap<(String, i32), i64>
pub async fn test_get_committed_offsets( &self, group_id: &str, ) -> HashMap<(String, i32), i64>
Test helper: Get committed offsets for a consumer group (only available in tests)
Sourcepub async fn test_create_topic(&self, name: &str, config: TopicConfig)
pub async fn test_create_topic(&self, name: &str, config: TopicConfig)
Test helper: Create a topic (only available in tests)
Sourcepub async fn test_join_group(
&self,
group_id: &str,
member_id: &str,
client_id: &str,
) -> Result<()>
pub async fn test_join_group( &self, group_id: &str, member_id: &str, client_id: &str, ) -> Result<()>
Test helper: Join a consumer group (only available in tests)
Sourcepub async fn test_sync_group(
&self,
group_id: &str,
assignments: Vec<PartitionAssignment>,
) -> Result<()>
pub async fn test_sync_group( &self, group_id: &str, assignments: Vec<PartitionAssignment>, ) -> Result<()>
Test helper: Sync group assignment (only available in tests)
Sourcepub async fn test_get_assignments(
&self,
group_id: &str,
member_id: &str,
) -> Vec<PartitionAssignment>
pub async fn test_get_assignments( &self, group_id: &str, member_id: &str, ) -> Vec<PartitionAssignment>
Test helper: Get group member assignments (only available in tests)
Sourcepub async fn test_simulate_lag(
&self,
group_id: &str,
topic: &str,
lag: i64,
) -> Result<()>
pub async fn test_simulate_lag( &self, group_id: &str, topic: &str, lag: i64, ) -> Result<()>
Test helper: Simulate consumer lag (only available in tests)
Sourcepub async fn test_reset_offsets(
&self,
group_id: &str,
topic: &str,
to: &str,
) -> Result<()>
pub async fn test_reset_offsets( &self, group_id: &str, topic: &str, to: &str, ) -> Result<()>
Test helper: Reset consumer offsets (only available in tests)
Sourcepub fn metrics(&self) -> &Arc<KafkaMetrics>
pub fn metrics(&self) -> &Arc<KafkaMetrics>
Get a reference to the metrics collector
This method provides access to the Kafka metrics for monitoring and statistics. The metrics are thread-safe and can be accessed concurrently.
§Example
use mockforge_kafka::KafkaMockBroker;
let broker = KafkaMockBroker::new(Default::default()).await?;
let metrics = broker.metrics();
let snapshot = metrics.snapshot();
println!("Messages produced: {}", snapshot.messages_produced_total);Trait Implementations§
Source§impl Clone for KafkaMockBroker
impl Clone for KafkaMockBroker
Source§fn clone(&self) -> KafkaMockBroker
fn clone(&self) -> KafkaMockBroker
1.0.0§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for KafkaMockBroker
impl !RefUnwindSafe for KafkaMockBroker
impl Send for KafkaMockBroker
impl Sync for KafkaMockBroker
impl Unpin for KafkaMockBroker
impl !UnwindSafe for KafkaMockBroker
Blanket Implementations§
§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§unsafe fn clone_to_uninit(&self, dest: *mut u8)
unsafe fn clone_to_uninit(&self, dest: *mut u8)
clone_to_uninit)