1use std::{
2 collections::HashSet,
3 time::{Duration, Instant},
4};
5
6use crate::{
7 session::{
8 process_service_result, process_unexpected_response,
9 request_builder::{builder_base, builder_debug, builder_error, RequestHeaderBuilder},
10 services::subscriptions::{
11 callbacks::OnSubscriptionNotificationCore, ModifyMonitoredItem,
12 PreInsertMonitoredItems, Subscription,
13 },
14 session_debug, session_error, session_warn,
15 },
16 Session, UARequest,
17};
18use opcua_core::{handle::AtomicHandle, sync::Mutex, trace_lock, ResponseMessage};
19use opcua_types::{
20 AttributeId, CreateMonitoredItemsRequest, CreateSubscriptionRequest,
21 CreateSubscriptionResponse, DeleteMonitoredItemsRequest, DeleteMonitoredItemsResponse,
22 DeleteSubscriptionsRequest, DeleteSubscriptionsResponse, DiagnosticInfo, IntegerId,
23 ModifyMonitoredItemsRequest, ModifyMonitoredItemsResponse, ModifySubscriptionRequest,
24 ModifySubscriptionResponse, MonitoredItemCreateRequest, MonitoredItemCreateResult,
25 MonitoredItemModifyRequest, MonitoredItemModifyResult, MonitoringMode, MonitoringParameters,
26 NodeId, NotificationMessage, PublishRequest, PublishResponse, ReadValueId, RepublishRequest,
27 RepublishResponse, ResponseHeader, SetMonitoringModeRequest, SetMonitoringModeResponse,
28 SetPublishingModeRequest, SetPublishingModeResponse, SetTriggeringRequest,
29 SetTriggeringResponse, StatusCode, SubscriptionAcknowledgement, TimestampsToReturn,
30 TransferResult, TransferSubscriptionsRequest, TransferSubscriptionsResponse,
31};
32use tracing::enabled;
33
34use super::state::SubscriptionState;
35
36pub struct CreateSubscription {
40 publishing_interval: Duration,
41 lifetime_count: u32,
42 keep_alive_count: u32,
43 max_notifications_per_publish: u32,
44 publishing_enabled: bool,
45 priority: u8,
46
47 header: RequestHeaderBuilder,
48}
49
50builder_base!(CreateSubscription);
51
52impl CreateSubscription {
53 pub fn new(session: &Session) -> Self {
55 Self {
56 publishing_interval: Duration::from_millis(500),
57 lifetime_count: 60,
58 keep_alive_count: 20,
59 max_notifications_per_publish: 0,
60 publishing_enabled: true,
61 priority: 0,
62 header: RequestHeaderBuilder::new_from_session(session),
63 }
64 }
65
66 pub fn new_manual(
68 session_id: u32,
69 timeout: Duration,
70 auth_token: NodeId,
71 request_handle: IntegerId,
72 ) -> Self {
73 Self {
74 publishing_interval: Duration::from_millis(500),
75 lifetime_count: 60,
76 keep_alive_count: 20,
77 max_notifications_per_publish: 0,
78 publishing_enabled: true,
79 priority: 0,
80 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
81 }
82 }
83
84 pub fn publishing_interval(mut self, interval: Duration) -> Self {
92 self.publishing_interval = interval;
93 self
94 }
95
96 pub fn max_lifetime_count(mut self, lifetime_count: u32) -> Self {
101 self.lifetime_count = lifetime_count;
102 self
103 }
104
105 pub fn max_keep_alive_count(mut self, keep_alive_count: u32) -> Self {
111 self.keep_alive_count = keep_alive_count;
112 self
113 }
114
115 pub fn max_notifications_per_publish(mut self, max_notifications_per_publish: u32) -> Self {
120 self.max_notifications_per_publish = max_notifications_per_publish;
121 self
122 }
123
124 pub fn priority(mut self, priority: u8) -> Self {
129 self.priority = priority;
130 self
131 }
132
133 pub fn publishing_enabled(mut self, publishing_enabled: bool) -> Self {
138 self.publishing_enabled = publishing_enabled;
139 self
140 }
141}
142
143impl UARequest for CreateSubscription {
144 type Out = CreateSubscriptionResponse;
145
146 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
147 where
148 Self: 'a,
149 {
150 let request = CreateSubscriptionRequest {
151 request_header: self.header.header,
152 requested_publishing_interval: self.publishing_interval.as_millis() as f64,
153 requested_lifetime_count: self.lifetime_count,
154 requested_max_keep_alive_count: self.keep_alive_count,
155 max_notifications_per_publish: self.max_notifications_per_publish,
156 publishing_enabled: self.publishing_enabled,
157 priority: self.priority,
158 };
159
160 let response = channel.send(request, self.header.timeout).await?;
161
162 if let ResponseMessage::CreateSubscription(response) = response {
163 process_service_result(&response.response_header)?;
164 builder_debug!(
165 self,
166 "create_subscription, created a subscription with id {}",
167 response.subscription_id
168 );
169 Ok(*response)
170 } else {
171 builder_error!(self, "create_subscription failed {:?}", response);
172 Err(process_unexpected_response(response))
173 }
174 }
175}
176
177#[derive(Clone)]
181pub struct ModifySubscription {
182 subscription_id: u32,
183 publishing_interval: Duration,
184 lifetime_count: u32,
185 keep_alive_count: u32,
186 max_notifications_per_publish: u32,
187 priority: u8,
188
189 header: RequestHeaderBuilder,
190}
191
192builder_base!(ModifySubscription);
193
194impl ModifySubscription {
195 pub fn new(subscription_id: u32, session: &Session) -> Self {
197 Self {
198 subscription_id,
199 publishing_interval: Duration::from_millis(500),
200 lifetime_count: 60,
201 keep_alive_count: 20,
202 max_notifications_per_publish: 0,
203 priority: 0,
204 header: RequestHeaderBuilder::new_from_session(session),
205 }
206 }
207
208 pub fn new_manual(
210 subscription_id: u32,
211 session_id: u32,
212 timeout: Duration,
213 auth_token: NodeId,
214 request_handle: IntegerId,
215 ) -> Self {
216 Self {
217 subscription_id,
218 publishing_interval: Duration::from_millis(500),
219 lifetime_count: 60,
220 keep_alive_count: 20,
221 max_notifications_per_publish: 0,
222 priority: 0,
223 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
224 }
225 }
226
227 pub fn publishing_interval(mut self, interval: Duration) -> Self {
235 self.publishing_interval = interval;
236 self
237 }
238
239 pub fn max_lifetime_count(mut self, lifetime_count: u32) -> Self {
244 self.lifetime_count = lifetime_count;
245 self
246 }
247
248 pub fn max_keep_alive_count(mut self, keep_alive_count: u32) -> Self {
254 self.keep_alive_count = keep_alive_count;
255 self
256 }
257
258 pub fn max_notifications_per_publish(mut self, max_notifications_per_publish: u32) -> Self {
263 self.max_notifications_per_publish = max_notifications_per_publish;
264 self
265 }
266
267 pub fn priority(mut self, priority: u8) -> Self {
272 self.priority = priority;
273 self
274 }
275}
276
277impl UARequest for ModifySubscription {
278 type Out = ModifySubscriptionResponse;
279
280 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
281 where
282 Self: 'a,
283 {
284 if self.subscription_id == 0 {
285 builder_error!(
286 self,
287 "modify_subscription, subscription id must be non-zero"
288 );
289 return Err(StatusCode::BadInvalidArgument);
290 }
291
292 let request = ModifySubscriptionRequest {
293 request_header: self.header.header,
294 subscription_id: self.subscription_id,
295 requested_publishing_interval: self.publishing_interval.as_millis() as f64,
296 requested_lifetime_count: self.lifetime_count,
297 requested_max_keep_alive_count: self.keep_alive_count,
298 max_notifications_per_publish: self.max_notifications_per_publish,
299 priority: self.priority,
300 };
301
302 let response = channel.send(request, self.header.timeout).await?;
303
304 if let ResponseMessage::ModifySubscription(response) = response {
305 process_service_result(&response.response_header)?;
306 builder_debug!(
307 self,
308 "modify_subscription success for {}",
309 self.subscription_id
310 );
311 Ok(*response)
312 } else {
313 builder_debug!(self, "modify_subscription failed");
314 Err(process_unexpected_response(response))
315 }
316 }
317}
318
319#[derive(Clone)]
323pub struct SetPublishingMode {
324 subscription_ids: Vec<u32>,
325 publishing_enabled: bool,
326
327 header: RequestHeaderBuilder,
328}
329
330builder_base!(SetPublishingMode);
331
332impl SetPublishingMode {
333 pub fn new(publishing_enabled: bool, session: &Session) -> Self {
335 Self {
336 subscription_ids: Vec::new(),
337 publishing_enabled,
338 header: RequestHeaderBuilder::new_from_session(session),
339 }
340 }
341
342 pub fn new_manual(
344 publishing_enabled: bool,
345 session_id: u32,
346 timeout: Duration,
347 auth_token: NodeId,
348 request_handle: IntegerId,
349 ) -> Self {
350 Self {
351 subscription_ids: Vec::new(),
352 publishing_enabled,
353 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
354 }
355 }
356
357 pub fn subscription_ids(mut self, subscription_ids: Vec<u32>) -> Self {
359 self.subscription_ids = subscription_ids;
360 self
361 }
362
363 pub fn subscription(mut self, subscription_id: u32) -> Self {
365 self.subscription_ids.push(subscription_id);
366 self
367 }
368}
369
370impl UARequest for SetPublishingMode {
371 type Out = SetPublishingModeResponse;
372
373 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
374 where
375 Self: 'a,
376 {
377 builder_debug!(
378 self,
379 "set_publishing_mode, for subscriptions {:?}, publishing enabled {}",
380 self.subscription_ids,
381 self.publishing_enabled
382 );
383 if self.subscription_ids.is_empty() {
384 builder_error!(
385 self,
386 "set_publishing_mode, no subscription ids were provided"
387 );
388 return Err(StatusCode::BadNothingToDo);
389 }
390
391 let request = SetPublishingModeRequest {
392 request_header: self.header.header,
393 publishing_enabled: self.publishing_enabled,
394 subscription_ids: Some(self.subscription_ids.clone()),
395 };
396 let response = channel.send(request, self.header.timeout).await?;
397 if let ResponseMessage::SetPublishingMode(response) = response {
398 process_service_result(&response.response_header)?;
399 let num_results = response
400 .results
401 .as_ref()
402 .map(|l| l.len())
403 .unwrap_or_default();
404
405 if num_results != self.subscription_ids.len() {
406 builder_error!(
407 self,
408 "set_publishing_mode returned an incorrect number of results. Expected {}, got {}",
409 self.subscription_ids.len(),
410 num_results
411 );
412 return Err(StatusCode::BadUnexpectedError);
413 }
414
415 builder_debug!(self, "set_publishing_mode success");
416 Ok(*response)
417 } else {
418 builder_error!(self, "set_publishing_mode failed {:?}", response);
419 Err(process_unexpected_response(response))
420 }
421 }
422}
423
424#[derive(Clone)]
425pub struct Publish {
429 header: RequestHeaderBuilder,
430 acks: Vec<SubscriptionAcknowledgement>,
431}
432
433builder_base!(Publish);
434
435impl Publish {
436 pub fn new(session: &Session) -> Self {
438 Self {
439 header: RequestHeaderBuilder::new_from_session(session),
440 acks: Vec::new(),
441 }
442 .timeout(session.publish_timeout)
443 }
444
445 pub fn new_manual(
447 session_id: u32,
448 timeout: Duration,
449 auth_token: NodeId,
450 request_handle: IntegerId,
451 ) -> Self {
452 Self {
453 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
454 acks: Vec::new(),
455 }
456 }
457
458 pub fn acks(mut self, acks: Vec<SubscriptionAcknowledgement>) -> Self {
460 self.acks = acks;
461 self
462 }
463
464 pub fn ack(mut self, subscription_id: u32, sequence_number: u32) -> Self {
466 self.acks.push(SubscriptionAcknowledgement {
467 subscription_id,
468 sequence_number,
469 });
470 self
471 }
472}
473
474impl UARequest for Publish {
475 type Out = PublishResponse;
476
477 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
478 where
479 Self: 'a,
480 {
481 if enabled!(tracing::Level::DEBUG) {
482 let sequence_nrs: Vec<u32> = self.acks.iter().map(|ack| ack.sequence_number).collect();
483 builder_debug!(
484 self,
485 "publish, acknowledging subscription acknowledgements with sequence nrs {:?}",
486 sequence_nrs
487 );
488 }
489 let request = PublishRequest {
490 request_header: self.header.header,
491 subscription_acknowledgements: if self.acks.is_empty() {
492 None
493 } else {
494 Some(self.acks)
495 },
496 };
497 let response = channel.send(request, self.header.timeout).await?;
498 if let ResponseMessage::Publish(response) = response {
499 process_service_result(&response.response_header)?;
500 builder_debug!(self, "publish success");
501 Ok(*response)
502 } else {
503 builder_error!(self, "publish failed {:?}", response);
504 Err(process_unexpected_response(response))
505 }
506 }
507}
508
509pub struct Republish {
513 subscription_id: u32,
514 retransmit_sequence_number: u32,
515
516 header: RequestHeaderBuilder,
517}
518
519builder_base!(Republish);
520
521impl Republish {
522 pub fn new(subscription_id: u32, retransmit_sequence_number: u32, session: &Session) -> Self {
524 Self {
525 subscription_id,
526 retransmit_sequence_number,
527 header: RequestHeaderBuilder::new_from_session(session),
528 }
529 }
530
531 pub fn new_manual(
533 subscription_id: u32,
534 retransmit_sequence_number: u32,
535 session_id: u32,
536 timeout: Duration,
537 auth_token: NodeId,
538 request_handle: IntegerId,
539 ) -> Self {
540 Self {
541 subscription_id,
542 retransmit_sequence_number,
543 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
544 }
545 }
546}
547
548impl UARequest for Republish {
549 type Out = RepublishResponse;
550 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
551 where
552 Self: 'a,
553 {
554 let request = RepublishRequest {
555 request_header: self.header.header,
556 subscription_id: self.subscription_id,
557 retransmit_sequence_number: self.retransmit_sequence_number,
558 };
559
560 let response = channel.send(request, self.header.timeout).await?;
561
562 if let ResponseMessage::Republish(response) = response {
563 process_service_result(&response.response_header)?;
564 builder_debug!(self, "republish success");
565 Ok(*response)
566 } else {
567 builder_error!(self, "republish failed {:?}", response);
568 Err(process_unexpected_response(response))
569 }
570 }
571}
572
573#[derive(Clone)]
574pub struct TransferSubscriptions {
586 subscription_ids: Vec<u32>,
587 send_initial_values: bool,
588
589 header: RequestHeaderBuilder,
590}
591
592builder_base!(TransferSubscriptions);
593
594impl TransferSubscriptions {
595 pub fn new(session: &Session) -> Self {
597 Self {
598 subscription_ids: Vec::new(),
599 send_initial_values: false,
600 header: RequestHeaderBuilder::new_from_session(session),
601 }
602 }
603
604 pub fn new_manual(
606 session_id: u32,
607 timeout: Duration,
608 auth_token: NodeId,
609 request_handle: IntegerId,
610 ) -> Self {
611 Self {
612 subscription_ids: Vec::new(),
613 send_initial_values: false,
614 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
615 }
616 }
617 pub fn send_initial_values(mut self, send_initial_values: bool) -> Self {
622 self.send_initial_values = send_initial_values;
623 self
624 }
625
626 pub fn subscription_ids(mut self, subscription_ids: Vec<u32>) -> Self {
628 self.subscription_ids = subscription_ids;
629 self
630 }
631
632 pub fn subscription(mut self, subscription_id: u32) -> Self {
634 self.subscription_ids.push(subscription_id);
635 self
636 }
637}
638
639impl UARequest for TransferSubscriptions {
640 type Out = TransferSubscriptionsResponse;
641
642 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
643 where
644 Self: 'a,
645 {
646 if self.subscription_ids.is_empty() {
647 builder_error!(
648 self,
649 "transfer_subscriptions, no subscription ids were provided"
650 );
651 return Err(StatusCode::BadNothingToDo);
652 }
653 let request = TransferSubscriptionsRequest {
654 request_header: self.header.header,
655 subscription_ids: Some(self.subscription_ids),
656 send_initial_values: self.send_initial_values,
657 };
658 let response = channel.send(request, self.header.timeout).await?;
659 if let ResponseMessage::TransferSubscriptions(response) = response {
660 process_service_result(&response.response_header)?;
661 builder_debug!(self, "transfer_subscriptions success");
662 Ok(*response)
663 } else {
664 builder_error!(self, "transfer_subscriptions failed {:?}", response);
665 Err(process_unexpected_response(response))
666 }
667 }
668}
669
670#[derive(Clone)]
671pub struct DeleteSubscriptions {
676 subscription_ids: Vec<u32>,
677
678 header: RequestHeaderBuilder,
679}
680
681builder_base!(DeleteSubscriptions);
682
683impl DeleteSubscriptions {
684 pub fn new(session: &Session) -> Self {
686 Self {
687 subscription_ids: Vec::new(),
688 header: RequestHeaderBuilder::new_from_session(session),
689 }
690 }
691
692 pub fn new_manual(
694 session_id: u32,
695 timeout: Duration,
696 auth_token: NodeId,
697 request_handle: IntegerId,
698 ) -> Self {
699 Self {
700 subscription_ids: Vec::new(),
701 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
702 }
703 }
704
705 pub fn subscription_ids(mut self, subscription_ids: Vec<u32>) -> Self {
707 self.subscription_ids = subscription_ids;
708 self
709 }
710
711 pub fn subscription(mut self, subscription_id: u32) -> Self {
713 self.subscription_ids.push(subscription_id);
714 self
715 }
716}
717
718impl UARequest for DeleteSubscriptions {
719 type Out = DeleteSubscriptionsResponse;
720
721 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
722 where
723 Self: 'a,
724 {
725 if self.subscription_ids.is_empty() {
726 builder_error!(self, "delete_subscriptions called with no subscription IDs");
727 return Err(StatusCode::BadNothingToDo);
728 }
729 let request = DeleteSubscriptionsRequest {
730 request_header: self.header.header,
731 subscription_ids: Some(self.subscription_ids.clone()),
732 };
733 let response = channel.send(request, self.header.timeout).await?;
734 if let ResponseMessage::DeleteSubscriptions(response) = response {
735 process_service_result(&response.response_header)?;
736
737 builder_debug!(self, "delete_subscriptions success");
738 Ok(*response)
739 } else {
740 builder_error!(self, "delete_subscriptions failed {:?}", response);
741 Err(process_unexpected_response(response))
742 }
743 }
744}
745
746#[derive(Clone)]
747pub struct CreateMonitoredItems<'a> {
751 subscription_id: u32,
752 timestamps_to_return: TimestampsToReturn,
753 items_to_create: Vec<MonitoredItemCreateRequest>,
754 handle: &'a AtomicHandle,
755
756 header: RequestHeaderBuilder,
757}
758
759builder_base!(CreateMonitoredItems<'a>);
760
761impl<'a> CreateMonitoredItems<'a> {
762 pub fn new(subscription_id: u32, session: &'a Session) -> Self {
764 Self {
765 subscription_id,
766 timestamps_to_return: TimestampsToReturn::Neither,
767 items_to_create: Vec::new(),
768 handle: &session.monitored_item_handle,
769 header: RequestHeaderBuilder::new_from_session(session),
770 }
771 }
772
773 pub fn new_manual(
775 subscription_id: u32,
776 monitored_item_handle: &'a AtomicHandle,
777 session_id: u32,
778 timeout: Duration,
779 auth_token: NodeId,
780 request_handle: IntegerId,
781 ) -> Self {
782 Self {
783 subscription_id,
784 timestamps_to_return: TimestampsToReturn::Neither,
785 items_to_create: Vec::new(),
786 handle: monitored_item_handle,
787 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
788 }
789 }
790
791 pub fn timestamps_to_return(mut self, timestamps_to_return: TimestampsToReturn) -> Self {
793 self.timestamps_to_return = timestamps_to_return;
794 self
795 }
796
797 pub fn items_to_create(mut self, items_to_create: Vec<MonitoredItemCreateRequest>) -> Self {
799 self.items_to_create = items_to_create;
800 self
801 }
802
803 pub fn item(mut self, item: MonitoredItemCreateRequest) -> Self {
805 self.items_to_create.push(item);
806 self
807 }
808
809 pub fn value(mut self, node_id: NodeId, sampling_interval: f64, queue_size: u32) -> Self {
812 self.items_to_create.push(MonitoredItemCreateRequest {
813 item_to_monitor: ReadValueId {
814 node_id,
815 attribute_id: AttributeId::Value as u32,
816 ..Default::default()
817 },
818 monitoring_mode: MonitoringMode::Reporting,
819 requested_parameters: MonitoringParameters {
820 client_handle: self.handle.next(),
821 sampling_interval,
822 queue_size,
823 discard_oldest: true,
824 ..Default::default()
825 },
826 });
827 self
828 }
829}
830
831#[derive(Debug, Clone)]
832pub struct CreatedMonitoredItem {
835 pub result: MonitoredItemCreateResult,
837 pub requested_parameters: MonitoringParameters,
839 pub monitoring_mode: MonitoringMode,
841 pub item_to_monitor: ReadValueId,
843}
844
845#[derive(Debug, Clone)]
846pub struct CreateMonitoredItemsResult {
849 pub response_header: ResponseHeader,
851 pub diagnostic_infos: Option<Vec<DiagnosticInfo>>,
853 pub results: Vec<CreatedMonitoredItem>,
855}
856
857impl UARequest for CreateMonitoredItems<'_> {
858 type Out = CreateMonitoredItemsResult;
859
860 async fn send<'a>(
861 mut self,
862 channel: &'a crate::AsyncSecureChannel,
863 ) -> Result<Self::Out, StatusCode>
864 where
865 Self: 'a,
866 {
867 builder_debug!(
868 self,
869 "create_monitored_items, for subscription {}, {} items",
870 self.subscription_id,
871 self.items_to_create.len()
872 );
873 if self.subscription_id == 0 {
874 builder_error!(self, "create_monitored_items, subscription id 0 is invalid");
875 return Err(StatusCode::BadSubscriptionIdInvalid);
876 }
877
878 if self.items_to_create.is_empty() {
879 builder_error!(
880 self,
881 "create_monitored_items, called with no items to create"
882 );
883 return Err(StatusCode::BadNothingToDo);
884 }
885 for item in &mut self.items_to_create {
886 if item.requested_parameters.client_handle == 0 {
887 item.requested_parameters.client_handle = self.handle.next();
888 }
889 }
890
891 let num_items = self.items_to_create.len();
892 let request = CreateMonitoredItemsRequest {
893 request_header: self.header.header,
894 subscription_id: self.subscription_id,
895 timestamps_to_return: self.timestamps_to_return,
896 items_to_create: Some(self.items_to_create.clone()),
897 };
898
899 let response = channel.send(request, self.header.timeout).await?;
900
901 if let ResponseMessage::CreateMonitoredItems(response) = response {
902 process_service_result(&response.response_header)?;
903 if let Some(ref results) = response.results {
904 if results.len() != num_items {
905 builder_error!(
906 self,
907 "create_monitored_items, unexpected number of results. Got {}, expected {}",
908 results.len(),
909 num_items
910 );
911 return Err(StatusCode::BadUnexpectedError);
912 }
913 builder_debug!(self, "create_monitored_items, {} items created", num_items);
914 } else {
915 builder_error!(
916 self,
917 "create_monitored_items, success but no monitored items were created"
918 );
919 return Err(StatusCode::BadUnexpectedError);
920 }
921
922 let created = response
923 .results
924 .unwrap_or_default()
925 .into_iter()
926 .zip(self.items_to_create)
927 .map(|(result, item)| CreatedMonitoredItem {
928 result,
929 requested_parameters: item.requested_parameters,
930 monitoring_mode: item.monitoring_mode,
931 item_to_monitor: item.item_to_monitor,
932 })
933 .collect();
934
935 Ok(CreateMonitoredItemsResult {
936 response_header: response.response_header,
937 diagnostic_infos: response.diagnostic_infos,
938 results: created,
939 })
940 } else {
941 builder_error!(self, "create_monitored_items failed {:?}", response);
942 Err(process_unexpected_response(response))
943 }
944 }
945}
946
947#[derive(Clone)]
948pub struct ModifyMonitoredItems {
952 subscription_id: u32,
953 timestamps_to_return: TimestampsToReturn,
954 items_to_modify: Vec<MonitoredItemModifyRequest>,
955
956 header: RequestHeaderBuilder,
957}
958
959builder_base!(ModifyMonitoredItems);
960
961impl ModifyMonitoredItems {
962 pub fn new(subscription_id: u32, session: &Session) -> Self {
964 Self {
965 subscription_id,
966 timestamps_to_return: TimestampsToReturn::Neither,
967 items_to_modify: Vec::new(),
968 header: RequestHeaderBuilder::new_from_session(session),
969 }
970 }
971
972 pub fn new_manual(
974 subscription_id: u32,
975 session_id: u32,
976 timeout: Duration,
977 auth_token: NodeId,
978 request_handle: IntegerId,
979 ) -> Self {
980 Self {
981 subscription_id,
982 timestamps_to_return: TimestampsToReturn::Neither,
983 items_to_modify: Vec::new(),
984 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
985 }
986 }
987
988 pub fn timestamps_to_return(mut self, timestamps_to_return: TimestampsToReturn) -> Self {
990 self.timestamps_to_return = timestamps_to_return;
991 self
992 }
993
994 pub fn items_to_modify(mut self, items_to_modify: Vec<MonitoredItemModifyRequest>) -> Self {
996 self.items_to_modify = items_to_modify;
997 self
998 }
999
1000 pub fn item(mut self, item: MonitoredItemModifyRequest) -> Self {
1002 self.items_to_modify.push(item);
1003 self
1004 }
1005}
1006
1007impl UARequest for ModifyMonitoredItems {
1008 type Out = ModifyMonitoredItemsResponse;
1009
1010 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
1011 where
1012 Self: 'a,
1013 {
1014 builder_debug!(
1015 self,
1016 "modify_monitored_items, for subscription {}, {} items",
1017 self.subscription_id,
1018 self.items_to_modify.len()
1019 );
1020 if self.subscription_id == 0 {
1021 builder_error!(self, "modify_monitored_items, subscription id 0 is invalid");
1022 return Err(StatusCode::BadInvalidArgument);
1023 }
1024 if self.items_to_modify.is_empty() {
1025 builder_error!(
1026 self,
1027 "modify_monitored_items, called with no items to modify"
1028 );
1029 return Err(StatusCode::BadNothingToDo);
1030 }
1031 let num_items = self.items_to_modify.len();
1032 let request = ModifyMonitoredItemsRequest {
1033 request_header: self.header.header,
1034 subscription_id: self.subscription_id,
1035 timestamps_to_return: self.timestamps_to_return,
1036 items_to_modify: Some(self.items_to_modify),
1037 };
1038 let response = channel.send(request, self.header.timeout).await?;
1039 if let ResponseMessage::ModifyMonitoredItems(response) = response {
1040 process_service_result(&response.response_header)?;
1041 let Some(results) = &response.results else {
1042 builder_error!(self, "modify_monitored_items, got empty response");
1043 return Err(StatusCode::BadUnexpectedError);
1044 };
1045 if results.len() != num_items {
1046 builder_error!(
1047 self,
1048 "modify_monitored_items, unexpected number of results. Expected {}, got {}",
1049 num_items,
1050 results.len()
1051 );
1052 return Err(StatusCode::BadUnexpectedError);
1053 }
1054
1055 builder_debug!(self, "modify_monitored_items, success");
1056 Ok(*response)
1057 } else {
1058 builder_error!(self, "modify_monitored_items failed {:?}", response);
1059 Err(process_unexpected_response(response))
1060 }
1061 }
1062}
1063
1064#[derive(Clone)]
1065pub struct SetMonitoringMode {
1070 subscription_id: u32,
1071 monitoring_mode: MonitoringMode,
1072 monitored_item_ids: Vec<u32>,
1073
1074 header: RequestHeaderBuilder,
1075}
1076
1077builder_base!(SetMonitoringMode);
1078
1079impl SetMonitoringMode {
1080 pub fn new(subscription_id: u32, monitoring_mode: MonitoringMode, session: &Session) -> Self {
1082 Self {
1083 subscription_id,
1084 monitored_item_ids: Vec::new(),
1085 monitoring_mode,
1086 header: RequestHeaderBuilder::new_from_session(session),
1087 }
1088 }
1089
1090 pub fn new_manual(
1092 subscription_id: u32,
1093 monitoring_mode: MonitoringMode,
1094 session_id: u32,
1095 timeout: Duration,
1096 auth_token: NodeId,
1097 request_handle: IntegerId,
1098 ) -> Self {
1099 Self {
1100 subscription_id,
1101 monitored_item_ids: Vec::new(),
1102 monitoring_mode,
1103 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
1104 }
1105 }
1106
1107 pub fn monitored_item_ids(mut self, monitored_item_ids: Vec<u32>) -> Self {
1109 self.monitored_item_ids = monitored_item_ids;
1110 self
1111 }
1112
1113 pub fn item(mut self, item: u32) -> Self {
1115 self.monitored_item_ids.push(item);
1116 self
1117 }
1118}
1119
1120impl UARequest for SetMonitoringMode {
1121 type Out = SetMonitoringModeResponse;
1122
1123 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
1124 where
1125 Self: 'a,
1126 {
1127 builder_debug!(
1128 self,
1129 "set_monitoring_mode, for subscription {}, {} items",
1130 self.subscription_id,
1131 self.monitored_item_ids.len()
1132 );
1133 if self.subscription_id == 0 {
1134 builder_error!(self, "set_monitoring_mode, subscription id 0 is invalid");
1135 return Err(StatusCode::BadInvalidArgument);
1136 }
1137 if self.monitored_item_ids.is_empty() {
1138 builder_error!(self, "set_monitoring_mode, called with no items to modify");
1139 return Err(StatusCode::BadNothingToDo);
1140 }
1141
1142 let num_items = self.monitored_item_ids.len();
1143 let request = SetMonitoringModeRequest {
1144 request_header: self.header.header,
1145 subscription_id: self.subscription_id,
1146 monitoring_mode: self.monitoring_mode,
1147 monitored_item_ids: Some(self.monitored_item_ids),
1148 };
1149 let response = channel.send(request, self.header.timeout).await?;
1150 if let ResponseMessage::SetMonitoringMode(response) = response {
1151 let Some(results) = &response.results else {
1152 builder_error!(self, "set_monitoring_mode, got empty response");
1153 return Err(StatusCode::BadUnexpectedError);
1154 };
1155 if results.len() != num_items {
1156 builder_error!(
1157 self,
1158 "set_monitoring_mode, unexpected number of results. Expected {}, got {}",
1159 num_items,
1160 results.len()
1161 );
1162 return Err(StatusCode::BadUnexpectedError);
1163 }
1164
1165 Ok(*response)
1166 } else {
1167 builder_error!(self, "set_monitoring_mode failed {:?}", response);
1168 Err(process_unexpected_response(response))
1169 }
1170 }
1171}
1172
1173#[derive(Clone)]
1174pub struct SetTriggering {
1180 subscription_id: u32,
1181 triggering_item_id: u32,
1182 links_to_add: Vec<u32>,
1183 links_to_remove: Vec<u32>,
1184
1185 header: RequestHeaderBuilder,
1186}
1187
1188builder_base!(SetTriggering);
1189
1190impl SetTriggering {
1191 pub fn new(subscription_id: u32, triggering_item_id: u32, session: &Session) -> Self {
1193 Self {
1194 subscription_id,
1195 triggering_item_id,
1196 links_to_add: Vec::new(),
1197 links_to_remove: Vec::new(),
1198 header: RequestHeaderBuilder::new_from_session(session),
1199 }
1200 }
1201
1202 pub fn new_manual(
1204 subscription_id: u32,
1205 triggering_item_id: u32,
1206 session_id: u32,
1207 timeout: Duration,
1208 auth_token: NodeId,
1209 request_handle: IntegerId,
1210 ) -> Self {
1211 Self {
1212 subscription_id,
1213 triggering_item_id,
1214 links_to_add: Vec::new(),
1215 links_to_remove: Vec::new(),
1216 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
1217 }
1218 }
1219
1220 pub fn links_to_add(mut self, links_to_add: Vec<u32>) -> Self {
1222 self.links_to_add = links_to_add;
1223 self
1224 }
1225
1226 pub fn add_link(mut self, item: u32) -> Self {
1228 self.links_to_add.push(item);
1229 self
1230 }
1231
1232 pub fn links_to_remove(mut self, links_to_remove: Vec<u32>) -> Self {
1234 self.links_to_remove = links_to_remove;
1235 self
1236 }
1237
1238 pub fn remove_link(mut self, item: u32) -> Self {
1240 self.links_to_remove.push(item);
1241 self
1242 }
1243}
1244
1245impl UARequest for SetTriggering {
1246 type Out = SetTriggeringResponse;
1247
1248 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
1249 where
1250 Self: 'a,
1251 {
1252 builder_debug!(
1253 self,
1254 "set_triggering, for subscription {}, {} links to add, {} links to remove",
1255 self.subscription_id,
1256 self.links_to_add.len(),
1257 self.links_to_remove.len()
1258 );
1259 if self.subscription_id == 0 {
1260 builder_error!(self, "set_triggering, subscription id 0 is invalid");
1261 return Err(StatusCode::BadInvalidArgument);
1262 }
1263
1264 if self.links_to_add.is_empty() && self.links_to_remove.is_empty() {
1265 builder_error!(self, "set_triggering, called with nothing to add or remove");
1266 return Err(StatusCode::BadNothingToDo);
1267 }
1268 let request = SetTriggeringRequest {
1269 request_header: self.header.header,
1270 subscription_id: self.subscription_id,
1271 triggering_item_id: self.triggering_item_id,
1272 links_to_add: if self.links_to_add.is_empty() {
1273 None
1274 } else {
1275 Some(self.links_to_add.clone())
1276 },
1277 links_to_remove: if self.links_to_remove.is_empty() {
1278 None
1279 } else {
1280 Some(self.links_to_remove.clone())
1281 },
1282 };
1283
1284 let response = channel.send(request, self.header.timeout).await?;
1285 if let ResponseMessage::SetTriggering(response) = response {
1286 let to_add_res = response.add_results.as_deref().unwrap_or(&[]);
1287 let to_remove_res = response.remove_results.as_deref().unwrap_or(&[]);
1288 if to_add_res.len() != self.links_to_add.len() {
1289 builder_error!(
1290 self,
1291 "set_triggering, got unexpected number of add results: {}, expected {}",
1292 to_add_res.len(),
1293 self.links_to_add.len()
1294 );
1295 return Err(StatusCode::BadUnexpectedError);
1296 }
1297 if to_remove_res.len() != self.links_to_remove.len() {
1298 builder_error!(
1299 self,
1300 "set_triggering, got unexpected number of remove results: {}, expected {}",
1301 to_remove_res.len(),
1302 self.links_to_add.len()
1303 );
1304 return Err(StatusCode::BadUnexpectedError);
1305 }
1306
1307 Ok(*response)
1308 } else {
1309 builder_error!(self, "set_triggering failed {:?}", response);
1310 Err(process_unexpected_response(response))
1311 }
1312 }
1313}
1314
1315#[derive(Clone)]
1316pub struct DeleteMonitoredItems {
1320 subscription_id: u32,
1321 items_to_delete: Vec<u32>,
1322
1323 header: RequestHeaderBuilder,
1324}
1325
1326builder_base!(DeleteMonitoredItems);
1327
1328impl DeleteMonitoredItems {
1329 pub fn new(subscription_id: u32, session: &Session) -> Self {
1331 Self {
1332 subscription_id,
1333 items_to_delete: Vec::new(),
1334 header: RequestHeaderBuilder::new_from_session(session),
1335 }
1336 }
1337
1338 pub fn new_manual(
1340 subscription_id: u32,
1341 session_id: u32,
1342 timeout: Duration,
1343 auth_token: NodeId,
1344 request_handle: IntegerId,
1345 ) -> Self {
1346 Self {
1347 subscription_id,
1348 items_to_delete: Vec::new(),
1349 header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
1350 }
1351 }
1352
1353 pub fn items_to_delete(mut self, items_to_delete: Vec<u32>) -> Self {
1355 self.items_to_delete = items_to_delete;
1356 self
1357 }
1358
1359 pub fn item(mut self, item: u32) -> Self {
1361 self.items_to_delete.push(item);
1362 self
1363 }
1364}
1365
1366impl UARequest for DeleteMonitoredItems {
1367 type Out = DeleteMonitoredItemsResponse;
1368
1369 async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
1370 where
1371 Self: 'a,
1372 {
1373 builder_debug!(
1374 self,
1375 "delete_monitored_items, subscription {} for {} items",
1376 self.subscription_id,
1377 self.items_to_delete.len(),
1378 );
1379 if self.subscription_id == 0 {
1380 builder_error!(self, "delete_monitored_items, subscription id 0 is invalid");
1381 return Err(StatusCode::BadInvalidArgument);
1382 }
1383 if self.items_to_delete.is_empty() {
1384 builder_error!(
1385 self,
1386 "delete_monitored_items, called with no items to delete"
1387 );
1388 return Err(StatusCode::BadNothingToDo);
1389 }
1390
1391 let request = DeleteMonitoredItemsRequest {
1392 request_header: self.header.header,
1393 subscription_id: self.subscription_id,
1394 monitored_item_ids: Some(self.items_to_delete.clone()),
1395 };
1396 let response = channel.send(request, self.header.timeout).await?;
1397 if let ResponseMessage::DeleteMonitoredItems(response) = response {
1398 process_service_result(&response.response_header)?;
1399 builder_debug!(self, "delete_monitored_items, success");
1400 Ok(*response)
1401 } else {
1402 builder_error!(self, "delete_monitored_items failed {:?}", response);
1403 Err(process_unexpected_response(response))
1404 }
1405 }
1406}
1407
1408impl Session {
1409 pub fn subscription_state(&self) -> &Mutex<SubscriptionState> {
1411 &self.subscription_state
1412 }
1413
1414 pub fn trigger_publish_now(&self) {
1416 let _ = self.trigger_publish_tx.send(Instant::now());
1417 }
1418
1419 #[allow(clippy::too_many_arguments)]
1420 async fn create_subscription_inner(
1421 &self,
1422 publishing_interval: Duration,
1423 lifetime_count: u32,
1424 max_keep_alive_count: u32,
1425 max_notifications_per_publish: u32,
1426 publishing_enabled: bool,
1427 priority: u8,
1428 callback: Box<dyn OnSubscriptionNotificationCore>,
1429 ) -> Result<u32, StatusCode> {
1430 let response = CreateSubscription::new(self)
1431 .publishing_interval(publishing_interval)
1432 .max_lifetime_count(lifetime_count)
1433 .max_keep_alive_count(max_keep_alive_count)
1434 .max_notifications_per_publish(max_notifications_per_publish)
1435 .publishing_enabled(publishing_enabled)
1436 .priority(priority)
1437 .send(&self.channel)
1438 .await?;
1439
1440 let subscription = Subscription::new(
1441 response.subscription_id,
1442 Duration::from_millis(response.revised_publishing_interval.max(0.0).floor() as u64),
1443 response.revised_lifetime_count,
1444 response.revised_max_keep_alive_count,
1445 max_notifications_per_publish,
1446 priority,
1447 publishing_enabled,
1448 callback,
1449 );
1450 {
1451 let mut subscription_state = trace_lock!(self.subscription_state);
1452 subscription_state.add_subscription(subscription);
1453 }
1454
1455 self.trigger_publish_now();
1456
1457 Ok(response.subscription_id)
1458 }
1459
1460 #[allow(clippy::too_many_arguments)]
1502 pub async fn create_subscription(
1503 &self,
1504 publishing_interval: Duration,
1505 lifetime_count: u32,
1506 max_keep_alive_count: u32,
1507 max_notifications_per_publish: u32,
1508 priority: u8,
1509 publishing_enabled: bool,
1510 callback: impl OnSubscriptionNotificationCore + 'static,
1511 ) -> Result<u32, StatusCode> {
1512 self.create_subscription_inner(
1513 publishing_interval,
1514 lifetime_count,
1515 max_keep_alive_count,
1516 max_notifications_per_publish,
1517 publishing_enabled,
1518 priority,
1519 Box::new(callback),
1520 )
1521 .await
1522 }
1523
1524 fn subscription_exists(&self, subscription_id: u32) -> bool {
1525 let subscription_state = trace_lock!(self.subscription_state);
1526 subscription_state.subscription_exists(subscription_id)
1527 }
1528
1529 pub async fn modify_subscription(
1568 &self,
1569 subscription_id: u32,
1570 publishing_interval: Duration,
1571 lifetime_count: u32,
1572 max_keep_alive_count: u32,
1573 max_notifications_per_publish: u32,
1574 priority: u8,
1575 ) -> Result<(), StatusCode> {
1576 if !self.subscription_exists(subscription_id) {
1577 session_error!(self, "modify_subscription, subscription id does not exist");
1578 return Err(StatusCode::BadInvalidArgument);
1579 }
1580
1581 let response = ModifySubscription::new(subscription_id, self)
1582 .publishing_interval(publishing_interval)
1583 .max_lifetime_count(lifetime_count)
1584 .max_keep_alive_count(max_keep_alive_count)
1585 .max_notifications_per_publish(max_notifications_per_publish)
1586 .priority(priority)
1587 .send(&self.channel)
1588 .await?;
1589
1590 {
1591 let mut subscription_state = trace_lock!(self.subscription_state);
1592 subscription_state.modify_subscription(
1593 subscription_id,
1594 Duration::from_millis(response.revised_publishing_interval.max(0.0).floor() as u64),
1595 response.revised_lifetime_count,
1596 response.revised_max_keep_alive_count,
1597 max_notifications_per_publish,
1598 priority,
1599 );
1600 }
1601
1602 Ok(())
1603 }
1604
1605 pub async fn set_publishing_mode(
1621 &self,
1622 subscription_ids: &[u32],
1623 publishing_enabled: bool,
1624 ) -> Result<Vec<StatusCode>, StatusCode> {
1625 let results = SetPublishingMode::new(publishing_enabled, self)
1626 .subscription_ids(subscription_ids.to_vec())
1627 .send(&self.channel)
1628 .await?
1629 .results
1630 .unwrap_or_default();
1631
1632 {
1633 let mut subscription_state = trace_lock!(self.subscription_state);
1635 let ids = subscription_ids
1636 .iter()
1637 .zip(results.iter())
1638 .filter(|(_, s)| s.is_good())
1639 .map(|(v, _)| *v)
1640 .collect::<Vec<_>>();
1641 subscription_state.set_publishing_mode(&ids, publishing_enabled);
1642 }
1643
1644 if publishing_enabled {
1645 self.trigger_publish_now();
1646 }
1647 Ok(results)
1648 }
1649
1650 pub async fn transfer_subscriptions(
1673 &self,
1674 subscription_ids: &[u32],
1675 send_initial_values: bool,
1676 ) -> Result<Vec<TransferResult>, StatusCode> {
1677 let r = TransferSubscriptions::new(self)
1678 .send_initial_values(send_initial_values)
1679 .subscription_ids(subscription_ids.to_vec())
1680 .send(&self.channel)
1681 .await?
1682 .results
1683 .unwrap_or_default();
1684
1685 self.trigger_publish_now();
1686
1687 Ok(r)
1688 }
1689
1690 pub async fn delete_subscription(
1704 &self,
1705 subscription_id: u32,
1706 ) -> Result<StatusCode, StatusCode> {
1707 if subscription_id == 0 {
1708 session_error!(self, "delete_subscription, subscription id 0 is invalid");
1709 Err(StatusCode::BadInvalidArgument)
1710 } else if !self.subscription_exists(subscription_id) {
1711 session_error!(
1712 self,
1713 "delete_subscription, subscription id {} does not exist",
1714 subscription_id
1715 );
1716 Err(StatusCode::BadInvalidArgument)
1717 } else {
1718 let result = self.delete_subscriptions(&[subscription_id]).await?;
1719 Ok(result[0])
1720 }
1721 }
1722
1723 pub async fn delete_subscriptions(
1739 &self,
1740 subscription_ids: &[u32],
1741 ) -> Result<Vec<StatusCode>, StatusCode> {
1742 let result = DeleteSubscriptions::new(self)
1743 .subscription_ids(subscription_ids.to_vec())
1744 .send(&self.channel)
1745 .await?
1746 .results
1747 .unwrap_or_default();
1748 {
1749 let mut subscription_state = trace_lock!(self.subscription_state);
1751 for id in subscription_ids {
1752 subscription_state.delete_subscription(*id);
1753 }
1754 }
1755
1756 Ok(result)
1757 }
1758
1759 pub async fn create_monitored_items(
1776 &self,
1777 subscription_id: u32,
1778 timestamps_to_return: TimestampsToReturn,
1779 mut items_to_create: Vec<MonitoredItemCreateRequest>,
1780 ) -> Result<Vec<CreatedMonitoredItem>, StatusCode> {
1781 {
1782 let state = trace_lock!(self.subscription_state);
1783 if !state.subscription_exists(subscription_id) {
1784 session_error!(
1785 self,
1786 "create_monitored_items, subscription id {} does not exist",
1787 subscription_id
1788 );
1789 return Err(StatusCode::BadSubscriptionIdInvalid);
1790 }
1791 }
1792
1793 for item in &mut items_to_create {
1794 if item.requested_parameters.client_handle == 0 {
1795 item.requested_parameters.client_handle = self.monitored_item_handle.next();
1796 }
1797 }
1798
1799 let pre_insert = PreInsertMonitoredItems::new(
1803 &self.subscription_state,
1804 subscription_id,
1805 &items_to_create,
1806 );
1807
1808 let result = CreateMonitoredItems::new(subscription_id, self)
1809 .items_to_create(items_to_create)
1810 .timestamps_to_return(timestamps_to_return)
1811 .send(&self.channel)
1812 .await?;
1813
1814 pre_insert.finish(&result.results);
1816
1817 Ok(result.results)
1818 }
1819
1820 pub async fn modify_monitored_items(
1837 &self,
1838 subscription_id: u32,
1839 timestamps_to_return: TimestampsToReturn,
1840 items_to_modify: &[MonitoredItemModifyRequest],
1841 ) -> Result<Vec<MonitoredItemModifyResult>, StatusCode> {
1842 {
1843 let state = trace_lock!(self.subscription_state);
1844 if !state.subscription_exists(subscription_id) {
1845 session_error!(
1846 self,
1847 "modify_monitored_items, subscription id {} does not exist",
1848 subscription_id
1849 );
1850 return Err(StatusCode::BadSubscriptionIdInvalid);
1851 }
1852 }
1853 let ids = items_to_modify
1854 .iter()
1855 .map(|i| i.monitored_item_id)
1856 .collect::<Vec<_>>();
1857 let results = ModifyMonitoredItems::new(subscription_id, self)
1858 .timestamps_to_return(timestamps_to_return)
1859 .items_to_modify(items_to_modify.to_vec())
1860 .send(&self.channel)
1861 .await?
1862 .results
1863 .unwrap_or_default();
1864
1865 let items_to_modify = ids
1866 .iter()
1867 .zip(results.iter())
1868 .map(|(id, r)| ModifyMonitoredItem {
1869 id: *id,
1870 queue_size: r.revised_queue_size,
1871 sampling_interval: r.revised_sampling_interval,
1872 })
1873 .collect::<Vec<ModifyMonitoredItem>>();
1874 {
1875 let mut subscription_state = trace_lock!(self.subscription_state);
1876 subscription_state.modify_monitored_items(subscription_id, &items_to_modify);
1877 }
1878
1879 Ok(results)
1880 }
1881
1882 pub async fn set_monitoring_mode(
1899 &self,
1900 subscription_id: u32,
1901 monitoring_mode: MonitoringMode,
1902 monitored_item_ids: &[u32],
1903 ) -> Result<Vec<StatusCode>, StatusCode> {
1904 {
1905 let state = trace_lock!(self.subscription_state);
1906 if !state.subscription_exists(subscription_id) {
1907 session_error!(
1908 self,
1909 "set_monitoring_mode, subscription id {} does not exist",
1910 subscription_id
1911 );
1912 return Err(StatusCode::BadSubscriptionIdInvalid);
1913 }
1914 }
1915 let results = SetMonitoringMode::new(subscription_id, monitoring_mode, self)
1916 .monitored_item_ids(monitored_item_ids.to_vec())
1917 .send(&self.channel)
1918 .await?
1919 .results
1920 .unwrap_or_default();
1921
1922 let ok_ids: Vec<_> = monitored_item_ids
1923 .iter()
1924 .zip(results.iter())
1925 .filter(|(_, s)| s.is_good())
1926 .map(|(v, _)| *v)
1927 .collect();
1928 {
1929 let mut subscription_state = trace_lock!(self.subscription_state);
1930 subscription_state.set_monitoring_mode(subscription_id, &ok_ids, monitoring_mode);
1931 }
1932
1933 Ok(results)
1934 }
1935
1936 pub async fn set_triggering(
1955 &self,
1956 subscription_id: u32,
1957 triggering_item_id: u32,
1958 links_to_add: &[u32],
1959 links_to_remove: &[u32],
1960 ) -> Result<(Option<Vec<StatusCode>>, Option<Vec<StatusCode>>), StatusCode> {
1961 {
1962 let state = trace_lock!(self.subscription_state);
1963 if !state.subscription_exists(subscription_id) {
1964 session_error!(
1965 self,
1966 "set_triggering, subscription id {} does not exist",
1967 subscription_id
1968 );
1969 return Err(StatusCode::BadSubscriptionIdInvalid);
1970 }
1971 }
1972 let response = SetTriggering::new(subscription_id, triggering_item_id, self)
1973 .links_to_add(links_to_add.to_vec())
1974 .links_to_remove(links_to_remove.to_vec())
1975 .send(&self.channel)
1976 .await?;
1977
1978 let to_add_res = response.add_results.as_deref().unwrap_or(&[]);
1979 let to_remove_res = response.remove_results.as_deref().unwrap_or(&[]);
1980
1981 let ok_adds = to_add_res
1982 .iter()
1983 .zip(links_to_add)
1984 .filter(|(s, _)| s.is_good())
1985 .map(|(_, v)| v)
1986 .copied()
1987 .collect::<Vec<_>>();
1988 let ok_removes = to_remove_res
1989 .iter()
1990 .zip(links_to_remove)
1991 .filter(|(s, _)| s.is_good())
1992 .map(|(_, v)| v)
1993 .copied()
1994 .collect::<Vec<_>>();
1995
1996 let mut subscription_state = trace_lock!(self.subscription_state);
1998 subscription_state.set_triggering(
1999 subscription_id,
2000 triggering_item_id,
2001 &ok_adds,
2002 &ok_removes,
2003 );
2004 Ok((response.add_results, response.remove_results))
2005 }
2006
2007 pub async fn delete_monitored_items(
2023 &self,
2024 subscription_id: u32,
2025 items_to_delete: &[u32],
2026 ) -> Result<Vec<StatusCode>, StatusCode> {
2027 {
2028 let state = trace_lock!(self.subscription_state);
2029 if !state.subscription_exists(subscription_id) {
2030 session_error!(
2031 self,
2032 "delete_monitored_items, subscription id {} does not exist",
2033 subscription_id
2034 );
2035 return Err(StatusCode::BadSubscriptionIdInvalid);
2036 }
2037 }
2038 let response = DeleteMonitoredItems::new(subscription_id, self)
2039 .items_to_delete(items_to_delete.to_vec())
2040 .send(&self.channel)
2041 .await?
2042 .results
2043 .unwrap_or_default();
2044 let mut subscription_state = trace_lock!(self.subscription_state);
2045 subscription_state.delete_monitored_items(subscription_id, items_to_delete);
2046 Ok(response)
2047 }
2048
2049 pub(crate) fn next_publish_time(&self, set_last_publish: bool) -> Option<Instant> {
2050 let mut subscription_state = trace_lock!(self.subscription_state);
2051 if set_last_publish {
2052 subscription_state.set_last_publish();
2053 }
2054 subscription_state.next_publish_time()
2055 }
2056
2057 pub(crate) async fn publish(&self) -> Result<bool, StatusCode> {
2060 let acks = {
2061 let mut subscription_state = trace_lock!(self.subscription_state);
2062 let acks = subscription_state.take_acknowledgements();
2063 if !acks.is_empty() {
2064 Some(acks)
2065 } else {
2066 None
2067 }
2068 };
2069
2070 match Publish::new(self)
2071 .acks(acks.clone().unwrap_or_default())
2072 .send(&self.channel)
2073 .await
2074 {
2075 Ok(r) => {
2076 let mut subscription_state = trace_lock!(self.subscription_state);
2077 subscription_state.handle_notification(r.subscription_id, r.notification_message);
2078 Ok(r.more_notifications)
2079 }
2080 Err(e) => {
2081 if let Some(acks) = acks {
2082 let mut subscription_state = trace_lock!(self.subscription_state);
2083 subscription_state.re_queue_acknowledgements(acks);
2084 }
2085 Err(e)
2086 }
2087 }
2088 }
2089
2090 pub async fn republish(
2105 &self,
2106 subscription_id: u32,
2107 sequence_number: u32,
2108 ) -> Result<NotificationMessage, StatusCode> {
2109 let res = Republish::new(subscription_id, sequence_number, self)
2110 .send(&self.channel)
2111 .await?;
2112
2113 {
2114 let mut lck = trace_lock!(self.subscription_state);
2115 lck.add_acknowledgement(subscription_id, sequence_number);
2116 }
2117
2118 Ok(res.notification_message)
2119 }
2120
2121 pub(crate) async fn transfer_subscriptions_from_old_session(&self) {
2124 let subscription_ids = {
2125 let subscription_state = trace_lock!(self.subscription_state);
2126 subscription_state.subscription_ids()
2127 };
2128
2129 let Some(subscription_ids) = subscription_ids else {
2130 return;
2131 };
2132
2133 let mut subscription_ids_to_recreate =
2137 subscription_ids.iter().copied().collect::<HashSet<u32>>();
2138 if let Ok(transfer_results) = self.transfer_subscriptions(&subscription_ids, true).await {
2139 session_debug!(self, "transfer_results = {:?}", transfer_results);
2140 transfer_results.iter().enumerate().for_each(|(i, r)| {
2141 if r.status_code.is_good() {
2142 subscription_ids_to_recreate.remove(&subscription_ids[i]);
2144 }
2145 });
2146 }
2147
2148 if !subscription_ids_to_recreate.is_empty() {
2150 session_warn!(self, "Some or all of the existing subscriptions could not be transferred and must be created manually");
2151 }
2152
2153 for subscription_id in subscription_ids_to_recreate {
2154 session_debug!(self, "Recreating subscription {}", subscription_id);
2155
2156 let deleted_subscription = {
2157 let mut subscription_state = trace_lock!(self.subscription_state);
2158 subscription_state.delete_subscription(subscription_id)
2159 };
2160
2161 let Some(subscription) = deleted_subscription else {
2162 session_warn!(
2163 self,
2164 "Subscription removed from session while transfer in progress"
2165 );
2166 continue;
2167 };
2168
2169 let Ok(subscription_id) = self
2170 .create_subscription_inner(
2171 subscription.publishing_interval,
2172 subscription.lifetime_count,
2173 subscription.max_keep_alive_count,
2174 subscription.max_notifications_per_publish,
2175 subscription.publishing_enabled,
2176 subscription.priority,
2177 subscription.callback,
2178 )
2179 .await
2180 else {
2181 session_warn!(
2182 self,
2183 "Could not create a subscription from the existing subscription {}",
2184 subscription_id
2185 );
2186 continue;
2187 };
2188
2189 let items_to_create = subscription
2190 .monitored_items
2191 .values()
2192 .map(|item| MonitoredItemCreateRequest {
2193 item_to_monitor: item.item_to_monitor().clone(),
2194 monitoring_mode: item.monitoring_mode,
2195 requested_parameters: MonitoringParameters {
2196 client_handle: item.client_handle(),
2197 sampling_interval: item.sampling_interval(),
2198 filter: item.filter.clone(),
2199 queue_size: item.queue_size() as u32,
2200 discard_oldest: item.discard_oldest(),
2201 },
2202 })
2203 .collect::<Vec<MonitoredItemCreateRequest>>();
2204
2205 let mut iter = items_to_create.into_iter();
2206
2207 loop {
2208 let chunk = (&mut iter)
2209 .take(self.recreate_monitored_items_chunk)
2210 .collect::<Vec<_>>();
2211
2212 if chunk.is_empty() {
2213 break;
2214 }
2215
2216 let _ = self
2217 .create_monitored_items(subscription_id, TimestampsToReturn::Both, chunk)
2218 .await;
2219 }
2220
2221 for item in subscription.monitored_items.values() {
2222 let triggered_items = item.triggered_items();
2223 if !triggered_items.is_empty() {
2224 let links_to_add = triggered_items.iter().copied().collect::<Vec<u32>>();
2225 let _ = self
2226 .set_triggering(subscription_id, item.id(), links_to_add.as_slice(), &[])
2227 .await;
2228 }
2229 }
2230 }
2231 }
2232}