1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::time::{Duration, Instant};
5
6use dashmap::DashMap;
7use parking_lot::RwLock;
8use tokio::sync::broadcast;
9use tracing::{debug, error, info, instrument};
10
11use crate::config::EngineConfig;
12use crate::device::{BoxedDevice, DeviceHandle, DeviceInfo, DeviceState};
13use crate::error::{Error, Result};
14use crate::metrics::MetricsCollector;
15use crate::protocol::Protocol;
16use crate::types::DataPoint;
17use crate::value::Value;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum EngineState {
22 Stopped,
24 Starting,
26 Running,
28 Stopping,
30 Error,
32}
33
34pub struct SimulatorEngine {
36 config: EngineConfig,
38
39 state: RwLock<EngineState>,
41
42 devices: DashMap<String, DeviceHandle>,
44
45 devices_by_protocol: DashMap<Protocol, Vec<String>>,
47
48 metrics: MetricsCollector,
50
51 start_time: RwLock<Option<Instant>>,
53
54 shutdown: AtomicBool,
56
57 event_tx: broadcast::Sender<EngineEvent>,
59
60 tick_count: AtomicU64,
62}
63
64impl SimulatorEngine {
65 pub fn new(config: EngineConfig) -> Self {
67 let (event_tx, _) = broadcast::channel(10_000);
68
69 Self {
70 config,
71 state: RwLock::new(EngineState::Stopped),
72 devices: DashMap::new(),
73 devices_by_protocol: DashMap::new(),
74 metrics: MetricsCollector::new(),
75 start_time: RwLock::new(None),
76 shutdown: AtomicBool::new(false),
77 event_tx,
78 tick_count: AtomicU64::new(0),
79 }
80 }
81
82 pub fn config(&self) -> &EngineConfig {
84 &self.config
85 }
86
87 pub fn state(&self) -> EngineState {
89 *self.state.read()
90 }
91
92 pub fn metrics(&self) -> &MetricsCollector {
94 &self.metrics
95 }
96
97 pub fn subscribe(&self) -> broadcast::Receiver<EngineEvent> {
99 self.event_tx.subscribe()
100 }
101
102 pub fn device_count(&self) -> usize {
104 self.devices.len()
105 }
106
107 pub fn point_count(&self) -> usize {
109 self.devices
110 .iter()
111 .map(|d| d.value().info().point_count)
112 .sum()
113 }
114
115 #[instrument(skip(self, device), fields(device_id))]
117 pub async fn add_device(&self, device: BoxedDevice) -> Result<()> {
118 let device_id = device.id().to_string();
119 tracing::Span::current().record("device_id", &device_id);
120
121 if self.devices.len() >= self.config.max_devices {
123 return Err(Error::capacity_exceeded(
124 self.devices.len(),
125 self.config.max_devices,
126 "devices",
127 ));
128 }
129
130 if self.devices.contains_key(&device_id) {
132 return Err(Error::DeviceAlreadyExists { device_id });
133 }
134
135 let protocol = device.protocol();
136 let handle = DeviceHandle::new(device);
137
138 handle.initialize().await?;
140
141 self.devices.insert(device_id.clone(), handle);
143 self.devices_by_protocol
144 .entry(protocol)
145 .or_default()
146 .push(device_id.clone());
147
148 self.metrics.set_devices_active(self.devices.len() as i64);
150
151 let _ = self.event_tx.send(EngineEvent::DeviceAdded {
153 device_id: device_id.clone(),
154 protocol,
155 });
156
157 info!(device_id = %device_id, protocol = ?protocol, "Device added");
158 Ok(())
159 }
160
161 #[instrument(skip(self))]
163 pub async fn remove_device(&self, device_id: &str) -> Result<()> {
164 let (_, handle) = self
165 .devices
166 .remove(device_id)
167 .ok_or_else(|| Error::device_not_found(device_id))?;
168
169 let protocol = handle.info().protocol;
170
171 handle.stop().await?;
173
174 if let Some(mut devices) = self.devices_by_protocol.get_mut(&protocol) {
176 devices.retain(|id| id != device_id);
177 }
178
179 self.metrics.set_devices_active(self.devices.len() as i64);
181
182 let _ = self.event_tx.send(EngineEvent::DeviceRemoved {
184 device_id: device_id.to_string(),
185 });
186
187 info!(device_id = %device_id, "Device removed");
188 Ok(())
189 }
190
191 pub fn device(&self, device_id: &str) -> Option<DeviceHandle> {
193 self.devices.get(device_id).map(|d| d.value().clone())
194 }
195
196 pub fn list_devices(&self) -> Vec<DeviceInfo> {
198 self.devices.iter().map(|d| d.value().info()).collect()
199 }
200
201 pub fn list_devices_by_protocol(&self, protocol: Protocol) -> Vec<DeviceInfo> {
203 if let Some(device_ids) = self.devices_by_protocol.get(&protocol) {
204 device_ids
205 .iter()
206 .filter_map(|id| self.devices.get(id))
207 .map(|d| d.value().info())
208 .collect()
209 } else {
210 Vec::new()
211 }
212 }
213
214 #[instrument(skip(self))]
216 pub async fn read_point(&self, device_id: &str, point_id: &str) -> Result<DataPoint> {
217 let device = self
218 .device(device_id)
219 .ok_or_else(|| Error::device_not_found(device_id))?;
220
221 let start = Instant::now();
222 let result = device.read(point_id).await;
223 let duration = start.elapsed();
224
225 let protocol = device.info().protocol.to_string();
226 self.metrics
227 .record_read(&protocol, result.is_ok(), duration);
228
229 result
230 }
231
232 #[instrument(skip(self, value))]
234 pub async fn write_point(&self, device_id: &str, point_id: &str, value: Value) -> Result<()> {
235 let device = self
236 .device(device_id)
237 .ok_or_else(|| Error::device_not_found(device_id))?;
238
239 let start = Instant::now();
240 let result = device.write(point_id, value).await;
241 let duration = start.elapsed();
242
243 let protocol = device.info().protocol.to_string();
244 self.metrics
245 .record_write(&protocol, result.is_ok(), duration);
246
247 result
248 }
249
250 #[instrument(skip(self))]
252 pub async fn start(&self) -> Result<()> {
253 {
254 let mut state = self.state.write();
255 if *state != EngineState::Stopped {
256 return Err(Error::Engine("Engine is not stopped".into()));
257 }
258 *state = EngineState::Starting;
259 }
260
261 info!("Starting simulator engine");
262 self.shutdown.store(false, Ordering::SeqCst);
263 *self.start_time.write() = Some(Instant::now());
264
265 for device in self.devices.iter() {
267 if let Err(e) = device.value().start().await {
268 error!(device_id = %device.key(), error = %e, "Failed to start device");
269 }
270 }
271
272 {
273 let mut state = self.state.write();
274 *state = EngineState::Running;
275 }
276
277 let _ = self.event_tx.send(EngineEvent::Started);
278 info!("Simulator engine started");
279 Ok(())
280 }
281
282 #[instrument(skip(self))]
284 pub async fn stop(&self) -> Result<()> {
285 {
286 let mut state = self.state.write();
287 if *state != EngineState::Running {
288 return Err(Error::Engine("Engine is not running".into()));
289 }
290 *state = EngineState::Stopping;
291 }
292
293 info!("Stopping simulator engine");
294 self.shutdown.store(true, Ordering::SeqCst);
295
296 for device in self.devices.iter() {
298 if let Err(e) = device.value().stop().await {
299 error!(device_id = %device.key(), error = %e, "Failed to stop device");
300 }
301 }
302
303 {
304 let mut state = self.state.write();
305 *state = EngineState::Stopped;
306 }
307
308 let _ = self.event_tx.send(EngineEvent::Stopped);
309 info!("Simulator engine stopped");
310 Ok(())
311 }
312
313 #[instrument(skip(self))]
315 pub async fn tick(&self) -> Result<()> {
316 if self.state() != EngineState::Running {
317 return Ok(());
318 }
319
320 let start = Instant::now();
321 let tick_num = self.tick_count.fetch_add(1, Ordering::Relaxed);
322
323 let futures: Vec<_> = self
325 .devices
326 .iter()
327 .map(|d| {
328 let handle = d.value().clone();
329 async move {
330 if let Err(e) = handle.tick().await {
331 error!(device_id = %d.key(), error = %e, "Device tick failed");
332 }
333 }
334 })
335 .collect();
336
337 futures::future::join_all(futures).await;
338
339 let duration = start.elapsed();
340 self.metrics.record_tick(duration);
341
342 if tick_num % 1000 == 0 {
343 debug!(
344 tick = tick_num,
345 devices = self.devices.len(),
346 duration_ms = duration.as_millis(),
347 "Engine tick"
348 );
349 }
350
351 Ok(())
352 }
353
354 #[instrument(skip(self))]
356 pub async fn run(&self) -> Result<()> {
357 self.start().await?;
358
359 let tick_interval = self.config.tick_interval();
360
361 while !self.shutdown.load(Ordering::SeqCst) {
362 let tick_start = Instant::now();
363
364 self.tick().await?;
365
366 let tick_duration = tick_start.elapsed();
367 if tick_duration < tick_interval {
368 tokio::time::sleep(tick_interval - tick_duration).await;
369 }
370 }
371
372 self.stop().await?;
373 Ok(())
374 }
375
376 pub fn uptime(&self) -> Option<Duration> {
378 self.start_time.read().map(|t| t.elapsed())
379 }
380
381 pub fn tick_count(&self) -> u64 {
383 self.tick_count.load(Ordering::Relaxed)
384 }
385
386 pub fn is_shutdown_requested(&self) -> bool {
388 self.shutdown.load(Ordering::SeqCst)
389 }
390
391 pub fn request_shutdown(&self) {
393 self.shutdown.store(true, Ordering::SeqCst);
394 }
395}
396
397impl Drop for SimulatorEngine {
398 fn drop(&mut self) {
399 self.shutdown.store(true, Ordering::SeqCst);
400 }
401}
402
403#[derive(Debug, Clone)]
405pub enum EngineEvent {
406 Started,
408 Stopped,
410 DeviceAdded {
412 device_id: String,
413 protocol: Protocol,
414 },
415 DeviceRemoved { device_id: String },
417 DeviceStateChanged {
419 device_id: String,
420 old_state: DeviceState,
421 new_state: DeviceState,
422 },
423 Error { message: String },
425}
426
427#[derive(Default)]
442pub struct SimulatorEngineBuilder {
443 config: EngineConfig,
444}
445
446impl SimulatorEngineBuilder {
447 pub fn new() -> Self {
449 Self::default()
450 }
451
452 pub fn name(mut self, name: impl Into<String>) -> Self {
454 self.config.name = name.into();
455 self
456 }
457
458 pub fn max_devices(mut self, max: usize) -> Self {
460 self.config.max_devices = max;
461 self
462 }
463
464 pub fn max_points(mut self, max: usize) -> Self {
466 self.config.max_points = max;
467 self
468 }
469
470 pub fn tick_interval(mut self, interval: Duration) -> Self {
472 self.config.tick_interval_ms = interval.as_millis() as u64;
473 self
474 }
475
476 pub fn tick_interval_ms(mut self, ms: u64) -> Self {
478 self.config.tick_interval_ms = ms;
479 self
480 }
481
482 pub fn workers(mut self, count: usize) -> Self {
484 self.config.workers = count;
485 self
486 }
487
488 pub fn enable_metrics(mut self, enable: bool) -> Self {
490 self.config.enable_metrics = enable;
491 self
492 }
493
494 pub fn metrics_interval(mut self, interval: Duration) -> Self {
496 self.config.metrics_interval_secs = interval.as_secs();
497 self
498 }
499
500 pub fn log_level(mut self, level: impl Into<String>) -> Self {
502 self.config.log_level = level.into();
503 self
504 }
505
506 pub fn preset(mut self, preset: EnginePreset) -> Self {
508 match preset {
509 EnginePreset::Development => {
510 self.config.max_devices = 100;
511 self.config.max_points = 10_000;
512 self.config.tick_interval_ms = 500;
513 self.config.log_level = "debug".to_string();
514 }
515 EnginePreset::Production => {
516 self.config.max_devices = 50_000;
517 self.config.max_points = 5_000_000;
518 self.config.tick_interval_ms = 100;
519 self.config.log_level = "info".to_string();
520 }
521 EnginePreset::Testing => {
522 self.config.max_devices = 10;
523 self.config.max_points = 100;
524 self.config.tick_interval_ms = 10;
525 self.config.log_level = "trace".to_string();
526 }
527 EnginePreset::StressTest => {
528 self.config.max_devices = 100_000;
529 self.config.max_points = 10_000_000;
530 self.config.tick_interval_ms = 50;
531 self.config.log_level = "warn".to_string();
532 }
533 }
534 self
535 }
536
537 pub fn with_config(mut self, config: EngineConfig) -> Self {
539 self.config = config;
540 self
541 }
542
543 pub fn build(self) -> SimulatorEngine {
545 SimulatorEngine::new(self.config)
546 }
547
548 pub async fn build_and_start(self) -> Result<SimulatorEngine> {
550 let engine = self.build();
551 engine.start().await?;
552 Ok(engine)
553 }
554}
555
556#[derive(Debug, Clone, Copy, PartialEq, Eq)]
558pub enum EnginePreset {
559 Development,
561 Production,
563 Testing,
565 StressTest,
567}
568
569pub fn engine() -> SimulatorEngineBuilder {
571 SimulatorEngineBuilder::new()
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577
578 #[tokio::test]
579 async fn test_engine_creation() {
580 let config = EngineConfig::default();
581 let engine = SimulatorEngine::new(config);
582
583 assert_eq!(engine.state(), EngineState::Stopped);
584 assert_eq!(engine.device_count(), 0);
585 }
586
587 #[tokio::test]
588 async fn test_engine_lifecycle() {
589 let config = EngineConfig::default();
590 let engine = SimulatorEngine::new(config);
591
592 engine.start().await.unwrap();
594 assert_eq!(engine.state(), EngineState::Running);
595
596 for _ in 0..5 {
598 engine.tick().await.unwrap();
599 }
600 assert_eq!(engine.tick_count(), 5);
601
602 engine.stop().await.unwrap();
604 assert_eq!(engine.state(), EngineState::Stopped);
605 }
606
607 #[tokio::test]
608 async fn test_engine_capacity() {
609 let config = EngineConfig::default().with_max_devices(5);
610 let engine = SimulatorEngine::new(config);
611
612 assert_eq!(engine.config().max_devices, 5);
613 }
614
615 #[tokio::test]
616 async fn test_engine_builder() {
617 let engine = SimulatorEngineBuilder::new()
618 .name("Test Engine")
619 .max_devices(1000)
620 .tick_interval_ms(50)
621 .build();
622
623 assert_eq!(engine.config().name, "Test Engine");
624 assert_eq!(engine.config().max_devices, 1000);
625 assert_eq!(engine.config().tick_interval_ms, 50);
626 }
627
628 #[tokio::test]
629 async fn test_engine_builder_presets() {
630 let dev_engine = SimulatorEngineBuilder::new()
631 .preset(EnginePreset::Development)
632 .build();
633 assert_eq!(dev_engine.config().max_devices, 100);
634 assert_eq!(dev_engine.config().log_level, "debug");
635
636 let prod_engine = SimulatorEngineBuilder::new()
637 .preset(EnginePreset::Production)
638 .build();
639 assert_eq!(prod_engine.config().max_devices, 50_000);
640 assert_eq!(prod_engine.config().log_level, "info");
641
642 let stress_engine = SimulatorEngineBuilder::new()
643 .preset(EnginePreset::StressTest)
644 .build();
645 assert_eq!(stress_engine.config().max_devices, 100_000);
646 }
647
648 #[tokio::test]
649 async fn test_engine_shorthand() {
650 let e = engine()
651 .name("Quick Engine")
652 .max_devices(500)
653 .build();
654
655 assert_eq!(e.config().name, "Quick Engine");
656 assert_eq!(e.config().max_devices, 500);
657 }
658}