opcua_server/node_manager/utils/
sync_sampler.rs

1use std::{
2    collections::HashMap,
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use tokio_util::sync::{CancellationToken, DropGuard};
8
9use crate::{MonitoredItemHandle, SubscriptionCache};
10use opcua_core::sync::Mutex;
11use opcua_types::{AttributeId, DataValue, MonitoringMode, NodeId};
12
13struct ItemRef {
14    mode: MonitoringMode,
15    sampling_interval: Duration,
16}
17
18struct SamplerItem {
19    sampler: Box<dyn FnMut() -> Option<DataValue> + Send>,
20    sampling_interval: Duration,
21    last_sample: Instant,
22    enabled: bool,
23    items: HashMap<MonitoredItemHandle, ItemRef>,
24}
25
26impl SamplerItem {
27    fn refresh_values(&mut self) {
28        let mut interval = Duration::MAX;
29        let mut enabled = false;
30        for item in self.items.values() {
31            if item.mode != MonitoringMode::Disabled {
32                if interval > item.sampling_interval {
33                    interval = item.sampling_interval;
34                }
35                enabled = true;
36            }
37        }
38        self.sampling_interval = interval;
39        self.enabled = enabled;
40    }
41}
42
43/// Utility for periodically sampling a list of nodes/attributes.
44/// When using this you should call `run` to start the sampler once you have access
45/// to the server context.
46pub struct SyncSampler {
47    samplers: Arc<Mutex<HashMap<(NodeId, AttributeId), SamplerItem>>>,
48    _guard: DropGuard,
49    token: CancellationToken,
50}
51
52impl Default for SyncSampler {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl SyncSampler {
59    /// Create a new sync sampler.
60    pub fn new() -> Self {
61        let token = CancellationToken::new();
62        Self {
63            samplers: Default::default(),
64            _guard: token.clone().drop_guard(),
65            token,
66        }
67    }
68
69    /// Start the sampler. You should avoid calling this multiple times, typically
70    /// this is called in `build_nodes` or `init`. The sampler will automatically shut down
71    /// once it is dropped.
72    pub fn run(&self, interval: Duration, subscriptions: Arc<SubscriptionCache>) {
73        let token = self.token.clone();
74        let samplers = self.samplers.clone();
75        tokio::spawn(async move {
76            tokio::select! {
77                _ = Self::run_internal(samplers, interval, subscriptions) => {},
78                _ = token.cancelled() => {}
79            }
80        });
81    }
82
83    /// Add a periodic sampler for a monitored item.
84    /// Note that if a sampler for the given nodeId/attributeId pair already exists,
85    /// no new sampler will be created. It is assumed that each nodeId/attributeId
86    /// pair has a single sampler function.
87    pub fn add_sampler(
88        &self,
89        node_id: NodeId,
90        attribute: AttributeId,
91        sampler: impl FnMut() -> Option<DataValue> + Send + 'static,
92        mode: MonitoringMode,
93        handle: MonitoredItemHandle,
94        sampling_interval: Duration,
95    ) {
96        let mut samplers = self.samplers.lock();
97        let id = (node_id, attribute);
98        let sampler = samplers.entry(id).or_insert(SamplerItem {
99            sampler: Box::new(sampler),
100            sampling_interval,
101            last_sample: Instant::now(),
102            items: HashMap::new(),
103            enabled: false,
104        });
105        sampler.items.insert(
106            handle,
107            ItemRef {
108                mode,
109                sampling_interval,
110            },
111        );
112        sampler.refresh_values();
113    }
114
115    /// Update the sample rate of a monitored item.
116    /// The smallest registered sampling interval for each nodeId/attributeId pair is
117    /// used. This is also bounded from below by the rate of the SyncSampler itself.
118    pub fn update_sampler(
119        &self,
120        node_id: &NodeId,
121        attribute: AttributeId,
122        handle: MonitoredItemHandle,
123        sampling_interval: Duration,
124    ) {
125        let mut samplers = self.samplers.lock();
126        if let Some(sampler) = samplers.get_mut(&(node_id.clone(), attribute)) {
127            if let Some(item) = sampler.items.get_mut(&handle) {
128                item.sampling_interval = sampling_interval;
129                sampler.refresh_values();
130            }
131        }
132    }
133
134    /// Set the sampler mode for a node.
135    pub fn set_sampler_mode(
136        &self,
137        node_id: &NodeId,
138        attribute: AttributeId,
139        handle: MonitoredItemHandle,
140        mode: MonitoringMode,
141    ) {
142        let mut samplers = self.samplers.lock();
143        if let Some(sampler) = samplers.get_mut(&(node_id.clone(), attribute)) {
144            if let Some(item) = sampler.items.get_mut(&handle) {
145                item.mode = mode;
146                sampler.refresh_values();
147            }
148        }
149    }
150
151    /// Remove a sampler. The actual sampler will only be fully removed once
152    /// all samplers for the attribute are gone.
153    pub fn remove_sampler(
154        &self,
155        node_id: &NodeId,
156        attribute: AttributeId,
157        handle: MonitoredItemHandle,
158    ) {
159        let mut samplers = self.samplers.lock();
160        let id = (node_id.clone(), attribute);
161
162        let Some(sampler) = samplers.get_mut(&id) else {
163            return;
164        };
165        sampler.items.remove(&handle);
166        if sampler.items.is_empty() {
167            samplers.remove(&id);
168        }
169    }
170
171    async fn run_internal(
172        samplers: Arc<Mutex<HashMap<(NodeId, AttributeId), SamplerItem>>>,
173        interval: Duration,
174        subscriptions: Arc<SubscriptionCache>,
175    ) {
176        let mut tick = tokio::time::interval(interval);
177        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
178        loop {
179            tick.tick().await;
180            let now = Instant::now();
181            let mut samplers = samplers.lock();
182            let values = samplers
183                .iter_mut()
184                .filter_map(|((node_id, attribute), sampler)| {
185                    if !sampler.enabled {
186                        return None;
187                    }
188                    if sampler
189                        .last_sample
190                        .checked_add(sampler.sampling_interval)
191                        .is_none_or(|v| v > now)
192                    {
193                        return None;
194                    }
195                    let value = (sampler.sampler)()?;
196                    sampler.last_sample = now;
197                    Some((value, node_id, *attribute))
198                });
199            subscriptions.notify_data_change(values);
200        }
201    }
202}