1pub mod config;
35pub mod updates;
36
37pub use config::*;
38pub use updates::*;
39
40use std::sync::Arc;
41use std::time::Duration;
42
43use parking_lot::RwLock;
44use serde::{Deserialize, Serialize};
45use tokio::sync::broadcast;
46
47use crate::error::{ModbusError, ModbusResult as Result};
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub enum ConfigUpdate {
52 MaxConnections(usize),
54
55 IdleTimeout(Duration),
57
58 RequestTimeout(Duration),
60
61 UnitEnabled { unit_id: u8, enabled: bool },
63
64 TcpNoDelay(bool),
66
67 KeepaliveInterval(Option<Duration>),
69
70 RegisterReadAccess {
72 unit_id: u8,
73 start_address: u16,
74 count: u16,
75 allowed: bool,
76 },
77
78 RegisterWriteAccess {
80 unit_id: u8,
81 start_address: u16,
82 count: u16,
83 allowed: bool,
84 },
85
86 SetRegister {
88 unit_id: u8,
89 address: u16,
90 value: u16,
91 },
92
93 SetRegisters {
95 unit_id: u8,
96 start_address: u16,
97 values: Vec<u16>,
98 },
99
100 SetCoil {
102 unit_id: u8,
103 address: u16,
104 value: bool,
105 },
106
107 MetricsEnabled(bool),
109
110 DebugLogging(bool),
112
113 Custom { key: String, value: String },
115}
116
117impl ConfigUpdate {
118 pub fn category(&self) -> UpdateCategory {
120 match self {
121 Self::MaxConnections(_)
122 | Self::IdleTimeout(_)
123 | Self::RequestTimeout(_)
124 | Self::TcpNoDelay(_)
125 | Self::KeepaliveInterval(_) => UpdateCategory::Connection,
126
127 Self::UnitEnabled { .. } => UpdateCategory::Unit,
128
129 Self::RegisterReadAccess { .. }
130 | Self::RegisterWriteAccess { .. }
131 | Self::SetRegister { .. }
132 | Self::SetRegisters { .. }
133 | Self::SetCoil { .. } => UpdateCategory::Data,
134
135 Self::MetricsEnabled(_) | Self::DebugLogging(_) => UpdateCategory::Monitoring,
136
137 Self::Custom { .. } => UpdateCategory::Custom,
138 }
139 }
140
141 pub fn requires_restart(&self) -> bool {
143 match self {
144 Self::TcpNoDelay(_) | Self::KeepaliveInterval(_) => true,
145 _ => false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
152pub enum UpdateCategory {
153 Connection,
155 Unit,
157 Data,
159 Monitoring,
161 Custom,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub enum ConfigEvent {
168 Updated {
170 update: ConfigUpdate,
171 timestamp: std::time::SystemTime,
172 },
173
174 BatchUpdated {
176 count: usize,
177 timestamp: std::time::SystemTime,
178 },
179
180 UpdateFailed {
182 update: ConfigUpdate,
183 error: String,
184 },
185
186 RolledBack {
188 update: ConfigUpdate,
189 reason: String,
190 },
191
192 Reset {
194 timestamp: std::time::SystemTime,
195 },
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct RuntimeState {
201 pub max_connections: usize,
203
204 pub idle_timeout: Duration,
206
207 pub request_timeout: Duration,
209
210 pub tcp_nodelay: bool,
212
213 pub keepalive_interval: Option<Duration>,
215
216 pub metrics_enabled: bool,
218
219 pub debug_logging: bool,
221
222 pub enabled_units: std::collections::HashSet<u8>,
224
225 pub custom: std::collections::HashMap<String, String>,
227}
228
229impl Default for RuntimeState {
230 fn default() -> Self {
231 let mut enabled_units = std::collections::HashSet::new();
232 enabled_units.insert(1); Self {
235 max_connections: 100,
236 idle_timeout: Duration::from_secs(300),
237 request_timeout: Duration::from_secs(30),
238 tcp_nodelay: true,
239 keepalive_interval: Some(Duration::from_secs(60)),
240 metrics_enabled: true,
241 debug_logging: false,
242 enabled_units,
243 custom: std::collections::HashMap::new(),
244 }
245 }
246}
247
248pub struct RuntimeConfigManager {
253 state: Arc<RwLock<RuntimeState>>,
255
256 event_tx: broadcast::Sender<ConfigEvent>,
258
259 pending_updates: RwLock<Vec<ConfigUpdate>>,
261
262 update_callback: RwLock<Option<Box<dyn Fn(&ConfigUpdate) -> Result<()> + Send + Sync>>>,
264
265 validate_updates: bool,
267}
268
269impl RuntimeConfigManager {
270 pub fn new() -> Self {
272 Self::with_state(RuntimeState::default())
273 }
274
275 pub fn with_state(state: RuntimeState) -> Self {
277 let (event_tx, _) = broadcast::channel(256);
278
279 Self {
280 state: Arc::new(RwLock::new(state)),
281 event_tx,
282 pending_updates: RwLock::new(Vec::new()),
283 update_callback: RwLock::new(None),
284 validate_updates: true,
285 }
286 }
287
288 pub fn set_update_callback<F>(&self, callback: F)
290 where
291 F: Fn(&ConfigUpdate) -> Result<()> + Send + Sync + 'static,
292 {
293 *self.update_callback.write() = Some(Box::new(callback));
294 }
295
296 pub fn state(&self) -> RuntimeState {
298 self.state.read().clone()
299 }
300
301 pub fn state_lock(&self) -> &Arc<RwLock<RuntimeState>> {
303 &self.state
304 }
305
306 pub fn subscribe(&self) -> broadcast::Receiver<ConfigEvent> {
308 self.event_tx.subscribe()
309 }
310
311 pub fn apply(&self, update: ConfigUpdate) -> Result<()> {
313 if self.validate_updates {
315 self.validate_update(&update)?;
316 }
317
318 self.apply_to_state(&update)?;
320
321 if let Some(ref callback) = *self.update_callback.read() {
323 if let Err(e) = callback(&update) {
324 self.rollback_update(&update)?;
326 let _ = self.event_tx.send(ConfigEvent::RolledBack {
327 update: update.clone(),
328 reason: e.to_string(),
329 });
330 return Err(e);
331 }
332 }
333
334 let _ = self.event_tx.send(ConfigEvent::Updated {
336 update,
337 timestamp: std::time::SystemTime::now(),
338 });
339
340 Ok(())
341 }
342
343 pub fn apply_batch(&self, updates: Vec<ConfigUpdate>) -> Result<()> {
345 if self.validate_updates {
347 for update in &updates {
348 self.validate_update(update)?;
349 }
350 }
351
352 let snapshot = self.state.read().clone();
354
355 for update in &updates {
357 if let Err(e) = self.apply_to_state(update) {
358 *self.state.write() = snapshot;
360 return Err(e);
361 }
362
363 if let Some(ref callback) = *self.update_callback.read() {
365 if let Err(e) = callback(update) {
366 *self.state.write() = snapshot;
368 return Err(e);
369 }
370 }
371 }
372
373 let _ = self.event_tx.send(ConfigEvent::BatchUpdated {
375 count: updates.len(),
376 timestamp: std::time::SystemTime::now(),
377 });
378
379 Ok(())
380 }
381
382 fn validate_update(&self, update: &ConfigUpdate) -> Result<()> {
384 match update {
385 ConfigUpdate::MaxConnections(max) => {
386 if *max == 0 {
387 return Err(ModbusError::Config("max_connections must be > 0".into()));
388 }
389 if *max > 100_000 {
390 return Err(ModbusError::Config(
391 "max_connections too high (max 100,000)".into(),
392 ));
393 }
394 }
395 ConfigUpdate::IdleTimeout(duration) => {
396 if duration.as_secs() == 0 {
397 return Err(ModbusError::Config("idle_timeout must be > 0".into()));
398 }
399 }
400 ConfigUpdate::RequestTimeout(duration) => {
401 if duration.as_secs() == 0 {
402 return Err(ModbusError::Config("request_timeout must be > 0".into()));
403 }
404 }
405 ConfigUpdate::UnitEnabled { unit_id, .. } => {
406 if *unit_id == 0 {
407 tracing::warn!("Enabling/disabling broadcast unit ID 0");
409 }
410 }
411 ConfigUpdate::RegisterReadAccess { count, .. }
412 | ConfigUpdate::RegisterWriteAccess { count, .. } => {
413 if *count == 0 {
414 return Err(ModbusError::Config("register count must be > 0".into()));
415 }
416 if *count > 125 {
417 return Err(ModbusError::Config(
418 "register count exceeds Modbus limit (125)".into(),
419 ));
420 }
421 }
422 ConfigUpdate::SetRegisters { values, .. } => {
423 if values.is_empty() {
424 return Err(ModbusError::Config("values cannot be empty".into()));
425 }
426 if values.len() > 123 {
427 return Err(ModbusError::Config(
428 "too many values (max 123 per write)".into(),
429 ));
430 }
431 }
432 _ => {}
433 }
434
435 Ok(())
436 }
437
438 fn apply_to_state(&self, update: &ConfigUpdate) -> Result<()> {
440 let mut state = self.state.write();
441
442 match update {
443 ConfigUpdate::MaxConnections(max) => {
444 state.max_connections = *max;
445 }
446 ConfigUpdate::IdleTimeout(duration) => {
447 state.idle_timeout = *duration;
448 }
449 ConfigUpdate::RequestTimeout(duration) => {
450 state.request_timeout = *duration;
451 }
452 ConfigUpdate::TcpNoDelay(enabled) => {
453 state.tcp_nodelay = *enabled;
454 }
455 ConfigUpdate::KeepaliveInterval(interval) => {
456 state.keepalive_interval = *interval;
457 }
458 ConfigUpdate::MetricsEnabled(enabled) => {
459 state.metrics_enabled = *enabled;
460 }
461 ConfigUpdate::DebugLogging(enabled) => {
462 state.debug_logging = *enabled;
463 }
464 ConfigUpdate::UnitEnabled { unit_id, enabled } => {
465 if *enabled {
466 state.enabled_units.insert(*unit_id);
467 } else {
468 state.enabled_units.remove(unit_id);
469 }
470 }
471 ConfigUpdate::Custom { key, value } => {
472 state.custom.insert(key.clone(), value.clone());
473 }
474 _ => {}
476 }
477
478 Ok(())
479 }
480
481 fn rollback_update(&self, update: &ConfigUpdate) -> Result<()> {
483 tracing::warn!(update = ?update, "Rolling back configuration update");
486 Ok(())
487 }
488
489 pub fn reset(&self) -> Result<()> {
491 *self.state.write() = RuntimeState::default();
492
493 let _ = self.event_tx.send(ConfigEvent::Reset {
494 timestamp: std::time::SystemTime::now(),
495 });
496
497 Ok(())
498 }
499
500 pub fn get<T: FromRuntimeState>(&self) -> T {
502 let state = self.state.read();
503 T::from_state(&state)
504 }
505
506 pub fn export_json(&self) -> Result<String> {
508 let state = self.state.read();
509 serde_json::to_string_pretty(&*state)
510 .map_err(|e| ModbusError::Config(format!("Failed to serialize state: {}", e)))
511 }
512
513 pub fn import_json(&self, json: &str) -> Result<()> {
515 let new_state: RuntimeState = serde_json::from_str(json)
516 .map_err(|e| ModbusError::Config(format!("Failed to parse JSON: {}", e)))?;
517
518 *self.state.write() = new_state;
519
520 let _ = self.event_tx.send(ConfigEvent::Reset {
521 timestamp: std::time::SystemTime::now(),
522 });
523
524 Ok(())
525 }
526}
527
528impl Default for RuntimeConfigManager {
529 fn default() -> Self {
530 Self::new()
531 }
532}
533
534impl std::fmt::Debug for RuntimeConfigManager {
535 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
536 f.debug_struct("RuntimeConfigManager")
537 .field("state", &self.state())
538 .field("validate_updates", &self.validate_updates)
539 .finish()
540 }
541}
542
543pub trait FromRuntimeState {
545 fn from_state(state: &RuntimeState) -> Self;
546}
547
548impl FromRuntimeState for usize {
549 fn from_state(state: &RuntimeState) -> Self {
550 state.max_connections
551 }
552}
553
554impl FromRuntimeState for bool {
555 fn from_state(state: &RuntimeState) -> Self {
556 state.metrics_enabled
557 }
558}
559
560impl FromRuntimeState for Duration {
561 fn from_state(state: &RuntimeState) -> Self {
562 state.idle_timeout
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569
570 #[test]
571 fn test_runtime_config_manager_new() {
572 let manager = RuntimeConfigManager::new();
573 let state = manager.state();
574 assert_eq!(state.max_connections, 100);
575 assert!(state.metrics_enabled);
576 }
577
578 #[test]
579 fn test_apply_single_update() {
580 let manager = RuntimeConfigManager::new();
581
582 manager.apply(ConfigUpdate::MaxConnections(200)).unwrap();
583 assert_eq!(manager.state().max_connections, 200);
584 }
585
586 #[test]
587 fn test_apply_batch_update() {
588 let manager = RuntimeConfigManager::new();
589
590 manager
591 .apply_batch(vec![
592 ConfigUpdate::MaxConnections(300),
593 ConfigUpdate::MetricsEnabled(false),
594 ])
595 .unwrap();
596
597 let state = manager.state();
598 assert_eq!(state.max_connections, 300);
599 assert!(!state.metrics_enabled);
600 }
601
602 #[test]
603 fn test_validation_failure() {
604 let manager = RuntimeConfigManager::new();
605
606 let result = manager.apply(ConfigUpdate::MaxConnections(0));
607 assert!(result.is_err());
608 }
609
610 #[test]
611 fn test_unit_enabled() {
612 let manager = RuntimeConfigManager::new();
613
614 manager
615 .apply(ConfigUpdate::UnitEnabled {
616 unit_id: 5,
617 enabled: true,
618 })
619 .unwrap();
620
621 assert!(manager.state().enabled_units.contains(&5));
622
623 manager
624 .apply(ConfigUpdate::UnitEnabled {
625 unit_id: 5,
626 enabled: false,
627 })
628 .unwrap();
629
630 assert!(!manager.state().enabled_units.contains(&5));
631 }
632
633 #[test]
634 fn test_custom_setting() {
635 let manager = RuntimeConfigManager::new();
636
637 manager
638 .apply(ConfigUpdate::Custom {
639 key: "custom_key".into(),
640 value: "custom_value".into(),
641 })
642 .unwrap();
643
644 assert_eq!(
645 manager.state().custom.get("custom_key"),
646 Some(&"custom_value".to_string())
647 );
648 }
649
650 #[test]
651 fn test_reset() {
652 let manager = RuntimeConfigManager::new();
653
654 manager.apply(ConfigUpdate::MaxConnections(500)).unwrap();
655 manager.reset().unwrap();
656
657 assert_eq!(manager.state().max_connections, 100);
658 }
659
660 #[test]
661 fn test_export_import_json() {
662 let manager = RuntimeConfigManager::new();
663
664 manager.apply(ConfigUpdate::MaxConnections(999)).unwrap();
665
666 let json = manager.export_json().unwrap();
667 assert!(json.contains("999"));
668
669 let manager2 = RuntimeConfigManager::new();
670 manager2.import_json(&json).unwrap();
671
672 assert_eq!(manager2.state().max_connections, 999);
673 }
674
675 #[test]
676 fn test_update_category() {
677 assert_eq!(
678 ConfigUpdate::MaxConnections(100).category(),
679 UpdateCategory::Connection
680 );
681 assert_eq!(
682 ConfigUpdate::UnitEnabled {
683 unit_id: 1,
684 enabled: true
685 }
686 .category(),
687 UpdateCategory::Unit
688 );
689 assert_eq!(
690 ConfigUpdate::MetricsEnabled(true).category(),
691 UpdateCategory::Monitoring
692 );
693 }
694
695 #[test]
696 fn test_requires_restart() {
697 assert!(ConfigUpdate::TcpNoDelay(true).requires_restart());
698 assert!(!ConfigUpdate::MaxConnections(100).requires_restart());
699 }
700
701 #[tokio::test]
702 async fn test_event_subscription() {
703 let manager = RuntimeConfigManager::new();
704 let mut rx = manager.subscribe();
705
706 manager.apply(ConfigUpdate::MaxConnections(150)).unwrap();
707
708 let event = rx.try_recv().unwrap();
709 matches!(event, ConfigEvent::Updated { .. });
710 }
711}