1use std::collections::VecDeque;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use parking_lot::Mutex;
9use serde::Serialize;
10use tokio::sync::broadcast;
11
12use mabi_core::device::DeviceInfo;
13use mabi_core::tags::Tags;
14use mabi_core::types::{AccessMode, Address, DataPoint, DataPointDef, ModbusRegisterType};
15use mabi_core::value::Value;
16use mabi_runtime::{
17 DevicePort, DevicePortLayer, DynDevicePort, ProtocolDriverRegistry, RuntimeSession,
18 RuntimeSessionSpec,
19};
20
21use crate::error::{ModbusError, ModbusResult};
22use crate::simulator::{
23 ActionBindingSummary, BehaviorBindingSummary, CompiledModbusSession, DatastorePolicySummary,
24};
25
26#[async_trait]
28pub trait SessionControlPort: Send {
29 async fn status(&self) -> ModbusResult<SessionStatus>;
30 async fn snapshot(&self) -> ModbusResult<SessionSnapshot>;
31 async fn reset(&mut self) -> ModbusResult<SessionSnapshot>;
32}
33
34pub trait PointCatalogPort {
36 fn list_points(&self, query: &PointCatalogQuery) -> ModbusResult<Vec<PointDescriptor>>;
37}
38
39#[async_trait]
41pub trait RegisterControlPort {
42 async fn read(&self, target: &PointTarget) -> ModbusResult<DataPoint>;
43 async fn write(&self, target: &PointTarget, value: Value) -> ModbusResult<()>;
44}
45
46pub trait TracePort {
48 fn tail(&self, limit: usize) -> Vec<TraceEntry>;
49 fn clear(&self);
50 fn subscribe(&self) -> broadcast::Receiver<TraceEntry>;
51}
52
53#[async_trait]
55pub trait FaultPresetPort: Send {
56 fn available_fault_presets(&self) -> Vec<String>;
57 fn active_fault_preset(&self) -> Option<String>;
58 async fn apply_fault_preset(&mut self, name: &str) -> ModbusResult<SessionSnapshot>;
59 async fn clear_fault_preset(&mut self) -> ModbusResult<SessionSnapshot>;
60}
61
62#[async_trait]
64pub trait ResponseProfilePort: Send {
65 fn available_response_profiles(&self) -> Vec<String>;
66 fn active_response_profile(&self) -> Option<String>;
67 async fn apply_response_profile(&mut self, name: &str) -> ModbusResult<SessionSnapshot>;
68 async fn clear_response_profile(&mut self) -> ModbusResult<SessionSnapshot>;
69}
70
71#[async_trait]
73pub trait BehaviorSetPort: Send {
74 fn available_behavior_sets(&self) -> Vec<String>;
75 fn active_behavior_set(&self) -> Option<String>;
76 async fn apply_behavior_set(&mut self, name: &str) -> ModbusResult<SessionSnapshot>;
77 async fn clear_behavior_set(&mut self) -> ModbusResult<SessionSnapshot>;
78}
79
80pub trait SessionMetadataPort {
82 fn action_binding_summaries(&self) -> &[ActionBindingSummary];
83 fn behavior_binding_summaries(&self) -> &[BehaviorBindingSummary];
84 fn datastore_policy_summaries(&self) -> &[DatastorePolicySummary];
85}
86
87#[derive(Debug, Clone, Default)]
89pub struct PointCatalogQuery {
90 pub device_id: Option<String>,
91 pub tag_filters: Vec<(String, String)>,
92 pub labels: Vec<String>,
93}
94
95#[derive(Debug, Clone, Serialize)]
97pub struct PointDescriptor {
98 pub device_id: String,
99 pub device_name: String,
100 pub unit_id: Option<u8>,
101 pub point_id: String,
102 pub point_name: String,
103 pub register_type: Option<ModbusRegisterType>,
104 pub address: Option<u16>,
105 pub data_type: String,
106 pub access: String,
107 pub read_only: bool,
108 pub invalid: bool,
109 pub action_bindings: Vec<String>,
110 pub behavior_bindings: Vec<String>,
111 pub source_datastore: Option<String>,
112 pub tags: Tags,
113}
114
115#[derive(Debug, Clone, Default)]
117pub struct PointTarget {
118 pub device_id: Option<String>,
119 pub point_id: Option<String>,
120 pub unit_id: Option<u8>,
121 pub register_type: Option<ModbusRegisterType>,
122 pub address: Option<u16>,
123}
124
125#[derive(Debug, Clone)]
126struct ResolvedPointTarget {
127 device_id: String,
128 point_id: String,
129}
130
131#[derive(Debug, Clone, Serialize)]
133pub struct SessionStatus {
134 pub session_name: String,
135 pub active_fault_preset: Option<String>,
136 pub active_response_profile: Option<String>,
137 pub active_behavior_set: Option<String>,
138 pub trace_enabled: bool,
139 pub trace_entries: usize,
140 pub services: usize,
141 pub devices: usize,
142}
143
144#[derive(Debug, Clone, Serialize)]
146pub struct SessionSnapshot {
147 pub status: SessionStatus,
148 pub services: Vec<mabi_runtime::ServiceSnapshot>,
149}
150
151#[derive(Debug, Clone, Serialize)]
153#[serde(rename_all = "snake_case")]
154pub enum TraceOperation {
155 Read,
156 Write,
157}
158
159#[derive(Debug, Clone, Serialize)]
161#[serde(rename_all = "snake_case")]
162pub enum TraceStatus {
163 Ok,
164 Error,
165}
166
167#[derive(Debug, Clone, Serialize)]
169pub struct TraceEntry {
170 pub device_id: String,
171 pub point_id: String,
172 pub operation: TraceOperation,
173 pub status: TraceStatus,
174 pub value: Option<serde_json::Value>,
175 pub error: Option<String>,
176 pub timestamp: chrono::DateTime<chrono::Utc>,
177}
178
179#[derive(Default)]
180struct TraceState {
181 entries: VecDeque<TraceEntry>,
182}
183
184pub struct TraceStore {
186 capacity: usize,
187 state: Mutex<TraceState>,
188 tx: broadcast::Sender<TraceEntry>,
189}
190
191impl TraceStore {
192 pub fn new(capacity: usize) -> Self {
193 let (tx, _) = broadcast::channel(capacity.max(16));
194 Self {
195 capacity: capacity.max(1),
196 state: Mutex::new(TraceState::default()),
197 tx,
198 }
199 }
200
201 pub fn record(&self, entry: TraceEntry) {
202 let mut state = self.state.lock();
203 if state.entries.len() == self.capacity {
204 state.entries.pop_front();
205 }
206 state.entries.push_back(entry.clone());
207 let _ = self.tx.send(entry);
208 }
209
210 pub fn tail(&self, limit: usize) -> Vec<TraceEntry> {
211 let state = self.state.lock();
212 let take = limit.max(1).min(state.entries.len());
213 state
214 .entries
215 .iter()
216 .rev()
217 .take(take)
218 .cloned()
219 .collect::<Vec<_>>()
220 .into_iter()
221 .rev()
222 .collect()
223 }
224
225 pub fn clear(&self) {
226 self.state.lock().entries.clear();
227 }
228
229 pub fn len(&self) -> usize {
230 self.state.lock().entries.len()
231 }
232
233 pub fn subscribe(&self) -> broadcast::Receiver<TraceEntry> {
234 self.tx.subscribe()
235 }
236}
237
238pub struct TraceLayer {
240 store: Arc<TraceStore>,
241}
242
243impl TraceLayer {
244 pub fn new(store: Arc<TraceStore>) -> Self {
245 Self { store }
246 }
247}
248
249impl DevicePortLayer for TraceLayer {
250 fn decorate(
251 &self,
252 _protocol: Option<mabi_core::Protocol>,
253 port: DynDevicePort,
254 ) -> DynDevicePort {
255 Arc::new(TracedDevicePort {
256 inner: port,
257 store: Arc::clone(&self.store),
258 })
259 }
260}
261
262struct TracedDevicePort {
263 inner: DynDevicePort,
264 store: Arc<TraceStore>,
265}
266
267#[async_trait]
268impl DevicePort for TracedDevicePort {
269 fn info(&self) -> DeviceInfo {
270 self.inner.info()
271 }
272
273 async fn start(&self) -> mabi_core::Result<()> {
274 self.inner.start().await
275 }
276
277 async fn stop(&self) -> mabi_core::Result<()> {
278 self.inner.stop().await
279 }
280
281 async fn read(&self, point_id: &str) -> mabi_core::Result<DataPoint> {
282 let result = self.inner.read(point_id).await;
283 self.store.record(match &result {
284 Ok(point) => TraceEntry {
285 device_id: point.id.device_id.clone(),
286 point_id: point.id.point_id.clone(),
287 operation: TraceOperation::Read,
288 status: TraceStatus::Ok,
289 value: serde_json::to_value(&point.value).ok(),
290 error: None,
291 timestamp: chrono::Utc::now(),
292 },
293 Err(error) => TraceEntry {
294 device_id: self.inner.id(),
295 point_id: point_id.to_string(),
296 operation: TraceOperation::Read,
297 status: TraceStatus::Error,
298 value: None,
299 error: Some(error.to_string()),
300 timestamp: chrono::Utc::now(),
301 },
302 });
303 result
304 }
305
306 async fn write(&self, point_id: &str, value: Value) -> mabi_core::Result<()> {
307 let json_value = serde_json::to_value(&value).ok();
308 let result = self.inner.write(point_id, value).await;
309 self.store.record(match &result {
310 Ok(()) => TraceEntry {
311 device_id: self.inner.id(),
312 point_id: point_id.to_string(),
313 operation: TraceOperation::Write,
314 status: TraceStatus::Ok,
315 value: json_value,
316 error: None,
317 timestamp: chrono::Utc::now(),
318 },
319 Err(error) => TraceEntry {
320 device_id: self.inner.id(),
321 point_id: point_id.to_string(),
322 operation: TraceOperation::Write,
323 status: TraceStatus::Error,
324 value: json_value,
325 error: Some(error.to_string()),
326 timestamp: chrono::Utc::now(),
327 },
328 });
329 result
330 }
331
332 fn point_definitions(&self) -> Vec<DataPointDef> {
333 self.inner.point_definitions()
334 }
335}
336
337pub struct ModbusControlSession {
339 registry: ProtocolDriverRegistry,
340 compiled: CompiledModbusSession,
341 fallback_readiness_timeout: Duration,
342 trace_store: Arc<TraceStore>,
343 runtime_session: RuntimeSession,
344}
345
346impl ModbusControlSession {
347 pub async fn new(
348 registry: ProtocolDriverRegistry,
349 compiled: CompiledModbusSession,
350 fallback_readiness_timeout: Duration,
351 ) -> ModbusResult<Self> {
352 let trace_store = Arc::new(TraceStore::new(compiled.trace.buffer_capacity()));
353 let runtime_session = Self::start_runtime(
354 ®istry,
355 &compiled,
356 Arc::clone(&trace_store),
357 fallback_readiness_timeout,
358 )
359 .await?;
360
361 Ok(Self {
362 registry,
363 compiled,
364 fallback_readiness_timeout,
365 trace_store,
366 runtime_session,
367 })
368 }
369
370 async fn start_runtime(
371 registry: &ProtocolDriverRegistry,
372 compiled: &CompiledModbusSession,
373 trace_store: Arc<TraceStore>,
374 fallback_readiness_timeout: Duration,
375 ) -> ModbusResult<RuntimeSession> {
376 let mut extensions = compiled.runtime_extensions();
377 if compiled.trace.enabled {
378 extensions.add_device_layer(Arc::new(TraceLayer::new(trace_store)));
379 }
380
381 let session = RuntimeSession::new(
382 RuntimeSessionSpec {
383 services: vec![compiled.launch.clone()],
384 readiness_timeout: compiled.readiness_timeout_ms,
385 },
386 registry,
387 extensions,
388 )
389 .await
390 .map_err(|error| ModbusError::Server(error.to_string()))?;
391 session
392 .start(fallback_readiness_timeout)
393 .await
394 .map_err(|error| ModbusError::Server(error.to_string()))?;
395 Ok(session)
396 }
397
398 async fn rebuild(
399 &mut self,
400 compiled: CompiledModbusSession,
401 clear_trace: bool,
402 ) -> ModbusResult<()> {
403 self.runtime_session
404 .stop()
405 .await
406 .map_err(|error| ModbusError::Server(error.to_string()))?;
407 if clear_trace {
408 self.trace_store.clear();
409 }
410
411 let runtime_session = Self::start_runtime(
412 &self.registry,
413 &compiled,
414 Arc::clone(&self.trace_store),
415 self.fallback_readiness_timeout,
416 )
417 .await?;
418 self.compiled = compiled;
419 self.runtime_session = runtime_session;
420 Ok(())
421 }
422
423 pub async fn stop(&self) -> ModbusResult<()> {
424 self.runtime_session
425 .stop()
426 .await
427 .map_err(|error| ModbusError::Server(error.to_string()))
428 }
429
430 fn resolve_target(&self, target: &PointTarget) -> ModbusResult<ResolvedPointTarget> {
431 if let (Some(device_id), Some(point_id)) = (&target.device_id, &target.point_id) {
432 return Ok(ResolvedPointTarget {
433 device_id: device_id.clone(),
434 point_id: point_id.clone(),
435 });
436 }
437
438 let descriptors = self.list_points(&PointCatalogQuery {
439 device_id: target.device_id.clone(),
440 ..Default::default()
441 })?;
442
443 let mut matches = descriptors
444 .into_iter()
445 .filter(|descriptor| {
446 let point_match = target
447 .point_id
448 .as_ref()
449 .map(|point_id| descriptor.point_id == *point_id)
450 .unwrap_or(true);
451 let unit_match = target
452 .unit_id
453 .map(|unit_id| descriptor.unit_id == Some(unit_id))
454 .unwrap_or(true);
455 let register_type_match = target
456 .register_type
457 .map(|register_type| descriptor.register_type == Some(register_type))
458 .unwrap_or(true);
459 let address_match = target
460 .address
461 .map(|address| descriptor.address == Some(address))
462 .unwrap_or(true);
463 point_match && unit_match && register_type_match && address_match
464 })
465 .map(|descriptor| ResolvedPointTarget {
466 device_id: descriptor.device_id,
467 point_id: descriptor.point_id,
468 })
469 .collect::<Vec<_>>();
470
471 if matches.is_empty() {
472 return Err(ModbusError::Config(
473 "no point matched the supplied selector".into(),
474 ));
475 }
476 if matches.len() > 1 {
477 return Err(ModbusError::Config(
478 "point selector matched more than one point; add --device or --unit".into(),
479 ));
480 }
481 Ok(matches.remove(0))
482 }
483
484 fn matches_query(info: &DeviceInfo, query: &PointCatalogQuery) -> bool {
485 if let Some(device_id) = &query.device_id {
486 if &info.id != device_id {
487 return false;
488 }
489 }
490
491 if !query
492 .tag_filters
493 .iter()
494 .all(|(key, value)| info.tags.get(key) == Some(value.as_str()))
495 {
496 return false;
497 }
498
499 if !query
500 .labels
501 .iter()
502 .all(|label| info.tags.has_label(label.as_str()))
503 {
504 return false;
505 }
506
507 true
508 }
509}
510
511#[async_trait]
512impl SessionControlPort for ModbusControlSession {
513 async fn status(&self) -> ModbusResult<SessionStatus> {
514 Ok(SessionStatus {
515 session_name: self.compiled.session_name.clone(),
516 active_fault_preset: self.compiled.active_fault_preset.clone(),
517 active_response_profile: self.compiled.active_response_profile.clone(),
518 active_behavior_set: self.compiled.active_behavior_set.clone(),
519 trace_enabled: self.compiled.trace.enabled,
520 trace_entries: self.trace_store.len(),
521 services: self.runtime_session.handles().len(),
522 devices: self.runtime_session.devices().len(),
523 })
524 }
525
526 async fn snapshot(&self) -> ModbusResult<SessionSnapshot> {
527 let status = self.status().await?;
528 let services = self
529 .runtime_session
530 .snapshots()
531 .await
532 .map_err(|error| ModbusError::Server(error.to_string()))?;
533 Ok(SessionSnapshot { status, services })
534 }
535
536 async fn reset(&mut self) -> ModbusResult<SessionSnapshot> {
537 let mut compiled = self.compiled.clone();
538 if self.compiled.reset.clear_fault_preset {
539 compiled = compiled.with_active_fault_preset(None)?;
540 }
541 if self.compiled.reset.clear_response_profile {
542 compiled = compiled.with_active_response_profile(None)?;
543 }
544 if self.compiled.reset.clear_behavior_set {
545 compiled = compiled.with_active_behavior_set(None)?;
546 }
547 self.rebuild(compiled, self.compiled.reset.clear_trace_buffer)
548 .await?;
549 self.snapshot().await
550 }
551}
552
553impl PointCatalogPort for ModbusControlSession {
554 fn list_points(&self, query: &PointCatalogQuery) -> ModbusResult<Vec<PointDescriptor>> {
555 let mut points = Vec::new();
556
557 for (_device_id, port) in self.runtime_session.devices().entries() {
558 let info = port.info();
559 if !Self::matches_query(&info, query) {
560 continue;
561 }
562
563 let unit_id = info
564 .metadata
565 .get("unit_id")
566 .and_then(|value| value.parse::<u8>().ok());
567 for point in port.point_definitions() {
568 let (register_type, address) = match point.address.clone() {
569 Some(Address::Modbus(address)) => {
570 (Some(address.register_type), Some(address.address))
571 }
572 _ => (None, None),
573 };
574 points.push(PointDescriptor {
575 device_id: info.id.clone(),
576 device_name: info.name.clone(),
577 unit_id,
578 point_id: point.id.clone(),
579 point_name: point.name.clone(),
580 register_type,
581 address,
582 data_type: format!("{:?}", point.data_type),
583 access: access_mode_name(point.access),
584 read_only: self
585 .compiled
586 .point_metadata(&info.id, &point.id)
587 .map(|metadata| metadata.read_only)
588 .unwrap_or(matches!(point.access, AccessMode::ReadOnly)),
589 invalid: self
590 .compiled
591 .point_metadata(&info.id, &point.id)
592 .map(|metadata| metadata.invalid)
593 .unwrap_or(false),
594 action_bindings: self
595 .compiled
596 .point_metadata(&info.id, &point.id)
597 .map(|metadata| metadata.action_bindings.clone())
598 .unwrap_or_default(),
599 behavior_bindings: self
600 .compiled
601 .point_metadata(&info.id, &point.id)
602 .map(|metadata| metadata.behavior_bindings.clone())
603 .unwrap_or_default(),
604 source_datastore: self
605 .compiled
606 .point_metadata(&info.id, &point.id)
607 .and_then(|metadata| metadata.source_datastore.clone()),
608 tags: info.tags.clone(),
609 });
610 }
611 }
612
613 points.sort_by(|left, right| {
614 left.device_id
615 .cmp(&right.device_id)
616 .then(left.point_id.cmp(&right.point_id))
617 });
618 Ok(points)
619 }
620}
621
622#[async_trait]
623impl RegisterControlPort for ModbusControlSession {
624 async fn read(&self, target: &PointTarget) -> ModbusResult<DataPoint> {
625 let resolved = self.resolve_target(target)?;
626 let port = self
627 .runtime_session
628 .devices()
629 .get(&resolved.device_id)
630 .ok_or_else(|| {
631 ModbusError::Config(format!("unknown device '{}'", resolved.device_id))
632 })?;
633 port.read(&resolved.point_id)
634 .await
635 .map_err(ModbusError::from)
636 }
637
638 async fn write(&self, target: &PointTarget, value: Value) -> ModbusResult<()> {
639 let resolved = self.resolve_target(target)?;
640 let port = self
641 .runtime_session
642 .devices()
643 .get(&resolved.device_id)
644 .ok_or_else(|| {
645 ModbusError::Config(format!("unknown device '{}'", resolved.device_id))
646 })?;
647 port.write(&resolved.point_id, value)
648 .await
649 .map_err(ModbusError::from)
650 }
651}
652
653impl TracePort for ModbusControlSession {
654 fn tail(&self, limit: usize) -> Vec<TraceEntry> {
655 self.trace_store.tail(limit)
656 }
657
658 fn clear(&self) {
659 self.trace_store.clear();
660 }
661
662 fn subscribe(&self) -> broadcast::Receiver<TraceEntry> {
663 self.trace_store.subscribe()
664 }
665}
666
667#[async_trait]
668impl FaultPresetPort for ModbusControlSession {
669 fn available_fault_presets(&self) -> Vec<String> {
670 self.compiled.fault_presets.keys().cloned().collect()
671 }
672
673 fn active_fault_preset(&self) -> Option<String> {
674 self.compiled.active_fault_preset.clone()
675 }
676
677 async fn apply_fault_preset(&mut self, name: &str) -> ModbusResult<SessionSnapshot> {
678 let compiled = self.compiled.with_active_fault_preset(Some(name))?;
679 self.rebuild(compiled, false).await?;
680 self.snapshot().await
681 }
682
683 async fn clear_fault_preset(&mut self) -> ModbusResult<SessionSnapshot> {
684 let compiled = self.compiled.with_active_fault_preset(None)?;
685 self.rebuild(compiled, false).await?;
686 self.snapshot().await
687 }
688}
689
690#[async_trait]
691impl ResponseProfilePort for ModbusControlSession {
692 fn available_response_profiles(&self) -> Vec<String> {
693 self.compiled.response_profiles.keys().cloned().collect()
694 }
695
696 fn active_response_profile(&self) -> Option<String> {
697 self.compiled.active_response_profile.clone()
698 }
699
700 async fn apply_response_profile(&mut self, name: &str) -> ModbusResult<SessionSnapshot> {
701 let compiled = self.compiled.with_active_response_profile(Some(name))?;
702 self.rebuild(compiled, false).await?;
703 self.snapshot().await
704 }
705
706 async fn clear_response_profile(&mut self) -> ModbusResult<SessionSnapshot> {
707 let compiled = self.compiled.with_active_response_profile(None)?;
708 self.rebuild(compiled, false).await?;
709 self.snapshot().await
710 }
711}
712
713impl SessionMetadataPort for ModbusControlSession {
714 fn action_binding_summaries(&self) -> &[ActionBindingSummary] {
715 &self.compiled.action_binding_summaries
716 }
717
718 fn behavior_binding_summaries(&self) -> &[BehaviorBindingSummary] {
719 &self.compiled.behavior_binding_summaries
720 }
721
722 fn datastore_policy_summaries(&self) -> &[DatastorePolicySummary] {
723 &self.compiled.datastore_policies
724 }
725}
726
727#[async_trait]
728impl BehaviorSetPort for ModbusControlSession {
729 fn available_behavior_sets(&self) -> Vec<String> {
730 self.compiled.behavior_sets.keys().cloned().collect()
731 }
732
733 fn active_behavior_set(&self) -> Option<String> {
734 self.compiled.active_behavior_set.clone()
735 }
736
737 async fn apply_behavior_set(&mut self, name: &str) -> ModbusResult<SessionSnapshot> {
738 let compiled = self.compiled.with_active_behavior_set(Some(name))?;
739 self.rebuild(compiled, false).await?;
740 self.snapshot().await
741 }
742
743 async fn clear_behavior_set(&mut self) -> ModbusResult<SessionSnapshot> {
744 let compiled = self.compiled.with_active_behavior_set(None)?;
745 self.rebuild(compiled, false).await?;
746 self.snapshot().await
747 }
748}
749
750fn access_mode_name(mode: AccessMode) -> String {
751 match mode {
752 AccessMode::ReadOnly => "read_only",
753 AccessMode::WriteOnly => "write_only",
754 AccessMode::ReadWrite => "read_write",
755 }
756 .to_string()
757}
758
759#[cfg(test)]
760mod tests {
761 use std::collections::BTreeMap;
762 use std::time::Duration;
763
764 use super::{
765 BehaviorSetPort, FaultPresetPort, ModbusControlSession, PointCatalogPort, PointTarget,
766 RegisterControlPort, ResponseProfilePort, SessionControlPort, TracePort,
767 };
768 use crate::fault_injection::FaultInjectionConfig;
769 use crate::profile::{PointProfile, SimulatorProfile, UnitProfile};
770 use crate::rtu::RtuServerConfig;
771 use crate::simulator::{
772 CompiledModbusSession, CompiledPointMetadata, CompiledTransportKind,
773 ModbusServiceLaunchConfig, ModbusTransportLaunch, ResponseProfileDefinition,
774 SessionControlConfig, SessionResetPolicy, SessionTraceConfig,
775 };
776 use mabi_core::types::{DataType, ModbusRegisterType};
777 use mabi_core::value::Value;
778 use mabi_runtime::ProtocolDriverRegistry;
779
780 fn registry() -> ProtocolDriverRegistry {
781 let mut registry = ProtocolDriverRegistry::new();
782 registry.register(crate::driver());
783 registry
784 }
785
786 fn compiled_session() -> CompiledModbusSession {
787 let profile = SimulatorProfile::new().with_unit(UnitProfile::new(1, "Pump").with_point(
788 PointProfile::new(
789 "temperature",
790 "Temperature",
791 ModbusRegisterType::HoldingRegister,
792 0,
793 DataType::UInt16,
794 ),
795 ));
796
797 CompiledModbusSession {
798 session_name: "demo".into(),
799 launch: mabi_runtime::ProtocolLaunchSpec {
800 protocol: "modbus".into(),
801 name: Some("demo".into()),
802 config: serde_json::to_value(ModbusServiceLaunchConfig {
803 transport: ModbusTransportLaunch::Rtu {
804 config: RtuServerConfig::for_testing(),
805 },
806 profile: Some(profile.clone()),
807 devices: None,
808 points_per_device: None,
809 })
810 .unwrap(),
811 },
812 transport_kind: CompiledTransportKind::Rtu,
813 profile,
814 trace: SessionTraceConfig {
815 enabled: true,
816 capacity: Some(32),
817 },
818 reset: SessionResetPolicy::default(),
819 control: SessionControlConfig::default(),
820 fault_presets: BTreeMap::from([("drop".to_string(), FaultInjectionConfig::default())]),
821 active_fault_preset: None,
822 response_profiles: BTreeMap::from([(
823 "slow".to_string(),
824 ResponseProfileDefinition {
825 delay_ms: Some(10),
826 ..Default::default()
827 },
828 )]),
829 active_response_profile: None,
830 actions: BTreeMap::new(),
831 behaviors: BTreeMap::new(),
832 behavior_sets: BTreeMap::from([(
833 "maintenance".to_string(),
834 crate::simulator::BehaviorSetDefinition {
835 behaviors: vec!["temperature_guard".into()],
836 },
837 )]),
838 active_behavior_set: None,
839 point_catalog: BTreeMap::from([(
840 "modbus-1/temperature".to_string(),
841 CompiledPointMetadata {
842 device_id: "modbus-1".into(),
843 point_id: "temperature".into(),
844 source_datastore: Some("inline".into()),
845 read_only: false,
846 invalid: false,
847 action_bindings: vec!["clamp_temp@on_write".into()],
848 behavior_bindings: vec!["temperature_guard@maintenance".into()],
849 },
850 )]),
851 datastore_policies: Vec::new(),
852 action_binding_summaries: Vec::new(),
853 behavior_binding_summaries: Vec::new(),
854 compiled_behavior_bindings: Vec::new(),
855 readiness_timeout_ms: Some(500),
856 }
857 }
858
859 #[tokio::test]
860 async fn control_session_lists_points_and_reads_back_writes() {
861 let session =
862 ModbusControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
863 .await
864 .unwrap();
865
866 let points = session.list_points(&Default::default()).unwrap();
867 assert_eq!(points.len(), 1);
868 assert_eq!(points[0].action_bindings, vec!["clamp_temp@on_write"]);
869 assert_eq!(
870 points[0].behavior_bindings,
871 vec!["temperature_guard@maintenance"]
872 );
873 assert_eq!(points[0].source_datastore.as_deref(), Some("inline"));
874
875 session
876 .write(
877 &PointTarget {
878 point_id: Some("temperature".into()),
879 ..Default::default()
880 },
881 Value::U16(42),
882 )
883 .await
884 .unwrap();
885
886 let point = session
887 .read(&PointTarget {
888 point_id: Some("temperature".into()),
889 ..Default::default()
890 })
891 .await
892 .unwrap();
893 assert_eq!(point.value, Value::U16(42));
894 assert_eq!(session.tail(10).len(), 2);
895
896 session.stop().await.unwrap();
897 }
898
899 #[tokio::test]
900 async fn control_session_can_apply_and_clear_fault_presets() {
901 let mut session =
902 ModbusControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
903 .await
904 .unwrap();
905
906 let snapshot = session.apply_fault_preset("drop").await.unwrap();
907 assert_eq!(snapshot.status.active_fault_preset.as_deref(), Some("drop"));
908
909 let snapshot = session.clear_fault_preset().await.unwrap();
910 assert!(snapshot.status.active_fault_preset.is_none());
911
912 session.stop().await.unwrap();
913 }
914
915 #[tokio::test]
916 async fn session_reset_clears_traces_and_fault_preset() {
917 let mut compiled = compiled_session();
918 compiled.active_fault_preset = Some("drop".into());
919 compiled.active_response_profile = Some("slow".into());
920 compiled.active_behavior_set = Some("maintenance".into());
921
922 let mut session = ModbusControlSession::new(registry(), compiled, Duration::from_secs(1))
923 .await
924 .unwrap();
925 session
926 .write(
927 &PointTarget {
928 point_id: Some("temperature".into()),
929 ..Default::default()
930 },
931 Value::U16(7),
932 )
933 .await
934 .unwrap();
935 assert_eq!(session.tail(10).len(), 1);
936
937 let snapshot = session.reset().await.unwrap();
938 assert!(snapshot.status.active_fault_preset.is_none());
939 assert!(snapshot.status.active_response_profile.is_none());
940 assert!(snapshot.status.active_behavior_set.is_none());
941 assert_eq!(snapshot.status.trace_entries, 0);
942
943 session.stop().await.unwrap();
944 }
945
946 #[tokio::test]
947 async fn control_session_can_apply_and_clear_response_profiles() {
948 let mut session =
949 ModbusControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
950 .await
951 .unwrap();
952
953 let snapshot = session.apply_response_profile("slow").await.unwrap();
954 assert_eq!(
955 snapshot.status.active_response_profile.as_deref(),
956 Some("slow")
957 );
958
959 let snapshot = session.clear_response_profile().await.unwrap();
960 assert!(snapshot.status.active_response_profile.is_none());
961
962 session.stop().await.unwrap();
963 }
964
965 #[tokio::test]
966 async fn control_session_can_apply_and_clear_behavior_sets() {
967 let mut session =
968 ModbusControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
969 .await
970 .unwrap();
971
972 let snapshot = session.apply_behavior_set("maintenance").await.unwrap();
973 assert_eq!(
974 snapshot.status.active_behavior_set.as_deref(),
975 Some("maintenance")
976 );
977
978 let snapshot = session.clear_behavior_set().await.unwrap();
979 assert!(snapshot.status.active_behavior_set.is_none());
980
981 session.stop().await.unwrap();
982 }
983}