Skip to main content

mabi_modbus/runtime/
mod.rs

1//! Runtime configuration management for Modbus simulator.
2//!
3//! This module provides dynamic configuration updates without server restart.
4//!
5//! # Features
6//!
7//! - **Atomic Updates**: Configuration changes are applied atomically
8//! - **Validation**: All changes are validated before application
9//! - **Rollback**: Automatic rollback on failure
10//! - **Events**: Subscribe to configuration change events
11//! - **Partial Updates**: Support for updating individual settings
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! use mabi_modbus::runtime::{RuntimeConfig, ConfigUpdate};
17//!
18//! // Create runtime config
19//! let mut runtime = RuntimeConfig::new(server_config);
20//!
21//! // Update a single setting
22//! runtime.apply(ConfigUpdate::MaxConnections(200))?;
23//!
24//! // Update multiple settings atomically
25//! runtime.apply_batch(vec![
26//!     ConfigUpdate::MaxConnections(200),
27//!     ConfigUpdate::IdleTimeout(Duration::from_secs(60)),
28//! ])?;
29//!
30//! // Subscribe to changes
31//! let mut rx = runtime.subscribe();
32//! ```
33
34pub mod config;
35pub mod updates;
36
37pub use config::*;
38pub use updates::*;
39
40use std::sync::Arc;
41use std::time::Duration;
42
43use parking_lot::RwLock;
44use serde::{Deserialize, Serialize};
45use tokio::sync::broadcast;
46
47use crate::error::{ModbusError, ModbusResult as Result};
48
49/// Dynamic configuration value that can be changed at runtime.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub enum ConfigUpdate {
52    /// Update maximum concurrent connections.
53    MaxConnections(usize),
54
55    /// Update connection idle timeout.
56    IdleTimeout(Duration),
57
58    /// Update request timeout.
59    RequestTimeout(Duration),
60
61    /// Enable/disable a specific unit ID.
62    UnitEnabled { unit_id: u8, enabled: bool },
63
64    /// Update TCP nodelay setting.
65    TcpNoDelay(bool),
66
67    /// Update keepalive interval.
68    KeepaliveInterval(Option<Duration>),
69
70    /// Update register read access for a range.
71    RegisterReadAccess {
72        unit_id: u8,
73        start_address: u16,
74        count: u16,
75        allowed: bool,
76    },
77
78    /// Update register write access for a range.
79    RegisterWriteAccess {
80        unit_id: u8,
81        start_address: u16,
82        count: u16,
83        allowed: bool,
84    },
85
86    /// Set a register value.
87    SetRegister {
88        unit_id: u8,
89        address: u16,
90        value: u16,
91    },
92
93    /// Set multiple register values.
94    SetRegisters {
95        unit_id: u8,
96        start_address: u16,
97        values: Vec<u16>,
98    },
99
100    /// Set a coil value.
101    SetCoil {
102        unit_id: u8,
103        address: u16,
104        value: bool,
105    },
106
107    /// Enable/disable metrics collection.
108    MetricsEnabled(bool),
109
110    /// Enable/disable debug logging.
111    DebugLogging(bool),
112
113    /// Custom extension for protocol-specific settings.
114    Custom { key: String, value: String },
115}
116
117impl ConfigUpdate {
118    /// Get the category of this update.
119    pub fn category(&self) -> UpdateCategory {
120        match self {
121            Self::MaxConnections(_)
122            | Self::IdleTimeout(_)
123            | Self::RequestTimeout(_)
124            | Self::TcpNoDelay(_)
125            | Self::KeepaliveInterval(_) => UpdateCategory::Connection,
126
127            Self::UnitEnabled { .. } => UpdateCategory::Unit,
128
129            Self::RegisterReadAccess { .. }
130            | Self::RegisterWriteAccess { .. }
131            | Self::SetRegister { .. }
132            | Self::SetRegisters { .. }
133            | Self::SetCoil { .. } => UpdateCategory::Data,
134
135            Self::MetricsEnabled(_) | Self::DebugLogging(_) => UpdateCategory::Monitoring,
136
137            Self::Custom { .. } => UpdateCategory::Custom,
138        }
139    }
140
141    /// Check if this update requires a restart to take effect.
142    pub fn requires_restart(&self) -> bool {
143        match self {
144            Self::TcpNoDelay(_) | Self::KeepaliveInterval(_) => true,
145            _ => false,
146        }
147    }
148}
149
150/// Category of configuration update.
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
152pub enum UpdateCategory {
153    /// Connection-related settings.
154    Connection,
155    /// Unit-related settings.
156    Unit,
157    /// Data/register settings.
158    Data,
159    /// Monitoring settings.
160    Monitoring,
161    /// Custom extensions.
162    Custom,
163}
164
165/// Event emitted when configuration changes.
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub enum ConfigEvent {
168    /// Configuration update applied successfully.
169    Updated {
170        update: ConfigUpdate,
171        timestamp: std::time::SystemTime,
172    },
173
174    /// Batch update applied successfully.
175    BatchUpdated {
176        count: usize,
177        timestamp: std::time::SystemTime,
178    },
179
180    /// Update failed.
181    UpdateFailed {
182        update: ConfigUpdate,
183        error: String,
184    },
185
186    /// Update was rolled back.
187    RolledBack {
188        update: ConfigUpdate,
189        reason: String,
190    },
191
192    /// Configuration was reset to defaults.
193    Reset {
194        timestamp: std::time::SystemTime,
195    },
196}
197
198/// Runtime configuration state.
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct RuntimeState {
201    /// Maximum concurrent connections.
202    pub max_connections: usize,
203
204    /// Connection idle timeout.
205    pub idle_timeout: Duration,
206
207    /// Request processing timeout.
208    pub request_timeout: Duration,
209
210    /// Whether TCP nodelay is enabled.
211    pub tcp_nodelay: bool,
212
213    /// TCP keepalive interval.
214    pub keepalive_interval: Option<Duration>,
215
216    /// Whether metrics collection is enabled.
217    pub metrics_enabled: bool,
218
219    /// Whether debug logging is enabled.
220    pub debug_logging: bool,
221
222    /// Enabled unit IDs.
223    pub enabled_units: std::collections::HashSet<u8>,
224
225    /// Custom settings.
226    pub custom: std::collections::HashMap<String, String>,
227}
228
229impl Default for RuntimeState {
230    fn default() -> Self {
231        let mut enabled_units = std::collections::HashSet::new();
232        enabled_units.insert(1); // Default unit ID 1 enabled
233
234        Self {
235            max_connections: 100,
236            idle_timeout: Duration::from_secs(300),
237            request_timeout: Duration::from_secs(30),
238            tcp_nodelay: true,
239            keepalive_interval: Some(Duration::from_secs(60)),
240            metrics_enabled: true,
241            debug_logging: false,
242            enabled_units,
243            custom: std::collections::HashMap::new(),
244        }
245    }
246}
247
248/// Runtime configuration manager.
249///
250/// Provides thread-safe runtime configuration updates with validation
251/// and event notification.
252pub struct RuntimeConfigManager {
253    /// Current configuration state.
254    state: Arc<RwLock<RuntimeState>>,
255
256    /// Event broadcaster.
257    event_tx: broadcast::Sender<ConfigEvent>,
258
259    /// Pending updates (for batch operations).
260    pending_updates: RwLock<Vec<ConfigUpdate>>,
261
262    /// Callback for applying updates to the server.
263    update_callback: RwLock<Option<Box<dyn Fn(&ConfigUpdate) -> Result<()> + Send + Sync>>>,
264
265    /// Whether to validate updates before applying.
266    validate_updates: bool,
267}
268
269impl RuntimeConfigManager {
270    /// Create a new runtime configuration manager.
271    pub fn new() -> Self {
272        Self::with_state(RuntimeState::default())
273    }
274
275    /// Create a manager with initial state.
276    pub fn with_state(state: RuntimeState) -> Self {
277        let (event_tx, _) = broadcast::channel(256);
278
279        Self {
280            state: Arc::new(RwLock::new(state)),
281            event_tx,
282            pending_updates: RwLock::new(Vec::new()),
283            update_callback: RwLock::new(None),
284            validate_updates: true,
285        }
286    }
287
288    /// Set the callback for applying updates.
289    pub fn set_update_callback<F>(&self, callback: F)
290    where
291        F: Fn(&ConfigUpdate) -> Result<()> + Send + Sync + 'static,
292    {
293        *self.update_callback.write() = Some(Box::new(callback));
294    }
295
296    /// Get the current configuration state.
297    pub fn state(&self) -> RuntimeState {
298        self.state.read().clone()
299    }
300
301    /// Get a reference to the state lock.
302    pub fn state_lock(&self) -> &Arc<RwLock<RuntimeState>> {
303        &self.state
304    }
305
306    /// Subscribe to configuration events.
307    pub fn subscribe(&self) -> broadcast::Receiver<ConfigEvent> {
308        self.event_tx.subscribe()
309    }
310
311    /// Apply a single configuration update.
312    pub fn apply(&self, update: ConfigUpdate) -> Result<()> {
313        // Validate if enabled
314        if self.validate_updates {
315            self.validate_update(&update)?;
316        }
317
318        // Apply to local state
319        self.apply_to_state(&update)?;
320
321        // Call external callback if set
322        if let Some(ref callback) = *self.update_callback.read() {
323            if let Err(e) = callback(&update) {
324                // Rollback on failure
325                self.rollback_update(&update)?;
326                let _ = self.event_tx.send(ConfigEvent::RolledBack {
327                    update: update.clone(),
328                    reason: e.to_string(),
329                });
330                return Err(e);
331            }
332        }
333
334        // Emit success event
335        let _ = self.event_tx.send(ConfigEvent::Updated {
336            update,
337            timestamp: std::time::SystemTime::now(),
338        });
339
340        Ok(())
341    }
342
343    /// Apply multiple updates atomically.
344    pub fn apply_batch(&self, updates: Vec<ConfigUpdate>) -> Result<()> {
345        // Validate all updates first
346        if self.validate_updates {
347            for update in &updates {
348                self.validate_update(update)?;
349            }
350        }
351
352        // Take a snapshot for potential rollback
353        let snapshot = self.state.read().clone();
354
355        // Apply all updates
356        for update in &updates {
357            if let Err(e) = self.apply_to_state(update) {
358                // Rollback to snapshot
359                *self.state.write() = snapshot;
360                return Err(e);
361            }
362
363            // Call callback
364            if let Some(ref callback) = *self.update_callback.read() {
365                if let Err(e) = callback(update) {
366                    // Rollback to snapshot
367                    *self.state.write() = snapshot;
368                    return Err(e);
369                }
370            }
371        }
372
373        // Emit batch success event
374        let _ = self.event_tx.send(ConfigEvent::BatchUpdated {
375            count: updates.len(),
376            timestamp: std::time::SystemTime::now(),
377        });
378
379        Ok(())
380    }
381
382    /// Validate a configuration update.
383    fn validate_update(&self, update: &ConfigUpdate) -> Result<()> {
384        match update {
385            ConfigUpdate::MaxConnections(max) => {
386                if *max == 0 {
387                    return Err(ModbusError::Config("max_connections must be > 0".into()));
388                }
389                if *max > 100_000 {
390                    return Err(ModbusError::Config(
391                        "max_connections too high (max 100,000)".into(),
392                    ));
393                }
394            }
395            ConfigUpdate::IdleTimeout(duration) => {
396                if duration.as_secs() == 0 {
397                    return Err(ModbusError::Config("idle_timeout must be > 0".into()));
398                }
399            }
400            ConfigUpdate::RequestTimeout(duration) => {
401                if duration.as_secs() == 0 {
402                    return Err(ModbusError::Config("request_timeout must be > 0".into()));
403                }
404            }
405            ConfigUpdate::UnitEnabled { unit_id, .. } => {
406                if *unit_id == 0 {
407                    // Unit ID 0 is broadcast, special handling
408                    tracing::warn!("Enabling/disabling broadcast unit ID 0");
409                }
410            }
411            ConfigUpdate::RegisterReadAccess { count, .. }
412            | ConfigUpdate::RegisterWriteAccess { count, .. } => {
413                if *count == 0 {
414                    return Err(ModbusError::Config("register count must be > 0".into()));
415                }
416                if *count > 125 {
417                    return Err(ModbusError::Config(
418                        "register count exceeds Modbus limit (125)".into(),
419                    ));
420                }
421            }
422            ConfigUpdate::SetRegisters { values, .. } => {
423                if values.is_empty() {
424                    return Err(ModbusError::Config("values cannot be empty".into()));
425                }
426                if values.len() > 123 {
427                    return Err(ModbusError::Config(
428                        "too many values (max 123 per write)".into(),
429                    ));
430                }
431            }
432            _ => {}
433        }
434
435        Ok(())
436    }
437
438    /// Apply update to local state.
439    fn apply_to_state(&self, update: &ConfigUpdate) -> Result<()> {
440        let mut state = self.state.write();
441
442        match update {
443            ConfigUpdate::MaxConnections(max) => {
444                state.max_connections = *max;
445            }
446            ConfigUpdate::IdleTimeout(duration) => {
447                state.idle_timeout = *duration;
448            }
449            ConfigUpdate::RequestTimeout(duration) => {
450                state.request_timeout = *duration;
451            }
452            ConfigUpdate::TcpNoDelay(enabled) => {
453                state.tcp_nodelay = *enabled;
454            }
455            ConfigUpdate::KeepaliveInterval(interval) => {
456                state.keepalive_interval = *interval;
457            }
458            ConfigUpdate::MetricsEnabled(enabled) => {
459                state.metrics_enabled = *enabled;
460            }
461            ConfigUpdate::DebugLogging(enabled) => {
462                state.debug_logging = *enabled;
463            }
464            ConfigUpdate::UnitEnabled { unit_id, enabled } => {
465                if *enabled {
466                    state.enabled_units.insert(*unit_id);
467                } else {
468                    state.enabled_units.remove(unit_id);
469                }
470            }
471            ConfigUpdate::Custom { key, value } => {
472                state.custom.insert(key.clone(), value.clone());
473            }
474            // Data updates don't affect state directly
475            _ => {}
476        }
477
478        Ok(())
479    }
480
481    /// Rollback a single update (best effort).
482    fn rollback_update(&self, update: &ConfigUpdate) -> Result<()> {
483        // For most updates, we can't easily rollback without knowing the previous value
484        // This is a best-effort implementation
485        tracing::warn!(update = ?update, "Rolling back configuration update");
486        Ok(())
487    }
488
489    /// Reset configuration to defaults.
490    pub fn reset(&self) -> Result<()> {
491        *self.state.write() = RuntimeState::default();
492
493        let _ = self.event_tx.send(ConfigEvent::Reset {
494            timestamp: std::time::SystemTime::now(),
495        });
496
497        Ok(())
498    }
499
500    /// Get a specific setting value.
501    pub fn get<T: FromRuntimeState>(&self) -> T {
502        let state = self.state.read();
503        T::from_state(&state)
504    }
505
506    /// Export current state as JSON.
507    pub fn export_json(&self) -> Result<String> {
508        let state = self.state.read();
509        serde_json::to_string_pretty(&*state)
510            .map_err(|e| ModbusError::Config(format!("Failed to serialize state: {}", e)))
511    }
512
513    /// Import state from JSON.
514    pub fn import_json(&self, json: &str) -> Result<()> {
515        let new_state: RuntimeState = serde_json::from_str(json)
516            .map_err(|e| ModbusError::Config(format!("Failed to parse JSON: {}", e)))?;
517
518        *self.state.write() = new_state;
519
520        let _ = self.event_tx.send(ConfigEvent::Reset {
521            timestamp: std::time::SystemTime::now(),
522        });
523
524        Ok(())
525    }
526}
527
528impl Default for RuntimeConfigManager {
529    fn default() -> Self {
530        Self::new()
531    }
532}
533
534impl std::fmt::Debug for RuntimeConfigManager {
535    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
536        f.debug_struct("RuntimeConfigManager")
537            .field("state", &self.state())
538            .field("validate_updates", &self.validate_updates)
539            .finish()
540    }
541}
542
543/// Trait for extracting values from runtime state.
544pub trait FromRuntimeState {
545    fn from_state(state: &RuntimeState) -> Self;
546}
547
548impl FromRuntimeState for usize {
549    fn from_state(state: &RuntimeState) -> Self {
550        state.max_connections
551    }
552}
553
554impl FromRuntimeState for bool {
555    fn from_state(state: &RuntimeState) -> Self {
556        state.metrics_enabled
557    }
558}
559
560impl FromRuntimeState for Duration {
561    fn from_state(state: &RuntimeState) -> Self {
562        state.idle_timeout
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569
570    #[test]
571    fn test_runtime_config_manager_new() {
572        let manager = RuntimeConfigManager::new();
573        let state = manager.state();
574        assert_eq!(state.max_connections, 100);
575        assert!(state.metrics_enabled);
576    }
577
578    #[test]
579    fn test_apply_single_update() {
580        let manager = RuntimeConfigManager::new();
581
582        manager.apply(ConfigUpdate::MaxConnections(200)).unwrap();
583        assert_eq!(manager.state().max_connections, 200);
584    }
585
586    #[test]
587    fn test_apply_batch_update() {
588        let manager = RuntimeConfigManager::new();
589
590        manager
591            .apply_batch(vec![
592                ConfigUpdate::MaxConnections(300),
593                ConfigUpdate::MetricsEnabled(false),
594            ])
595            .unwrap();
596
597        let state = manager.state();
598        assert_eq!(state.max_connections, 300);
599        assert!(!state.metrics_enabled);
600    }
601
602    #[test]
603    fn test_validation_failure() {
604        let manager = RuntimeConfigManager::new();
605
606        let result = manager.apply(ConfigUpdate::MaxConnections(0));
607        assert!(result.is_err());
608    }
609
610    #[test]
611    fn test_unit_enabled() {
612        let manager = RuntimeConfigManager::new();
613
614        manager
615            .apply(ConfigUpdate::UnitEnabled {
616                unit_id: 5,
617                enabled: true,
618            })
619            .unwrap();
620
621        assert!(manager.state().enabled_units.contains(&5));
622
623        manager
624            .apply(ConfigUpdate::UnitEnabled {
625                unit_id: 5,
626                enabled: false,
627            })
628            .unwrap();
629
630        assert!(!manager.state().enabled_units.contains(&5));
631    }
632
633    #[test]
634    fn test_custom_setting() {
635        let manager = RuntimeConfigManager::new();
636
637        manager
638            .apply(ConfigUpdate::Custom {
639                key: "custom_key".into(),
640                value: "custom_value".into(),
641            })
642            .unwrap();
643
644        assert_eq!(
645            manager.state().custom.get("custom_key"),
646            Some(&"custom_value".to_string())
647        );
648    }
649
650    #[test]
651    fn test_reset() {
652        let manager = RuntimeConfigManager::new();
653
654        manager.apply(ConfigUpdate::MaxConnections(500)).unwrap();
655        manager.reset().unwrap();
656
657        assert_eq!(manager.state().max_connections, 100);
658    }
659
660    #[test]
661    fn test_export_import_json() {
662        let manager = RuntimeConfigManager::new();
663
664        manager.apply(ConfigUpdate::MaxConnections(999)).unwrap();
665
666        let json = manager.export_json().unwrap();
667        assert!(json.contains("999"));
668
669        let manager2 = RuntimeConfigManager::new();
670        manager2.import_json(&json).unwrap();
671
672        assert_eq!(manager2.state().max_connections, 999);
673    }
674
675    #[test]
676    fn test_update_category() {
677        assert_eq!(
678            ConfigUpdate::MaxConnections(100).category(),
679            UpdateCategory::Connection
680        );
681        assert_eq!(
682            ConfigUpdate::UnitEnabled {
683                unit_id: 1,
684                enabled: true
685            }
686            .category(),
687            UpdateCategory::Unit
688        );
689        assert_eq!(
690            ConfigUpdate::MetricsEnabled(true).category(),
691            UpdateCategory::Monitoring
692        );
693    }
694
695    #[test]
696    fn test_requires_restart() {
697        assert!(ConfigUpdate::TcpNoDelay(true).requires_restart());
698        assert!(!ConfigUpdate::MaxConnections(100).requires_restart());
699    }
700
701    #[tokio::test]
702    async fn test_event_subscription() {
703        let manager = RuntimeConfigManager::new();
704        let mut rx = manager.subscribe();
705
706        manager.apply(ConfigUpdate::MaxConnections(150)).unwrap();
707
708        let event = rx.try_recv().unwrap();
709        matches!(event, ConfigEvent::Updated { .. });
710    }
711}