1mod monitored_item;
2mod notify;
3mod session_subscriptions;
4mod subscription;
5
6use std::{hash::Hash, sync::Arc, time::Instant};
7
8use chrono::Utc;
9use hashbrown::{Equivalent, HashMap};
10pub use monitored_item::{CreateMonitoredItem, MonitoredItem};
11use opcua_core::{trace_read_lock, trace_write_lock, ResponseMessage};
12use opcua_nodes::{Event, TypeTree};
13pub use session_subscriptions::SessionSubscriptions;
14use subscription::TickReason;
15pub use subscription::{MonitoredItemHandle, Subscription, SubscriptionState};
16use tracing::error;
17
18pub use notify::{
19 SubscriptionDataNotifier, SubscriptionDataNotifierBatch, SubscriptionEventNotifier,
20 SubscriptionEventNotifierBatch,
21};
22
23use opcua_core::sync::{Mutex, RwLock};
24
25use opcua_types::{
26 node_id::{IdentifierRef, NodeIdRef},
27 AttributeId, CreateSubscriptionRequest, CreateSubscriptionResponse, DataEncoding, DataValue,
28 DateTimeUtc, MessageSecurityMode, ModifySubscriptionRequest, ModifySubscriptionResponse,
29 MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoringMode, NodeId,
30 NotificationMessage, NumericRange, PublishRequest, RepublishRequest, RepublishResponse,
31 ResponseHeader, SetPublishingModeRequest, SetPublishingModeResponse, StatusCode,
32 TimestampsToReturn, TransferResult, TransferSubscriptionsRequest,
33 TransferSubscriptionsResponse,
34};
35
36use crate::node_manager::RequestContextInner;
37
38use super::{
39 authenticator::UserToken,
40 info::ServerInfo,
41 node_manager::{MonitoredItemRef, MonitoredItemUpdateRef, RequestContext, ServerContext},
42 session::instance::Session,
43 SubscriptionLimits,
44};
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47struct MonitoredItemKey {
48 id: NodeId,
49 attribute_id: AttributeId,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53struct MonitoredItemKeyRef<T: IdentifierRef> {
54 id: NodeIdRef<T>,
55 attribute_id: AttributeId,
56}
57
58impl<T> Hash for MonitoredItemKeyRef<T>
59where
60 T: IdentifierRef,
61{
62 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
63 self.id.hash(state);
64 self.attribute_id.hash(state);
65 }
66}
67
68impl<T: IdentifierRef> Equivalent<MonitoredItemKey> for MonitoredItemKeyRef<T> {
69 fn equivalent(&self, key: &MonitoredItemKey) -> bool {
70 self.id == key.id && self.attribute_id == key.attribute_id
71 }
72}
73
74pub struct MonitoredItemEntry {
77 pub enabled: bool,
79 pub data_encoding: DataEncoding,
81 pub index_range: NumericRange,
83}
84
85struct SubscriptionCacheInner {
86 session_subscriptions: HashMap<u32, Arc<Mutex<SessionSubscriptions>>>,
88 subscription_to_session: HashMap<u32, u32>,
90 monitored_items: HashMap<MonitoredItemKey, HashMap<MonitoredItemHandle, MonitoredItemEntry>>,
92}
93
94pub struct SubscriptionCache {
101 inner: RwLock<SubscriptionCacheInner>,
102 limits: SubscriptionLimits,
104}
105
106impl SubscriptionCache {
107 pub(crate) fn new(limits: SubscriptionLimits) -> Self {
108 Self {
109 inner: RwLock::new(SubscriptionCacheInner {
110 session_subscriptions: HashMap::new(),
111 subscription_to_session: HashMap::new(),
112 monitored_items: HashMap::new(),
113 }),
114 limits,
115 }
116 }
117
118 pub fn get_session_subscriptions(
120 &self,
121 session_id: u32,
122 ) -> Option<Arc<Mutex<SessionSubscriptions>>> {
123 let inner = trace_read_lock!(self.inner);
124 inner.session_subscriptions.get(&session_id).cloned()
125 }
126
127 pub(crate) async fn periodic_tick(&self, context: &ServerContext) {
131 let mut to_delete = Vec::new();
135 let mut items_to_delete = Vec::new();
136 {
137 let now = Utc::now();
138 let now_instant = Instant::now();
139 let lck = trace_read_lock!(self.inner);
140 for (session_id, sub) in lck.session_subscriptions.iter() {
141 let mut sub_lck = sub.lock();
142 items_to_delete.push((
143 sub_lck.session().clone(),
144 sub_lck.tick(&now, now_instant, TickReason::TickTimerFired),
145 ));
146 if sub_lck.is_ready_to_delete() {
147 to_delete.push(*session_id);
148 }
149 }
150 }
151 if !to_delete.is_empty() {
152 let mut lck = trace_write_lock!(self.inner);
153 for id in to_delete {
154 lck.session_subscriptions.remove(&id);
155 }
156 context
157 .info
158 .diagnostics
159 .set_current_subscription_count(lck.subscription_to_session.len() as u32);
160 }
161 if !items_to_delete.is_empty() {
162 Self::delete_expired_monitored_items(context, items_to_delete).await;
163 }
164 }
165
166 async fn delete_expired_monitored_items(
167 context: &ServerContext,
168 items_to_delete: Vec<(Arc<RwLock<Session>>, Vec<MonitoredItemRef>)>,
169 ) {
170 for (session, items) in items_to_delete {
171 let (id, token) = {
174 let lck = session.read();
175 let Some(token) = lck.user_token() else {
176 error!("Active session missing user token, this should be impossible");
177 continue;
178 };
179
180 (lck.session_id_numeric(), token.clone())
181 };
182 let ctx = RequestContext {
183 current_node_manager_index: 0,
184 inner: Arc::new(RequestContextInner {
185 session,
186 session_id: id,
187 authenticator: context.authenticator.clone(),
188 token,
189 type_tree: context.type_tree.clone(),
190 subscriptions: context.subscriptions.clone(),
191 info: context.info.clone(),
192 type_tree_getter: context.type_tree_getter.clone(),
193 }),
194 };
195
196 for mgr in context.node_managers.iter() {
197 let owned: Vec<_> = items
198 .iter()
199 .filter(|n| mgr.owns_node(n.node_id()))
200 .collect();
201
202 if owned.is_empty() {
203 continue;
204 }
205
206 mgr.delete_monitored_items(&ctx, &owned).await;
207 }
208 }
209 }
210
211 pub(crate) fn get_monitored_item_count(
212 &self,
213 session_id: u32,
214 subscription_id: u32,
215 ) -> Option<usize> {
216 let cache = ({
217 let lck = trace_read_lock!(self.inner);
218 lck.session_subscriptions.get(&session_id).cloned()
219 })?;
220 let cache_lck = cache.lock();
221 cache_lck.get_monitored_item_count(subscription_id)
222 }
223
224 pub(crate) fn create_subscription(
225 &self,
226 session_id: u32,
227 request: &CreateSubscriptionRequest,
228 context: &RequestContext,
229 ) -> Result<CreateSubscriptionResponse, StatusCode> {
230 let mut lck = trace_write_lock!(self.inner);
231 let cache = lck
232 .session_subscriptions
233 .entry(session_id)
234 .or_insert_with(|| {
235 Arc::new(Mutex::new(SessionSubscriptions::new(
236 self.limits,
237 Self::get_key(&context.session),
238 context.session.clone(),
239 context.info.type_tree_getter.get_type_tree_static(context),
240 )))
241 })
242 .clone();
243 let mut cache_lck = cache.lock();
244 let res = cache_lck.create_subscription(request, &context.info)?;
245 lck.subscription_to_session
246 .insert(res.subscription_id, session_id);
247 context
248 .info
249 .diagnostics
250 .set_current_subscription_count(lck.subscription_to_session.len() as u32);
251 context.info.diagnostics.inc_subscription_count();
252 Ok(res)
253 }
254
255 pub(crate) fn modify_subscription(
256 &self,
257 session_id: u32,
258 request: &ModifySubscriptionRequest,
259 info: &ServerInfo,
260 ) -> Result<ModifySubscriptionResponse, StatusCode> {
261 let Some(cache) = ({
262 let lck = trace_read_lock!(self.inner);
263 lck.session_subscriptions.get(&session_id).cloned()
264 }) else {
265 return Err(StatusCode::BadNoSubscription);
266 };
267 let mut cache_lck = cache.lock();
268 cache_lck.modify_subscription(request, info)
269 }
270
271 pub(crate) fn set_publishing_mode(
272 &self,
273 session_id: u32,
274 request: &SetPublishingModeRequest,
275 ) -> Result<SetPublishingModeResponse, StatusCode> {
276 let Some(cache) = ({
277 let lck = trace_read_lock!(self.inner);
278 lck.session_subscriptions.get(&session_id).cloned()
279 }) else {
280 return Err(StatusCode::BadNoSubscription);
281 };
282 let mut cache_lck = cache.lock();
283 cache_lck.set_publishing_mode(request)
284 }
285
286 pub(crate) fn republish(
287 &self,
288 session_id: u32,
289 request: &RepublishRequest,
290 ) -> Result<RepublishResponse, StatusCode> {
291 let Some(cache) = ({
292 let lck = trace_read_lock!(self.inner);
293 lck.session_subscriptions.get(&session_id).cloned()
294 }) else {
295 return Err(StatusCode::BadNoSubscription);
296 };
297 let cache_lck = cache.lock();
298 cache_lck.republish(request)
299 }
300
301 pub(crate) fn enqueue_publish_request(
302 &self,
303 session_id: u32,
304 now: &DateTimeUtc,
305 now_instant: Instant,
306 request: PendingPublish,
307 ) -> Result<(), StatusCode> {
308 let Some(cache) = ({
309 let lck = trace_read_lock!(self.inner);
310 lck.session_subscriptions.get(&session_id).cloned()
311 }) else {
312 return Err(StatusCode::BadNoSubscription);
313 };
314
315 let mut cache_lck = cache.lock();
316 cache_lck.enqueue_publish_request(now, now_instant, request);
317 Ok(())
318 }
319
320 pub fn data_notifier<'a>(&'a self) -> SubscriptionDataNotifier<'a> {
336 SubscriptionDataNotifier::new(trace_read_lock!(self.inner))
337 }
338
339 pub fn event_notifier<'a, 'b>(&'a self) -> SubscriptionEventNotifier<'a, 'b> {
355 SubscriptionEventNotifier::new(trace_read_lock!(self.inner))
356 }
357
358 pub fn notify_data_change<'a>(
363 &self,
364 items: impl Iterator<Item = (DataValue, &'a NodeId, AttributeId)>,
365 ) {
366 let mut notif = self.data_notifier();
367 for (dv, node_id, attribute_id) in items {
368 notif.notify(node_id, attribute_id, dv);
369 }
370 }
371
372 pub fn maybe_notify<'a>(
377 &self,
378 items: impl Iterator<Item = (&'a NodeId, AttributeId)>,
379 sample: impl Fn(&NodeId, AttributeId, &NumericRange, &DataEncoding) -> Option<DataValue>,
380 ) {
381 let mut notif = self.data_notifier();
382 for (id, attribute_id) in items {
383 if let Some(mut batch) = notif.notify_for(id, attribute_id) {
384 for (handle, entry) in batch.entries() {
385 if let Some(value) =
386 sample(id, attribute_id, &entry.index_range, &entry.data_encoding)
387 {
388 batch.data_value_to_item(value, handle);
389 }
390 }
391 }
392 }
393 }
394
395 pub fn notify_events<'a>(&self, items: impl Iterator<Item = (&'a dyn Event, &'a NodeId)>) {
398 let mut notif = self.event_notifier();
399 for (evt, id) in items {
400 notif.notify(id, evt);
401 }
402 }
403
404 pub(crate) fn create_monitored_items(
405 &self,
406 session_id: u32,
407 subscription_id: u32,
408 requests: &[CreateMonitoredItem],
409 ) -> Result<Vec<MonitoredItemCreateResult>, StatusCode> {
410 let mut lck = trace_write_lock!(self.inner);
411 let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
412 return Err(StatusCode::BadNoSubscription);
413 };
414
415 let mut cache_lck = cache.lock();
416 let result = cache_lck.create_monitored_items(subscription_id, requests);
417 if let Ok(res) = &result {
418 for (create, res) in requests.iter().zip(res.iter()) {
419 if res.status_code.is_good() {
420 let key = MonitoredItemKey {
421 id: create.item_to_monitor().node_id.clone(),
422 attribute_id: create.item_to_monitor().attribute_id,
423 };
424
425 let index_range = create.item_to_monitor().index_range.clone();
426
427 lck.monitored_items.entry(key).or_default().insert(
428 create.handle(),
429 MonitoredItemEntry {
430 enabled: !matches!(create.monitoring_mode(), MonitoringMode::Disabled),
431 index_range,
432 data_encoding: create.item_to_monitor().data_encoding.clone(),
433 },
434 );
435 }
436 }
437 }
438
439 result
440 }
441
442 pub(crate) fn modify_monitored_items(
443 &self,
444 session_id: u32,
445 subscription_id: u32,
446 info: &ServerInfo,
447 timestamps_to_return: TimestampsToReturn,
448 requests: Vec<MonitoredItemModifyRequest>,
449 type_tree: &dyn TypeTree,
450 ) -> Result<Vec<MonitoredItemUpdateRef>, StatusCode> {
451 let Some(cache) = ({
452 let lck = trace_read_lock!(self.inner);
453 lck.session_subscriptions.get(&session_id).cloned()
454 }) else {
455 return Err(StatusCode::BadNoSubscription);
456 };
457
458 let mut cache_lck = cache.lock();
459 cache_lck.modify_monitored_items(
460 subscription_id,
461 info,
462 timestamps_to_return,
463 requests,
464 type_tree,
465 )
466 }
467
468 fn get_key(session: &RwLock<Session>) -> PersistentSessionKey {
469 let lck = trace_read_lock!(session);
470 PersistentSessionKey::new(
471 lck.user_token().unwrap(),
472 lck.message_security_mode(),
473 lck.application_description().application_uri.as_ref(),
474 )
475 }
476
477 pub(crate) fn set_monitoring_mode(
478 &self,
479 session_id: u32,
480 subscription_id: u32,
481 monitoring_mode: MonitoringMode,
482 items: Vec<u32>,
483 ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
484 let mut lck = trace_write_lock!(self.inner);
485 let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
486 return Err(StatusCode::BadNoSubscription);
487 };
488
489 let mut cache_lck = cache.lock();
490 let result = cache_lck.set_monitoring_mode(subscription_id, monitoring_mode, items);
491
492 if let Ok(res) = &result {
493 for (status, rf) in res {
494 if status.is_good() {
495 let key = MonitoredItemKeyRef {
496 id: rf.node_id().into(),
497 attribute_id: rf.attribute(),
498 };
499 if let Some(it) = lck
500 .monitored_items
501 .get_mut(&key)
502 .and_then(|it| it.get_mut(&rf.handle()))
503 {
504 it.enabled = !matches!(monitoring_mode, MonitoringMode::Disabled);
505 }
506 }
507 }
508 }
509 result
510 }
511
512 pub(crate) fn set_triggering(
513 &self,
514 session_id: u32,
515 subscription_id: u32,
516 triggering_item_id: u32,
517 links_to_add: Vec<u32>,
518 links_to_remove: Vec<u32>,
519 ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
520 let Some(cache) = ({
521 let lck = trace_read_lock!(self.inner);
522 lck.session_subscriptions.get(&session_id).cloned()
523 }) else {
524 return Err(StatusCode::BadNoSubscription);
525 };
526
527 let mut cache_lck = cache.lock();
528 cache_lck.set_triggering(
529 subscription_id,
530 triggering_item_id,
531 links_to_add,
532 links_to_remove,
533 )
534 }
535
536 pub(crate) fn delete_monitored_items(
537 &self,
538 session_id: u32,
539 subscription_id: u32,
540 items: &[u32],
541 ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
542 let mut lck = trace_write_lock!(self.inner);
543 let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
544 return Err(StatusCode::BadNoSubscription);
545 };
546
547 let mut cache_lck = cache.lock();
548 let result = cache_lck.delete_monitored_items(subscription_id, items);
549 if let Ok(res) = &result {
550 for (status, rf) in res {
551 if status.is_good() {
552 let key = MonitoredItemKeyRef {
553 id: rf.node_id().into(),
554 attribute_id: rf.attribute(),
555 };
556 if let Some(it) = lck.monitored_items.get_mut(&key) {
557 it.remove(&rf.handle());
558 }
559 }
560 }
561 }
562 result
563 }
564
565 pub(crate) fn delete_subscriptions(
566 &self,
567 session_id: u32,
568 ids: &[u32],
569 info: &ServerInfo,
570 ) -> Result<Vec<(StatusCode, Vec<MonitoredItemRef>)>, StatusCode> {
571 let mut lck = trace_write_lock!(self.inner);
572 let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
573 return Err(StatusCode::BadNoSubscription);
574 };
575 let mut cache_lck = cache.lock();
576 for id in ids {
577 if cache_lck.contains(*id) {
578 lck.subscription_to_session.remove(id);
579 }
580 }
581 info.diagnostics
582 .set_current_subscription_count(lck.subscription_to_session.len() as u32);
583 let result = cache_lck.delete_subscriptions(ids);
584
585 for (status, item_res) in &result {
586 if !status.is_good() {
587 continue;
588 }
589
590 for rf in item_res {
591 if rf.attribute() == AttributeId::EventNotifier {
592 let key = MonitoredItemKeyRef {
593 id: rf.node_id().into(),
594 attribute_id: rf.attribute(),
595 };
596 if let Some(it) = lck.monitored_items.get_mut(&key) {
597 it.remove(&rf.handle());
598 }
599 }
600 }
601 }
602
603 Ok(result)
604 }
605
606 pub(crate) fn get_session_subscription_ids(&self, session_id: u32) -> Vec<u32> {
607 let Some(cache) = ({
608 let lck = trace_read_lock!(self.inner);
609 lck.session_subscriptions.get(&session_id).cloned()
610 }) else {
611 return Vec::new();
612 };
613
614 let cache_lck = cache.lock();
615 cache_lck.subscription_ids()
616 }
617
618 pub(crate) fn transfer(
619 &self,
620 req: &TransferSubscriptionsRequest,
621 context: &RequestContext,
622 ) -> TransferSubscriptionsResponse {
623 let mut results: Vec<_> = req
624 .subscription_ids
625 .iter()
626 .flatten()
627 .map(|id| {
628 (
629 *id,
630 TransferResult {
631 status_code: StatusCode::BadSubscriptionIdInvalid,
632 available_sequence_numbers: None,
633 },
634 )
635 })
636 .collect();
637
638 let key = Self::get_key(&context.session);
639 {
640 let mut lck = trace_write_lock!(self.inner);
641 let session_subs = lck
642 .session_subscriptions
643 .entry(context.session_id)
644 .or_insert_with(|| {
645 Arc::new(Mutex::new(SessionSubscriptions::new(
646 self.limits,
647 key.clone(),
648 context.session.clone(),
649 context.info.type_tree_getter.get_type_tree_static(context),
650 )))
651 })
652 .clone();
653 let mut session_subs_lck = session_subs.lock();
654
655 for (sub_id, res) in &mut results {
656 let Some(current_owner_session_id) = lck.subscription_to_session.get(sub_id) else {
657 continue;
658 };
659 if context.session_id == *current_owner_session_id {
660 res.status_code = StatusCode::Good;
661 res.available_sequence_numbers =
662 session_subs_lck.available_sequence_numbers(*sub_id);
663 continue;
664 }
665
666 let Some(session_cache) = lck
667 .session_subscriptions
668 .get(current_owner_session_id)
669 .cloned()
670 else {
671 continue;
673 };
674
675 let mut session_lck = session_cache.lock();
676
677 if !session_lck.user_token().is_equivalent_for_transfer(&key) {
678 res.status_code = StatusCode::BadUserAccessDenied;
679 continue;
680 }
681
682 if let (Some(sub), notifs) = session_lck.remove(*sub_id) {
683 tracing::debug!(
684 "Transfer subscription {} to session {}",
685 sub.id(),
686 context.session_id
687 );
688 res.status_code = StatusCode::Good;
689 res.available_sequence_numbers =
690 Some(notifs.iter().map(|n| n.message.sequence_number).collect());
691
692 if let Err((e, sub, notifs)) = session_subs_lck.insert(sub, notifs) {
693 res.status_code = e;
694 let _ = session_lck.insert(sub, notifs);
695 } else {
696 if req.send_initial_values {
697 if let Some(sub) = session_subs_lck.get_mut(*sub_id) {
698 sub.set_resend_data();
699 }
700 }
701 lck.subscription_to_session
702 .insert(*sub_id, context.session_id);
703 }
704 }
705 }
706 }
707
708 TransferSubscriptionsResponse {
709 response_header: ResponseHeader::new_good(&req.request_header),
710 results: Some(results.into_iter().map(|r| r.1).collect()),
711 diagnostic_infos: None,
712 }
713 }
714}
715
716pub(crate) struct PendingPublish {
717 pub response: tokio::sync::oneshot::Sender<ResponseMessage>,
718 pub request: Box<PublishRequest>,
719 pub ack_results: Option<Vec<StatusCode>>,
720 pub deadline: Instant,
721}
722
723struct NonAckedPublish {
724 message: NotificationMessage,
725 subscription_id: u32,
726}
727
728#[derive(Debug, Clone)]
729struct PersistentSessionKey {
730 token: UserToken,
731 security_mode: MessageSecurityMode,
732 application_uri: String,
733}
734
735impl PersistentSessionKey {
736 fn new(token: &UserToken, security_mode: MessageSecurityMode, application_uri: &str) -> Self {
737 Self {
738 token: token.clone(),
739 security_mode,
740 application_uri: application_uri.to_owned(),
741 }
742 }
743
744 fn is_equivalent_for_transfer(&self, other: &PersistentSessionKey) -> bool {
745 if self.token.is_anonymous() {
746 other.token.is_anonymous()
747 && matches!(
748 other.security_mode,
749 MessageSecurityMode::Sign | MessageSecurityMode::SignAndEncrypt
750 )
751 && self.application_uri == other.application_uri
752 } else {
753 other.token == self.token
754 }
755 }
756}