1use crate::error::Result;
6use crate::generators::Generator;
7use rayon::prelude::*;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Instant;
12
13#[derive(Debug, Clone, Copy)]
15pub enum GenerationStrategy {
16 Sequential,
18 Random,
20 Mixed,
22}
23
24#[derive(Debug, Clone)]
26pub struct GenConfig {
27 pub batch_size: usize,
29 pub locale: crate::Language,
31 pub null_probability: f32,
33 pub strategy: GenerationStrategy,
35 pub parallelism: usize,
37}
38
39impl Default for GenConfig {
40 fn default() -> Self {
41 Self {
42 batch_size: 1000,
43 locale: crate::Language::ZhCN,
44 null_probability: 0.05,
45 strategy: GenerationStrategy::Random,
46 parallelism: 0, }
48 }
49}
50
51pub struct CoreEngine {
53 config: GenConfig,
54 generators: HashMap<String, Arc<dyn Generator>>,
55 metrics: Arc<GeneratorMetrics>,
56}
57
58#[derive(Default)]
60pub struct GeneratorMetrics {
61 generated_count: std::sync::atomic::AtomicUsize,
62 error_count: std::sync::atomic::AtomicUsize,
63 total_duration: std::sync::Mutex<std::time::Duration>,
64}
65
66impl GeneratorMetrics {
67 pub fn record_success(&self, count: usize, duration: std::time::Duration) {
69 self.generated_count
70 .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
71 let mut total = self.total_duration.lock().unwrap();
72 *total += duration;
73 }
74
75 pub fn record_error(&self) {
77 self.error_count
78 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
79 }
80
81 pub fn generated_count(&self) -> usize {
83 self.generated_count
84 .load(std::sync::atomic::Ordering::Relaxed)
85 }
86
87 pub fn error_count(&self) -> usize {
89 self.error_count.load(std::sync::atomic::Ordering::Relaxed)
90 }
91
92 pub fn average_latency(&self) -> std::time::Duration {
94 let total = *self.total_duration.lock().unwrap();
95 let count = self.generated_count();
96 if count > 0 {
97 total / count as u32
98 } else {
99 std::time::Duration::ZERO
100 }
101 }
102
103 pub fn export_prometheus(&self) -> String {
105 format!(
106 r#"# HELP dataforge_generated_total Total number of generated records
107# TYPE dataforge_generated_total counter
108dataforge_generated_total {}
109
110# HELP dataforge_errors_total Total number of generation errors
111# TYPE dataforge_errors_total counter
112dataforge_errors_total {}
113
114# HELP dataforge_average_latency_seconds Average generation latency in seconds
115# TYPE dataforge_average_latency_seconds gauge
116dataforge_average_latency_seconds {:.6}
117"#,
118 self.generated_count(),
119 self.error_count(),
120 self.average_latency().as_secs_f64()
121 )
122 }
123}
124
125impl CoreEngine {
126 pub fn new(config: GenConfig) -> Self {
128 let parallelism = if config.parallelism == 0 {
130 rayon::current_num_threads()
131 } else {
132 config.parallelism
133 };
134
135 if let Err(e) = rayon::ThreadPoolBuilder::new()
137 .num_threads(parallelism)
138 .build_global()
139 {
140 log::warn!("Failed to configure Rayon thread pool: {}", e);
141 }
142
143 Self {
144 config,
145 generators: HashMap::new(),
146 metrics: Arc::new(GeneratorMetrics::default()),
147 }
148 }
149
150 pub fn register_generator<T: Generator + 'static>(&mut self, name: String, generator: T) {
152 self.generators.insert(name, Arc::new(generator));
153 }
154
155 pub fn generate_batch(&self, count: usize) -> Result<Vec<Value>> {
157 let start = Instant::now();
158
159 let result = match self.config.strategy {
160 GenerationStrategy::Sequential => self.generate_sequential(count),
161 GenerationStrategy::Random => self.generate_parallel(count),
162 GenerationStrategy::Mixed => self.generate_mixed(count),
163 };
164
165 let duration = start.elapsed();
166
167 match &result {
168 Ok(data) => {
169 self.metrics.record_success(data.len(), duration);
170 log::debug!("Generated {} records in {:?}", data.len(), duration);
171 }
172 Err(_) => {
173 self.metrics.record_error();
174 log::error!("Failed to generate batch of {} records", count);
175 }
176 }
177
178 result
179 }
180
181 fn generate_sequential(&self, count: usize) -> Result<Vec<Value>> {
183 let mut results = Vec::with_capacity(count);
184
185 for i in 0..count {
186 let value = self.generate_single_record(i)?;
187 results.push(value);
188 }
189
190 Ok(results)
191 }
192
193 fn generate_parallel(&self, count: usize) -> Result<Vec<Value>> {
195 let batch_size = self.config.batch_size.min(count);
196 let chunks: Vec<_> = (0..count)
197 .collect::<Vec<_>>()
198 .chunks(batch_size)
199 .map(|chunk| chunk.to_vec())
200 .collect();
201
202 let results: Result<Vec<Vec<Value>>> = chunks
203 .into_par_iter()
204 .map(|chunk| {
205 chunk
206 .into_iter()
207 .map(|i| self.generate_single_record(i))
208 .collect::<Result<Vec<_>>>()
209 })
210 .collect();
211
212 Ok(results?.into_iter().flatten().collect())
213 }
214
215 fn generate_mixed(&self, count: usize) -> Result<Vec<Value>> {
217 let parallel_count = count / 2;
219 let sequential_count = count - parallel_count;
220
221 let mut parallel_results = self.generate_parallel(parallel_count)?;
222 let mut sequential_results = self.generate_sequential(sequential_count)?;
223
224 parallel_results.append(&mut sequential_results);
225 Ok(parallel_results)
226 }
227
228 fn generate_single_record(&self, index: usize) -> Result<Value> {
230 use crate::generators::*;
233
234 if self.should_generate_null() {
236 return Ok(Value::Null);
237 }
238
239 Ok(serde_json::json!({
240 "id": index,
241 "uuid": uuid_v4(),
242 "name": name::zh_cn_fullname(),
243 "email": internet::email(),
244 "created_at": datetime::iso8601(),
245 "metadata": {
246 "generated_by": "dataforge",
247 "strategy": format!("{:?}", self.config.strategy),
248 "locale": format!("{:?}", self.config.locale)
249 }
250 }))
251 }
252
253 fn should_generate_null(&self) -> bool {
255 use rand::Rng;
256 let mut rng = rand::thread_rng();
257 rng.gen::<f32>() < self.config.null_probability
258 }
259
260 pub fn metrics(&self) -> Arc<GeneratorMetrics> {
262 Arc::clone(&self.metrics)
263 }
264
265 pub fn config(&self) -> &GenConfig {
267 &self.config
268 }
269
270 pub fn update_config(&mut self, config: GenConfig) {
272 self.config = config;
273 }
274
275 pub fn reset_metrics(&self) {
277 self.metrics
278 .generated_count
279 .store(0, std::sync::atomic::Ordering::Relaxed);
280 self.metrics
281 .error_count
282 .store(0, std::sync::atomic::Ordering::Relaxed);
283 *self.metrics.total_duration.lock().unwrap() = std::time::Duration::ZERO;
284 }
285}
286
287pub struct GeneratorFactory;
289
290impl GeneratorFactory {
291 pub fn create_default_generators() -> HashMap<String, Arc<dyn Generator>> {
293 let generators = HashMap::new();
294
295 generators
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_core_engine_creation() {
309 let config = GenConfig::default();
310 let engine = CoreEngine::new(config);
311
312 assert_eq!(engine.config().batch_size, 1000);
313 assert_eq!(engine.metrics().generated_count(), 0);
314 }
315
316 #[test]
317 fn test_batch_generation() {
318 let config = GenConfig {
319 batch_size: 10,
320 ..Default::default()
321 };
322 let engine = CoreEngine::new(config);
323
324 let result = engine.generate_batch(5);
325 assert!(result.is_ok());
326
327 let data = result.unwrap();
328 assert_eq!(data.len(), 5);
329 assert_eq!(engine.metrics().generated_count(), 5);
330 }
331
332 #[test]
333 fn test_parallel_generation() {
334 let config = GenConfig {
335 strategy: GenerationStrategy::Random,
336 batch_size: 100,
337 ..Default::default()
338 };
339 let engine = CoreEngine::new(config);
340
341 let result = engine.generate_batch(1000);
342 assert!(result.is_ok());
343
344 let data = result.unwrap();
345 assert_eq!(data.len(), 1000);
346 }
347
348 #[test]
349 fn test_metrics() {
350 let engine = CoreEngine::new(GenConfig::default());
351
352 let _ = engine.generate_batch(100);
354
355 let metrics = engine.metrics();
356 assert_eq!(metrics.generated_count(), 100);
357 assert_eq!(metrics.error_count(), 0);
358
359 let prometheus_output = metrics.export_prometheus();
361 assert!(prometheus_output.contains("dataforge_generated_total 100"));
362 }
363
364 #[test]
365 fn test_null_probability() {
366 let config = GenConfig {
367 null_probability: 1.0, ..Default::default()
369 };
370 let engine = CoreEngine::new(config);
371
372 let mut null_count = 0;
374 for _ in 0..100 {
375 if engine.should_generate_null() {
376 null_count += 1;
377 }
378 }
379
380 assert_eq!(null_count, 100);
382 }
383}