Skip to main content

mabi_opcua/
control.rs

1//! In-process control-plane surface for compiled OPC UA simulator sessions.
2
3use std::fs;
4use std::path::PathBuf;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Serialize;
9
10use mabi_core::types::{Address, DataPoint};
11use mabi_core::value::Value;
12use mabi_runtime::{ProtocolDriverRegistry, RuntimeSession, RuntimeSessionSpec};
13
14use crate::error::{OpcUaError, OpcUaResult};
15use crate::modeling::CompiledOpcUaSession;
16use crate::sdk::subscription::{
17    DurableSubscriptionStatus, DurableSubscriptionStore, SubscriptionDurabilityMode,
18};
19use crate::security::{SecurityAuditStatus, SecurityManager, SecurityStatus};
20
21/// Lifecycle-oriented control surface for a compiled session.
22#[async_trait]
23pub trait SessionControlPort: Send {
24    async fn status(&self) -> OpcUaResult<SessionStatus>;
25    async fn snapshot(&self) -> OpcUaResult<SessionSnapshot>;
26    async fn reset(&mut self) -> OpcUaResult<SessionSnapshot>;
27}
28
29/// Security admin surface used by CLI commands.
30#[async_trait]
31pub trait SecurityControlPort: Send {
32    async fn security_status(&self) -> OpcUaResult<SecurityControlStatus>;
33    async fn trust_reload(&self) -> OpcUaResult<SecurityControlStatus>;
34    async fn rotate_server_certificate(
35        &self,
36        certificate_path: PathBuf,
37        private_key_path: PathBuf,
38    ) -> OpcUaResult<SecurityControlStatus>;
39    async fn audit_summary(&self) -> OpcUaResult<SecurityAuditStatus>;
40}
41
42/// Node catalog inspection surface used by CLI commands.
43pub trait NodeCatalogPort {
44    fn list_nodes(&self) -> OpcUaResult<Vec<NodeDescriptor>>;
45}
46
47/// Point and raw-node read/write surface.
48#[async_trait]
49pub trait NodeValueControlPort {
50    async fn read(&self, target: &NodeTarget) -> OpcUaResult<DataPoint>;
51    async fn write(&self, target: &NodeTarget, value: Value) -> OpcUaResult<()>;
52}
53
54/// Stable session status view.
55#[derive(Debug, Clone, Serialize)]
56pub struct SessionStatus {
57    pub session_name: String,
58    pub services: usize,
59    pub devices: usize,
60    pub nodes: usize,
61    pub namespaces: usize,
62    pub allow_raw_node_access: bool,
63    pub durability_mode: String,
64    pub restore_on_start: bool,
65    pub persisted_state_present: bool,
66    pub restored_subscriptions: usize,
67    pub detached_restored_subscriptions: usize,
68    pub last_durable_flush_at: Option<String>,
69    pub last_durable_flush_result: String,
70    pub diagnostics_summary: String,
71    pub security_profile: String,
72    pub audit_sink: String,
73    pub allow_trust_reload: bool,
74    pub allow_certificate_rotation: bool,
75    pub audit_status: SecurityAuditStatus,
76    pub generated_type_entries: usize,
77    pub generated_type_module: String,
78}
79
80/// Snapshot returned by reset and snapshot operations.
81#[derive(Debug, Clone, Serialize)]
82pub struct SessionSnapshot {
83    pub status: SessionStatus,
84    pub services: Vec<mabi_runtime::ServiceSnapshot>,
85}
86
87/// Stable security admin view.
88#[derive(Debug, Clone, Serialize)]
89pub struct SecurityControlStatus {
90    pub profile_name: String,
91    pub allow_trust_reload: bool,
92    pub allow_certificate_rotation: bool,
93    pub status: SecurityStatus,
94}
95
96/// Operator-facing node catalog record.
97#[derive(Debug, Clone, Serialize)]
98pub struct NodeDescriptor {
99    pub device_id: String,
100    pub point_id: String,
101    pub node_id: String,
102    pub browse_name: String,
103    pub display_name: String,
104    pub node_class: String,
105    pub writable: bool,
106    pub historizing: bool,
107    pub sampling_interval_ms: Option<u32>,
108}
109
110/// Read/write selector used by control commands.
111#[derive(Debug, Clone, Default)]
112pub struct NodeTarget {
113    pub device_id: Option<String>,
114    pub point_id: Option<String>,
115    pub node_id: Option<String>,
116}
117
118/// In-process control session over a compiled OPC UA simulator session.
119pub struct OpcUaControlSession {
120    registry: ProtocolDriverRegistry,
121    compiled: CompiledOpcUaSession,
122    fallback_readiness_timeout: Duration,
123    runtime_session: RuntimeSession,
124    security_manager: SecurityManager,
125}
126
127impl OpcUaControlSession {
128    pub async fn new(
129        registry: ProtocolDriverRegistry,
130        compiled: CompiledOpcUaSession,
131        fallback_readiness_timeout: Duration,
132    ) -> OpcUaResult<Self> {
133        let runtime_session =
134            Self::start_runtime(&registry, &compiled, fallback_readiness_timeout).await?;
135        let security_manager = Self::build_security_manager(&compiled)?;
136        Ok(Self {
137            registry,
138            compiled,
139            fallback_readiness_timeout,
140            runtime_session,
141            security_manager,
142        })
143    }
144
145    fn build_security_manager(compiled: &CompiledOpcUaSession) -> OpcUaResult<SecurityManager> {
146        let manager = SecurityManager::new(compiled.security.manager_config.clone());
147        manager
148            .initialize()
149            .map_err(|error| OpcUaError::Server(error.to_string()))?;
150        Ok(manager)
151    }
152
153    async fn start_runtime(
154        registry: &ProtocolDriverRegistry,
155        compiled: &CompiledOpcUaSession,
156        fallback_readiness_timeout: Duration,
157    ) -> OpcUaResult<RuntimeSession> {
158        let session = RuntimeSession::new(
159            RuntimeSessionSpec {
160                services: vec![compiled.launch.clone()],
161                readiness_timeout: compiled.readiness_timeout_ms,
162            },
163            registry,
164            compiled.runtime_extensions(),
165        )
166        .await
167        .map_err(|error| OpcUaError::Server(error.to_string()))?;
168        session
169            .start(fallback_readiness_timeout)
170            .await
171            .map_err(|error| OpcUaError::Server(error.to_string()))?;
172        Ok(session)
173    }
174
175    async fn rebuild(&mut self, compiled: CompiledOpcUaSession) -> OpcUaResult<()> {
176        self.runtime_session
177            .stop()
178            .await
179            .map_err(|error| OpcUaError::Server(error.to_string()))?;
180        self.runtime_session =
181            Self::start_runtime(&self.registry, &compiled, self.fallback_readiness_timeout).await?;
182        self.security_manager = Self::build_security_manager(&compiled)?;
183        self.compiled = compiled;
184        Ok(())
185    }
186
187    fn durable_store(&self) -> Option<DurableSubscriptionStore> {
188        DurableSubscriptionStore::new(self.compiled.runtime.durability.clone())
189    }
190
191    fn durable_state_file(&self) -> Option<PathBuf> {
192        if self.compiled.runtime.durability.mode != SubscriptionDurabilityMode::Persisted {
193            return None;
194        }
195        let state_dir = self
196            .compiled
197            .runtime
198            .durability
199            .state_dir
200            .clone()
201            .unwrap_or_else(|| std::env::temp_dir().join("mabi-opcua-subscriptions"));
202        Some(state_dir.join("subscriptions.json"))
203    }
204
205    fn durable_status(&self) -> DurableSubscriptionStatus {
206        self.durable_store()
207            .and_then(|store| store.load_status().ok())
208            .unwrap_or_else(|| DurableSubscriptionStatus {
209                persisted_state_present: self
210                    .durable_state_file()
211                    .is_some_and(|path| path.exists()),
212                restored_subscription_count: 0,
213                detached_subscription_count: 0,
214                last_flush_at: None,
215                last_flush_result: "never_flushed".to_string(),
216            })
217    }
218
219    fn diagnostics_summary(&self, durability: &DurableSubscriptionStatus) -> String {
220        format!(
221            "namespaces={} profile={} durability={:?} restored={} detached={}",
222            self.compiled.catalog.namespace_table.len(),
223            self.compiled.security.name,
224            self.compiled.runtime.durability.mode,
225            durability.restored_subscription_count,
226            durability.detached_subscription_count,
227        )
228    }
229
230    fn ensure_trust_reload_allowed(&self) -> OpcUaResult<()> {
231        if self.compiled.security.allow_trust_reload {
232            Ok(())
233        } else {
234            Err(OpcUaError::Config(format!(
235                "security profile '{}' does not allow trust reload operations",
236                self.compiled.security.name
237            )))
238        }
239    }
240
241    fn ensure_certificate_rotation_allowed(&self) -> OpcUaResult<()> {
242        if self.compiled.security.allow_certificate_rotation {
243            Ok(())
244        } else {
245            Err(OpcUaError::Config(format!(
246                "security profile '{}' does not allow certificate rotation operations",
247                self.compiled.security.name
248            )))
249        }
250    }
251
252    fn security_control_status(&self) -> SecurityControlStatus {
253        SecurityControlStatus {
254            profile_name: self.compiled.security.name.clone(),
255            allow_trust_reload: self.compiled.security.allow_trust_reload,
256            allow_certificate_rotation: self.compiled.security.allow_certificate_rotation,
257            status: self.security_manager.security_status(),
258        }
259    }
260
261    fn resolve_device_id(&self, target: &NodeTarget) -> OpcUaResult<String> {
262        if let Some(device_id) = &target.device_id {
263            return Ok(device_id.clone());
264        }
265        self.runtime_session
266            .devices()
267            .device_ids()
268            .into_iter()
269            .next()
270            .ok_or_else(|| OpcUaError::Server("session has no registered OPC UA devices".into()))
271    }
272
273    fn resolve_point_id(&self, target: &NodeTarget) -> OpcUaResult<String> {
274        if let Some(point_id) = &target.point_id {
275            return Ok(point_id.clone());
276        }
277        if let Some(node_id) = &target.node_id {
278            return Ok(node_id.clone());
279        }
280        Err(OpcUaError::Config(
281            "node selection requires either --point or --node-id".into(),
282        ))
283    }
284
285    pub async fn stop(&self) -> OpcUaResult<()> {
286        self.runtime_session
287            .stop()
288            .await
289            .map_err(|error| OpcUaError::Server(error.to_string()))
290    }
291}
292
293#[async_trait]
294impl SessionControlPort for OpcUaControlSession {
295    async fn status(&self) -> OpcUaResult<SessionStatus> {
296        let durability = self.durable_status();
297        let audit_status = self.security_manager.audit_status();
298        let diagnostics_summary = self.diagnostics_summary(&durability);
299        Ok(SessionStatus {
300            session_name: self.compiled.session_name.clone(),
301            services: self.runtime_session.handles().len(),
302            devices: self.runtime_session.devices().len(),
303            nodes: self.compiled.catalog.nodes.len(),
304            namespaces: self.compiled.catalog.namespace_table.len(),
305            allow_raw_node_access: self.compiled.control.allow_raw_node_access,
306            durability_mode: format!("{:?}", self.compiled.runtime.durability.mode),
307            restore_on_start: self.compiled.runtime.durability.restore_on_start,
308            persisted_state_present: durability.persisted_state_present,
309            restored_subscriptions: durability.restored_subscription_count,
310            detached_restored_subscriptions: durability.detached_subscription_count,
311            last_durable_flush_at: durability.last_flush_at.map(|value| value.to_rfc3339()),
312            last_durable_flush_result: durability.last_flush_result,
313            diagnostics_summary,
314            security_profile: self.compiled.security.name.clone(),
315            audit_sink: format!("{:?}", self.compiled.security.audit_sink.kind),
316            allow_trust_reload: self.compiled.security.allow_trust_reload,
317            allow_certificate_rotation: self.compiled.security.allow_certificate_rotation,
318            audit_status,
319            generated_type_entries: self.compiled.generated_types.entries.len(),
320            generated_type_module: self.compiled.generated_types.module_name.clone(),
321        })
322    }
323
324    async fn snapshot(&self) -> OpcUaResult<SessionSnapshot> {
325        Ok(SessionSnapshot {
326            status: self.status().await?,
327            services: self
328                .runtime_session
329                .snapshots()
330                .await
331                .map_err(|error| OpcUaError::Server(error.to_string()))?,
332        })
333    }
334
335    async fn reset(&mut self) -> OpcUaResult<SessionSnapshot> {
336        if self.compiled.control.clear_persisted_subscriptions_on_reset {
337            if let Some(state_file) = self.durable_state_file() {
338                match fs::remove_file(&state_file) {
339                    Ok(()) => {}
340                    Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
341                    Err(error) => {
342                        return Err(OpcUaError::Server(format!(
343                            "failed to clear persisted subscription state '{}': {}",
344                            state_file.display(),
345                            error
346                        )));
347                    }
348                }
349            }
350        }
351        let compiled = self.compiled.clone();
352        self.rebuild(compiled).await?;
353        self.snapshot().await
354    }
355}
356
357impl NodeCatalogPort for OpcUaControlSession {
358    fn list_nodes(&self) -> OpcUaResult<Vec<NodeDescriptor>> {
359        let mut nodes = Vec::new();
360        for (device_id, port) in self.runtime_session.devices().entries() {
361            for point in port.point_definitions() {
362                let node_id = match point.address.as_ref() {
363                    Some(Address::OpcUa { node_id }) => node_id.clone(),
364                    _ => point.id.clone(),
365                };
366                let compiled = self
367                    .compiled
368                    .devices
369                    .iter()
370                    .find(|device| device.device_id == device_id)
371                    .and_then(|device| {
372                        device
373                            .points
374                            .iter()
375                            .find(|binding| binding.point_id == point.id)
376                    });
377                nodes.push(NodeDescriptor {
378                    device_id: device_id.clone(),
379                    point_id: point.id.clone(),
380                    node_id,
381                    browse_name: compiled
382                        .map(|binding| binding.browse_name.clone())
383                        .unwrap_or_else(|| point.name.clone()),
384                    display_name: compiled
385                        .map(|binding| binding.display_name.clone())
386                        .unwrap_or_else(|| point.name.clone()),
387                    node_class: compiled
388                        .map(|binding| binding.node_class.clone())
389                        .unwrap_or_else(|| "variable".into()),
390                    writable: compiled.map(|binding| binding.writable).unwrap_or(false),
391                    historizing: compiled.map(|binding| binding.historizing).unwrap_or(false),
392                    sampling_interval_ms: compiled.and_then(|binding| binding.sampling_interval_ms),
393                });
394            }
395        }
396        nodes.sort_by(|left, right| {
397            left.device_id
398                .cmp(&right.device_id)
399                .then(left.point_id.cmp(&right.point_id))
400        });
401        Ok(nodes)
402    }
403}
404
405#[async_trait]
406impl SecurityControlPort for OpcUaControlSession {
407    async fn security_status(&self) -> OpcUaResult<SecurityControlStatus> {
408        Ok(self.security_control_status())
409    }
410
411    async fn trust_reload(&self) -> OpcUaResult<SecurityControlStatus> {
412        self.ensure_trust_reload_allowed()?;
413        self.security_manager
414            .reload_trust_store()
415            .map_err(|error| OpcUaError::Server(error.to_string()))?;
416        Ok(self.security_control_status())
417    }
418
419    async fn rotate_server_certificate(
420        &self,
421        certificate_path: PathBuf,
422        private_key_path: PathBuf,
423    ) -> OpcUaResult<SecurityControlStatus> {
424        self.ensure_certificate_rotation_allowed()?;
425        self.security_manager
426            .rotate_server_certificate(&certificate_path, &private_key_path)
427            .map_err(|error| OpcUaError::Server(error.to_string()))?;
428        Ok(self.security_control_status())
429    }
430
431    async fn audit_summary(&self) -> OpcUaResult<SecurityAuditStatus> {
432        Ok(self.security_manager.audit_status())
433    }
434}
435
436#[async_trait]
437impl NodeValueControlPort for OpcUaControlSession {
438    async fn read(&self, target: &NodeTarget) -> OpcUaResult<DataPoint> {
439        let device_id = self.resolve_device_id(target)?;
440        let point_id = self.resolve_point_id(target)?;
441        let port = self
442            .runtime_session
443            .devices()
444            .get(&device_id)
445            .ok_or_else(|| OpcUaError::NodeNotFound { node_id: device_id })?;
446        port.read(&point_id).await.map_err(OpcUaError::from)
447    }
448
449    async fn write(&self, target: &NodeTarget, value: Value) -> OpcUaResult<()> {
450        let device_id = self.resolve_device_id(target)?;
451        let point_id = self.resolve_point_id(target)?;
452        let port = self
453            .runtime_session
454            .devices()
455            .get(&device_id)
456            .ok_or_else(|| OpcUaError::NodeNotFound { node_id: device_id })?;
457        port.write(&point_id, value).await.map_err(OpcUaError::from)
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::collections::BTreeMap;
464
465    use super::*;
466    use crate::modeling::{
467        PresetDefinition, SessionDefinition, SimulatorDefaults, TransportDefinition,
468    };
469
470    fn compiled_session() -> CompiledOpcUaSession {
471        let config = crate::modeling::OpcUaSimulatorConfig {
472            defaults: SimulatorDefaults::default(),
473            transports: BTreeMap::from([(
474                "main".into(),
475                TransportDefinition {
476                    port: 0,
477                    ..TransportDefinition::default()
478                },
479            )]),
480            presets: BTreeMap::from([("generated".into(), PresetDefinition::default())]),
481            sessions: BTreeMap::from([(
482                "demo".into(),
483                SessionDefinition {
484                    transport: "main".into(),
485                    preset: Some("generated".into()),
486                    service_name: Some("opcua-control".into()),
487                    ..Default::default()
488                },
489            )]),
490            ..Default::default()
491        };
492        config.compile_session("demo", None).unwrap()
493    }
494
495    fn registry() -> ProtocolDriverRegistry {
496        let mut registry = ProtocolDriverRegistry::new();
497        registry.register(crate::runtime::driver());
498        registry
499    }
500
501    #[tokio::test]
502    async fn control_session_lists_nodes() {
503        let session =
504            OpcUaControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
505                .await
506                .unwrap();
507        let nodes = session.list_nodes().unwrap();
508        assert!(!nodes.is_empty());
509    }
510}