opcua_server/node_manager/utils/
sync_sampler.rs1use std::{
2 collections::HashMap,
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use tokio_util::sync::{CancellationToken, DropGuard};
8
9use crate::{MonitoredItemHandle, SubscriptionCache};
10use opcua_core::sync::Mutex;
11use opcua_types::{AttributeId, DataValue, MonitoringMode, NodeId};
12
13struct ItemRef {
14 mode: MonitoringMode,
15 sampling_interval: Duration,
16}
17
18struct SamplerItem {
19 sampler: Box<dyn FnMut() -> Option<DataValue> + Send>,
20 sampling_interval: Duration,
21 last_sample: Instant,
22 enabled: bool,
23 items: HashMap<MonitoredItemHandle, ItemRef>,
24}
25
26impl SamplerItem {
27 fn refresh_values(&mut self) {
28 let mut interval = Duration::MAX;
29 let mut enabled = false;
30 for item in self.items.values() {
31 if item.mode != MonitoringMode::Disabled {
32 if interval > item.sampling_interval {
33 interval = item.sampling_interval;
34 }
35 enabled = true;
36 }
37 }
38 self.sampling_interval = interval;
39 self.enabled = enabled;
40 }
41}
42
43pub struct SyncSampler {
47 samplers: Arc<Mutex<HashMap<(NodeId, AttributeId), SamplerItem>>>,
48 _guard: DropGuard,
49 token: CancellationToken,
50}
51
52impl Default for SyncSampler {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl SyncSampler {
59 pub fn new() -> Self {
61 let token = CancellationToken::new();
62 Self {
63 samplers: Default::default(),
64 _guard: token.clone().drop_guard(),
65 token,
66 }
67 }
68
69 pub fn run(&self, interval: Duration, subscriptions: Arc<SubscriptionCache>) {
73 let token = self.token.clone();
74 let samplers = self.samplers.clone();
75 tokio::spawn(async move {
76 tokio::select! {
77 _ = Self::run_internal(samplers, interval, subscriptions) => {},
78 _ = token.cancelled() => {}
79 }
80 });
81 }
82
83 pub fn add_sampler(
88 &self,
89 node_id: NodeId,
90 attribute: AttributeId,
91 sampler: impl FnMut() -> Option<DataValue> + Send + 'static,
92 mode: MonitoringMode,
93 handle: MonitoredItemHandle,
94 sampling_interval: Duration,
95 ) {
96 let mut samplers = self.samplers.lock();
97 let id = (node_id, attribute);
98 let sampler = samplers.entry(id).or_insert(SamplerItem {
99 sampler: Box::new(sampler),
100 sampling_interval,
101 last_sample: Instant::now(),
102 items: HashMap::new(),
103 enabled: false,
104 });
105 sampler.items.insert(
106 handle,
107 ItemRef {
108 mode,
109 sampling_interval,
110 },
111 );
112 sampler.refresh_values();
113 }
114
115 pub fn update_sampler(
119 &self,
120 node_id: &NodeId,
121 attribute: AttributeId,
122 handle: MonitoredItemHandle,
123 sampling_interval: Duration,
124 ) {
125 let mut samplers = self.samplers.lock();
126 if let Some(sampler) = samplers.get_mut(&(node_id.clone(), attribute)) {
127 if let Some(item) = sampler.items.get_mut(&handle) {
128 item.sampling_interval = sampling_interval;
129 sampler.refresh_values();
130 }
131 }
132 }
133
134 pub fn set_sampler_mode(
136 &self,
137 node_id: &NodeId,
138 attribute: AttributeId,
139 handle: MonitoredItemHandle,
140 mode: MonitoringMode,
141 ) {
142 let mut samplers = self.samplers.lock();
143 if let Some(sampler) = samplers.get_mut(&(node_id.clone(), attribute)) {
144 if let Some(item) = sampler.items.get_mut(&handle) {
145 item.mode = mode;
146 sampler.refresh_values();
147 }
148 }
149 }
150
151 pub fn remove_sampler(
154 &self,
155 node_id: &NodeId,
156 attribute: AttributeId,
157 handle: MonitoredItemHandle,
158 ) {
159 let mut samplers = self.samplers.lock();
160 let id = (node_id.clone(), attribute);
161
162 let Some(sampler) = samplers.get_mut(&id) else {
163 return;
164 };
165 sampler.items.remove(&handle);
166 if sampler.items.is_empty() {
167 samplers.remove(&id);
168 }
169 }
170
171 async fn run_internal(
172 samplers: Arc<Mutex<HashMap<(NodeId, AttributeId), SamplerItem>>>,
173 interval: Duration,
174 subscriptions: Arc<SubscriptionCache>,
175 ) {
176 let mut tick = tokio::time::interval(interval);
177 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
178 loop {
179 tick.tick().await;
180 let now = Instant::now();
181 let mut samplers = samplers.lock();
182 let values = samplers
183 .iter_mut()
184 .filter_map(|((node_id, attribute), sampler)| {
185 if !sampler.enabled {
186 return None;
187 }
188 if sampler
189 .last_sample
190 .checked_add(sampler.sampling_interval)
191 .is_none_or(|v| v > now)
192 {
193 return None;
194 }
195 let value = (sampler.sampler)()?;
196 sampler.last_sample = now;
197 Some((value, node_id, *attribute))
198 });
199 subscriptions.notify_data_change(values);
200 }
201 }
202}