Skip to main content

mabi_core/
engine.rs

1//! Simulator engine - orchestrates all devices and protocols.
2
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::time::{Duration, Instant};
5
6use dashmap::DashMap;
7use parking_lot::RwLock;
8use tokio::sync::broadcast;
9use tracing::{debug, error, info, instrument};
10
11use crate::config::EngineConfig;
12use crate::device::{BoxedDevice, DeviceHandle, DeviceInfo, DeviceState};
13use crate::error::{Error, Result};
14use crate::metrics::MetricsCollector;
15use crate::protocol::Protocol;
16use crate::types::DataPoint;
17use crate::value::Value;
18
19/// Engine state.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum EngineState {
22    /// Engine is stopped.
23    Stopped,
24    /// Engine is starting.
25    Starting,
26    /// Engine is running.
27    Running,
28    /// Engine is stopping.
29    Stopping,
30    /// Engine has an error.
31    Error,
32}
33
34/// Simulator engine that manages all virtual devices.
35pub struct SimulatorEngine {
36    /// Engine configuration.
37    config: EngineConfig,
38
39    /// Current state.
40    state: RwLock<EngineState>,
41
42    /// All managed devices.
43    devices: DashMap<String, DeviceHandle>,
44
45    /// Devices by protocol.
46    devices_by_protocol: DashMap<Protocol, Vec<String>>,
47
48    /// Metrics collector.
49    metrics: MetricsCollector,
50
51    /// Start time.
52    start_time: RwLock<Option<Instant>>,
53
54    /// Shutdown flag.
55    shutdown: AtomicBool,
56
57    /// Event broadcaster.
58    event_tx: broadcast::Sender<EngineEvent>,
59
60    /// Tick counter.
61    tick_count: AtomicU64,
62}
63
64impl SimulatorEngine {
65    /// Create a new simulator engine.
66    pub fn new(config: EngineConfig) -> Self {
67        let (event_tx, _) = broadcast::channel(10_000);
68
69        Self {
70            config,
71            state: RwLock::new(EngineState::Stopped),
72            devices: DashMap::new(),
73            devices_by_protocol: DashMap::new(),
74            metrics: MetricsCollector::new(),
75            start_time: RwLock::new(None),
76            shutdown: AtomicBool::new(false),
77            event_tx,
78            tick_count: AtomicU64::new(0),
79        }
80    }
81
82    /// Get the engine configuration.
83    pub fn config(&self) -> &EngineConfig {
84        &self.config
85    }
86
87    /// Get the current engine state.
88    pub fn state(&self) -> EngineState {
89        *self.state.read()
90    }
91
92    /// Get metrics collector.
93    pub fn metrics(&self) -> &MetricsCollector {
94        &self.metrics
95    }
96
97    /// Subscribe to engine events.
98    pub fn subscribe(&self) -> broadcast::Receiver<EngineEvent> {
99        self.event_tx.subscribe()
100    }
101
102    /// Get device count.
103    pub fn device_count(&self) -> usize {
104        self.devices.len()
105    }
106
107    /// Get total data points across all devices.
108    pub fn point_count(&self) -> usize {
109        self.devices
110            .iter()
111            .map(|d| d.value().info().point_count)
112            .sum()
113    }
114
115    /// Add a device to the engine.
116    #[instrument(skip(self, device), fields(device_id))]
117    pub async fn add_device(&self, device: BoxedDevice) -> Result<()> {
118        let device_id = device.id().to_string();
119        tracing::Span::current().record("device_id", &device_id);
120
121        // Check capacity
122        if self.devices.len() >= self.config.max_devices {
123            return Err(Error::capacity_exceeded(
124                self.devices.len(),
125                self.config.max_devices,
126                "devices",
127            ));
128        }
129
130        // Check if device already exists
131        if self.devices.contains_key(&device_id) {
132            return Err(Error::DeviceAlreadyExists { device_id });
133        }
134
135        let protocol = device.protocol();
136        let handle = DeviceHandle::new(device);
137
138        // Initialize device
139        handle.initialize().await?;
140
141        // Add to maps
142        self.devices.insert(device_id.clone(), handle);
143        self.devices_by_protocol
144            .entry(protocol)
145            .or_default()
146            .push(device_id.clone());
147
148        // Update metrics
149        self.metrics.set_devices_active(self.devices.len() as i64);
150
151        // Broadcast event
152        let _ = self.event_tx.send(EngineEvent::DeviceAdded {
153            device_id: device_id.clone(),
154            protocol,
155        });
156
157        info!(device_id = %device_id, protocol = ?protocol, "Device added");
158        Ok(())
159    }
160
161    /// Remove a device from the engine.
162    #[instrument(skip(self))]
163    pub async fn remove_device(&self, device_id: &str) -> Result<()> {
164        let (_, handle) = self
165            .devices
166            .remove(device_id)
167            .ok_or_else(|| Error::device_not_found(device_id))?;
168
169        let protocol = handle.info().protocol;
170
171        // Stop device
172        handle.stop().await?;
173
174        // Remove from protocol map
175        if let Some(mut devices) = self.devices_by_protocol.get_mut(&protocol) {
176            devices.retain(|id| id != device_id);
177        }
178
179        // Update metrics
180        self.metrics.set_devices_active(self.devices.len() as i64);
181
182        // Broadcast event
183        let _ = self.event_tx.send(EngineEvent::DeviceRemoved {
184            device_id: device_id.to_string(),
185        });
186
187        info!(device_id = %device_id, "Device removed");
188        Ok(())
189    }
190
191    /// Get device by ID.
192    pub fn device(&self, device_id: &str) -> Option<DeviceHandle> {
193        self.devices.get(device_id).map(|d| d.value().clone())
194    }
195
196    /// List all devices.
197    pub fn list_devices(&self) -> Vec<DeviceInfo> {
198        self.devices.iter().map(|d| d.value().info()).collect()
199    }
200
201    /// List devices by protocol.
202    pub fn list_devices_by_protocol(&self, protocol: Protocol) -> Vec<DeviceInfo> {
203        if let Some(device_ids) = self.devices_by_protocol.get(&protocol) {
204            device_ids
205                .iter()
206                .filter_map(|id| self.devices.get(id))
207                .map(|d| d.value().info())
208                .collect()
209        } else {
210            Vec::new()
211        }
212    }
213
214    /// Read a data point.
215    #[instrument(skip(self))]
216    pub async fn read_point(&self, device_id: &str, point_id: &str) -> Result<DataPoint> {
217        let device = self
218            .device(device_id)
219            .ok_or_else(|| Error::device_not_found(device_id))?;
220
221        let start = Instant::now();
222        let result = device.read(point_id).await;
223        let duration = start.elapsed();
224
225        let protocol = device.info().protocol.to_string();
226        self.metrics
227            .record_read(&protocol, result.is_ok(), duration);
228
229        result
230    }
231
232    /// Write a data point.
233    #[instrument(skip(self, value))]
234    pub async fn write_point(&self, device_id: &str, point_id: &str, value: Value) -> Result<()> {
235        let device = self
236            .device(device_id)
237            .ok_or_else(|| Error::device_not_found(device_id))?;
238
239        let start = Instant::now();
240        let result = device.write(point_id, value).await;
241        let duration = start.elapsed();
242
243        let protocol = device.info().protocol.to_string();
244        self.metrics
245            .record_write(&protocol, result.is_ok(), duration);
246
247        result
248    }
249
250    /// Start the engine.
251    #[instrument(skip(self))]
252    pub async fn start(&self) -> Result<()> {
253        {
254            let mut state = self.state.write();
255            if *state != EngineState::Stopped {
256                return Err(Error::Engine("Engine is not stopped".into()));
257            }
258            *state = EngineState::Starting;
259        }
260
261        info!("Starting simulator engine");
262        self.shutdown.store(false, Ordering::SeqCst);
263        *self.start_time.write() = Some(Instant::now());
264
265        // Start all devices
266        for device in self.devices.iter() {
267            if let Err(e) = device.value().start().await {
268                error!(device_id = %device.key(), error = %e, "Failed to start device");
269            }
270        }
271
272        {
273            let mut state = self.state.write();
274            *state = EngineState::Running;
275        }
276
277        let _ = self.event_tx.send(EngineEvent::Started);
278        info!("Simulator engine started");
279        Ok(())
280    }
281
282    /// Stop the engine.
283    #[instrument(skip(self))]
284    pub async fn stop(&self) -> Result<()> {
285        {
286            let mut state = self.state.write();
287            if *state != EngineState::Running {
288                return Err(Error::Engine("Engine is not running".into()));
289            }
290            *state = EngineState::Stopping;
291        }
292
293        info!("Stopping simulator engine");
294        self.shutdown.store(true, Ordering::SeqCst);
295
296        // Stop all devices
297        for device in self.devices.iter() {
298            if let Err(e) = device.value().stop().await {
299                error!(device_id = %device.key(), error = %e, "Failed to stop device");
300            }
301        }
302
303        {
304            let mut state = self.state.write();
305            *state = EngineState::Stopped;
306        }
307
308        let _ = self.event_tx.send(EngineEvent::Stopped);
309        info!("Simulator engine stopped");
310        Ok(())
311    }
312
313    /// Process one tick (update all devices).
314    #[instrument(skip(self))]
315    pub async fn tick(&self) -> Result<()> {
316        if self.state() != EngineState::Running {
317            return Ok(());
318        }
319
320        let start = Instant::now();
321        let tick_num = self.tick_count.fetch_add(1, Ordering::Relaxed);
322
323        // Process all devices in parallel
324        let futures: Vec<_> = self
325            .devices
326            .iter()
327            .map(|d| {
328                let handle = d.value().clone();
329                async move {
330                    if let Err(e) = handle.tick().await {
331                        error!(device_id = %d.key(), error = %e, "Device tick failed");
332                    }
333                }
334            })
335            .collect();
336
337        futures::future::join_all(futures).await;
338
339        let duration = start.elapsed();
340        self.metrics.record_tick(duration);
341
342        if tick_num % 1000 == 0 {
343            debug!(
344                tick = tick_num,
345                devices = self.devices.len(),
346                duration_ms = duration.as_millis(),
347                "Engine tick"
348            );
349        }
350
351        Ok(())
352    }
353
354    /// Run the engine (blocking).
355    #[instrument(skip(self))]
356    pub async fn run(&self) -> Result<()> {
357        self.start().await?;
358
359        let tick_interval = self.config.tick_interval();
360
361        while !self.shutdown.load(Ordering::SeqCst) {
362            let tick_start = Instant::now();
363
364            self.tick().await?;
365
366            let tick_duration = tick_start.elapsed();
367            if tick_duration < tick_interval {
368                tokio::time::sleep(tick_interval - tick_duration).await;
369            }
370        }
371
372        self.stop().await?;
373        Ok(())
374    }
375
376    /// Get uptime.
377    pub fn uptime(&self) -> Option<Duration> {
378        self.start_time.read().map(|t| t.elapsed())
379    }
380
381    /// Get tick count.
382    pub fn tick_count(&self) -> u64 {
383        self.tick_count.load(Ordering::Relaxed)
384    }
385
386    /// Check if shutdown is requested.
387    pub fn is_shutdown_requested(&self) -> bool {
388        self.shutdown.load(Ordering::SeqCst)
389    }
390
391    /// Request shutdown.
392    pub fn request_shutdown(&self) {
393        self.shutdown.store(true, Ordering::SeqCst);
394    }
395}
396
397impl Drop for SimulatorEngine {
398    fn drop(&mut self) {
399        self.shutdown.store(true, Ordering::SeqCst);
400    }
401}
402
403/// Engine events.
404#[derive(Debug, Clone)]
405pub enum EngineEvent {
406    /// Engine started.
407    Started,
408    /// Engine stopped.
409    Stopped,
410    /// Device added.
411    DeviceAdded {
412        device_id: String,
413        protocol: Protocol,
414    },
415    /// Device removed.
416    DeviceRemoved { device_id: String },
417    /// Device state changed.
418    DeviceStateChanged {
419        device_id: String,
420        old_state: DeviceState,
421        new_state: DeviceState,
422    },
423    /// Error occurred.
424    Error { message: String },
425}
426
427/// Builder for creating SimulatorEngine with fluent API.
428///
429/// # Example
430///
431/// ```rust,ignore
432/// use mabi_core::engine::{SimulatorEngineBuilder, SimulatorEngine};
433///
434/// let engine = SimulatorEngineBuilder::new()
435///     .name("HVAC Simulator")
436///     .max_devices(10_000)
437///     .tick_interval(Duration::from_millis(50))
438///     .enable_metrics(true)
439///     .build();
440/// ```
441#[derive(Default)]
442pub struct SimulatorEngineBuilder {
443    config: EngineConfig,
444}
445
446impl SimulatorEngineBuilder {
447    /// Create a new engine builder.
448    pub fn new() -> Self {
449        Self::default()
450    }
451
452    /// Set the engine name.
453    pub fn name(mut self, name: impl Into<String>) -> Self {
454        self.config.name = name.into();
455        self
456    }
457
458    /// Set the maximum number of devices.
459    pub fn max_devices(mut self, max: usize) -> Self {
460        self.config.max_devices = max;
461        self
462    }
463
464    /// Set the maximum number of data points.
465    pub fn max_points(mut self, max: usize) -> Self {
466        self.config.max_points = max;
467        self
468    }
469
470    /// Set the tick interval.
471    pub fn tick_interval(mut self, interval: Duration) -> Self {
472        self.config.tick_interval_ms = interval.as_millis() as u64;
473        self
474    }
475
476    /// Set the tick interval in milliseconds.
477    pub fn tick_interval_ms(mut self, ms: u64) -> Self {
478        self.config.tick_interval_ms = ms;
479        self
480    }
481
482    /// Set the number of worker threads.
483    pub fn workers(mut self, count: usize) -> Self {
484        self.config.workers = count;
485        self
486    }
487
488    /// Enable or disable metrics collection.
489    pub fn enable_metrics(mut self, enable: bool) -> Self {
490        self.config.enable_metrics = enable;
491        self
492    }
493
494    /// Set the metrics export interval.
495    pub fn metrics_interval(mut self, interval: Duration) -> Self {
496        self.config.metrics_interval_secs = interval.as_secs();
497        self
498    }
499
500    /// Set the log level.
501    pub fn log_level(mut self, level: impl Into<String>) -> Self {
502        self.config.log_level = level.into();
503        self
504    }
505
506    /// Apply a configuration preset.
507    pub fn preset(mut self, preset: EnginePreset) -> Self {
508        match preset {
509            EnginePreset::Development => {
510                self.config.max_devices = 100;
511                self.config.max_points = 10_000;
512                self.config.tick_interval_ms = 500;
513                self.config.log_level = "debug".to_string();
514            }
515            EnginePreset::Production => {
516                self.config.max_devices = 50_000;
517                self.config.max_points = 5_000_000;
518                self.config.tick_interval_ms = 100;
519                self.config.log_level = "info".to_string();
520            }
521            EnginePreset::Testing => {
522                self.config.max_devices = 10;
523                self.config.max_points = 100;
524                self.config.tick_interval_ms = 10;
525                self.config.log_level = "trace".to_string();
526            }
527            EnginePreset::StressTest => {
528                self.config.max_devices = 100_000;
529                self.config.max_points = 10_000_000;
530                self.config.tick_interval_ms = 50;
531                self.config.log_level = "warn".to_string();
532            }
533        }
534        self
535    }
536
537    /// Use an existing configuration.
538    pub fn with_config(mut self, config: EngineConfig) -> Self {
539        self.config = config;
540        self
541    }
542
543    /// Build the simulator engine.
544    pub fn build(self) -> SimulatorEngine {
545        SimulatorEngine::new(self.config)
546    }
547
548    /// Build and start the engine.
549    pub async fn build_and_start(self) -> Result<SimulatorEngine> {
550        let engine = self.build();
551        engine.start().await?;
552        Ok(engine)
553    }
554}
555
556/// Engine configuration presets.
557#[derive(Debug, Clone, Copy, PartialEq, Eq)]
558pub enum EnginePreset {
559    /// Development preset: small scale, verbose logging.
560    Development,
561    /// Production preset: large scale, optimized settings.
562    Production,
563    /// Testing preset: minimal scale, fast ticks.
564    Testing,
565    /// Stress test preset: maximum scale.
566    StressTest,
567}
568
569/// Shorthand function for creating an engine builder.
570pub fn engine() -> SimulatorEngineBuilder {
571    SimulatorEngineBuilder::new()
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577
578    #[tokio::test]
579    async fn test_engine_creation() {
580        let config = EngineConfig::default();
581        let engine = SimulatorEngine::new(config);
582
583        assert_eq!(engine.state(), EngineState::Stopped);
584        assert_eq!(engine.device_count(), 0);
585    }
586
587    #[tokio::test]
588    async fn test_engine_lifecycle() {
589        let config = EngineConfig::default();
590        let engine = SimulatorEngine::new(config);
591
592        // Start
593        engine.start().await.unwrap();
594        assert_eq!(engine.state(), EngineState::Running);
595
596        // Run a few ticks
597        for _ in 0..5 {
598            engine.tick().await.unwrap();
599        }
600        assert_eq!(engine.tick_count(), 5);
601
602        // Stop
603        engine.stop().await.unwrap();
604        assert_eq!(engine.state(), EngineState::Stopped);
605    }
606
607    #[tokio::test]
608    async fn test_engine_capacity() {
609        let config = EngineConfig::default().with_max_devices(5);
610        let engine = SimulatorEngine::new(config);
611
612        assert_eq!(engine.config().max_devices, 5);
613    }
614
615    #[tokio::test]
616    async fn test_engine_builder() {
617        let engine = SimulatorEngineBuilder::new()
618            .name("Test Engine")
619            .max_devices(1000)
620            .tick_interval_ms(50)
621            .build();
622
623        assert_eq!(engine.config().name, "Test Engine");
624        assert_eq!(engine.config().max_devices, 1000);
625        assert_eq!(engine.config().tick_interval_ms, 50);
626    }
627
628    #[tokio::test]
629    async fn test_engine_builder_presets() {
630        let dev_engine = SimulatorEngineBuilder::new()
631            .preset(EnginePreset::Development)
632            .build();
633        assert_eq!(dev_engine.config().max_devices, 100);
634        assert_eq!(dev_engine.config().log_level, "debug");
635
636        let prod_engine = SimulatorEngineBuilder::new()
637            .preset(EnginePreset::Production)
638            .build();
639        assert_eq!(prod_engine.config().max_devices, 50_000);
640        assert_eq!(prod_engine.config().log_level, "info");
641
642        let stress_engine = SimulatorEngineBuilder::new()
643            .preset(EnginePreset::StressTest)
644            .build();
645        assert_eq!(stress_engine.config().max_devices, 100_000);
646    }
647
648    #[tokio::test]
649    async fn test_engine_shorthand() {
650        let e = engine()
651            .name("Quick Engine")
652            .max_devices(500)
653            .build();
654
655        assert_eq!(e.config().name, "Quick Engine");
656        assert_eq!(e.config().max_devices, 500);
657    }
658}