1use std::fs;
4use std::path::PathBuf;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Serialize;
9
10use mabi_core::types::{Address, DataPoint};
11use mabi_core::value::Value;
12use mabi_runtime::{ProtocolDriverRegistry, RuntimeSession, RuntimeSessionSpec};
13
14use crate::error::{OpcUaError, OpcUaResult};
15use crate::modeling::CompiledOpcUaSession;
16use crate::sdk::subscription::{
17 DurableSubscriptionStatus, DurableSubscriptionStore, SubscriptionDurabilityMode,
18};
19use crate::security::{SecurityAuditStatus, SecurityManager, SecurityStatus};
20
21#[async_trait]
23pub trait SessionControlPort: Send {
24 async fn status(&self) -> OpcUaResult<SessionStatus>;
25 async fn snapshot(&self) -> OpcUaResult<SessionSnapshot>;
26 async fn reset(&mut self) -> OpcUaResult<SessionSnapshot>;
27}
28
29#[async_trait]
31pub trait SecurityControlPort: Send {
32 async fn security_status(&self) -> OpcUaResult<SecurityControlStatus>;
33 async fn trust_reload(&self) -> OpcUaResult<SecurityControlStatus>;
34 async fn rotate_server_certificate(
35 &self,
36 certificate_path: PathBuf,
37 private_key_path: PathBuf,
38 ) -> OpcUaResult<SecurityControlStatus>;
39 async fn audit_summary(&self) -> OpcUaResult<SecurityAuditStatus>;
40}
41
42pub trait NodeCatalogPort {
44 fn list_nodes(&self) -> OpcUaResult<Vec<NodeDescriptor>>;
45}
46
47#[async_trait]
49pub trait NodeValueControlPort {
50 async fn read(&self, target: &NodeTarget) -> OpcUaResult<DataPoint>;
51 async fn write(&self, target: &NodeTarget, value: Value) -> OpcUaResult<()>;
52}
53
54#[derive(Debug, Clone, Serialize)]
56pub struct SessionStatus {
57 pub session_name: String,
58 pub services: usize,
59 pub devices: usize,
60 pub nodes: usize,
61 pub namespaces: usize,
62 pub allow_raw_node_access: bool,
63 pub durability_mode: String,
64 pub restore_on_start: bool,
65 pub persisted_state_present: bool,
66 pub restored_subscriptions: usize,
67 pub detached_restored_subscriptions: usize,
68 pub last_durable_flush_at: Option<String>,
69 pub last_durable_flush_result: String,
70 pub diagnostics_summary: String,
71 pub security_profile: String,
72 pub audit_sink: String,
73 pub allow_trust_reload: bool,
74 pub allow_certificate_rotation: bool,
75 pub audit_status: SecurityAuditStatus,
76 pub generated_type_entries: usize,
77 pub generated_type_module: String,
78}
79
80#[derive(Debug, Clone, Serialize)]
82pub struct SessionSnapshot {
83 pub status: SessionStatus,
84 pub services: Vec<mabi_runtime::ServiceSnapshot>,
85}
86
87#[derive(Debug, Clone, Serialize)]
89pub struct SecurityControlStatus {
90 pub profile_name: String,
91 pub allow_trust_reload: bool,
92 pub allow_certificate_rotation: bool,
93 pub status: SecurityStatus,
94}
95
96#[derive(Debug, Clone, Serialize)]
98pub struct NodeDescriptor {
99 pub device_id: String,
100 pub point_id: String,
101 pub node_id: String,
102 pub browse_name: String,
103 pub display_name: String,
104 pub node_class: String,
105 pub writable: bool,
106 pub historizing: bool,
107 pub sampling_interval_ms: Option<u32>,
108}
109
110#[derive(Debug, Clone, Default)]
112pub struct NodeTarget {
113 pub device_id: Option<String>,
114 pub point_id: Option<String>,
115 pub node_id: Option<String>,
116}
117
118pub struct OpcUaControlSession {
120 registry: ProtocolDriverRegistry,
121 compiled: CompiledOpcUaSession,
122 fallback_readiness_timeout: Duration,
123 runtime_session: RuntimeSession,
124 security_manager: SecurityManager,
125}
126
127impl OpcUaControlSession {
128 pub async fn new(
129 registry: ProtocolDriverRegistry,
130 compiled: CompiledOpcUaSession,
131 fallback_readiness_timeout: Duration,
132 ) -> OpcUaResult<Self> {
133 let runtime_session =
134 Self::start_runtime(®istry, &compiled, fallback_readiness_timeout).await?;
135 let security_manager = Self::build_security_manager(&compiled)?;
136 Ok(Self {
137 registry,
138 compiled,
139 fallback_readiness_timeout,
140 runtime_session,
141 security_manager,
142 })
143 }
144
145 fn build_security_manager(compiled: &CompiledOpcUaSession) -> OpcUaResult<SecurityManager> {
146 let manager = SecurityManager::new(compiled.security.manager_config.clone());
147 manager
148 .initialize()
149 .map_err(|error| OpcUaError::Server(error.to_string()))?;
150 Ok(manager)
151 }
152
153 async fn start_runtime(
154 registry: &ProtocolDriverRegistry,
155 compiled: &CompiledOpcUaSession,
156 fallback_readiness_timeout: Duration,
157 ) -> OpcUaResult<RuntimeSession> {
158 let session = RuntimeSession::new(
159 RuntimeSessionSpec {
160 services: vec![compiled.launch.clone()],
161 readiness_timeout: compiled.readiness_timeout_ms,
162 },
163 registry,
164 compiled.runtime_extensions(),
165 )
166 .await
167 .map_err(|error| OpcUaError::Server(error.to_string()))?;
168 session
169 .start(fallback_readiness_timeout)
170 .await
171 .map_err(|error| OpcUaError::Server(error.to_string()))?;
172 Ok(session)
173 }
174
175 async fn rebuild(&mut self, compiled: CompiledOpcUaSession) -> OpcUaResult<()> {
176 self.runtime_session
177 .stop()
178 .await
179 .map_err(|error| OpcUaError::Server(error.to_string()))?;
180 self.runtime_session =
181 Self::start_runtime(&self.registry, &compiled, self.fallback_readiness_timeout).await?;
182 self.security_manager = Self::build_security_manager(&compiled)?;
183 self.compiled = compiled;
184 Ok(())
185 }
186
187 fn durable_store(&self) -> Option<DurableSubscriptionStore> {
188 DurableSubscriptionStore::new(self.compiled.runtime.durability.clone())
189 }
190
191 fn durable_state_file(&self) -> Option<PathBuf> {
192 if self.compiled.runtime.durability.mode != SubscriptionDurabilityMode::Persisted {
193 return None;
194 }
195 let state_dir = self
196 .compiled
197 .runtime
198 .durability
199 .state_dir
200 .clone()
201 .unwrap_or_else(|| std::env::temp_dir().join("mabi-opcua-subscriptions"));
202 Some(state_dir.join("subscriptions.json"))
203 }
204
205 fn durable_status(&self) -> DurableSubscriptionStatus {
206 self.durable_store()
207 .and_then(|store| store.load_status().ok())
208 .unwrap_or_else(|| DurableSubscriptionStatus {
209 persisted_state_present: self
210 .durable_state_file()
211 .is_some_and(|path| path.exists()),
212 restored_subscription_count: 0,
213 detached_subscription_count: 0,
214 last_flush_at: None,
215 last_flush_result: "never_flushed".to_string(),
216 })
217 }
218
219 fn diagnostics_summary(&self, durability: &DurableSubscriptionStatus) -> String {
220 format!(
221 "namespaces={} profile={} durability={:?} restored={} detached={}",
222 self.compiled.catalog.namespace_table.len(),
223 self.compiled.security.name,
224 self.compiled.runtime.durability.mode,
225 durability.restored_subscription_count,
226 durability.detached_subscription_count,
227 )
228 }
229
230 fn ensure_trust_reload_allowed(&self) -> OpcUaResult<()> {
231 if self.compiled.security.allow_trust_reload {
232 Ok(())
233 } else {
234 Err(OpcUaError::Config(format!(
235 "security profile '{}' does not allow trust reload operations",
236 self.compiled.security.name
237 )))
238 }
239 }
240
241 fn ensure_certificate_rotation_allowed(&self) -> OpcUaResult<()> {
242 if self.compiled.security.allow_certificate_rotation {
243 Ok(())
244 } else {
245 Err(OpcUaError::Config(format!(
246 "security profile '{}' does not allow certificate rotation operations",
247 self.compiled.security.name
248 )))
249 }
250 }
251
252 fn security_control_status(&self) -> SecurityControlStatus {
253 SecurityControlStatus {
254 profile_name: self.compiled.security.name.clone(),
255 allow_trust_reload: self.compiled.security.allow_trust_reload,
256 allow_certificate_rotation: self.compiled.security.allow_certificate_rotation,
257 status: self.security_manager.security_status(),
258 }
259 }
260
261 fn resolve_device_id(&self, target: &NodeTarget) -> OpcUaResult<String> {
262 if let Some(device_id) = &target.device_id {
263 return Ok(device_id.clone());
264 }
265 self.runtime_session
266 .devices()
267 .device_ids()
268 .into_iter()
269 .next()
270 .ok_or_else(|| OpcUaError::Server("session has no registered OPC UA devices".into()))
271 }
272
273 fn resolve_point_id(&self, target: &NodeTarget) -> OpcUaResult<String> {
274 if let Some(point_id) = &target.point_id {
275 return Ok(point_id.clone());
276 }
277 if let Some(node_id) = &target.node_id {
278 return Ok(node_id.clone());
279 }
280 Err(OpcUaError::Config(
281 "node selection requires either --point or --node-id".into(),
282 ))
283 }
284
285 pub async fn stop(&self) -> OpcUaResult<()> {
286 self.runtime_session
287 .stop()
288 .await
289 .map_err(|error| OpcUaError::Server(error.to_string()))
290 }
291}
292
293#[async_trait]
294impl SessionControlPort for OpcUaControlSession {
295 async fn status(&self) -> OpcUaResult<SessionStatus> {
296 let durability = self.durable_status();
297 let audit_status = self.security_manager.audit_status();
298 let diagnostics_summary = self.diagnostics_summary(&durability);
299 Ok(SessionStatus {
300 session_name: self.compiled.session_name.clone(),
301 services: self.runtime_session.handles().len(),
302 devices: self.runtime_session.devices().len(),
303 nodes: self.compiled.catalog.nodes.len(),
304 namespaces: self.compiled.catalog.namespace_table.len(),
305 allow_raw_node_access: self.compiled.control.allow_raw_node_access,
306 durability_mode: format!("{:?}", self.compiled.runtime.durability.mode),
307 restore_on_start: self.compiled.runtime.durability.restore_on_start,
308 persisted_state_present: durability.persisted_state_present,
309 restored_subscriptions: durability.restored_subscription_count,
310 detached_restored_subscriptions: durability.detached_subscription_count,
311 last_durable_flush_at: durability.last_flush_at.map(|value| value.to_rfc3339()),
312 last_durable_flush_result: durability.last_flush_result,
313 diagnostics_summary,
314 security_profile: self.compiled.security.name.clone(),
315 audit_sink: format!("{:?}", self.compiled.security.audit_sink.kind),
316 allow_trust_reload: self.compiled.security.allow_trust_reload,
317 allow_certificate_rotation: self.compiled.security.allow_certificate_rotation,
318 audit_status,
319 generated_type_entries: self.compiled.generated_types.entries.len(),
320 generated_type_module: self.compiled.generated_types.module_name.clone(),
321 })
322 }
323
324 async fn snapshot(&self) -> OpcUaResult<SessionSnapshot> {
325 Ok(SessionSnapshot {
326 status: self.status().await?,
327 services: self
328 .runtime_session
329 .snapshots()
330 .await
331 .map_err(|error| OpcUaError::Server(error.to_string()))?,
332 })
333 }
334
335 async fn reset(&mut self) -> OpcUaResult<SessionSnapshot> {
336 if self.compiled.control.clear_persisted_subscriptions_on_reset {
337 if let Some(state_file) = self.durable_state_file() {
338 match fs::remove_file(&state_file) {
339 Ok(()) => {}
340 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
341 Err(error) => {
342 return Err(OpcUaError::Server(format!(
343 "failed to clear persisted subscription state '{}': {}",
344 state_file.display(),
345 error
346 )));
347 }
348 }
349 }
350 }
351 let compiled = self.compiled.clone();
352 self.rebuild(compiled).await?;
353 self.snapshot().await
354 }
355}
356
357impl NodeCatalogPort for OpcUaControlSession {
358 fn list_nodes(&self) -> OpcUaResult<Vec<NodeDescriptor>> {
359 let mut nodes = Vec::new();
360 for (device_id, port) in self.runtime_session.devices().entries() {
361 for point in port.point_definitions() {
362 let node_id = match point.address.as_ref() {
363 Some(Address::OpcUa { node_id }) => node_id.clone(),
364 _ => point.id.clone(),
365 };
366 let compiled = self
367 .compiled
368 .devices
369 .iter()
370 .find(|device| device.device_id == device_id)
371 .and_then(|device| {
372 device
373 .points
374 .iter()
375 .find(|binding| binding.point_id == point.id)
376 });
377 nodes.push(NodeDescriptor {
378 device_id: device_id.clone(),
379 point_id: point.id.clone(),
380 node_id,
381 browse_name: compiled
382 .map(|binding| binding.browse_name.clone())
383 .unwrap_or_else(|| point.name.clone()),
384 display_name: compiled
385 .map(|binding| binding.display_name.clone())
386 .unwrap_or_else(|| point.name.clone()),
387 node_class: compiled
388 .map(|binding| binding.node_class.clone())
389 .unwrap_or_else(|| "variable".into()),
390 writable: compiled.map(|binding| binding.writable).unwrap_or(false),
391 historizing: compiled.map(|binding| binding.historizing).unwrap_or(false),
392 sampling_interval_ms: compiled.and_then(|binding| binding.sampling_interval_ms),
393 });
394 }
395 }
396 nodes.sort_by(|left, right| {
397 left.device_id
398 .cmp(&right.device_id)
399 .then(left.point_id.cmp(&right.point_id))
400 });
401 Ok(nodes)
402 }
403}
404
405#[async_trait]
406impl SecurityControlPort for OpcUaControlSession {
407 async fn security_status(&self) -> OpcUaResult<SecurityControlStatus> {
408 Ok(self.security_control_status())
409 }
410
411 async fn trust_reload(&self) -> OpcUaResult<SecurityControlStatus> {
412 self.ensure_trust_reload_allowed()?;
413 self.security_manager
414 .reload_trust_store()
415 .map_err(|error| OpcUaError::Server(error.to_string()))?;
416 Ok(self.security_control_status())
417 }
418
419 async fn rotate_server_certificate(
420 &self,
421 certificate_path: PathBuf,
422 private_key_path: PathBuf,
423 ) -> OpcUaResult<SecurityControlStatus> {
424 self.ensure_certificate_rotation_allowed()?;
425 self.security_manager
426 .rotate_server_certificate(&certificate_path, &private_key_path)
427 .map_err(|error| OpcUaError::Server(error.to_string()))?;
428 Ok(self.security_control_status())
429 }
430
431 async fn audit_summary(&self) -> OpcUaResult<SecurityAuditStatus> {
432 Ok(self.security_manager.audit_status())
433 }
434}
435
436#[async_trait]
437impl NodeValueControlPort for OpcUaControlSession {
438 async fn read(&self, target: &NodeTarget) -> OpcUaResult<DataPoint> {
439 let device_id = self.resolve_device_id(target)?;
440 let point_id = self.resolve_point_id(target)?;
441 let port = self
442 .runtime_session
443 .devices()
444 .get(&device_id)
445 .ok_or_else(|| OpcUaError::NodeNotFound { node_id: device_id })?;
446 port.read(&point_id).await.map_err(OpcUaError::from)
447 }
448
449 async fn write(&self, target: &NodeTarget, value: Value) -> OpcUaResult<()> {
450 let device_id = self.resolve_device_id(target)?;
451 let point_id = self.resolve_point_id(target)?;
452 let port = self
453 .runtime_session
454 .devices()
455 .get(&device_id)
456 .ok_or_else(|| OpcUaError::NodeNotFound { node_id: device_id })?;
457 port.write(&point_id, value).await.map_err(OpcUaError::from)
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::collections::BTreeMap;
464
465 use super::*;
466 use crate::modeling::{
467 PresetDefinition, SessionDefinition, SimulatorDefaults, TransportDefinition,
468 };
469
470 fn compiled_session() -> CompiledOpcUaSession {
471 let config = crate::modeling::OpcUaSimulatorConfig {
472 defaults: SimulatorDefaults::default(),
473 transports: BTreeMap::from([(
474 "main".into(),
475 TransportDefinition {
476 port: 0,
477 ..TransportDefinition::default()
478 },
479 )]),
480 presets: BTreeMap::from([("generated".into(), PresetDefinition::default())]),
481 sessions: BTreeMap::from([(
482 "demo".into(),
483 SessionDefinition {
484 transport: "main".into(),
485 preset: Some("generated".into()),
486 service_name: Some("opcua-control".into()),
487 ..Default::default()
488 },
489 )]),
490 ..Default::default()
491 };
492 config.compile_session("demo", None).unwrap()
493 }
494
495 fn registry() -> ProtocolDriverRegistry {
496 let mut registry = ProtocolDriverRegistry::new();
497 registry.register(crate::runtime::driver());
498 registry
499 }
500
501 #[tokio::test]
502 async fn control_session_lists_nodes() {
503 let session =
504 OpcUaControlSession::new(registry(), compiled_session(), Duration::from_secs(1))
505 .await
506 .unwrap();
507 let nodes = session.list_nodes().unwrap();
508 assert!(!nodes.is_empty());
509 }
510}