Skip to main content

mabi_modbus/
control.rs

1//! In-process control-plane ports for DX-oriented simulator workflows.
2
3use std::collections::VecDeque;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use parking_lot::Mutex;
9use serde::Serialize;
10use tokio::sync::broadcast;
11
12use mabi_core::device::DeviceInfo;
13use mabi_core::tags::Tags;
14use mabi_core::types::{AccessMode, Address, DataPoint, DataPointDef, ModbusRegisterType};
15use mabi_core::value::Value;
16use mabi_runtime::{
17    DevicePort, DevicePortLayer, DynDevicePort, ProtocolDriverRegistry, RuntimeSession,
18    RuntimeSessionSpec,
19};
20
21use crate::error::{ModbusError, ModbusResult};
22use crate::simulator::{
23    ActionBindingSummary, BehaviorBindingSummary, CompiledModbusSession, DatastorePolicySummary,
24};
25
26/// Lifecycle-oriented control surface for a simulator session.
27#[async_trait]
28pub trait SessionControlPort: Send {
29    async fn status(&self) -> ModbusResult<SessionStatus>;
30    async fn snapshot(&self) -> ModbusResult<SessionSnapshot>;
31    async fn reset(&mut self) -> ModbusResult<SessionSnapshot>;
32}
33
34/// Point catalog surface used by CLI inspection and filtering.
35pub trait PointCatalogPort {
36    fn list_points(&self, query: &PointCatalogQuery) -> ModbusResult<Vec<PointDescriptor>>;
37}
38
39/// Point-oriented read/write surface used by CLI control commands.
40#[async_trait]
41pub trait RegisterControlPort {
42    async fn read(&self, target: &PointTarget) -> ModbusResult<DataPoint>;
43    async fn write(&self, target: &PointTarget, value: Value) -> ModbusResult<()>;
44}
45
46/// Trace inspection surface for recent control-plane activity.
47pub trait TracePort {
48    fn tail(&self, limit: usize) -> Vec<TraceEntry>;
49    fn clear(&self);
50    fn subscribe(&self) -> broadcast::Receiver<TraceEntry>;
51}
52
53/// Named fault preset control surface.
54#[async_trait]
55pub trait FaultPresetPort: Send {
56    fn available_fault_presets(&self) -> Vec<String>;
57    fn active_fault_preset(&self) -> Option<String>;
58    async fn apply_fault_preset(&mut self, name: &str) -> ModbusResult<SessionSnapshot>;
59    async fn clear_fault_preset(&mut self) -> ModbusResult<SessionSnapshot>;
60}
61
62/// Named response profile control surface.
63#[async_trait]
64pub trait ResponseProfilePort: Send {
65    fn available_response_profiles(&self) -> Vec<String>;
66    fn active_response_profile(&self) -> Option<String>;
67    async fn apply_response_profile(&mut self, name: &str) -> ModbusResult<SessionSnapshot>;
68    async fn clear_response_profile(&mut self) -> ModbusResult<SessionSnapshot>;
69}
70
71/// Named behavior-set control surface.
72#[async_trait]
73pub trait BehaviorSetPort: Send {
74    fn available_behavior_sets(&self) -> Vec<String>;
75    fn active_behavior_set(&self) -> Option<String>;
76    async fn apply_behavior_set(&mut self, name: &str) -> ModbusResult<SessionSnapshot>;
77    async fn clear_behavior_set(&mut self) -> ModbusResult<SessionSnapshot>;
78}
79
80/// Static simulator metadata exposed through the control plane.
81pub trait SessionMetadataPort {
82    fn action_binding_summaries(&self) -> &[ActionBindingSummary];
83    fn behavior_binding_summaries(&self) -> &[BehaviorBindingSummary];
84    fn datastore_policy_summaries(&self) -> &[DatastorePolicySummary];
85}
86
87/// Query parameters for point catalog operations.
88#[derive(Debug, Clone, Default)]
89pub struct PointCatalogQuery {
90    pub device_id: Option<String>,
91    pub tag_filters: Vec<(String, String)>,
92    pub labels: Vec<String>,
93}
94
95/// Stable point catalog record for operator-facing control flows.
96#[derive(Debug, Clone, Serialize)]
97pub struct PointDescriptor {
98    pub device_id: String,
99    pub device_name: String,
100    pub unit_id: Option<u8>,
101    pub point_id: String,
102    pub point_name: String,
103    pub register_type: Option<ModbusRegisterType>,
104    pub address: Option<u16>,
105    pub data_type: String,
106    pub access: String,
107    pub read_only: bool,
108    pub invalid: bool,
109    pub action_bindings: Vec<String>,
110    pub behavior_bindings: Vec<String>,
111    pub source_datastore: Option<String>,
112    pub tags: Tags,
113}
114
115/// Point selection used by read/write commands.
116#[derive(Debug, Clone, Default)]
117pub struct PointTarget {
118    pub device_id: Option<String>,
119    pub point_id: Option<String>,
120    pub unit_id: Option<u8>,
121    pub register_type: Option<ModbusRegisterType>,
122    pub address: Option<u16>,
123}
124
125#[derive(Debug, Clone)]
126struct ResolvedPointTarget {
127    device_id: String,
128    point_id: String,
129}
130
131/// Human-oriented session status for CLI output.
132#[derive(Debug, Clone, Serialize)]
133pub struct SessionStatus {
134    pub session_name: String,
135    pub active_fault_preset: Option<String>,
136    pub active_response_profile: Option<String>,
137    pub active_behavior_set: Option<String>,
138    pub trace_enabled: bool,
139    pub trace_entries: usize,
140    pub services: usize,
141    pub devices: usize,
142}
143
144/// Snapshot surface returned by reset/fault operations.
145#[derive(Debug, Clone, Serialize)]
146pub struct SessionSnapshot {
147    pub status: SessionStatus,
148    pub services: Vec<mabi_runtime::ServiceSnapshot>,
149}
150
151/// Trace operation kind.
152#[derive(Debug, Clone, Serialize)]
153#[serde(rename_all = "snake_case")]
154pub enum TraceOperation {
155    Read,
156    Write,
157}
158
159/// Trace entry status.
160#[derive(Debug, Clone, Serialize)]
161#[serde(rename_all = "snake_case")]
162pub enum TraceStatus {
163    Ok,
164    Error,
165}
166
167/// In-memory control-plane trace record.
168#[derive(Debug, Clone, Serialize)]
169pub struct TraceEntry {
170    pub device_id: String,
171    pub point_id: String,
172    pub operation: TraceOperation,
173    pub status: TraceStatus,
174    pub value: Option<serde_json::Value>,
175    pub error: Option<String>,
176    pub timestamp: chrono::DateTime<chrono::Utc>,
177}
178
179#[derive(Default)]
180struct TraceState {
181    entries: VecDeque<TraceEntry>,
182}
183
184/// Bounded trace store with SSE-like subscription for CLI consumers.
185pub struct TraceStore {
186    capacity: usize,
187    state: Mutex<TraceState>,
188    tx: broadcast::Sender<TraceEntry>,
189}
190
191impl TraceStore {
192    pub fn new(capacity: usize) -> Self {
193        let (tx, _) = broadcast::channel(capacity.max(16));
194        Self {
195            capacity: capacity.max(1),
196            state: Mutex::new(TraceState::default()),
197            tx,
198        }
199    }
200
201    pub fn record(&self, entry: TraceEntry) {
202        let mut state = self.state.lock();
203        if state.entries.len() == self.capacity {
204            state.entries.pop_front();
205        }
206        state.entries.push_back(entry.clone());
207        let _ = self.tx.send(entry);
208    }
209
210    pub fn tail(&self, limit: usize) -> Vec<TraceEntry> {
211        let state = self.state.lock();
212        let take = limit.max(1).min(state.entries.len());
213        state
214            .entries
215            .iter()
216            .rev()
217            .take(take)
218            .cloned()
219            .collect::<Vec<_>>()
220            .into_iter()
221            .rev()
222            .collect()
223    }
224
225    pub fn clear(&self) {
226        self.state.lock().entries.clear();
227    }
228
229    pub fn len(&self) -> usize {
230        self.state.lock().entries.len()
231    }
232
233    pub fn subscribe(&self) -> broadcast::Receiver<TraceEntry> {
234        self.tx.subscribe()
235    }
236}
237
238/// Device-layer decorator that records point reads and writes.
239pub struct TraceLayer {
240    store: Arc<TraceStore>,
241}
242
243impl TraceLayer {
244    pub fn new(store: Arc<TraceStore>) -> Self {
245        Self { store }
246    }
247}
248
249impl DevicePortLayer for TraceLayer {
250    fn decorate(
251        &self,
252        _protocol: Option<mabi_core::Protocol>,
253        port: DynDevicePort,
254    ) -> DynDevicePort {
255        Arc::new(TracedDevicePort {
256            inner: port,
257            store: Arc::clone(&self.store),
258        })
259    }
260}
261
262struct TracedDevicePort {
263    inner: DynDevicePort,
264    store: Arc<TraceStore>,
265}
266
267#[async_trait]
268impl DevicePort for TracedDevicePort {
269    fn info(&self) -> DeviceInfo {
270        self.inner.info()
271    }
272
273    async fn start(&self) -> mabi_core::Result<()> {
274        self.inner.start().await
275    }
276
277    async fn stop(&self) -> mabi_core::Result<()> {
278        self.inner.stop().await
279    }
280
281    async fn read(&self, point_id: &str) -> mabi_core::Result<DataPoint> {
282        let result = self.inner.read(point_id).await;
283        self.store.record(match &result {
284            Ok(point) => TraceEntry {
285                device_id: point.id.device_id.clone(),
286                point_id: point.id.point_id.clone(),
287                operation: TraceOperation::Read,
288                status: TraceStatus::Ok,
289                value: serde_json::to_value(&point.value).ok(),
290                error: None,
291                timestamp: chrono::Utc::now(),
292            },
293            Err(error) => TraceEntry {
294                device_id: self.inner.id(),
295                point_id: point_id.to_string(),
296                operation: TraceOperation::Read,
297                status: TraceStatus::Error,
298                value: None,
299                error: Some(error.to_string()),
300                timestamp: chrono::Utc::now(),
301            },
302        });
303        result
304    }
305
306    async fn write(&self, point_id: &str, value: Value) -> mabi_core::Result<()> {
307        let json_value = serde_json::to_value(&value).ok();
308        let result = self.inner.write(point_id, value).await;
309        self.store.record(match &result {
310            Ok(()) => TraceEntry {
311                device_id: self.inner.id(),
312                point_id: point_id.to_string(),
313                operation: TraceOperation::Write,
314                status: TraceStatus::Ok,
315                value: json_value,
316                error: None,
317                timestamp: chrono::Utc::now(),
318            },
319            Err(error) => TraceEntry {
320                device_id: self.inner.id(),
321                point_id: point_id.to_string(),
322                operation: TraceOperation::Write,
323                status: TraceStatus::Error,
324                value: json_value,
325                error: Some(error.to_string()),
326                timestamp: chrono::Utc::now(),
327            },
328        });
329        result
330    }
331
332    fn point_definitions(&self) -> Vec<DataPointDef> {
333        self.inner.point_definitions()
334    }
335}
336
337/// In-process control session over a compiled simulator session.
338pub struct ModbusControlSession {
339    registry: ProtocolDriverRegistry,
340    compiled: CompiledModbusSession,
341    fallback_readiness_timeout: Duration,
342    trace_store: Arc<TraceStore>,
343    runtime_session: RuntimeSession,
344}
345
346impl ModbusControlSession {
347    pub async fn new(
348        registry: ProtocolDriverRegistry,
349        compiled: CompiledModbusSession,
350        fallback_readiness_timeout: Duration,
351    ) -> ModbusResult<Self> {
352        let trace_store = Arc::new(TraceStore::new(compiled.trace.buffer_capacity()));
353        let runtime_session = Self::start_runtime(
354            &registry,
355            &compiled,
356            Arc::clone(&trace_store),
357            fallback_readiness_timeout,
358        )
359        .await?;
360
361        Ok(Self {
362            registry,
363            compiled,
364            fallback_readiness_timeout,
365            trace_store,
366            runtime_session,
367        })
368    }
369
370    async fn start_runtime(
371        registry: &ProtocolDriverRegistry,
372        compiled: &CompiledModbusSession,
373        trace_store: Arc<TraceStore>,
374        fallback_readiness_timeout: Duration,
375    ) -> ModbusResult<RuntimeSession> {
376        let mut extensions = compiled.runtime_extensions();
377        if compiled.trace.enabled {
378            extensions.add_device_layer(Arc::new(TraceLayer::new(trace_store)));
379        }
380
381        let session = RuntimeSession::new(
382            RuntimeSessionSpec {
383                services: vec![compiled.launch.clone()],
384                readiness_timeout: compiled.readiness_timeout_ms,
385            },
386            registry,
387            extensions,
388        )
389        .await
390        .map_err(|error| ModbusError::Server(error.to_string()))?;
391        session
392            .start(fallback_readiness_timeout)
393            .await
394            .map_err(|error| ModbusError::Server(error.to_string()))?;
395        Ok(session)
396    }
397
398    async fn rebuild(
399        &mut self,
400        compiled: CompiledModbusSession,
401        clear_trace: bool,
402    ) -> ModbusResult<()> {
403        self.runtime_session
404            .stop()
405            .await
406            .map_err(|error| ModbusError::Server(error.to_string()))?;
407        if clear_trace {
408            self.trace_store.clear();
409        }
410
411        let runtime_session = Self::start_runtime(
412            &self.registry,
413            &compiled,
414            Arc::clone(&self.trace_store),
415            self.fallback_readiness_timeout,
416        )
417        .await?;
418        self.compiled = compiled;
419        self.runtime_session = runtime_session;
420        Ok(())
421    }
422
423    pub async fn stop(&self) -> ModbusResult<()> {
424        self.runtime_session
425            .stop()
426            .await
427            .map_err(|error| ModbusError::Server(error.to_string()))
428    }
429
430    fn resolve_target(&self, target: &PointTarget) -> ModbusResult<ResolvedPointTarget> {
431        if let (Some(device_id), Some(point_id)) = (&target.device_id, &target.point_id) {
432            return Ok(ResolvedPointTarget {
433                device_id: device_id.clone(),
434                point_id: point_id.clone(),
435            });
436        }
437
438        let descriptors = self.list_points(&PointCatalogQuery {
439            device_id: target.device_id.clone(),
440            ..Default::default()
441        })?;
442
443        let mut matches = descriptors
444            .into_iter()
445            .filter(|descriptor| {
446                let point_match = target
447                    .point_id
448                    .as_ref()
449                    .map(|point_id| descriptor.point_id == *point_id)
450                    .unwrap_or(true);
451                let unit_match = target
452                    .unit_id
453                    .map(|unit_id| descriptor.unit_id == Some(unit_id))
454                    .unwrap_or(true);
455                let register_type_match = target
456                    .register_type
457                    .map(|register_type| descriptor.register_type == Some(register_type))
458                    .unwrap_or(true);
459                let address_match = target
460                    .address
461                    .map(|address| descriptor.address == Some(address))
462                    .unwrap_or(true);
463                point_match && unit_match && register_type_match && address_match
464            })
465            .map(|descriptor| ResolvedPointTarget {
466                device_id: descriptor.device_id,
467                point_id: descriptor.point_id,
468            })
469            .collect::<Vec<_>>();
470
471        if matches.is_empty() {
472            return Err(ModbusError::Config(
473                "no point matched the supplied selector".into(),
474            ));
475        }
476        if matches.len() > 1 {
477            return Err(ModbusError::Config(
478                "point selector matched more than one point; add --device or --unit".into(),
479            ));
480        }
481        Ok(matches.remove(0))
482    }
483
484    fn matches_query(info: &DeviceInfo, query: &PointCatalogQuery) -> bool {
485        if let Some(device_id) = &query.device_id {
486            if &info.id != device_id {
487                return false;
488            }
489        }
490
491        if !query
492            .tag_filters
493            .iter()
494            .all(|(key, value)| info.tags.get(key) == Some(value.as_str()))
495        {
496            return false;
497        }
498
499        if !query
500            .labels
501            .iter()
502            .all(|label| info.tags.has_label(label.as_str()))
503        {
504            return false;
505        }
506
507        true
508    }
509}
510
511#[async_trait]
512impl SessionControlPort for ModbusControlSession {
513    async fn status(&self) -> ModbusResult<SessionStatus> {
514        Ok(SessionStatus {
515            session_name: self.compiled.session_name.clone(),
516            active_fault_preset: self.compiled.active_fault_preset.clone(),
517            active_response_profile: self.compiled.active_response_profile.clone(),
518            active_behavior_set: self.compiled.active_behavior_set.clone(),
519            trace_enabled: self.compiled.trace.enabled,
520            trace_entries: self.trace_store.len(),
521            services: self.runtime_session.handles().len(),
522            devices: self.runtime_session.devices().len(),
523        })
524    }
525
526    async fn snapshot(&self) -> ModbusResult<SessionSnapshot> {
527        let status = self.status().await?;
528        let services = self
529            .runtime_session
530            .snapshots()
531            .await
532            .map_err(|error| ModbusError::Server(error.to_string()))?;
533        Ok(SessionSnapshot { status, services })
534    }
535
536    async fn reset(&mut self) -> ModbusResult<SessionSnapshot> {
537        let mut compiled = self.compiled.clone();
538        if self.compiled.reset.clear_fault_preset {
539            compiled = compiled.with_active_fault_preset(None)?;
540        }
541        if self.compiled.reset.clear_response_profile {
542            compiled = compiled.with_active_response_profile(None)?;
543        }
544        if self.compiled.reset.clear_behavior_set {
545            compiled = compiled.with_active_behavior_set(None)?;
546        }
547        self.rebuild(compiled, self.compiled.reset.clear_trace_buffer)
548            .await?;
549        self.snapshot().await
550    }
551}
552
553impl PointCatalogPort for ModbusControlSession {
554    fn list_points(&self, query: &PointCatalogQuery) -> ModbusResult<Vec<PointDescriptor>> {
555        let mut points = Vec::new();
556
557        for (_device_id, port) in self.runtime_session.devices().entries() {
558            let info = port.info();
559            if !Self::matches_query(&info, query) {
560                continue;
561            }
562
563            let unit_id = info
564                .metadata
565                .get("unit_id")
566                .and_then(|value| value.parse::<u8>().ok());
567            for point in port.point_definitions() {
568                let (register_type, address) = match point.address.clone() {
569                    Some(Address::Modbus(address)) => {
570                        (Some(address.register_type), Some(address.address))
571                    }
572                    _ => (None, None),
573                };
574                points.push(PointDescriptor {
575                    device_id: info.id.clone(),
576                    device_name: info.name.clone(),
577                    unit_id,
578                    point_id: point.id.clone(),
579                    point_name: point.name.clone(),
580                    register_type,
581                    address,
582                    data_type: format!("{:?}", point.data_type),
583                    access: access_mode_name(point.access),
584                    read_only: self
585                        .compiled
586                        .point_metadata(&info.id, &point.id)
587                        .map(|metadata| metadata.read_only)
588                        .unwrap_or(matches!(point.access, AccessMode::ReadOnly)),
589                    invalid: self
590                        .compiled
591                        .point_metadata(&info.id, &point.id)
592                        .map(|metadata| metadata.invalid)
593                        .unwrap_or(false),
594                    action_bindings: self
595                        .compiled
596                        .point_metadata(&info.id, &point.id)
597                        .map(|metadata| metadata.action_bindings.clone())
598                        .unwrap_or_default(),
599                    behavior_bindings: self
600                        .compiled
601                        .point_metadata(&info.id, &point.id)
602                        .map(|metadata| metadata.behavior_bindings.clone())
603                        .unwrap_or_default(),
604                    source_datastore: self
605                        .compiled
606                        .point_metadata(&info.id, &point.id)
607                        .and_then(|metadata| metadata.source_datastore.clone()),
608                    tags: info.tags.clone(),
609                });
610            }
611        }
612
613        points.sort_by(|left, right| {
614            left.device_id
615                .cmp(&right.device_id)
616                .then(left.point_id.cmp(&right.point_id))
617        });
618        Ok(points)
619    }
620}
621
622#[async_trait]
623impl RegisterControlPort for ModbusControlSession {
624    async fn read(&self, target: &PointTarget) -> ModbusResult<DataPoint> {
625        let resolved = self.resolve_target(target)?;
626        let port = self
627            .runtime_session
628            .devices()
629            .get(&resolved.device_id)
630            .ok_or_else(|| {
631                ModbusError::Config(format!("unknown device '{}'", resolved.device_id))
632            })?;
633        port.read(&resolved.point_id)
634            .await
635            .map_err(ModbusError::from)
636    }
637
638    async fn write(&self, target: &PointTarget, value: Value) -> ModbusResult<()> {
639        let resolved = self.resolve_target(target)?;
640        let port = self
641            .runtime_session
642            .devices()
643            .get(&resolved.device_id)
644            .ok_or_else(|| {
645                ModbusError::Config(format!("unknown device '{}'", resolved.device_id))
646            })?;
647        port.write(&resolved.point_id, value)
648            .await
649            .map_err(ModbusError::from)
650    }
651}
652
653impl TracePort for ModbusControlSession {
654    fn tail(&self, limit: usize) -> Vec<TraceEntry> {
655        self.trace_store.tail(limit)
656    }
657
658    fn clear(&self) {
659        self.trace_store.clear();
660    }
661
662    fn subscribe(&self) -> broadcast::Receiver<TraceEntry> {
663        self.trace_store.subscribe()
664    }
665}
666
667#[async_trait]
668impl FaultPresetPort for ModbusControlSession {
669    fn available_fault_presets(&self) -> Vec<String> {
670        self.compiled.fault_presets.keys().cloned().collect()
671    }
672
673    fn active_fault_preset(&self) -> Option<String> {
674        self.compiled.active_fault_preset.clone()
675    }
676
677    async fn apply_fault_preset(&mut self, name: &str) -> ModbusResult<SessionSnapshot> {
678        let compiled = self.compiled.with_active_fault_preset(Some(name))?;
679        self.rebuild(compiled, false).await?;
680        self.snapshot().await
681    }
682
683    async fn clear_fault_preset(&mut self) -> ModbusResult<SessionSnapshot> {
684        let compiled = self.compiled.with_active_fault_preset(None)?;
685        self.rebuild(compiled, false).await?;
686        self.snapshot().await
687    }
688}
689
690#[async_trait]
691impl ResponseProfilePort for ModbusControlSession {
692    fn available_response_profiles(&self) -> Vec<String> {
693        self.compiled.response_profiles.keys().cloned().collect()
694    }
695
696    fn active_response_profile(&self) -> Option<String> {
697        self.compiled.active_response_profile.clone()
698    }
699
700    async fn apply_response_profile(&mut self, name: &str) -> ModbusResult<SessionSnapshot> {
701        let compiled = self.compiled.with_active_response_profile(Some(name))?;
702        self.rebuild(compiled, false).await?;
703        self.snapshot().await
704    }
705
706    async fn clear_response_profile(&mut self) -> ModbusResult<SessionSnapshot> {
707        let compiled = self.compiled.with_active_response_profile(None)?;
708        self.rebuild(compiled, false).await?;
709        self.snapshot().await
710    }
711}
712
713impl SessionMetadataPort for ModbusControlSession {
714    fn action_binding_summaries(&self) -> &[ActionBindingSummary] {
715        &self.compiled.action_binding_summaries
716    }
717
718    fn behavior_binding_summaries(&self) -> &[BehaviorBindingSummary] {
719        &self.compiled.behavior_binding_summaries
720    }
721
722    fn datastore_policy_summaries(&self) -> &[DatastorePolicySummary] {
723        &self.compiled.datastore_policies
724    }
725}
726
727#[async_trait]
728impl BehaviorSetPort for ModbusControlSession {
729    fn available_behavior_sets(&self) -> Vec<String> {
730        self.compiled.behavior_sets.keys().cloned().collect()
731    }
732
733    fn active_behavior_set(&self) -> Option<String> {
734        self.compiled.active_behavior_set.clone()
735    }
736
737    async fn apply_behavior_set(&mut self, name: &str) -> ModbusResult<SessionSnapshot> {
738        let compiled = self.compiled.with_active_behavior_set(Some(name))?;
739        self.rebuild(compiled, false).await?;
740        self.snapshot().await
741    }
742
743    async fn clear_behavior_set(&mut self) -> ModbusResult<SessionSnapshot> {
744        let compiled = self.compiled.with_active_behavior_set(None)?;
745        self.rebuild(compiled, false).await?;
746        self.snapshot().await
747    }
748}
749
750fn access_mode_name(mode: AccessMode) -> String {
751    match mode {
752        AccessMode::ReadOnly => "read_only",
753        AccessMode::WriteOnly => "write_only",
754        AccessMode::ReadWrite => "read_write",
755    }
756    .to_string()
757}
758
759#[cfg(test)]
760mod tests {
761    use std::collections::BTreeMap;
762    use std::time::Duration;
763
764    use super::{
765        BehaviorSetPort, FaultPresetPort, ModbusControlSession, PointCatalogPort, PointTarget,
766        RegisterControlPort, ResponseProfilePort, SessionControlPort, TracePort,
767    };
768    use crate::fault_injection::FaultInjectionConfig;
769    use crate::profile::{PointProfile, SimulatorProfile, UnitProfile};
770    use crate::rtu::RtuServerConfig;
771    use crate::simulator::{
772        CompiledModbusSession, CompiledPointMetadata, CompiledTransportKind,
773        ModbusServiceLaunchConfig, ModbusTransportLaunch, ResponseProfileDefinition,
774        SessionControlConfig, SessionResetPolicy, SessionTraceConfig,
775    };
776    use mabi_core::types::{DataType, ModbusRegisterType};
777    use mabi_core::value::Value;
778    use mabi_runtime::ProtocolDriverRegistry;
779
780    fn registry() -> ProtocolDriverRegistry {
781        let mut registry = ProtocolDriverRegistry::new();
782        registry.register(crate::driver());
783        registry
784    }
785
786    fn compiled_session() -> CompiledModbusSession {
787        let profile = SimulatorProfile::new().with_unit(UnitProfile::new(1, "Pump").with_point(
788            PointProfile::new(
789                "temperature",
790                "Temperature",
791                ModbusRegisterType::HoldingRegister,
792                0,
793                DataType::UInt16,
794            ),
795        ));
796
797        CompiledModbusSession {
798            session_name: "demo".into(),
799            launch: mabi_runtime::ProtocolLaunchSpec {
800                protocol: "modbus".into(),
801                name: Some("demo".into()),
802                config: serde_json::to_value(ModbusServiceLaunchConfig {
803                    transport: ModbusTransportLaunch::Rtu {
804                        config: RtuServerConfig::for_testing(),
805                    },
806                    profile: Some(profile.clone()),
807                    devices: None,
808                    points_per_device: None,
809                })
810                .unwrap(),
811            },
812            transport_kind: CompiledTransportKind::Rtu,
813            profile,
814            trace: SessionTraceConfig {
815                enabled: true,
816                capacity: Some(32),
817            },
818            reset: SessionResetPolicy::default(),
819            control: SessionControlConfig::default(),
820            fault_presets: BTreeMap::from([("drop".to_string(), FaultInjectionConfig::default())]),
821            active_fault_preset: None,
822            response_profiles: BTreeMap::from([(
823                "slow".to_string(),
824                ResponseProfileDefinition {
825                    delay_ms: Some(10),
826                    ..Default::default()
827                },
828            )]),
829            active_response_profile: None,
830            actions: BTreeMap::new(),
831            behaviors: BTreeMap::new(),
832            behavior_sets: BTreeMap::from([(
833                "maintenance".to_string(),
834                crate::simulator::BehaviorSetDefinition {
835                    behaviors: vec!["temperature_guard".into()],
836                },
837            )]),
838            active_behavior_set: None,
839            point_catalog: BTreeMap::from([(
840                "modbus-1/temperature".to_string(),
841                CompiledPointMetadata {
842                    device_id: "modbus-1".into(),
843                    point_id: "temperature".into(),
844                    source_datastore: Some("inline".into()),
845                    read_only: false,
846                    invalid: false,
847                    action_bindings: vec!["clamp_temp@on_write".into()],
848                    behavior_bindings: vec!["temperature_guard@maintenance".into()],
849                },
850            )]),
851            datastore_policies: Vec::new(),
852            action_binding_summaries: Vec::new(),
853            behavior_binding_summaries: Vec::new(),
854            compiled_behavior_bindings: Vec::new(),
855            readiness_timeout_ms: Some(500),
856        }
857    }
858
859    #[tokio::test]
860    async fn control_session_lists_points_and_reads_back_writes() {
861        let session =
862            ModbusControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
863                .await
864                .unwrap();
865
866        let points = session.list_points(&Default::default()).unwrap();
867        assert_eq!(points.len(), 1);
868        assert_eq!(points[0].action_bindings, vec!["clamp_temp@on_write"]);
869        assert_eq!(
870            points[0].behavior_bindings,
871            vec!["temperature_guard@maintenance"]
872        );
873        assert_eq!(points[0].source_datastore.as_deref(), Some("inline"));
874
875        session
876            .write(
877                &PointTarget {
878                    point_id: Some("temperature".into()),
879                    ..Default::default()
880                },
881                Value::U16(42),
882            )
883            .await
884            .unwrap();
885
886        let point = session
887            .read(&PointTarget {
888                point_id: Some("temperature".into()),
889                ..Default::default()
890            })
891            .await
892            .unwrap();
893        assert_eq!(point.value, Value::U16(42));
894        assert_eq!(session.tail(10).len(), 2);
895
896        session.stop().await.unwrap();
897    }
898
899    #[tokio::test]
900    async fn control_session_can_apply_and_clear_fault_presets() {
901        let mut session =
902            ModbusControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
903                .await
904                .unwrap();
905
906        let snapshot = session.apply_fault_preset("drop").await.unwrap();
907        assert_eq!(snapshot.status.active_fault_preset.as_deref(), Some("drop"));
908
909        let snapshot = session.clear_fault_preset().await.unwrap();
910        assert!(snapshot.status.active_fault_preset.is_none());
911
912        session.stop().await.unwrap();
913    }
914
915    #[tokio::test]
916    async fn session_reset_clears_traces_and_fault_preset() {
917        let mut compiled = compiled_session();
918        compiled.active_fault_preset = Some("drop".into());
919        compiled.active_response_profile = Some("slow".into());
920        compiled.active_behavior_set = Some("maintenance".into());
921
922        let mut session = ModbusControlSession::new(registry(), compiled, Duration::from_secs(1))
923            .await
924            .unwrap();
925        session
926            .write(
927                &PointTarget {
928                    point_id: Some("temperature".into()),
929                    ..Default::default()
930                },
931                Value::U16(7),
932            )
933            .await
934            .unwrap();
935        assert_eq!(session.tail(10).len(), 1);
936
937        let snapshot = session.reset().await.unwrap();
938        assert!(snapshot.status.active_fault_preset.is_none());
939        assert!(snapshot.status.active_response_profile.is_none());
940        assert!(snapshot.status.active_behavior_set.is_none());
941        assert_eq!(snapshot.status.trace_entries, 0);
942
943        session.stop().await.unwrap();
944    }
945
946    #[tokio::test]
947    async fn control_session_can_apply_and_clear_response_profiles() {
948        let mut session =
949            ModbusControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
950                .await
951                .unwrap();
952
953        let snapshot = session.apply_response_profile("slow").await.unwrap();
954        assert_eq!(
955            snapshot.status.active_response_profile.as_deref(),
956            Some("slow")
957        );
958
959        let snapshot = session.clear_response_profile().await.unwrap();
960        assert!(snapshot.status.active_response_profile.is_none());
961
962        session.stop().await.unwrap();
963    }
964
965    #[tokio::test]
966    async fn control_session_can_apply_and_clear_behavior_sets() {
967        let mut session =
968            ModbusControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
969                .await
970                .unwrap();
971
972        let snapshot = session.apply_behavior_set("maintenance").await.unwrap();
973        assert_eq!(
974            snapshot.status.active_behavior_set.as_deref(),
975            Some("maintenance")
976        );
977
978        let snapshot = session.clear_behavior_set().await.unwrap();
979        assert!(snapshot.status.active_behavior_set.is_none());
980
981        session.stop().await.unwrap();
982    }
983}