dataforge/
core.rs

1//! 核心引擎模块
2//!
3//! 提供基于Rayon的并行流水线,生成策略控制等核心功能
4
5use crate::error::Result;
6use crate::generators::Generator;
7use rayon::prelude::*;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Instant;
12
13/// 生成策略
14#[derive(Debug, Clone, Copy)]
15pub enum GenerationStrategy {
16    /// 顺序生成
17    Sequential,
18    /// 随机生成
19    Random,
20    /// 混合模式
21    Mixed,
22}
23
24/// 生成配置
25#[derive(Debug, Clone)]
26pub struct GenConfig {
27    /// 每批次生成数量
28    pub batch_size: usize,
29    /// 区域设置
30    pub locale: crate::Language,
31    /// 空值概率 (0.0-1.0)
32    pub null_probability: f32,
33    /// 生成策略
34    pub strategy: GenerationStrategy,
35    /// 并行度 (0表示自动检测)
36    pub parallelism: usize,
37}
38
39impl Default for GenConfig {
40    fn default() -> Self {
41        Self {
42            batch_size: 1000,
43            locale: crate::Language::ZhCN,
44            null_probability: 0.05,
45            strategy: GenerationStrategy::Random,
46            parallelism: 0, // 自动检测
47        }
48    }
49}
50
51/// 核心生成引擎
52pub struct CoreEngine {
53    config: GenConfig,
54    generators: HashMap<String, Arc<dyn Generator>>,
55    metrics: Arc<GeneratorMetrics>,
56}
57
58/// 生成器指标统计
59#[derive(Default)]
60pub struct GeneratorMetrics {
61    generated_count: std::sync::atomic::AtomicUsize,
62    error_count: std::sync::atomic::AtomicUsize,
63    total_duration: std::sync::Mutex<std::time::Duration>,
64}
65
66impl GeneratorMetrics {
67    /// 记录生成成功
68    pub fn record_success(&self, count: usize, duration: std::time::Duration) {
69        self.generated_count
70            .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
71        let mut total = self.total_duration.lock().unwrap();
72        *total += duration;
73    }
74
75    /// 记录生成错误
76    pub fn record_error(&self) {
77        self.error_count
78            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
79    }
80
81    /// 获取生成总数
82    pub fn generated_count(&self) -> usize {
83        self.generated_count
84            .load(std::sync::atomic::Ordering::Relaxed)
85    }
86
87    /// 获取错误总数
88    pub fn error_count(&self) -> usize {
89        self.error_count.load(std::sync::atomic::Ordering::Relaxed)
90    }
91
92    /// 获取平均延迟
93    pub fn average_latency(&self) -> std::time::Duration {
94        let total = *self.total_duration.lock().unwrap();
95        let count = self.generated_count();
96        if count > 0 {
97            total / count as u32
98        } else {
99            std::time::Duration::ZERO
100        }
101    }
102
103    /// 导出Prometheus格式指标
104    pub fn export_prometheus(&self) -> String {
105        format!(
106            r#"# HELP dataforge_generated_total Total number of generated records
107# TYPE dataforge_generated_total counter
108dataforge_generated_total {}
109
110# HELP dataforge_errors_total Total number of generation errors
111# TYPE dataforge_errors_total counter
112dataforge_errors_total {}
113
114# HELP dataforge_average_latency_seconds Average generation latency in seconds
115# TYPE dataforge_average_latency_seconds gauge
116dataforge_average_latency_seconds {:.6}
117"#,
118            self.generated_count(),
119            self.error_count(),
120            self.average_latency().as_secs_f64()
121        )
122    }
123}
124
125impl CoreEngine {
126    /// 创建新的核心引擎
127    pub fn new(config: GenConfig) -> Self {
128        // 设置并行度
129        let parallelism = if config.parallelism == 0 {
130            rayon::current_num_threads()
131        } else {
132            config.parallelism
133        };
134
135        // 配置Rayon线程池
136        if let Err(e) = rayon::ThreadPoolBuilder::new()
137            .num_threads(parallelism)
138            .build_global()
139        {
140            log::warn!("Failed to configure Rayon thread pool: {}", e);
141        }
142
143        Self {
144            config,
145            generators: HashMap::new(),
146            metrics: Arc::new(GeneratorMetrics::default()),
147        }
148    }
149
150    /// 注册生成器
151    pub fn register_generator<T: Generator + 'static>(&mut self, name: String, generator: T) {
152        self.generators.insert(name, Arc::new(generator));
153    }
154
155    /// 批量生成数据
156    pub fn generate_batch(&self, count: usize) -> Result<Vec<Value>> {
157        let start = Instant::now();
158
159        let result = match self.config.strategy {
160            GenerationStrategy::Sequential => self.generate_sequential(count),
161            GenerationStrategy::Random => self.generate_parallel(count),
162            GenerationStrategy::Mixed => self.generate_mixed(count),
163        };
164
165        let duration = start.elapsed();
166
167        match &result {
168            Ok(data) => {
169                self.metrics.record_success(data.len(), duration);
170                log::debug!("Generated {} records in {:?}", data.len(), duration);
171            }
172            Err(_) => {
173                self.metrics.record_error();
174                log::error!("Failed to generate batch of {} records", count);
175            }
176        }
177
178        result
179    }
180
181    /// 顺序生成
182    fn generate_sequential(&self, count: usize) -> Result<Vec<Value>> {
183        let mut results = Vec::with_capacity(count);
184
185        for i in 0..count {
186            let value = self.generate_single_record(i)?;
187            results.push(value);
188        }
189
190        Ok(results)
191    }
192
193    /// 并行生成
194    fn generate_parallel(&self, count: usize) -> Result<Vec<Value>> {
195        let batch_size = self.config.batch_size.min(count);
196        let chunks: Vec<_> = (0..count)
197            .collect::<Vec<_>>()
198            .chunks(batch_size)
199            .map(|chunk| chunk.to_vec())
200            .collect();
201
202        let results: Result<Vec<Vec<Value>>> = chunks
203            .into_par_iter()
204            .map(|chunk| {
205                chunk
206                    .into_iter()
207                    .map(|i| self.generate_single_record(i))
208                    .collect::<Result<Vec<_>>>()
209            })
210            .collect();
211
212        Ok(results?.into_iter().flatten().collect())
213    }
214
215    /// 混合模式生成
216    fn generate_mixed(&self, count: usize) -> Result<Vec<Value>> {
217        // 一半并行,一半顺序
218        let parallel_count = count / 2;
219        let sequential_count = count - parallel_count;
220
221        let mut parallel_results = self.generate_parallel(parallel_count)?;
222        let mut sequential_results = self.generate_sequential(sequential_count)?;
223
224        parallel_results.append(&mut sequential_results);
225        Ok(parallel_results)
226    }
227
228    /// 生成单条记录
229    fn generate_single_record(&self, index: usize) -> Result<Value> {
230        // 这里应该根据具体的生成器来生成数据
231        // 暂时返回一个示例结构
232        use crate::generators::*;
233
234        // 检查是否应该生成null值
235        if self.should_generate_null() {
236            return Ok(Value::Null);
237        }
238
239        Ok(serde_json::json!({
240            "id": index,
241            "uuid": uuid_v4(),
242            "name": name::zh_cn_fullname(),
243            "email": internet::email(),
244            "created_at": datetime::iso8601(),
245            "metadata": {
246                "generated_by": "dataforge",
247                "strategy": format!("{:?}", self.config.strategy),
248                "locale": format!("{:?}", self.config.locale)
249            }
250        }))
251    }
252
253    /// 判断是否应该生成null值
254    fn should_generate_null(&self) -> bool {
255        use rand::Rng;
256        let mut rng = rand::thread_rng();
257        rng.gen::<f32>() < self.config.null_probability
258    }
259
260    /// 获取指标
261    pub fn metrics(&self) -> Arc<GeneratorMetrics> {
262        Arc::clone(&self.metrics)
263    }
264
265    /// 获取配置
266    pub fn config(&self) -> &GenConfig {
267        &self.config
268    }
269
270    /// 更新配置
271    pub fn update_config(&mut self, config: GenConfig) {
272        self.config = config;
273    }
274
275    /// 重置指标
276    pub fn reset_metrics(&self) {
277        self.metrics
278            .generated_count
279            .store(0, std::sync::atomic::Ordering::Relaxed);
280        self.metrics
281            .error_count
282            .store(0, std::sync::atomic::Ordering::Relaxed);
283        *self.metrics.total_duration.lock().unwrap() = std::time::Duration::ZERO;
284    }
285}
286
287/// 生成器工厂
288pub struct GeneratorFactory;
289
290impl GeneratorFactory {
291    /// 创建默认生成器集合
292    pub fn create_default_generators() -> HashMap<String, Arc<dyn Generator>> {
293        let generators = HashMap::new();
294
295        // 这里可以注册各种内置生成器
296        // generators.insert("uuid".to_string(), Arc::new(UuidGenerator));
297        // generators.insert("name".to_string(), Arc::new(NameGenerator));
298
299        generators
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn test_core_engine_creation() {
309        let config = GenConfig::default();
310        let engine = CoreEngine::new(config);
311
312        assert_eq!(engine.config().batch_size, 1000);
313        assert_eq!(engine.metrics().generated_count(), 0);
314    }
315
316    #[test]
317    fn test_batch_generation() {
318        let config = GenConfig {
319            batch_size: 10,
320            ..Default::default()
321        };
322        let engine = CoreEngine::new(config);
323
324        let result = engine.generate_batch(5);
325        assert!(result.is_ok());
326
327        let data = result.unwrap();
328        assert_eq!(data.len(), 5);
329        assert_eq!(engine.metrics().generated_count(), 5);
330    }
331
332    #[test]
333    fn test_parallel_generation() {
334        let config = GenConfig {
335            strategy: GenerationStrategy::Random,
336            batch_size: 100,
337            ..Default::default()
338        };
339        let engine = CoreEngine::new(config);
340
341        let result = engine.generate_batch(1000);
342        assert!(result.is_ok());
343
344        let data = result.unwrap();
345        assert_eq!(data.len(), 1000);
346    }
347
348    #[test]
349    fn test_metrics() {
350        let engine = CoreEngine::new(GenConfig::default());
351
352        // 生成一些数据
353        let _ = engine.generate_batch(100);
354
355        let metrics = engine.metrics();
356        assert_eq!(metrics.generated_count(), 100);
357        assert_eq!(metrics.error_count(), 0);
358
359        // 测试Prometheus导出
360        let prometheus_output = metrics.export_prometheus();
361        assert!(prometheus_output.contains("dataforge_generated_total 100"));
362    }
363
364    #[test]
365    fn test_null_probability() {
366        let config = GenConfig {
367            null_probability: 1.0, // 100%概率生成null
368            ..Default::default()
369        };
370        let engine = CoreEngine::new(config);
371
372        // 多次测试,应该有null值
373        let mut null_count = 0;
374        for _ in 0..100 {
375            if engine.should_generate_null() {
376                null_count += 1;
377            }
378        }
379
380        // 由于概率是1.0,应该全部为null
381        assert_eq!(null_count, 100);
382    }
383}