mockforge_kafka/
fixtures.rs1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use serde_yaml;
4use std::collections::HashMap;
5use std::fs;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tokio::time::{interval, Duration};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct KafkaFixture {
14 pub identifier: String,
15 pub name: String,
16 pub topic: String,
17 pub partition: Option<i32>, pub key_pattern: Option<String>, pub value_template: serde_json::Value,
20 pub headers: std::collections::HashMap<String, String>,
21 pub auto_produce: Option<AutoProduceConfig>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct AutoProduceConfig {
27 pub enabled: bool,
28 pub rate_per_second: u64,
29 pub duration_seconds: Option<u64>,
30 pub total_count: Option<usize>,
31}
32
33pub struct AutoProducer {
35 fixtures: Arc<RwLock<HashMap<String, KafkaFixture>>>,
36 template_engine: mockforge_core::templating::TemplateEngine,
37 broker: Arc<super::broker::KafkaMockBroker>,
38}
39
40impl AutoProducer {
41 pub fn new(
43 broker: Arc<super::broker::KafkaMockBroker>,
44 template_engine: mockforge_core::templating::TemplateEngine,
45 ) -> Self {
46 Self {
47 fixtures: Arc::new(RwLock::new(HashMap::new())),
48 template_engine,
49 broker,
50 }
51 }
52
53 pub async fn add_fixture(&self, fixture: KafkaFixture) {
55 if fixture.auto_produce.as_ref().is_some_and(|ap| ap.enabled) {
56 let fixture_id = fixture.identifier.clone();
57 self.fixtures.write().await.insert(fixture_id, fixture);
58 }
59 }
60
61 pub async fn start(&self) -> anyhow::Result<()> {
63 let fixtures = self.fixtures.clone();
64 let _template_engine = self.template_engine.clone();
65 let _broker = self.broker.clone();
66
67 tokio::spawn(async move {
68 let mut interval = interval(Duration::from_secs(1));
69
70 loop {
71 interval.tick().await;
72
73 let fixtures_read = fixtures.read().await.clone();
74 for fixture in fixtures_read.values() {
75 if let Some(auto_produce) = &fixture.auto_produce {
76 if auto_produce.enabled {
77 for _ in 0..auto_produce.rate_per_second {
79 if let Ok(message) = fixture.generate_message(&HashMap::new()) {
80 let mut topics = _broker.topics.write().await;
82 if let Some(topic) = topics.get_mut(&fixture.topic) {
83 let partition = fixture.partition.unwrap_or_else(|| {
84 topic.assign_partition(message.key.as_deref())
85 });
86
87 if let Err(e) = topic.produce(partition, message).await {
88 tracing::error!(
89 "Failed to produce message to topic {}: {}",
90 fixture.topic,
91 e
92 );
93 } else {
94 tracing::debug!(
95 "Auto-produced message to topic {} partition {}",
96 fixture.topic,
97 partition
98 );
99 }
100 } else {
101 tracing::warn!(
102 "Topic {} not found for auto-production",
103 fixture.topic
104 );
105 }
106 }
107 }
108 }
109 }
110 }
111 }
112 });
113
114 Ok(())
115 }
116
117 pub async fn stop_fixture(&self, fixture_id: &str) {
119 if let Some(fixture) = self.fixtures.write().await.get_mut(fixture_id) {
120 if let Some(auto_produce) = &mut fixture.auto_produce {
121 auto_produce.enabled = false;
122 }
123 }
124 }
125}
126
127impl KafkaFixture {
128 pub fn load_from_dir(dir: &PathBuf) -> mockforge_core::Result<Vec<Self>> {
130 let mut fixtures = Vec::new();
131 for entry in fs::read_dir(dir)? {
132 let entry = entry?;
133 let path = entry.path();
134 if path.extension().and_then(|s| s.to_str()) == Some("yaml")
135 || path.extension().and_then(|s| s.to_str()) == Some("yml")
136 {
137 let file = fs::File::open(&path)?;
138 let file_fixtures: Vec<Self> = serde_yaml::from_reader(file)?;
139 fixtures.extend(file_fixtures);
140 }
141 }
142 Ok(fixtures)
143 }
144
145 pub fn generate_message(
147 &self,
148 context: &std::collections::HashMap<String, String>,
149 ) -> mockforge_core::Result<crate::partitions::KafkaMessage> {
150 let key = self.key_pattern.as_ref().map(|pattern| self.render_template(pattern, context));
152
153 let value_str = serde_json::to_string(&self.value_template)?;
155 let value_rendered = self.render_template(&value_str, context);
156 let value = value_rendered.into_bytes();
157
158 let headers = self
160 .headers
161 .iter()
162 .map(|(k, v)| (k.clone(), self.render_template(v, context).into_bytes()))
163 .collect();
164
165 Ok(crate::partitions::KafkaMessage {
166 offset: 0,
167 timestamp: Utc::now().timestamp_millis(),
168 key: key.map(|k| k.into_bytes()),
169 value,
170 headers,
171 })
172 }
173
174 fn render_template(
175 &self,
176 template: &str,
177 context: &std::collections::HashMap<String, String>,
178 ) -> String {
179 let mut result = template.to_string();
180 for (key, value) in context {
181 result = result.replace(&format!("{{{{{}}}}}", key), value);
182 }
183 result
184 }
185}