1pub(crate) mod event_loop;
2pub use event_loop::SubscriptionActivity;
3
4mod callbacks;
5mod event_loop_state;
6mod service;
7pub(crate) mod state;
8
9pub use callbacks::{
10 DataChangeCallback, EventCallback, OnSubscriptionNotification, OnSubscriptionNotificationCore,
11 SubscriptionCallbacks,
12};
13use opcua_core::trace_lock;
14
15use std::{
16 collections::{BTreeSet, HashMap},
17 time::Duration,
18};
19
20use opcua_types::{
21 ExtensionObject, MonitoredItemCreateRequest, MonitoringMode, NotificationMessage, ReadValueId,
22};
23
24pub use service::{
25 CreateMonitoredItems, CreateSubscription, DeleteMonitoredItems, DeleteSubscriptions,
26 ModifyMonitoredItems, ModifySubscription, Publish, Republish, SetMonitoringMode,
27 SetPublishingMode, SetTriggering, TransferSubscriptions,
28};
29
30pub use event_loop_state::{SubscriptionCache, SubscriptionEventLoopState};
31
32use crate::session::services::subscriptions::{
33 service::CreatedMonitoredItem, state::SubscriptionState,
34};
35
36pub(crate) struct CreateMonitoredItem {
37 pub id: u32,
38 pub client_handle: u32,
39 pub item_to_monitor: ReadValueId,
40 pub monitoring_mode: MonitoringMode,
41 pub queue_size: u32,
42 pub discard_oldest: bool,
43 pub sampling_interval: f64,
44 pub filter: ExtensionObject,
45}
46
47pub(crate) struct ModifyMonitoredItem {
48 pub id: u32,
49 pub sampling_interval: f64,
50 pub queue_size: u32,
51}
52
53#[derive(Debug, Clone)]
54pub struct MonitoredItem {
56 id: u32,
58 client_handle: u32,
60 item_to_monitor: ReadValueId,
62 queue_size: usize,
64 monitoring_mode: MonitoringMode,
66 sampling_interval: f64,
68 triggered_items: BTreeSet<u32>,
70 discard_oldest: bool,
72 filter: ExtensionObject,
74}
75
76impl MonitoredItem {
77 pub fn new(client_handle: u32) -> MonitoredItem {
79 MonitoredItem {
80 id: 0,
81 client_handle,
82 item_to_monitor: ReadValueId::default(),
83 queue_size: 1,
84 monitoring_mode: MonitoringMode::Reporting,
85 sampling_interval: 0.0,
86 triggered_items: BTreeSet::new(),
87 discard_oldest: true,
88 filter: ExtensionObject::null(),
89 }
90 }
91
92 pub fn id(&self) -> u32 {
94 self.id
95 }
96
97 pub fn client_handle(&self) -> u32 {
99 self.client_handle
100 }
101
102 pub fn item_to_monitor(&self) -> &ReadValueId {
104 &self.item_to_monitor
105 }
106
107 pub fn sampling_interval(&self) -> f64 {
109 self.sampling_interval
110 }
111
112 pub fn queue_size(&self) -> usize {
114 self.queue_size
115 }
116
117 pub fn discard_oldest(&self) -> bool {
119 self.discard_oldest
120 }
121
122 pub(crate) fn set_sampling_interval(&mut self, value: f64) {
123 self.sampling_interval = value;
124 }
125
126 pub(crate) fn set_queue_size(&mut self, value: usize) {
127 self.queue_size = value;
128 }
129
130 pub(crate) fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) {
131 self.monitoring_mode = monitoring_mode;
132 }
133
134 pub(crate) fn set_triggering(&mut self, links_to_add: &[u32], links_to_remove: &[u32]) {
135 links_to_remove.iter().for_each(|i| {
136 self.triggered_items.remove(i);
137 });
138 links_to_add.iter().for_each(|i| {
139 self.triggered_items.insert(*i);
140 });
141 }
142
143 pub(crate) fn triggered_items(&self) -> &BTreeSet<u32> {
144 &self.triggered_items
145 }
146}
147
148#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
149enum MonitoredItemId {
150 Temporary(u32),
151 Server(u32),
152}
153
154pub struct Subscription {
156 subscription_id: u32,
158 publishing_interval: Duration,
160 lifetime_count: u32,
162 max_keep_alive_count: u32,
164 max_notifications_per_publish: u32,
166 publishing_enabled: bool,
168 priority: u8,
170
171 monitored_items: HashMap<MonitoredItemId, MonitoredItem>,
173 client_handles: HashMap<u32, MonitoredItemId>,
175
176 callback: Box<dyn OnSubscriptionNotificationCore>,
177}
178
179impl Subscription {
180 #[allow(clippy::too_many_arguments)]
182 pub fn new(
183 subscription_id: u32,
184 publishing_interval: Duration,
185 lifetime_count: u32,
186 max_keep_alive_count: u32,
187 max_notifications_per_publish: u32,
188 priority: u8,
189 publishing_enabled: bool,
190 status_change_callback: Box<dyn OnSubscriptionNotificationCore>,
191 ) -> Subscription {
192 Subscription {
193 subscription_id,
194 publishing_interval,
195 lifetime_count,
196 max_keep_alive_count,
197 max_notifications_per_publish,
198 publishing_enabled,
199 priority,
200 monitored_items: HashMap::new(),
201 client_handles: HashMap::new(),
202 callback: status_change_callback,
203 }
204 }
205
206 pub fn monitored_items(&self) -> impl Iterator<Item = &MonitoredItem> {
208 self.monitored_items.values()
209 }
210
211 pub fn subscription_id(&self) -> u32 {
213 self.subscription_id
214 }
215
216 pub fn publishing_interval(&self) -> Duration {
218 self.publishing_interval
219 }
220
221 pub fn lifetime_count(&self) -> u32 {
223 self.lifetime_count
224 }
225
226 pub fn priority(&self) -> u8 {
228 self.priority
229 }
230
231 pub fn max_keep_alive_count(&self) -> u32 {
233 self.max_keep_alive_count
234 }
235
236 pub fn max_notifications_per_publish(&self) -> u32 {
238 self.max_notifications_per_publish
239 }
240
241 pub fn publishing_enabled(&self) -> bool {
243 self.publishing_enabled
244 }
245
246 pub fn insert_existing_monitored_item(&mut self, item: MonitoredItem) {
251 let client_handle = item.client_handle();
252 let monitored_item_id = item.id();
253 tracing::debug!(
254 "Inserting monitored item {} with client handle {}",
255 monitored_item_id,
256 client_handle
257 );
258 self.monitored_items
259 .insert(MonitoredItemId::Server(monitored_item_id), item);
260 self.client_handles
261 .insert(client_handle, MonitoredItemId::Server(monitored_item_id));
262 }
263
264 pub(crate) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
265 self.publishing_interval = publishing_interval;
266 }
267
268 pub(crate) fn set_lifetime_count(&mut self, lifetime_count: u32) {
269 self.lifetime_count = lifetime_count;
270 }
271
272 pub(crate) fn set_max_keep_alive_count(&mut self, max_keep_alive_count: u32) {
273 self.max_keep_alive_count = max_keep_alive_count;
274 }
275
276 pub(crate) fn set_max_notifications_per_publish(&mut self, max_notifications_per_publish: u32) {
277 self.max_notifications_per_publish = max_notifications_per_publish;
278 }
279
280 pub(crate) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
281 self.publishing_enabled = publishing_enabled;
282 }
283
284 pub(crate) fn set_priority(&mut self, priority: u8) {
285 self.priority = priority;
286 }
287
288 pub(crate) fn insert_monitored_items(&mut self, items_to_create: Vec<CreateMonitoredItem>) {
289 items_to_create.into_iter().for_each(|i| {
290 let monitored_item = MonitoredItem {
291 id: i.id,
292 client_handle: i.client_handle,
293 item_to_monitor: i.item_to_monitor,
294 queue_size: i.queue_size as usize,
295 monitoring_mode: i.monitoring_mode,
296 sampling_interval: i.sampling_interval,
297 triggered_items: BTreeSet::new(),
298 discard_oldest: i.discard_oldest,
299 filter: i.filter,
300 };
301
302 self.insert_existing_monitored_item(monitored_item);
303 });
304 }
305
306 pub(crate) fn modify_monitored_items(&mut self, items_to_modify: &[ModifyMonitoredItem]) {
307 items_to_modify.iter().for_each(|i| {
308 if let Some(ref mut monitored_item) =
309 self.monitored_items.get_mut(&MonitoredItemId::Server(i.id))
310 {
311 monitored_item.set_sampling_interval(i.sampling_interval);
312 monitored_item.set_queue_size(i.queue_size as usize);
313 }
314 });
315 }
316
317 pub(crate) fn delete_monitored_items(&mut self, items_to_delete: &[u32]) {
318 items_to_delete.iter().for_each(|id| {
319 if let Some(monitored_item) = self.monitored_items.remove(&MonitoredItemId::Server(*id))
321 {
322 let _ = self.client_handles.remove(&monitored_item.client_handle());
323 }
324 })
325 }
326
327 pub(crate) fn set_triggering(
328 &mut self,
329 triggering_item_id: u32,
330 links_to_add: &[u32],
331 links_to_remove: &[u32],
332 ) {
333 if let Some(ref mut monitored_item) = self
334 .monitored_items
335 .get_mut(&MonitoredItemId::Server(triggering_item_id))
336 {
337 monitored_item.set_triggering(links_to_add, links_to_remove);
338 }
339 }
340
341 pub(crate) fn on_notification(&mut self, notification: NotificationMessage) {
342 self.callback.on_subscription_notification(
343 notification,
344 MonitoredItemMap::new(&self.monitored_items, &self.client_handles),
345 );
346 }
347
348 fn clear_temporary_id(&mut self, temp_id: MonitoredItemId, remove_handle: bool) {
349 if let Some(monitored_item) = self.monitored_items.remove(&temp_id) {
350 if remove_handle {
351 let _ = self.client_handles.remove(&monitored_item.client_handle());
352 }
353 }
354 }
355
356 fn insert_temporary_monitored_item(&mut self, item: &TempMonitoredItem) {
357 let monitored_item = MonitoredItem {
358 id: 0,
359 client_handle: item.client_handle,
360 item_to_monitor: item.item_to_monitor.clone(),
361 queue_size: item.queue_size as usize,
362 monitoring_mode: item.monitoring_mode,
363 sampling_interval: item.sampling_interval,
364 triggered_items: BTreeSet::new(),
365 discard_oldest: item.discard_oldest,
366 filter: item.filter.clone(),
367 };
368
369 self.monitored_items
370 .insert(MonitoredItemId::Temporary(item.temp_id), monitored_item);
371 self.client_handles
372 .insert(item.client_handle, MonitoredItemId::Temporary(item.temp_id));
373 }
374}
375
376pub struct MonitoredItemMap<'a> {
378 monitored_items: &'a HashMap<MonitoredItemId, MonitoredItem>,
380 client_handles: &'a HashMap<u32, MonitoredItemId>,
382}
383
384impl<'a> MonitoredItemMap<'a> {
385 fn new(
386 monitored_items: &'a HashMap<MonitoredItemId, MonitoredItem>,
387 client_handles: &'a HashMap<u32, MonitoredItemId>,
388 ) -> Self {
389 Self {
390 monitored_items,
391 client_handles,
392 }
393 }
394
395 pub fn get(&self, client_handle: u32) -> Option<&'a MonitoredItem> {
397 self.client_handles
398 .get(&client_handle)
399 .and_then(|id| self.monitored_items.get(id))
400 }
401}
402
403#[derive(Debug)]
404pub struct PublishLimits {
407 message_roundtrip: Duration,
408 publish_interval: Duration,
409 subscriptions: usize,
410 min_publish_requests: usize,
411 max_publish_requests: usize,
412}
413
414impl PublishLimits {
415 const MIN_MESSAGE_ROUNDTRIP: Duration = Duration::from_millis(10);
416 const REQUESTS_PER_SUBSCRIPTION: usize = 2;
417
418 pub(crate) fn new() -> Self {
419 Self {
420 message_roundtrip: Self::MIN_MESSAGE_ROUNDTRIP,
421 publish_interval: Duration::ZERO,
422 subscriptions: 0,
423 min_publish_requests: 0,
424 max_publish_requests: 0,
425 }
426 }
427
428 pub(crate) fn update_message_roundtrip(&mut self, message_roundtrip: Duration) {
429 self.message_roundtrip = message_roundtrip.max(Self::MIN_MESSAGE_ROUNDTRIP);
430 self.calculate_publish_limits();
431 }
432
433 pub(crate) fn update_subscriptions(
434 &mut self,
435 subscriptions: usize,
436 publish_interval: Duration,
437 ) {
438 self.subscriptions = subscriptions;
439 self.publish_interval = publish_interval;
440 self.calculate_publish_limits();
441 }
442
443 fn calculate_publish_limits(&mut self) {
444 self.min_publish_requests = self.subscriptions * Self::REQUESTS_PER_SUBSCRIPTION;
445 self.max_publish_requests = (self.message_roundtrip.as_millis() as f32
446 / self.publish_interval.as_millis() as f32)
447 .ceil() as usize
448 * (self.min_publish_requests);
449 }
450}
451
452struct TempMonitoredItem {
453 temp_id: u32,
454 client_handle: u32,
455 item_to_monitor: ReadValueId,
456 queue_size: u32,
457 monitoring_mode: MonitoringMode,
458 sampling_interval: f64,
459 filter: ExtensionObject,
460 discard_oldest: bool,
461}
462
463pub struct PreInsertMonitoredItems<'a> {
477 temp_ids: Vec<MonitoredItemTempResult>,
478 subscription_id: u32,
479 lock: &'a opcua_core::sync::Mutex<SubscriptionState>,
480}
481
482struct MonitoredItemTempResult {
483 temp_id: MonitoredItemId,
484 created: bool,
485}
486
487impl<'a> PreInsertMonitoredItems<'a> {
488 pub fn new(
491 lock: &'a opcua_core::sync::Mutex<SubscriptionState>,
492 subscription_id: u32,
493 items: &[MonitoredItemCreateRequest],
494 ) -> Self {
495 let mut lck = trace_lock!(lock);
496
497 let to_insert: Vec<_> = items
498 .iter()
499 .map(|item| TempMonitoredItem {
500 temp_id: lck.next_temp_id(),
501 client_handle: item.requested_parameters.client_handle,
502 item_to_monitor: item.item_to_monitor.clone(),
503 queue_size: item.requested_parameters.queue_size,
504 monitoring_mode: item.monitoring_mode,
505 sampling_interval: item.requested_parameters.sampling_interval,
506 filter: item.requested_parameters.filter.clone(),
507 discard_oldest: item.requested_parameters.discard_oldest,
508 })
509 .collect();
510
511 let ids = to_insert
512 .iter()
513 .map(|i| MonitoredItemTempResult {
514 temp_id: MonitoredItemId::Temporary(i.temp_id),
515 created: false,
516 })
517 .collect();
518
519 lck.insert_temporary_monitored_items(&to_insert, subscription_id);
520 Self {
521 subscription_id,
522 temp_ids: ids,
523 lock,
524 }
525 }
526
527 pub fn finish(mut self, results: &[CreatedMonitoredItem]) {
530 let mut lck = trace_lock!(self.lock);
531 let mut items_to_create = Vec::with_capacity(results.len());
532 for (temp_id, item) in self.temp_ids.iter_mut().zip(results.iter()) {
533 if item.result.status_code.is_good() {
534 temp_id.created = true;
535 items_to_create.push(CreateMonitoredItem {
536 id: item.result.monitored_item_id,
537 client_handle: item.requested_parameters.client_handle,
538 discard_oldest: item.requested_parameters.discard_oldest,
539 item_to_monitor: item.item_to_monitor.clone(),
540 monitoring_mode: item.monitoring_mode,
541 queue_size: item.result.revised_queue_size,
542 sampling_interval: item.result.revised_sampling_interval,
543 filter: item.requested_parameters.filter.clone(),
544 });
545 }
546 }
547
548 lck.insert_monitored_items(self.subscription_id, items_to_create);
549 }
550}
551
552impl Drop for PreInsertMonitoredItems<'_> {
553 fn drop(&mut self) {
554 let mut lck = trace_lock!(self.lock);
555 lck.clear_temporary_ids(&self.temp_ids, self.subscription_id);
556 }
557}