1mod monitored_item;
2mod notify;
3mod session_subscriptions;
4mod subscription;
5
6use std::{hash::Hash, sync::Arc, time::Instant};
7
8use chrono::Utc;
9use hashbrown::{Equivalent, HashMap};
10pub use monitored_item::{CreateMonitoredItem, MonitoredItem};
11use opcua_core::{trace_read_lock, trace_write_lock, ResponseMessage};
12use opcua_nodes::{Event, TypeTree};
13pub use session_subscriptions::SessionSubscriptions;
14use subscription::TickReason;
15pub use subscription::{MonitoredItemHandle, Subscription, SubscriptionState};
16use tracing::error;
17
18pub use notify::{
19 SubscriptionDataNotifier, SubscriptionDataNotifierBatch, SubscriptionEventNotifier,
20 SubscriptionEventNotifierBatch,
21};
22
23use opcua_core::sync::{Mutex, RwLock};
24
25use opcua_types::{
26 node_id::{IdentifierRef, NodeIdRef},
27 AttributeId, CreateSubscriptionRequest, CreateSubscriptionResponse, DataEncoding, DataValue,
28 DateTimeUtc, MessageSecurityMode, ModifySubscriptionRequest, ModifySubscriptionResponse,
29 MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoringMode, NodeId,
30 NotificationMessage, NumericRange, PublishRequest, RepublishRequest, RepublishResponse,
31 ResponseHeader, SetPublishingModeRequest, SetPublishingModeResponse, StatusCode,
32 TimestampsToReturn, TransferResult, TransferSubscriptionsRequest,
33 TransferSubscriptionsResponse,
34};
35
36use super::{
37 authenticator::UserToken,
38 info::ServerInfo,
39 node_manager::{MonitoredItemRef, MonitoredItemUpdateRef, RequestContext, ServerContext},
40 session::instance::Session,
41 SubscriptionLimits,
42};
43
44#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45struct MonitoredItemKey {
46 id: NodeId,
47 attribute_id: AttributeId,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
51struct MonitoredItemKeyRef<T: IdentifierRef> {
52 id: NodeIdRef<T>,
53 attribute_id: AttributeId,
54}
55
56impl<T> Hash for MonitoredItemKeyRef<T>
57where
58 T: IdentifierRef,
59{
60 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
61 self.id.hash(state);
62 self.attribute_id.hash(state);
63 }
64}
65
66impl<T: IdentifierRef> Equivalent<MonitoredItemKey> for MonitoredItemKeyRef<T> {
67 fn equivalent(&self, key: &MonitoredItemKey) -> bool {
68 self.id == key.id && self.attribute_id == key.attribute_id
69 }
70}
71
72pub struct MonitoredItemEntry {
75 pub enabled: bool,
77 pub data_encoding: DataEncoding,
79 pub index_range: NumericRange,
81}
82
83struct SubscriptionCacheInner {
84 session_subscriptions: HashMap<u32, Arc<Mutex<SessionSubscriptions>>>,
86 subscription_to_session: HashMap<u32, u32>,
88 monitored_items: HashMap<MonitoredItemKey, HashMap<MonitoredItemHandle, MonitoredItemEntry>>,
90}
91
92pub struct SubscriptionCache {
99 inner: RwLock<SubscriptionCacheInner>,
100 limits: SubscriptionLimits,
102}
103
104impl SubscriptionCache {
105 pub(crate) fn new(limits: SubscriptionLimits) -> Self {
106 Self {
107 inner: RwLock::new(SubscriptionCacheInner {
108 session_subscriptions: HashMap::new(),
109 subscription_to_session: HashMap::new(),
110 monitored_items: HashMap::new(),
111 }),
112 limits,
113 }
114 }
115
116 pub fn get_session_subscriptions(
118 &self,
119 session_id: u32,
120 ) -> Option<Arc<Mutex<SessionSubscriptions>>> {
121 let inner = trace_read_lock!(self.inner);
122 inner.session_subscriptions.get(&session_id).cloned()
123 }
124
125 pub(crate) async fn periodic_tick(&self, context: &ServerContext) {
129 let mut to_delete = Vec::new();
133 let mut items_to_delete = Vec::new();
134 {
135 let now = Utc::now();
136 let now_instant = Instant::now();
137 let lck = trace_read_lock!(self.inner);
138 for (session_id, sub) in lck.session_subscriptions.iter() {
139 let mut sub_lck = sub.lock();
140 items_to_delete.push((
141 sub_lck.session().clone(),
142 sub_lck.tick(&now, now_instant, TickReason::TickTimerFired),
143 ));
144 if sub_lck.is_ready_to_delete() {
145 to_delete.push(*session_id);
146 }
147 }
148 }
149 if !to_delete.is_empty() {
150 let mut lck = trace_write_lock!(self.inner);
151 for id in to_delete {
152 lck.session_subscriptions.remove(&id);
153 }
154 context
155 .info
156 .diagnostics
157 .set_current_subscription_count(lck.subscription_to_session.len() as u32);
158 }
159 if !items_to_delete.is_empty() {
160 Self::delete_expired_monitored_items(context, items_to_delete).await;
161 }
162 }
163
164 async fn delete_expired_monitored_items(
165 context: &ServerContext,
166 items_to_delete: Vec<(Arc<RwLock<Session>>, Vec<MonitoredItemRef>)>,
167 ) {
168 for (session, items) in items_to_delete {
169 let (id, token) = {
172 let lck = session.read();
173 let Some(token) = lck.user_token() else {
174 error!("Active session missing user token, this should be impossible");
175 continue;
176 };
177
178 (lck.session_id_numeric(), token.clone())
179 };
180 let ctx = RequestContext {
181 session,
182 session_id: id,
183 authenticator: context.authenticator.clone(),
184 token,
185 current_node_manager_index: 0,
186 type_tree: context.type_tree.clone(),
187 subscriptions: context.subscriptions.clone(),
188 info: context.info.clone(),
189 type_tree_getter: context.type_tree_getter.clone(),
190 };
191
192 for mgr in context.node_managers.iter() {
193 let owned: Vec<_> = items
194 .iter()
195 .filter(|n| mgr.owns_node(n.node_id()))
196 .collect();
197
198 if owned.is_empty() {
199 continue;
200 }
201
202 mgr.delete_monitored_items(&ctx, &owned).await;
203 }
204 }
205 }
206
207 pub(crate) fn get_monitored_item_count(
208 &self,
209 session_id: u32,
210 subscription_id: u32,
211 ) -> Option<usize> {
212 let cache = ({
213 let lck = trace_read_lock!(self.inner);
214 lck.session_subscriptions.get(&session_id).cloned()
215 })?;
216 let cache_lck = cache.lock();
217 cache_lck.get_monitored_item_count(subscription_id)
218 }
219
220 pub(crate) fn create_subscription(
221 &self,
222 session_id: u32,
223 request: &CreateSubscriptionRequest,
224 context: &RequestContext,
225 ) -> Result<CreateSubscriptionResponse, StatusCode> {
226 let mut lck = trace_write_lock!(self.inner);
227 let cache = lck
228 .session_subscriptions
229 .entry(session_id)
230 .or_insert_with(|| {
231 Arc::new(Mutex::new(SessionSubscriptions::new(
232 self.limits,
233 Self::get_key(&context.session),
234 context.session.clone(),
235 context.info.type_tree_getter.get_type_tree_static(context),
236 )))
237 })
238 .clone();
239 let mut cache_lck = cache.lock();
240 let res = cache_lck.create_subscription(request, &context.info)?;
241 lck.subscription_to_session
242 .insert(res.subscription_id, session_id);
243 context
244 .info
245 .diagnostics
246 .set_current_subscription_count(lck.subscription_to_session.len() as u32);
247 context.info.diagnostics.inc_subscription_count();
248 Ok(res)
249 }
250
251 pub(crate) fn modify_subscription(
252 &self,
253 session_id: u32,
254 request: &ModifySubscriptionRequest,
255 info: &ServerInfo,
256 ) -> Result<ModifySubscriptionResponse, StatusCode> {
257 let Some(cache) = ({
258 let lck = trace_read_lock!(self.inner);
259 lck.session_subscriptions.get(&session_id).cloned()
260 }) else {
261 return Err(StatusCode::BadNoSubscription);
262 };
263 let mut cache_lck = cache.lock();
264 cache_lck.modify_subscription(request, info)
265 }
266
267 pub(crate) fn set_publishing_mode(
268 &self,
269 session_id: u32,
270 request: &SetPublishingModeRequest,
271 ) -> Result<SetPublishingModeResponse, StatusCode> {
272 let Some(cache) = ({
273 let lck = trace_read_lock!(self.inner);
274 lck.session_subscriptions.get(&session_id).cloned()
275 }) else {
276 return Err(StatusCode::BadNoSubscription);
277 };
278 let mut cache_lck = cache.lock();
279 cache_lck.set_publishing_mode(request)
280 }
281
282 pub(crate) fn republish(
283 &self,
284 session_id: u32,
285 request: &RepublishRequest,
286 ) -> Result<RepublishResponse, StatusCode> {
287 let Some(cache) = ({
288 let lck = trace_read_lock!(self.inner);
289 lck.session_subscriptions.get(&session_id).cloned()
290 }) else {
291 return Err(StatusCode::BadNoSubscription);
292 };
293 let cache_lck = cache.lock();
294 cache_lck.republish(request)
295 }
296
297 pub(crate) fn enqueue_publish_request(
298 &self,
299 session_id: u32,
300 now: &DateTimeUtc,
301 now_instant: Instant,
302 request: PendingPublish,
303 ) -> Result<(), StatusCode> {
304 let Some(cache) = ({
305 let lck = trace_read_lock!(self.inner);
306 lck.session_subscriptions.get(&session_id).cloned()
307 }) else {
308 return Err(StatusCode::BadNoSubscription);
309 };
310
311 let mut cache_lck = cache.lock();
312 cache_lck.enqueue_publish_request(now, now_instant, request);
313 Ok(())
314 }
315
316 pub fn data_notifier<'a>(&'a self) -> SubscriptionDataNotifier<'a> {
332 SubscriptionDataNotifier::new(trace_read_lock!(self.inner))
333 }
334
335 pub fn event_notifier<'a, 'b>(&'a self) -> SubscriptionEventNotifier<'a, 'b> {
351 SubscriptionEventNotifier::new(trace_read_lock!(self.inner))
352 }
353
354 pub fn notify_data_change<'a>(
359 &self,
360 items: impl Iterator<Item = (DataValue, &'a NodeId, AttributeId)>,
361 ) {
362 let mut notif = self.data_notifier();
363 for (dv, node_id, attribute_id) in items {
364 notif.notify(node_id, attribute_id, dv);
365 }
366 }
367
368 pub fn maybe_notify<'a>(
373 &self,
374 items: impl Iterator<Item = (&'a NodeId, AttributeId)>,
375 sample: impl Fn(&NodeId, AttributeId, &NumericRange, &DataEncoding) -> Option<DataValue>,
376 ) {
377 let mut notif = self.data_notifier();
378 for (id, attribute_id) in items {
379 if let Some(mut batch) = notif.notify_for(id, attribute_id) {
380 for (handle, entry) in batch.entries() {
381 if let Some(value) =
382 sample(id, attribute_id, &entry.index_range, &entry.data_encoding)
383 {
384 batch.data_value_to_item(value, handle);
385 }
386 }
387 }
388 }
389 }
390
391 pub fn notify_events<'a>(&self, items: impl Iterator<Item = (&'a dyn Event, &'a NodeId)>) {
394 let mut notif = self.event_notifier();
395 for (evt, id) in items {
396 notif.notify(id, evt);
397 }
398 }
399
400 pub(crate) fn create_monitored_items(
401 &self,
402 session_id: u32,
403 subscription_id: u32,
404 requests: &[CreateMonitoredItem],
405 ) -> Result<Vec<MonitoredItemCreateResult>, StatusCode> {
406 let mut lck = trace_write_lock!(self.inner);
407 let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
408 return Err(StatusCode::BadNoSubscription);
409 };
410
411 let mut cache_lck = cache.lock();
412 let result = cache_lck.create_monitored_items(subscription_id, requests);
413 if let Ok(res) = &result {
414 for (create, res) in requests.iter().zip(res.iter()) {
415 if res.status_code.is_good() {
416 let key = MonitoredItemKey {
417 id: create.item_to_monitor().node_id.clone(),
418 attribute_id: create.item_to_monitor().attribute_id,
419 };
420
421 let index_range = create.item_to_monitor().index_range.clone();
422
423 lck.monitored_items.entry(key).or_default().insert(
424 create.handle(),
425 MonitoredItemEntry {
426 enabled: !matches!(create.monitoring_mode(), MonitoringMode::Disabled),
427 index_range,
428 data_encoding: create.item_to_monitor().data_encoding.clone(),
429 },
430 );
431 }
432 }
433 }
434
435 result
436 }
437
438 pub(crate) fn modify_monitored_items(
439 &self,
440 session_id: u32,
441 subscription_id: u32,
442 info: &ServerInfo,
443 timestamps_to_return: TimestampsToReturn,
444 requests: Vec<MonitoredItemModifyRequest>,
445 type_tree: &dyn TypeTree,
446 ) -> Result<Vec<MonitoredItemUpdateRef>, StatusCode> {
447 let Some(cache) = ({
448 let lck = trace_read_lock!(self.inner);
449 lck.session_subscriptions.get(&session_id).cloned()
450 }) else {
451 return Err(StatusCode::BadNoSubscription);
452 };
453
454 let mut cache_lck = cache.lock();
455 cache_lck.modify_monitored_items(
456 subscription_id,
457 info,
458 timestamps_to_return,
459 requests,
460 type_tree,
461 )
462 }
463
464 fn get_key(session: &RwLock<Session>) -> PersistentSessionKey {
465 let lck = trace_read_lock!(session);
466 PersistentSessionKey::new(
467 lck.user_token().unwrap(),
468 lck.message_security_mode(),
469 lck.application_description().application_uri.as_ref(),
470 )
471 }
472
473 pub(crate) fn set_monitoring_mode(
474 &self,
475 session_id: u32,
476 subscription_id: u32,
477 monitoring_mode: MonitoringMode,
478 items: Vec<u32>,
479 ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
480 let mut lck = trace_write_lock!(self.inner);
481 let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
482 return Err(StatusCode::BadNoSubscription);
483 };
484
485 let mut cache_lck = cache.lock();
486 let result = cache_lck.set_monitoring_mode(subscription_id, monitoring_mode, items);
487
488 if let Ok(res) = &result {
489 for (status, rf) in res {
490 if status.is_good() {
491 let key = MonitoredItemKeyRef {
492 id: rf.node_id().into(),
493 attribute_id: rf.attribute(),
494 };
495 if let Some(it) = lck
496 .monitored_items
497 .get_mut(&key)
498 .and_then(|it| it.get_mut(&rf.handle()))
499 {
500 it.enabled = !matches!(monitoring_mode, MonitoringMode::Disabled);
501 }
502 }
503 }
504 }
505 result
506 }
507
508 pub(crate) fn set_triggering(
509 &self,
510 session_id: u32,
511 subscription_id: u32,
512 triggering_item_id: u32,
513 links_to_add: Vec<u32>,
514 links_to_remove: Vec<u32>,
515 ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
516 let Some(cache) = ({
517 let lck = trace_read_lock!(self.inner);
518 lck.session_subscriptions.get(&session_id).cloned()
519 }) else {
520 return Err(StatusCode::BadNoSubscription);
521 };
522
523 let mut cache_lck = cache.lock();
524 cache_lck.set_triggering(
525 subscription_id,
526 triggering_item_id,
527 links_to_add,
528 links_to_remove,
529 )
530 }
531
532 pub(crate) fn delete_monitored_items(
533 &self,
534 session_id: u32,
535 subscription_id: u32,
536 items: &[u32],
537 ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
538 let mut lck = trace_write_lock!(self.inner);
539 let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
540 return Err(StatusCode::BadNoSubscription);
541 };
542
543 let mut cache_lck = cache.lock();
544 let result = cache_lck.delete_monitored_items(subscription_id, items);
545 if let Ok(res) = &result {
546 for (status, rf) in res {
547 if status.is_good() {
548 let key = MonitoredItemKeyRef {
549 id: rf.node_id().into(),
550 attribute_id: rf.attribute(),
551 };
552 if let Some(it) = lck.monitored_items.get_mut(&key) {
553 it.remove(&rf.handle());
554 }
555 }
556 }
557 }
558 result
559 }
560
561 pub(crate) fn delete_subscriptions(
562 &self,
563 session_id: u32,
564 ids: &[u32],
565 info: &ServerInfo,
566 ) -> Result<Vec<(StatusCode, Vec<MonitoredItemRef>)>, StatusCode> {
567 let mut lck = trace_write_lock!(self.inner);
568 let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
569 return Err(StatusCode::BadNoSubscription);
570 };
571 let mut cache_lck = cache.lock();
572 for id in ids {
573 if cache_lck.contains(*id) {
574 lck.subscription_to_session.remove(id);
575 }
576 }
577 info.diagnostics
578 .set_current_subscription_count(lck.subscription_to_session.len() as u32);
579 let result = cache_lck.delete_subscriptions(ids);
580
581 for (status, item_res) in &result {
582 if !status.is_good() {
583 continue;
584 }
585
586 for rf in item_res {
587 if rf.attribute() == AttributeId::EventNotifier {
588 let key = MonitoredItemKeyRef {
589 id: rf.node_id().into(),
590 attribute_id: rf.attribute(),
591 };
592 if let Some(it) = lck.monitored_items.get_mut(&key) {
593 it.remove(&rf.handle());
594 }
595 }
596 }
597 }
598
599 Ok(result)
600 }
601
602 pub(crate) fn get_session_subscription_ids(&self, session_id: u32) -> Vec<u32> {
603 let Some(cache) = ({
604 let lck = trace_read_lock!(self.inner);
605 lck.session_subscriptions.get(&session_id).cloned()
606 }) else {
607 return Vec::new();
608 };
609
610 let cache_lck = cache.lock();
611 cache_lck.subscription_ids()
612 }
613
614 pub(crate) fn transfer(
615 &self,
616 req: &TransferSubscriptionsRequest,
617 context: &RequestContext,
618 ) -> TransferSubscriptionsResponse {
619 let mut results: Vec<_> = req
620 .subscription_ids
621 .iter()
622 .flatten()
623 .map(|id| {
624 (
625 *id,
626 TransferResult {
627 status_code: StatusCode::BadSubscriptionIdInvalid,
628 available_sequence_numbers: None,
629 },
630 )
631 })
632 .collect();
633
634 let key = Self::get_key(&context.session);
635 {
636 let mut lck = trace_write_lock!(self.inner);
637 let session_subs = lck
638 .session_subscriptions
639 .entry(context.session_id)
640 .or_insert_with(|| {
641 Arc::new(Mutex::new(SessionSubscriptions::new(
642 self.limits,
643 key.clone(),
644 context.session.clone(),
645 context.info.type_tree_getter.get_type_tree_static(context),
646 )))
647 })
648 .clone();
649 let mut session_subs_lck = session_subs.lock();
650
651 for (sub_id, res) in &mut results {
652 let Some(current_owner_session_id) = lck.subscription_to_session.get(sub_id) else {
653 continue;
654 };
655 if context.session_id == *current_owner_session_id {
656 res.status_code = StatusCode::Good;
657 res.available_sequence_numbers =
658 session_subs_lck.available_sequence_numbers(*sub_id);
659 continue;
660 }
661
662 let Some(session_cache) = lck
663 .session_subscriptions
664 .get(current_owner_session_id)
665 .cloned()
666 else {
667 continue;
669 };
670
671 let mut session_lck = session_cache.lock();
672
673 if !session_lck.user_token().is_equivalent_for_transfer(&key) {
674 res.status_code = StatusCode::BadUserAccessDenied;
675 continue;
676 }
677
678 if let (Some(sub), notifs) = session_lck.remove(*sub_id) {
679 tracing::debug!(
680 "Transfer subscription {} to session {}",
681 sub.id(),
682 context.session_id
683 );
684 res.status_code = StatusCode::Good;
685 res.available_sequence_numbers =
686 Some(notifs.iter().map(|n| n.message.sequence_number).collect());
687
688 if let Err((e, sub, notifs)) = session_subs_lck.insert(sub, notifs) {
689 res.status_code = e;
690 let _ = session_lck.insert(sub, notifs);
691 } else {
692 if req.send_initial_values {
693 if let Some(sub) = session_subs_lck.get_mut(*sub_id) {
694 sub.set_resend_data();
695 }
696 }
697 lck.subscription_to_session
698 .insert(*sub_id, context.session_id);
699 }
700 }
701 }
702 }
703
704 TransferSubscriptionsResponse {
705 response_header: ResponseHeader::new_good(&req.request_header),
706 results: Some(results.into_iter().map(|r| r.1).collect()),
707 diagnostic_infos: None,
708 }
709 }
710}
711
712pub(crate) struct PendingPublish {
713 pub response: tokio::sync::oneshot::Sender<ResponseMessage>,
714 pub request: Box<PublishRequest>,
715 pub ack_results: Option<Vec<StatusCode>>,
716 pub deadline: Instant,
717}
718
719struct NonAckedPublish {
720 message: NotificationMessage,
721 subscription_id: u32,
722}
723
724#[derive(Debug, Clone)]
725struct PersistentSessionKey {
726 token: UserToken,
727 security_mode: MessageSecurityMode,
728 application_uri: String,
729}
730
731impl PersistentSessionKey {
732 fn new(token: &UserToken, security_mode: MessageSecurityMode, application_uri: &str) -> Self {
733 Self {
734 token: token.clone(),
735 security_mode,
736 application_uri: application_uri.to_owned(),
737 }
738 }
739
740 fn is_equivalent_for_transfer(&self, other: &PersistentSessionKey) -> bool {
741 if self.token.is_anonymous() {
742 other.token.is_anonymous()
743 && matches!(
744 other.security_mode,
745 MessageSecurityMode::Sign | MessageSecurityMode::SignAndEncrypt
746 )
747 && self.application_uri == other.application_uri
748 } else {
749 other.token == self.token
750 }
751 }
752}