mockforge_kafka/
topics.rs1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3
4use crate::fixtures::KafkaFixture;
5use crate::partitions::Partition;
6
7#[derive(Debug)]
9pub struct Topic {
10 pub name: String,
11 pub partitions: Vec<Partition>,
12 pub config: TopicConfig,
13 pub fixtures: Vec<Arc<KafkaFixture>>,
14 round_robin_counter: AtomicUsize,
15}
16
17#[derive(Debug, Clone)]
18pub struct TopicConfig {
19 pub num_partitions: i32,
20 pub replication_factor: i16,
21 pub retention_ms: i64,
22 pub max_message_bytes: i32,
23}
24
25impl Default for TopicConfig {
26 fn default() -> Self {
27 Self {
28 num_partitions: 3,
29 replication_factor: 1,
30 retention_ms: 604800000, max_message_bytes: 1048576, }
33 }
34}
35
36impl Topic {
37 pub fn new(name: String, config: TopicConfig) -> Self {
39 let partitions = (0..config.num_partitions).map(Partition::new).collect();
40
41 Self {
42 name,
43 partitions,
44 config,
45 fixtures: vec![],
46 round_robin_counter: AtomicUsize::new(0),
47 }
48 }
49
50 pub fn assign_partition(&mut self, key: Option<&[u8]>) -> i32 {
52 match key {
53 Some(key_bytes) => {
54 use std::collections::hash_map::DefaultHasher;
56 use std::hash::{Hash, Hasher};
57 let mut hasher = DefaultHasher::new();
58 key_bytes.hash(&mut hasher);
59 let hash = hasher.finish();
60 (hash % self.config.num_partitions as u64) as i32
61 }
62 None => {
63 let partition = self.round_robin_counter.fetch_add(1, Ordering::Relaxed)
65 % self.config.num_partitions as usize;
66 partition as i32
67 }
68 }
69 }
70
71 pub async fn produce(
73 &mut self,
74 partition: i32,
75 record: crate::partitions::KafkaMessage,
76 ) -> mockforge_core::Result<i64> {
77 if let Some(partition) = self.partitions.get_mut(partition as usize) {
78 Ok(partition.append(record))
79 } else {
80 Err(mockforge_core::Error::generic(format!(
81 "Partition {} does not exist",
82 partition
83 )))
84 }
85 }
86
87 pub fn get_partition(&self, id: i32) -> Option<&Partition> {
89 self.partitions.get(id as usize)
90 }
91
92 pub fn get_partition_mut(&mut self, id: i32) -> Option<&mut Partition> {
94 self.partitions.get_mut(id as usize)
95 }
96}