mockforge_kafka/
fixtures.rs

1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use serde_yaml;
4use std::collections::HashMap;
5use std::fs;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tokio::time::{interval, Duration};
10
11/// Kafka fixture for message generation
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct KafkaFixture {
14    pub identifier: String,
15    pub name: String,
16    pub topic: String,
17    pub partition: Option<i32>,      // None = all partitions
18    pub key_pattern: Option<String>, // Template
19    pub value_template: serde_json::Value,
20    pub headers: std::collections::HashMap<String, String>,
21    pub auto_produce: Option<AutoProduceConfig>,
22}
23
24/// Configuration for auto-producing messages
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct AutoProduceConfig {
27    pub enabled: bool,
28    pub rate_per_second: u64,
29    pub duration_seconds: Option<u64>,
30    pub total_count: Option<usize>,
31}
32
33/// Auto-producer for fixtures
34pub struct AutoProducer {
35    fixtures: Arc<RwLock<HashMap<String, KafkaFixture>>>,
36    template_engine: mockforge_core::templating::TemplateEngine,
37    broker: Arc<super::broker::KafkaMockBroker>,
38}
39
40impl AutoProducer {
41    /// Create a new auto-producer
42    pub fn new(
43        broker: Arc<super::broker::KafkaMockBroker>,
44        template_engine: mockforge_core::templating::TemplateEngine,
45    ) -> Self {
46        Self {
47            fixtures: Arc::new(RwLock::new(HashMap::new())),
48            template_engine,
49            broker,
50        }
51    }
52
53    /// Add a fixture for auto-production
54    pub async fn add_fixture(&self, fixture: KafkaFixture) {
55        if fixture.auto_produce.as_ref().is_some_and(|ap| ap.enabled) {
56            let fixture_id = fixture.identifier.clone();
57            self.fixtures.write().await.insert(fixture_id, fixture);
58        }
59    }
60
61    /// Start auto-producing messages
62    pub async fn start(&self) -> anyhow::Result<()> {
63        let fixtures = self.fixtures.clone();
64        let _template_engine = self.template_engine.clone();
65        let _broker = self.broker.clone();
66
67        tokio::spawn(async move {
68            let mut interval = interval(Duration::from_secs(1));
69
70            loop {
71                interval.tick().await;
72
73                let fixtures_read = fixtures.read().await.clone();
74                for fixture in fixtures_read.values() {
75                    if let Some(auto_produce) = &fixture.auto_produce {
76                        if auto_produce.enabled {
77                            // Generate and produce messages
78                            for _ in 0..auto_produce.rate_per_second {
79                                if let Ok(message) = fixture.generate_message(&HashMap::new()) {
80                                    // Produce the message to the broker
81                                    let mut topics = _broker.topics.write().await;
82                                    if let Some(topic) = topics.get_mut(&fixture.topic) {
83                                        let partition = fixture.partition.unwrap_or_else(|| {
84                                            topic.assign_partition(message.key.as_deref())
85                                        });
86
87                                        if let Err(e) = topic.produce(partition, message).await {
88                                            tracing::error!(
89                                                "Failed to produce message to topic {}: {}",
90                                                fixture.topic,
91                                                e
92                                            );
93                                        } else {
94                                            tracing::debug!(
95                                                "Auto-produced message to topic {} partition {}",
96                                                fixture.topic,
97                                                partition
98                                            );
99                                        }
100                                    } else {
101                                        tracing::warn!(
102                                            "Topic {} not found for auto-production",
103                                            fixture.topic
104                                        );
105                                    }
106                                }
107                            }
108                        }
109                    }
110                }
111            }
112        });
113
114        Ok(())
115    }
116
117    /// Stop auto-producing for a specific fixture
118    pub async fn stop_fixture(&self, fixture_id: &str) {
119        if let Some(fixture) = self.fixtures.write().await.get_mut(fixture_id) {
120            if let Some(auto_produce) = &mut fixture.auto_produce {
121                auto_produce.enabled = false;
122            }
123        }
124    }
125}
126
127impl KafkaFixture {
128    /// Load fixtures from a directory
129    pub fn load_from_dir(dir: &PathBuf) -> mockforge_core::Result<Vec<Self>> {
130        let mut fixtures = Vec::new();
131        for entry in fs::read_dir(dir)? {
132            let entry = entry?;
133            let path = entry.path();
134            if path.extension().and_then(|s| s.to_str()) == Some("yaml")
135                || path.extension().and_then(|s| s.to_str()) == Some("yml")
136            {
137                let file = fs::File::open(&path)?;
138                let file_fixtures: Vec<Self> = serde_yaml::from_reader(file)?;
139                fixtures.extend(file_fixtures);
140            }
141        }
142        Ok(fixtures)
143    }
144
145    /// Generate a message using the fixture
146    pub fn generate_message(
147        &self,
148        context: &std::collections::HashMap<String, String>,
149    ) -> mockforge_core::Result<crate::partitions::KafkaMessage> {
150        // Render key if pattern provided
151        let key = self.key_pattern.as_ref().map(|pattern| self.render_template(pattern, context));
152
153        // Render value template
154        let value_str = serde_json::to_string(&self.value_template)?;
155        let value_rendered = self.render_template(&value_str, context);
156        let value = value_rendered.into_bytes();
157
158        // Render headers
159        let headers = self
160            .headers
161            .iter()
162            .map(|(k, v)| (k.clone(), self.render_template(v, context).into_bytes()))
163            .collect();
164
165        Ok(crate::partitions::KafkaMessage {
166            offset: 0,
167            timestamp: Utc::now().timestamp_millis(),
168            key: key.map(|k| k.into_bytes()),
169            value,
170            headers,
171        })
172    }
173
174    fn render_template(
175        &self,
176        template: &str,
177        context: &std::collections::HashMap<String, String>,
178    ) -> String {
179        let mut result = template.to_string();
180        for (key, value) in context {
181            result = result.replace(&format!("{{{{{}}}}}", key), value);
182        }
183        result
184    }
185}