Skip to main content

opcua_client/session/services/subscriptions/
service.rs

1use std::{
2    collections::HashSet,
3    time::{Duration, Instant},
4};
5
6use crate::{
7    session::{
8        process_service_result, process_unexpected_response,
9        request_builder::{builder_base, builder_debug, builder_error, RequestHeaderBuilder},
10        services::subscriptions::{
11            callbacks::OnSubscriptionNotificationCore, ModifyMonitoredItem,
12            PreInsertMonitoredItems, Subscription,
13        },
14        session_debug, session_error, session_warn,
15    },
16    Session, UARequest,
17};
18use opcua_core::{handle::AtomicHandle, sync::Mutex, trace_lock, ResponseMessage};
19use opcua_types::{
20    AttributeId, CreateMonitoredItemsRequest, CreateSubscriptionRequest,
21    CreateSubscriptionResponse, DeleteMonitoredItemsRequest, DeleteMonitoredItemsResponse,
22    DeleteSubscriptionsRequest, DeleteSubscriptionsResponse, DiagnosticInfo, IntegerId,
23    ModifyMonitoredItemsRequest, ModifyMonitoredItemsResponse, ModifySubscriptionRequest,
24    ModifySubscriptionResponse, MonitoredItemCreateRequest, MonitoredItemCreateResult,
25    MonitoredItemModifyRequest, MonitoredItemModifyResult, MonitoringMode, MonitoringParameters,
26    NodeId, NotificationMessage, PublishRequest, PublishResponse, ReadValueId, RepublishRequest,
27    RepublishResponse, ResponseHeader, SetMonitoringModeRequest, SetMonitoringModeResponse,
28    SetPublishingModeRequest, SetPublishingModeResponse, SetTriggeringRequest,
29    SetTriggeringResponse, StatusCode, SubscriptionAcknowledgement, TimestampsToReturn,
30    TransferResult, TransferSubscriptionsRequest, TransferSubscriptionsResponse,
31};
32use tracing::enabled;
33
34use super::state::SubscriptionState;
35
36/// Create a subscription by sending a [`CreateSubscriptionRequest`] to the server.
37///
38/// See OPC UA Part 4 - Services 5.13.2 for complete description of the service and error responses.
39pub struct CreateSubscription {
40    publishing_interval: Duration,
41    lifetime_count: u32,
42    keep_alive_count: u32,
43    max_notifications_per_publish: u32,
44    publishing_enabled: bool,
45    priority: u8,
46
47    header: RequestHeaderBuilder,
48}
49
50builder_base!(CreateSubscription);
51
52impl CreateSubscription {
53    /// Construct a new call to the `CreateSubscription` service.
54    pub fn new(session: &Session) -> Self {
55        Self {
56            publishing_interval: Duration::from_millis(500),
57            lifetime_count: 60,
58            keep_alive_count: 20,
59            max_notifications_per_publish: 0,
60            publishing_enabled: true,
61            priority: 0,
62            header: RequestHeaderBuilder::new_from_session(session),
63        }
64    }
65
66    /// Construct a new call to the `CreateSubscription` service, setting header parameters manually.
67    pub fn new_manual(
68        session_id: u32,
69        timeout: Duration,
70        auth_token: NodeId,
71        request_handle: IntegerId,
72    ) -> Self {
73        Self {
74            publishing_interval: Duration::from_millis(500),
75            lifetime_count: 60,
76            keep_alive_count: 20,
77            max_notifications_per_publish: 0,
78            publishing_enabled: true,
79            priority: 0,
80            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
81        }
82    }
83
84    /// The requested publishing interval defines the cyclic rate that
85    /// the Subscription is being requested to return Notifications to the Client. This interval
86    /// is expressed in milliseconds. This interval is represented by the publishing timer in the
87    /// Subscription state table. The negotiated value for this parameter returned in the
88    /// response is used as the default sampling interval for MonitoredItems assigned to this
89    /// Subscription. If the requested value is 0 or negative, the server shall revise with the
90    /// fastest supported publishing interval in milliseconds.
91    pub fn publishing_interval(mut self, interval: Duration) -> Self {
92        self.publishing_interval = interval;
93        self
94    }
95
96    /// Requested lifetime count. The lifetime count shall be a minimum of
97    /// three times the keep keep-alive count. When the publishing timer has expired this
98    /// number of times without a Publish request being available to send a NotificationMessage,
99    /// then the Subscription shall be deleted by the Server.
100    pub fn max_lifetime_count(mut self, lifetime_count: u32) -> Self {
101        self.lifetime_count = lifetime_count;
102        self
103    }
104
105    /// Requested maximum keep-alive count. When the publishing timer has
106    /// expired this number of times without requiring any NotificationMessage to be sent, the
107    /// Subscription sends a keep-alive Message to the Client. The negotiated value for this
108    /// parameter is returned in the response. If the requested value is 0, the server shall
109    /// revise with the smallest supported keep-alive count.
110    pub fn max_keep_alive_count(mut self, keep_alive_count: u32) -> Self {
111        self.keep_alive_count = keep_alive_count;
112        self
113    }
114
115    /// The maximum number of notifications that the Client
116    /// wishes to receive in a single Publish response. A value of zero indicates that there is
117    /// no limit. The number of notifications per Publish is the sum of monitoredItems in
118    /// the DataChangeNotification and events in the EventNotificationList.
119    pub fn max_notifications_per_publish(mut self, max_notifications_per_publish: u32) -> Self {
120        self.max_notifications_per_publish = max_notifications_per_publish;
121        self
122    }
123
124    /// Indicates the relative priority of the Subscription. When more than one
125    /// Subscription needs to send Notifications, the Server should de-queue a Publish request
126    /// to the Subscription with the highest priority number. For Subscriptions with equal
127    /// priority the Server should de-queue Publish requests in a round-robin fashion.
128    pub fn priority(mut self, priority: u8) -> Self {
129        self.priority = priority;
130        self
131    }
132
133    /// A boolean parameter with the following values - `true` publishing
134    /// is enabled for the Subscription, `false`, publishing is disabled for the Subscription.
135    /// The value of this parameter does not affect the value of the monitoring mode Attribute of
136    /// MonitoredItems.
137    pub fn publishing_enabled(mut self, publishing_enabled: bool) -> Self {
138        self.publishing_enabled = publishing_enabled;
139        self
140    }
141}
142
143impl UARequest for CreateSubscription {
144    type Out = CreateSubscriptionResponse;
145
146    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
147    where
148        Self: 'a,
149    {
150        let request = CreateSubscriptionRequest {
151            request_header: self.header.header,
152            requested_publishing_interval: self.publishing_interval.as_millis() as f64,
153            requested_lifetime_count: self.lifetime_count,
154            requested_max_keep_alive_count: self.keep_alive_count,
155            max_notifications_per_publish: self.max_notifications_per_publish,
156            publishing_enabled: self.publishing_enabled,
157            priority: self.priority,
158        };
159
160        let response = channel.send(request, self.header.timeout).await?;
161
162        if let ResponseMessage::CreateSubscription(response) = response {
163            process_service_result(&response.response_header)?;
164            builder_debug!(
165                self,
166                "create_subscription, created a subscription with id {}",
167                response.subscription_id
168            );
169            Ok(*response)
170        } else {
171            builder_error!(self, "create_subscription failed {:?}", response);
172            Err(process_unexpected_response(response))
173        }
174    }
175}
176
177/// Modifies a subscription by sending a [`ModifySubscriptionRequest`] to the server.
178///
179/// See OPC UA Part 4 - Services 5.13.3 for complete description of the service and error responses.
180#[derive(Clone)]
181pub struct ModifySubscription {
182    subscription_id: u32,
183    publishing_interval: Duration,
184    lifetime_count: u32,
185    keep_alive_count: u32,
186    max_notifications_per_publish: u32,
187    priority: u8,
188
189    header: RequestHeaderBuilder,
190}
191
192builder_base!(ModifySubscription);
193
194impl ModifySubscription {
195    /// Construct a new call to the `ModifySubscription` service.
196    pub fn new(subscription_id: u32, session: &Session) -> Self {
197        Self {
198            subscription_id,
199            publishing_interval: Duration::from_millis(500),
200            lifetime_count: 60,
201            keep_alive_count: 20,
202            max_notifications_per_publish: 0,
203            priority: 0,
204            header: RequestHeaderBuilder::new_from_session(session),
205        }
206    }
207
208    /// Construct a new call to the `ModifySubscription` service, setting header parameters manually.
209    pub fn new_manual(
210        subscription_id: u32,
211        session_id: u32,
212        timeout: Duration,
213        auth_token: NodeId,
214        request_handle: IntegerId,
215    ) -> Self {
216        Self {
217            subscription_id,
218            publishing_interval: Duration::from_millis(500),
219            lifetime_count: 60,
220            keep_alive_count: 20,
221            max_notifications_per_publish: 0,
222            priority: 0,
223            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
224        }
225    }
226
227    /// The requested publishing interval defines the cyclic rate that
228    /// the Subscription is being requested to return Notifications to the Client. This interval
229    /// is expressed in milliseconds. This interval is represented by the publishing timer in the
230    /// Subscription state table. The negotiated value for this parameter returned in the
231    /// response is used as the default sampling interval for MonitoredItems assigned to this
232    /// Subscription. If the requested value is 0 or negative, the server shall revise with the
233    /// fastest supported publishing interval in milliseconds.
234    pub fn publishing_interval(mut self, interval: Duration) -> Self {
235        self.publishing_interval = interval;
236        self
237    }
238
239    /// Requested lifetime count. The lifetime count shall be a minimum of
240    /// three times the keep keep-alive count. When the publishing timer has expired this
241    /// number of times without a Publish request being available to send a NotificationMessage,
242    /// then the Subscription shall be deleted by the Server.
243    pub fn max_lifetime_count(mut self, lifetime_count: u32) -> Self {
244        self.lifetime_count = lifetime_count;
245        self
246    }
247
248    /// Requested maximum keep-alive count. When the publishing timer has
249    /// expired this number of times without requiring any NotificationMessage to be sent, the
250    /// Subscription sends a keep-alive Message to the Client. The negotiated value for this
251    /// parameter is returned in the response. If the requested value is 0, the server shall
252    /// revise with the smallest supported keep-alive count.
253    pub fn max_keep_alive_count(mut self, keep_alive_count: u32) -> Self {
254        self.keep_alive_count = keep_alive_count;
255        self
256    }
257
258    /// The maximum number of notifications that the Client
259    /// wishes to receive in a single Publish response. A value of zero indicates that there is
260    /// no limit. The number of notifications per Publish is the sum of monitoredItems in
261    /// the DataChangeNotification and events in the EventNotificationList.
262    pub fn max_notifications_per_publish(mut self, max_notifications_per_publish: u32) -> Self {
263        self.max_notifications_per_publish = max_notifications_per_publish;
264        self
265    }
266
267    /// Indicates the relative priority of the Subscription. When more than one
268    /// Subscription needs to send Notifications, the Server should de-queue a Publish request
269    /// to the Subscription with the highest priority number. For Subscriptions with equal
270    /// priority the Server should de-queue Publish requests in a round-robin fashion.
271    pub fn priority(mut self, priority: u8) -> Self {
272        self.priority = priority;
273        self
274    }
275}
276
277impl UARequest for ModifySubscription {
278    type Out = ModifySubscriptionResponse;
279
280    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
281    where
282        Self: 'a,
283    {
284        if self.subscription_id == 0 {
285            builder_error!(
286                self,
287                "modify_subscription, subscription id must be non-zero"
288            );
289            return Err(StatusCode::BadInvalidArgument);
290        }
291
292        let request = ModifySubscriptionRequest {
293            request_header: self.header.header,
294            subscription_id: self.subscription_id,
295            requested_publishing_interval: self.publishing_interval.as_millis() as f64,
296            requested_lifetime_count: self.lifetime_count,
297            requested_max_keep_alive_count: self.keep_alive_count,
298            max_notifications_per_publish: self.max_notifications_per_publish,
299            priority: self.priority,
300        };
301
302        let response = channel.send(request, self.header.timeout).await?;
303
304        if let ResponseMessage::ModifySubscription(response) = response {
305            process_service_result(&response.response_header)?;
306            builder_debug!(
307                self,
308                "modify_subscription success for {}",
309                self.subscription_id
310            );
311            Ok(*response)
312        } else {
313            builder_debug!(self, "modify_subscription failed");
314            Err(process_unexpected_response(response))
315        }
316    }
317}
318
319/// Changes the publishing mode of subscriptions by sending a [`SetPublishingModeRequest`] to the server.
320///
321/// See OPC UA Part 4 - Services 5.13.4 for complete description of the service and error responses.
322#[derive(Clone)]
323pub struct SetPublishingMode {
324    subscription_ids: Vec<u32>,
325    publishing_enabled: bool,
326
327    header: RequestHeaderBuilder,
328}
329
330builder_base!(SetPublishingMode);
331
332impl SetPublishingMode {
333    /// Construct a new call to the `SetPublishingMode` service.
334    pub fn new(publishing_enabled: bool, session: &Session) -> Self {
335        Self {
336            subscription_ids: Vec::new(),
337            publishing_enabled,
338            header: RequestHeaderBuilder::new_from_session(session),
339        }
340    }
341
342    /// Construct a new call to the `SetPublishingMode` service, setting header parameters manually.
343    pub fn new_manual(
344        publishing_enabled: bool,
345        session_id: u32,
346        timeout: Duration,
347        auth_token: NodeId,
348        request_handle: IntegerId,
349    ) -> Self {
350        Self {
351            subscription_ids: Vec::new(),
352            publishing_enabled,
353            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
354        }
355    }
356
357    /// Set the subscription IDs to update, overwriting any that were added previously.
358    pub fn subscription_ids(mut self, subscription_ids: Vec<u32>) -> Self {
359        self.subscription_ids = subscription_ids;
360        self
361    }
362
363    /// Add a subscription ID to update.
364    pub fn subscription(mut self, subscription_id: u32) -> Self {
365        self.subscription_ids.push(subscription_id);
366        self
367    }
368}
369
370impl UARequest for SetPublishingMode {
371    type Out = SetPublishingModeResponse;
372
373    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
374    where
375        Self: 'a,
376    {
377        builder_debug!(
378            self,
379            "set_publishing_mode, for subscriptions {:?}, publishing enabled {}",
380            self.subscription_ids,
381            self.publishing_enabled
382        );
383        if self.subscription_ids.is_empty() {
384            builder_error!(
385                self,
386                "set_publishing_mode, no subscription ids were provided"
387            );
388            return Err(StatusCode::BadNothingToDo);
389        }
390
391        let request = SetPublishingModeRequest {
392            request_header: self.header.header,
393            publishing_enabled: self.publishing_enabled,
394            subscription_ids: Some(self.subscription_ids.clone()),
395        };
396        let response = channel.send(request, self.header.timeout).await?;
397        if let ResponseMessage::SetPublishingMode(response) = response {
398            process_service_result(&response.response_header)?;
399            let num_results = response
400                .results
401                .as_ref()
402                .map(|l| l.len())
403                .unwrap_or_default();
404
405            if num_results != self.subscription_ids.len() {
406                builder_error!(
407                    self,
408                    "set_publishing_mode returned an incorrect number of results. Expected {}, got {}",
409                    self.subscription_ids.len(),
410                    num_results
411                );
412                return Err(StatusCode::BadUnexpectedError);
413            }
414
415            builder_debug!(self, "set_publishing_mode success");
416            Ok(*response)
417        } else {
418            builder_error!(self, "set_publishing_mode failed {:?}", response);
419            Err(process_unexpected_response(response))
420        }
421    }
422}
423
424#[derive(Clone)]
425/// Send a [`PublishRequest`] to the server to receive notifications from subscriptions.
426///
427/// See OPC UA Part 4 - Services 5.13.5 for complete description of the service and error responses.
428pub struct Publish {
429    header: RequestHeaderBuilder,
430    acks: Vec<SubscriptionAcknowledgement>,
431}
432
433builder_base!(Publish);
434
435impl Publish {
436    /// Construct a new call to the `Publish` service.
437    pub fn new(session: &Session) -> Self {
438        Self {
439            header: RequestHeaderBuilder::new_from_session(session),
440            acks: Vec::new(),
441        }
442        .timeout(session.publish_timeout)
443    }
444
445    /// Construct a new call to the `Publish` service, setting header parameters manually.
446    pub fn new_manual(
447        session_id: u32,
448        timeout: Duration,
449        auth_token: NodeId,
450        request_handle: IntegerId,
451    ) -> Self {
452        Self {
453            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
454            acks: Vec::new(),
455        }
456    }
457
458    /// Set the subscription acknowledgements to send, overwriting any that were added previously.
459    pub fn acks(mut self, acks: Vec<SubscriptionAcknowledgement>) -> Self {
460        self.acks = acks;
461        self
462    }
463
464    /// Add a subscription acknowledgement to send.
465    pub fn ack(mut self, subscription_id: u32, sequence_number: u32) -> Self {
466        self.acks.push(SubscriptionAcknowledgement {
467            subscription_id,
468            sequence_number,
469        });
470        self
471    }
472}
473
474impl UARequest for Publish {
475    type Out = PublishResponse;
476
477    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
478    where
479        Self: 'a,
480    {
481        if enabled!(tracing::Level::DEBUG) {
482            let sequence_nrs: Vec<u32> = self.acks.iter().map(|ack| ack.sequence_number).collect();
483            builder_debug!(
484                self,
485                "publish, acknowledging subscription acknowledgements with sequence nrs {:?}",
486                sequence_nrs
487            );
488        }
489        let request = PublishRequest {
490            request_header: self.header.header,
491            subscription_acknowledgements: if self.acks.is_empty() {
492                None
493            } else {
494                Some(self.acks)
495            },
496        };
497        let response = channel.send(request, self.header.timeout).await?;
498        if let ResponseMessage::Publish(response) = response {
499            process_service_result(&response.response_header)?;
500            builder_debug!(self, "publish success");
501            Ok(*response)
502        } else {
503            builder_error!(self, "publish failed {:?}", response);
504            Err(process_unexpected_response(response))
505        }
506    }
507}
508
509/// Republishes notifications from a subscription by sending a [`RepublishRequest`] to the server.
510///
511/// See OPC UA Part 4 - Services 5.13.6 for complete description of the service and error responses.
512pub struct Republish {
513    subscription_id: u32,
514    retransmit_sequence_number: u32,
515
516    header: RequestHeaderBuilder,
517}
518
519builder_base!(Republish);
520
521impl Republish {
522    /// Construct a new call to the `Republish` service.
523    pub fn new(subscription_id: u32, retransmit_sequence_number: u32, session: &Session) -> Self {
524        Self {
525            subscription_id,
526            retransmit_sequence_number,
527            header: RequestHeaderBuilder::new_from_session(session),
528        }
529    }
530
531    /// Construct a new call to the `Republish` service, setting header parameters manually.
532    pub fn new_manual(
533        subscription_id: u32,
534        retransmit_sequence_number: u32,
535        session_id: u32,
536        timeout: Duration,
537        auth_token: NodeId,
538        request_handle: IntegerId,
539    ) -> Self {
540        Self {
541            subscription_id,
542            retransmit_sequence_number,
543            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
544        }
545    }
546}
547
548impl UARequest for Republish {
549    type Out = RepublishResponse;
550    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
551    where
552        Self: 'a,
553    {
554        let request = RepublishRequest {
555            request_header: self.header.header,
556            subscription_id: self.subscription_id,
557            retransmit_sequence_number: self.retransmit_sequence_number,
558        };
559
560        let response = channel.send(request, self.header.timeout).await?;
561
562        if let ResponseMessage::Republish(response) = response {
563            process_service_result(&response.response_header)?;
564            builder_debug!(self, "republish success");
565            Ok(*response)
566        } else {
567            builder_error!(self, "republish failed {:?}", response);
568            Err(process_unexpected_response(response))
569        }
570    }
571}
572
573#[derive(Clone)]
574/// Transfers Subscriptions and their MonitoredItems from one Session to another. For example,
575/// a Client may need to reopen a Session and then transfer its Subscriptions to that Session.
576/// It may also be used by one Client to take over a Subscription from another Client by
577/// transferring the Subscription to its Session.
578///
579/// Note that if you call this manually, you will need to register the
580/// subscriptions in the subscription state ([`Session::subscription_state`]) in order to
581/// receive notifications.
582///
583/// See OPC UA Part 4 - Services 5.13.7 for complete description of the service and error responses.
584///
585pub struct TransferSubscriptions {
586    subscription_ids: Vec<u32>,
587    send_initial_values: bool,
588
589    header: RequestHeaderBuilder,
590}
591
592builder_base!(TransferSubscriptions);
593
594impl TransferSubscriptions {
595    /// Construct a new call to the `TransferSubscriptions` service.
596    pub fn new(session: &Session) -> Self {
597        Self {
598            subscription_ids: Vec::new(),
599            send_initial_values: false,
600            header: RequestHeaderBuilder::new_from_session(session),
601        }
602    }
603
604    /// Construct a new call to the `TransferSubscriptions` service, setting header parameters manually.
605    pub fn new_manual(
606        session_id: u32,
607        timeout: Duration,
608        auth_token: NodeId,
609        request_handle: IntegerId,
610    ) -> Self {
611        Self {
612            subscription_ids: Vec::new(),
613            send_initial_values: false,
614            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
615        }
616    }
617    /// A boolean parameter with the following values - `true` the first
618    /// publish response shall contain the current values of all monitored items in the subscription,
619    /// `false`, the first publish response shall contain only the value changes since the last
620    /// publish response was sent.
621    pub fn send_initial_values(mut self, send_initial_values: bool) -> Self {
622        self.send_initial_values = send_initial_values;
623        self
624    }
625
626    /// Set the subscription IDs to transfer, overwriting any that were added previously.
627    pub fn subscription_ids(mut self, subscription_ids: Vec<u32>) -> Self {
628        self.subscription_ids = subscription_ids;
629        self
630    }
631
632    /// Add a subscription ID to transfer.
633    pub fn subscription(mut self, subscription_id: u32) -> Self {
634        self.subscription_ids.push(subscription_id);
635        self
636    }
637}
638
639impl UARequest for TransferSubscriptions {
640    type Out = TransferSubscriptionsResponse;
641
642    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
643    where
644        Self: 'a,
645    {
646        if self.subscription_ids.is_empty() {
647            builder_error!(
648                self,
649                "transfer_subscriptions, no subscription ids were provided"
650            );
651            return Err(StatusCode::BadNothingToDo);
652        }
653        let request = TransferSubscriptionsRequest {
654            request_header: self.header.header,
655            subscription_ids: Some(self.subscription_ids),
656            send_initial_values: self.send_initial_values,
657        };
658        let response = channel.send(request, self.header.timeout).await?;
659        if let ResponseMessage::TransferSubscriptions(response) = response {
660            process_service_result(&response.response_header)?;
661            builder_debug!(self, "transfer_subscriptions success");
662            Ok(*response)
663        } else {
664            builder_error!(self, "transfer_subscriptions failed {:?}", response);
665            Err(process_unexpected_response(response))
666        }
667    }
668}
669
670#[derive(Clone)]
671/// Deletes subscriptions by sending a [`DeleteSubscriptionsRequest`] to the server with the list
672/// of subscriptions to delete.
673///
674/// See OPC UA Part 4 - Services 5.13.8 for complete description of the service and error responses.
675pub struct DeleteSubscriptions {
676    subscription_ids: Vec<u32>,
677
678    header: RequestHeaderBuilder,
679}
680
681builder_base!(DeleteSubscriptions);
682
683impl DeleteSubscriptions {
684    /// Construct a new call to the `DeleteSubscriptions` service.
685    pub fn new(session: &Session) -> Self {
686        Self {
687            subscription_ids: Vec::new(),
688            header: RequestHeaderBuilder::new_from_session(session),
689        }
690    }
691
692    /// Construct a new call to the `DeleteSubscriptions` service, setting header parameters manually.
693    pub fn new_manual(
694        session_id: u32,
695        timeout: Duration,
696        auth_token: NodeId,
697        request_handle: IntegerId,
698    ) -> Self {
699        Self {
700            subscription_ids: Vec::new(),
701            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
702        }
703    }
704
705    /// Set the subscription IDs to delete, overwriting any that were added previously.
706    pub fn subscription_ids(mut self, subscription_ids: Vec<u32>) -> Self {
707        self.subscription_ids = subscription_ids;
708        self
709    }
710
711    /// Add a subscription ID to delete.
712    pub fn subscription(mut self, subscription_id: u32) -> Self {
713        self.subscription_ids.push(subscription_id);
714        self
715    }
716}
717
718impl UARequest for DeleteSubscriptions {
719    type Out = DeleteSubscriptionsResponse;
720
721    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
722    where
723        Self: 'a,
724    {
725        if self.subscription_ids.is_empty() {
726            builder_error!(self, "delete_subscriptions called with no subscription IDs");
727            return Err(StatusCode::BadNothingToDo);
728        }
729        let request = DeleteSubscriptionsRequest {
730            request_header: self.header.header,
731            subscription_ids: Some(self.subscription_ids.clone()),
732        };
733        let response = channel.send(request, self.header.timeout).await?;
734        if let ResponseMessage::DeleteSubscriptions(response) = response {
735            process_service_result(&response.response_header)?;
736
737            builder_debug!(self, "delete_subscriptions success");
738            Ok(*response)
739        } else {
740            builder_error!(self, "delete_subscriptions failed {:?}", response);
741            Err(process_unexpected_response(response))
742        }
743    }
744}
745
746#[derive(Clone)]
747/// Creates monitored items on a subscription by sending a [`CreateMonitoredItemsRequest`] to the server.
748///
749/// See OPC UA Part 4 - Services 5.12.2 for complete description of the service and error responses.
750pub struct CreateMonitoredItems<'a> {
751    subscription_id: u32,
752    timestamps_to_return: TimestampsToReturn,
753    items_to_create: Vec<MonitoredItemCreateRequest>,
754    handle: &'a AtomicHandle,
755
756    header: RequestHeaderBuilder,
757}
758
759builder_base!(CreateMonitoredItems<'a>);
760
761impl<'a> CreateMonitoredItems<'a> {
762    /// Construct a new call to the `CreateMonitoredItems` service.
763    pub fn new(subscription_id: u32, session: &'a Session) -> Self {
764        Self {
765            subscription_id,
766            timestamps_to_return: TimestampsToReturn::Neither,
767            items_to_create: Vec::new(),
768            handle: &session.monitored_item_handle,
769            header: RequestHeaderBuilder::new_from_session(session),
770        }
771    }
772
773    /// Construct a new call to the `CreateMonitoredItems` service, setting header parameters manually.
774    pub fn new_manual(
775        subscription_id: u32,
776        monitored_item_handle: &'a AtomicHandle,
777        session_id: u32,
778        timeout: Duration,
779        auth_token: NodeId,
780        request_handle: IntegerId,
781    ) -> Self {
782        Self {
783            subscription_id,
784            timestamps_to_return: TimestampsToReturn::Neither,
785            items_to_create: Vec::new(),
786            handle: monitored_item_handle,
787            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
788        }
789    }
790
791    /// An enumeration that specifies the timestamp Attributes to be transmitted for each MonitoredItem.
792    pub fn timestamps_to_return(mut self, timestamps_to_return: TimestampsToReturn) -> Self {
793        self.timestamps_to_return = timestamps_to_return;
794        self
795    }
796
797    /// Set the monitored items to create, overwriting any that were added previously.
798    pub fn items_to_create(mut self, items_to_create: Vec<MonitoredItemCreateRequest>) -> Self {
799        self.items_to_create = items_to_create;
800        self
801    }
802
803    /// Add a monitored item to create.
804    pub fn item(mut self, item: MonitoredItemCreateRequest) -> Self {
805        self.items_to_create.push(item);
806        self
807    }
808
809    /// Add a monitored item to create, subscribing to values on `node_id` with the
810    /// given `sampling_interval` and `queue_size`.
811    pub fn value(mut self, node_id: NodeId, sampling_interval: f64, queue_size: u32) -> Self {
812        self.items_to_create.push(MonitoredItemCreateRequest {
813            item_to_monitor: ReadValueId {
814                node_id,
815                attribute_id: AttributeId::Value as u32,
816                ..Default::default()
817            },
818            monitoring_mode: MonitoringMode::Reporting,
819            requested_parameters: MonitoringParameters {
820                client_handle: self.handle.next(),
821                sampling_interval,
822                queue_size,
823                discard_oldest: true,
824                ..Default::default()
825            },
826        });
827        self
828    }
829}
830
831#[derive(Debug, Clone)]
832/// The result of a [`CreateMonitoredItems`] request, including
833/// the requested parameters.
834pub struct CreatedMonitoredItem {
835    /// The monitored item result, including revised parameters.
836    pub result: MonitoredItemCreateResult,
837    /// The requested parameters.
838    pub requested_parameters: MonitoringParameters,
839    /// The requested monitoring mode.
840    pub monitoring_mode: MonitoringMode,
841    /// The requested item to monitor.
842    pub item_to_monitor: ReadValueId,
843}
844
845#[derive(Debug, Clone)]
846/// The result of a [`CreateMonitoredItems`] request, including
847/// the requested items.
848pub struct CreateMonitoredItemsResult {
849    /// The original response header.
850    pub response_header: ResponseHeader,
851    /// Optional diagnostic information, if requested.
852    pub diagnostic_infos: Option<Vec<DiagnosticInfo>>,
853    /// Created monitored items, with the requested parameters.
854    pub results: Vec<CreatedMonitoredItem>,
855}
856
857impl UARequest for CreateMonitoredItems<'_> {
858    type Out = CreateMonitoredItemsResult;
859
860    async fn send<'a>(
861        mut self,
862        channel: &'a crate::AsyncSecureChannel,
863    ) -> Result<Self::Out, StatusCode>
864    where
865        Self: 'a,
866    {
867        builder_debug!(
868            self,
869            "create_monitored_items, for subscription {}, {} items",
870            self.subscription_id,
871            self.items_to_create.len()
872        );
873        if self.subscription_id == 0 {
874            builder_error!(self, "create_monitored_items, subscription id 0 is invalid");
875            return Err(StatusCode::BadSubscriptionIdInvalid);
876        }
877
878        if self.items_to_create.is_empty() {
879            builder_error!(
880                self,
881                "create_monitored_items, called with no items to create"
882            );
883            return Err(StatusCode::BadNothingToDo);
884        }
885        for item in &mut self.items_to_create {
886            if item.requested_parameters.client_handle == 0 {
887                item.requested_parameters.client_handle = self.handle.next();
888            }
889        }
890
891        let num_items = self.items_to_create.len();
892        let request = CreateMonitoredItemsRequest {
893            request_header: self.header.header,
894            subscription_id: self.subscription_id,
895            timestamps_to_return: self.timestamps_to_return,
896            items_to_create: Some(self.items_to_create.clone()),
897        };
898
899        let response = channel.send(request, self.header.timeout).await?;
900
901        if let ResponseMessage::CreateMonitoredItems(response) = response {
902            process_service_result(&response.response_header)?;
903            if let Some(ref results) = response.results {
904                if results.len() != num_items {
905                    builder_error!(
906                        self,
907                        "create_monitored_items, unexpected number of results. Got {}, expected {}",
908                        results.len(),
909                        num_items
910                    );
911                    return Err(StatusCode::BadUnexpectedError);
912                }
913                builder_debug!(self, "create_monitored_items, {} items created", num_items);
914            } else {
915                builder_error!(
916                    self,
917                    "create_monitored_items, success but no monitored items were created"
918                );
919                return Err(StatusCode::BadUnexpectedError);
920            }
921
922            let created = response
923                .results
924                .unwrap_or_default()
925                .into_iter()
926                .zip(self.items_to_create)
927                .map(|(result, item)| CreatedMonitoredItem {
928                    result,
929                    requested_parameters: item.requested_parameters,
930                    monitoring_mode: item.monitoring_mode,
931                    item_to_monitor: item.item_to_monitor,
932                })
933                .collect();
934
935            Ok(CreateMonitoredItemsResult {
936                response_header: response.response_header,
937                diagnostic_infos: response.diagnostic_infos,
938                results: created,
939            })
940        } else {
941            builder_error!(self, "create_monitored_items failed {:?}", response);
942            Err(process_unexpected_response(response))
943        }
944    }
945}
946
947#[derive(Clone)]
948/// Modifies monitored items on a subscription by sending a [`ModifyMonitoredItemsRequest`] to the server.
949///
950/// See OPC UA Part 4 - Services 5.12.3 for complete description of the service and error responses.
951pub struct ModifyMonitoredItems {
952    subscription_id: u32,
953    timestamps_to_return: TimestampsToReturn,
954    items_to_modify: Vec<MonitoredItemModifyRequest>,
955
956    header: RequestHeaderBuilder,
957}
958
959builder_base!(ModifyMonitoredItems);
960
961impl ModifyMonitoredItems {
962    /// Construct a new call to the `ModifyMonitoredItems` service.
963    pub fn new(subscription_id: u32, session: &Session) -> Self {
964        Self {
965            subscription_id,
966            timestamps_to_return: TimestampsToReturn::Neither,
967            items_to_modify: Vec::new(),
968            header: RequestHeaderBuilder::new_from_session(session),
969        }
970    }
971
972    /// Construct a new call to the `ModifyMonitoredItems` service, setting header parameters manually.
973    pub fn new_manual(
974        subscription_id: u32,
975        session_id: u32,
976        timeout: Duration,
977        auth_token: NodeId,
978        request_handle: IntegerId,
979    ) -> Self {
980        Self {
981            subscription_id,
982            timestamps_to_return: TimestampsToReturn::Neither,
983            items_to_modify: Vec::new(),
984            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
985        }
986    }
987
988    /// An enumeration that specifies the timestamp Attributes to be transmitted for each MonitoredItem.
989    pub fn timestamps_to_return(mut self, timestamps_to_return: TimestampsToReturn) -> Self {
990        self.timestamps_to_return = timestamps_to_return;
991        self
992    }
993
994    /// Set the monitored items to modify, overwriting any that were added previously.
995    pub fn items_to_modify(mut self, items_to_modify: Vec<MonitoredItemModifyRequest>) -> Self {
996        self.items_to_modify = items_to_modify;
997        self
998    }
999
1000    /// Add a monitored item to modify.
1001    pub fn item(mut self, item: MonitoredItemModifyRequest) -> Self {
1002        self.items_to_modify.push(item);
1003        self
1004    }
1005}
1006
1007impl UARequest for ModifyMonitoredItems {
1008    type Out = ModifyMonitoredItemsResponse;
1009
1010    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
1011    where
1012        Self: 'a,
1013    {
1014        builder_debug!(
1015            self,
1016            "modify_monitored_items, for subscription {}, {} items",
1017            self.subscription_id,
1018            self.items_to_modify.len()
1019        );
1020        if self.subscription_id == 0 {
1021            builder_error!(self, "modify_monitored_items, subscription id 0 is invalid");
1022            return Err(StatusCode::BadInvalidArgument);
1023        }
1024        if self.items_to_modify.is_empty() {
1025            builder_error!(
1026                self,
1027                "modify_monitored_items, called with no items to modify"
1028            );
1029            return Err(StatusCode::BadNothingToDo);
1030        }
1031        let num_items = self.items_to_modify.len();
1032        let request = ModifyMonitoredItemsRequest {
1033            request_header: self.header.header,
1034            subscription_id: self.subscription_id,
1035            timestamps_to_return: self.timestamps_to_return,
1036            items_to_modify: Some(self.items_to_modify),
1037        };
1038        let response = channel.send(request, self.header.timeout).await?;
1039        if let ResponseMessage::ModifyMonitoredItems(response) = response {
1040            process_service_result(&response.response_header)?;
1041            let Some(results) = &response.results else {
1042                builder_error!(self, "modify_monitored_items, got empty response");
1043                return Err(StatusCode::BadUnexpectedError);
1044            };
1045            if results.len() != num_items {
1046                builder_error!(
1047                    self,
1048                    "modify_monitored_items, unexpected number of results. Expected {}, got {}",
1049                    num_items,
1050                    results.len()
1051                );
1052                return Err(StatusCode::BadUnexpectedError);
1053            }
1054
1055            builder_debug!(self, "modify_monitored_items, success");
1056            Ok(*response)
1057        } else {
1058            builder_error!(self, "modify_monitored_items failed {:?}", response);
1059            Err(process_unexpected_response(response))
1060        }
1061    }
1062}
1063
1064#[derive(Clone)]
1065/// Sets the monitoring mode on one or more monitored items by sending a [`SetMonitoringModeRequest`]
1066/// to the server.
1067///
1068/// See OPC UA Part 4 - Services 5.12.4 for complete description of the service and error responses.
1069pub struct SetMonitoringMode {
1070    subscription_id: u32,
1071    monitoring_mode: MonitoringMode,
1072    monitored_item_ids: Vec<u32>,
1073
1074    header: RequestHeaderBuilder,
1075}
1076
1077builder_base!(SetMonitoringMode);
1078
1079impl SetMonitoringMode {
1080    /// Construct a new call to the `SetMonitoringMode` service.
1081    pub fn new(subscription_id: u32, monitoring_mode: MonitoringMode, session: &Session) -> Self {
1082        Self {
1083            subscription_id,
1084            monitored_item_ids: Vec::new(),
1085            monitoring_mode,
1086            header: RequestHeaderBuilder::new_from_session(session),
1087        }
1088    }
1089
1090    /// Construct a new call to the `SetMonitoringMode` service, setting header parameters manually.
1091    pub fn new_manual(
1092        subscription_id: u32,
1093        monitoring_mode: MonitoringMode,
1094        session_id: u32,
1095        timeout: Duration,
1096        auth_token: NodeId,
1097        request_handle: IntegerId,
1098    ) -> Self {
1099        Self {
1100            subscription_id,
1101            monitored_item_ids: Vec::new(),
1102            monitoring_mode,
1103            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
1104        }
1105    }
1106
1107    /// Set the monitored items to modify, overwriting any that were added previously.
1108    pub fn monitored_item_ids(mut self, monitored_item_ids: Vec<u32>) -> Self {
1109        self.monitored_item_ids = monitored_item_ids;
1110        self
1111    }
1112
1113    /// Add a monitored item to modify.
1114    pub fn item(mut self, item: u32) -> Self {
1115        self.monitored_item_ids.push(item);
1116        self
1117    }
1118}
1119
1120impl UARequest for SetMonitoringMode {
1121    type Out = SetMonitoringModeResponse;
1122
1123    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
1124    where
1125        Self: 'a,
1126    {
1127        builder_debug!(
1128            self,
1129            "set_monitoring_mode, for subscription {}, {} items",
1130            self.subscription_id,
1131            self.monitored_item_ids.len()
1132        );
1133        if self.subscription_id == 0 {
1134            builder_error!(self, "set_monitoring_mode, subscription id 0 is invalid");
1135            return Err(StatusCode::BadInvalidArgument);
1136        }
1137        if self.monitored_item_ids.is_empty() {
1138            builder_error!(self, "set_monitoring_mode, called with no items to modify");
1139            return Err(StatusCode::BadNothingToDo);
1140        }
1141
1142        let num_items = self.monitored_item_ids.len();
1143        let request = SetMonitoringModeRequest {
1144            request_header: self.header.header,
1145            subscription_id: self.subscription_id,
1146            monitoring_mode: self.monitoring_mode,
1147            monitored_item_ids: Some(self.monitored_item_ids),
1148        };
1149        let response = channel.send(request, self.header.timeout).await?;
1150        if let ResponseMessage::SetMonitoringMode(response) = response {
1151            let Some(results) = &response.results else {
1152                builder_error!(self, "set_monitoring_mode, got empty response");
1153                return Err(StatusCode::BadUnexpectedError);
1154            };
1155            if results.len() != num_items {
1156                builder_error!(
1157                    self,
1158                    "set_monitoring_mode, unexpected number of results. Expected {}, got {}",
1159                    num_items,
1160                    results.len()
1161                );
1162                return Err(StatusCode::BadUnexpectedError);
1163            }
1164
1165            Ok(*response)
1166        } else {
1167            builder_error!(self, "set_monitoring_mode failed {:?}", response);
1168            Err(process_unexpected_response(response))
1169        }
1170    }
1171}
1172
1173#[derive(Clone)]
1174/// Sets a monitored item so it becomes the trigger that causes other monitored items to send
1175/// change events in the same update. Sends a [`SetTriggeringRequest`] to the server.
1176/// Note that `items_to_remove` is applied before `items_to_add`.
1177///
1178/// See OPC UA Part 4 - Services 5.12.5 for complete description of the service and error responses.
1179pub struct SetTriggering {
1180    subscription_id: u32,
1181    triggering_item_id: u32,
1182    links_to_add: Vec<u32>,
1183    links_to_remove: Vec<u32>,
1184
1185    header: RequestHeaderBuilder,
1186}
1187
1188builder_base!(SetTriggering);
1189
1190impl SetTriggering {
1191    /// Construct a new call to the `SetTriggering` service.
1192    pub fn new(subscription_id: u32, triggering_item_id: u32, session: &Session) -> Self {
1193        Self {
1194            subscription_id,
1195            triggering_item_id,
1196            links_to_add: Vec::new(),
1197            links_to_remove: Vec::new(),
1198            header: RequestHeaderBuilder::new_from_session(session),
1199        }
1200    }
1201
1202    /// Construct a new call to the `SetTriggering` service, setting header parameters manually.
1203    pub fn new_manual(
1204        subscription_id: u32,
1205        triggering_item_id: u32,
1206        session_id: u32,
1207        timeout: Duration,
1208        auth_token: NodeId,
1209        request_handle: IntegerId,
1210    ) -> Self {
1211        Self {
1212            subscription_id,
1213            triggering_item_id,
1214            links_to_add: Vec::new(),
1215            links_to_remove: Vec::new(),
1216            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
1217        }
1218    }
1219
1220    /// Set the links to add, overwriting any that were added previously.
1221    pub fn links_to_add(mut self, links_to_add: Vec<u32>) -> Self {
1222        self.links_to_add = links_to_add;
1223        self
1224    }
1225
1226    /// Add a new trigger target.
1227    pub fn add_link(mut self, item: u32) -> Self {
1228        self.links_to_add.push(item);
1229        self
1230    }
1231
1232    /// Set the links to add, overwriting any that were added previously.
1233    pub fn links_to_remove(mut self, links_to_remove: Vec<u32>) -> Self {
1234        self.links_to_remove = links_to_remove;
1235        self
1236    }
1237
1238    /// Add a new trigger to remove.
1239    pub fn remove_link(mut self, item: u32) -> Self {
1240        self.links_to_remove.push(item);
1241        self
1242    }
1243}
1244
1245impl UARequest for SetTriggering {
1246    type Out = SetTriggeringResponse;
1247
1248    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
1249    where
1250        Self: 'a,
1251    {
1252        builder_debug!(
1253            self,
1254            "set_triggering, for subscription {}, {} links to add, {} links to remove",
1255            self.subscription_id,
1256            self.links_to_add.len(),
1257            self.links_to_remove.len()
1258        );
1259        if self.subscription_id == 0 {
1260            builder_error!(self, "set_triggering, subscription id 0 is invalid");
1261            return Err(StatusCode::BadInvalidArgument);
1262        }
1263
1264        if self.links_to_add.is_empty() && self.links_to_remove.is_empty() {
1265            builder_error!(self, "set_triggering, called with nothing to add or remove");
1266            return Err(StatusCode::BadNothingToDo);
1267        }
1268        let request = SetTriggeringRequest {
1269            request_header: self.header.header,
1270            subscription_id: self.subscription_id,
1271            triggering_item_id: self.triggering_item_id,
1272            links_to_add: if self.links_to_add.is_empty() {
1273                None
1274            } else {
1275                Some(self.links_to_add.clone())
1276            },
1277            links_to_remove: if self.links_to_remove.is_empty() {
1278                None
1279            } else {
1280                Some(self.links_to_remove.clone())
1281            },
1282        };
1283
1284        let response = channel.send(request, self.header.timeout).await?;
1285        if let ResponseMessage::SetTriggering(response) = response {
1286            let to_add_res = response.add_results.as_deref().unwrap_or(&[]);
1287            let to_remove_res = response.remove_results.as_deref().unwrap_or(&[]);
1288            if to_add_res.len() != self.links_to_add.len() {
1289                builder_error!(
1290                    self,
1291                    "set_triggering, got unexpected number of add results: {}, expected {}",
1292                    to_add_res.len(),
1293                    self.links_to_add.len()
1294                );
1295                return Err(StatusCode::BadUnexpectedError);
1296            }
1297            if to_remove_res.len() != self.links_to_remove.len() {
1298                builder_error!(
1299                    self,
1300                    "set_triggering, got unexpected number of remove results: {}, expected {}",
1301                    to_remove_res.len(),
1302                    self.links_to_add.len()
1303                );
1304                return Err(StatusCode::BadUnexpectedError);
1305            }
1306
1307            Ok(*response)
1308        } else {
1309            builder_error!(self, "set_triggering failed {:?}", response);
1310            Err(process_unexpected_response(response))
1311        }
1312    }
1313}
1314
1315#[derive(Clone)]
1316/// Deletes monitored items from a subscription by sending a [`DeleteMonitoredItemsRequest`] to the server.
1317///
1318/// See OPC UA Part 4 - Services 5.12.6 for complete description of the service and error responses.
1319pub struct DeleteMonitoredItems {
1320    subscription_id: u32,
1321    items_to_delete: Vec<u32>,
1322
1323    header: RequestHeaderBuilder,
1324}
1325
1326builder_base!(DeleteMonitoredItems);
1327
1328impl DeleteMonitoredItems {
1329    /// Construct a new call to the `DeleteMonitoredItems` service.
1330    pub fn new(subscription_id: u32, session: &Session) -> Self {
1331        Self {
1332            subscription_id,
1333            items_to_delete: Vec::new(),
1334            header: RequestHeaderBuilder::new_from_session(session),
1335        }
1336    }
1337
1338    /// Construct a new call to the `DeleteMonitoredItems` service, setting header parameters manually.
1339    pub fn new_manual(
1340        subscription_id: u32,
1341        session_id: u32,
1342        timeout: Duration,
1343        auth_token: NodeId,
1344        request_handle: IntegerId,
1345    ) -> Self {
1346        Self {
1347            subscription_id,
1348            items_to_delete: Vec::new(),
1349            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
1350        }
1351    }
1352
1353    /// Set the items to delete, overwriting any that were added previously.
1354    pub fn items_to_delete(mut self, items_to_delete: Vec<u32>) -> Self {
1355        self.items_to_delete = items_to_delete;
1356        self
1357    }
1358
1359    /// Add a new item to delete.
1360    pub fn item(mut self, item: u32) -> Self {
1361        self.items_to_delete.push(item);
1362        self
1363    }
1364}
1365
1366impl UARequest for DeleteMonitoredItems {
1367    type Out = DeleteMonitoredItemsResponse;
1368
1369    async fn send<'a>(self, channel: &'a crate::AsyncSecureChannel) -> Result<Self::Out, StatusCode>
1370    where
1371        Self: 'a,
1372    {
1373        builder_debug!(
1374            self,
1375            "delete_monitored_items, subscription {} for {} items",
1376            self.subscription_id,
1377            self.items_to_delete.len(),
1378        );
1379        if self.subscription_id == 0 {
1380            builder_error!(self, "delete_monitored_items, subscription id 0 is invalid");
1381            return Err(StatusCode::BadInvalidArgument);
1382        }
1383        if self.items_to_delete.is_empty() {
1384            builder_error!(
1385                self,
1386                "delete_monitored_items, called with no items to delete"
1387            );
1388            return Err(StatusCode::BadNothingToDo);
1389        }
1390
1391        let request = DeleteMonitoredItemsRequest {
1392            request_header: self.header.header,
1393            subscription_id: self.subscription_id,
1394            monitored_item_ids: Some(self.items_to_delete.clone()),
1395        };
1396        let response = channel.send(request, self.header.timeout).await?;
1397        if let ResponseMessage::DeleteMonitoredItems(response) = response {
1398            process_service_result(&response.response_header)?;
1399            builder_debug!(self, "delete_monitored_items, success");
1400            Ok(*response)
1401        } else {
1402            builder_error!(self, "delete_monitored_items failed {:?}", response);
1403            Err(process_unexpected_response(response))
1404        }
1405    }
1406}
1407
1408impl Session {
1409    /// Get the internal state of subscriptions registered on the session.
1410    pub fn subscription_state(&self) -> &Mutex<SubscriptionState> {
1411        &self.subscription_state
1412    }
1413
1414    /// Trigger a publish to fire immediately.
1415    pub fn trigger_publish_now(&self) {
1416        let _ = self.trigger_publish_tx.send(Instant::now());
1417    }
1418
1419    #[allow(clippy::too_many_arguments)]
1420    async fn create_subscription_inner(
1421        &self,
1422        publishing_interval: Duration,
1423        lifetime_count: u32,
1424        max_keep_alive_count: u32,
1425        max_notifications_per_publish: u32,
1426        publishing_enabled: bool,
1427        priority: u8,
1428        callback: Box<dyn OnSubscriptionNotificationCore>,
1429    ) -> Result<u32, StatusCode> {
1430        let response = CreateSubscription::new(self)
1431            .publishing_interval(publishing_interval)
1432            .max_lifetime_count(lifetime_count)
1433            .max_keep_alive_count(max_keep_alive_count)
1434            .max_notifications_per_publish(max_notifications_per_publish)
1435            .publishing_enabled(publishing_enabled)
1436            .priority(priority)
1437            .send(&self.channel)
1438            .await?;
1439
1440        let subscription = Subscription::new(
1441            response.subscription_id,
1442            Duration::from_millis(response.revised_publishing_interval.max(0.0).floor() as u64),
1443            response.revised_lifetime_count,
1444            response.revised_max_keep_alive_count,
1445            max_notifications_per_publish,
1446            priority,
1447            publishing_enabled,
1448            callback,
1449        );
1450        {
1451            let mut subscription_state = trace_lock!(self.subscription_state);
1452            subscription_state.add_subscription(subscription);
1453        }
1454
1455        self.trigger_publish_now();
1456
1457        Ok(response.subscription_id)
1458    }
1459
1460    /// Create a subscription by sending a [`CreateSubscriptionRequest`] to the server.
1461    ///
1462    /// See OPC UA Part 4 - Services 5.13.2 for complete description of the service and error responses.
1463    ///
1464    /// # Arguments
1465    ///
1466    /// * `publishing_interval` - The requested publishing interval defines the cyclic rate that
1467    ///   the Subscription is being requested to return Notifications to the Client. This interval
1468    ///   is expressed in milliseconds. This interval is represented by the publishing timer in the
1469    ///   Subscription state table. The negotiated value for this parameter returned in the
1470    ///   response is used as the default sampling interval for MonitoredItems assigned to this
1471    ///   Subscription. If the requested value is 0 or negative, the server shall revise with the
1472    ///   fastest supported publishing interval in milliseconds.
1473    /// * `lifetime_count` - Requested lifetime count. The lifetime count shall be a minimum of
1474    ///   three times the keep keep-alive count. When the publishing timer has expired this
1475    ///   number of times without a Publish request being available to send a NotificationMessage,
1476    ///   then the Subscription shall be deleted by the Server.
1477    /// * `max_keep_alive_count` - Requested maximum keep-alive count. When the publishing timer has
1478    ///   expired this number of times without requiring any NotificationMessage to be sent, the
1479    ///   Subscription sends a keep-alive Message to the Client. The negotiated value for this
1480    ///   parameter is returned in the response. If the requested value is 0, the server shall
1481    ///   revise with the smallest supported keep-alive count.
1482    /// * `max_notifications_per_publish` - The maximum number of notifications that the Client
1483    ///   wishes to receive in a single Publish response. A value of zero indicates that there is
1484    ///   no limit. The number of notifications per Publish is the sum of monitoredItems in
1485    ///   the DataChangeNotification and events in the EventNotificationList.
1486    /// * `priority` - Indicates the relative priority of the Subscription. When more than one
1487    ///   Subscription needs to send Notifications, the Server should de-queue a Publish request
1488    ///   to the Subscription with the highest priority number. For Subscriptions with equal
1489    ///   priority the Server should de-queue Publish requests in a round-robin fashion.
1490    ///   A Client that does not require special priority settings should set this value to zero.
1491    /// * `publishing_enabled` - A boolean parameter with the following values - `true` publishing
1492    ///   is enabled for the Subscription, `false`, publishing is disabled for the Subscription.
1493    ///   The value of this parameter does not affect the value of the monitoring mode Attribute of
1494    ///   MonitoredItems.
1495    ///
1496    /// # Returns
1497    ///
1498    /// * `Ok(u32)` - identifier for new subscription
1499    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1500    ///
1501    #[allow(clippy::too_many_arguments)]
1502    pub async fn create_subscription(
1503        &self,
1504        publishing_interval: Duration,
1505        lifetime_count: u32,
1506        max_keep_alive_count: u32,
1507        max_notifications_per_publish: u32,
1508        priority: u8,
1509        publishing_enabled: bool,
1510        callback: impl OnSubscriptionNotificationCore + 'static,
1511    ) -> Result<u32, StatusCode> {
1512        self.create_subscription_inner(
1513            publishing_interval,
1514            lifetime_count,
1515            max_keep_alive_count,
1516            max_notifications_per_publish,
1517            publishing_enabled,
1518            priority,
1519            Box::new(callback),
1520        )
1521        .await
1522    }
1523
1524    fn subscription_exists(&self, subscription_id: u32) -> bool {
1525        let subscription_state = trace_lock!(self.subscription_state);
1526        subscription_state.subscription_exists(subscription_id)
1527    }
1528
1529    /// Modifies a subscription by sending a [`ModifySubscriptionRequest`] to the server.
1530    ///
1531    /// See OPC UA Part 4 - Services 5.13.3 for complete description of the service and error responses.
1532    ///
1533    /// # Arguments
1534    ///
1535    /// * `subscription_id` - subscription identifier returned from `create_subscription`.
1536    /// * `publishing_interval` - The requested publishing interval defines the cyclic rate that
1537    ///   the Subscription is being requested to return Notifications to the Client. This interval
1538    ///   is expressed in milliseconds. This interval is represented by the publishing timer in the
1539    ///   Subscription state table. The negotiated value for this parameter returned in the
1540    ///   response is used as the default sampling interval for MonitoredItems assigned to this
1541    ///   Subscription. If the requested value is 0 or negative, the server shall revise with the
1542    ///   fastest supported publishing interval in milliseconds.
1543    /// * `lifetime_count` - Requested lifetime count. The lifetime count shall be a minimum of
1544    ///   three times the keep keep-alive count. When the publishing timer has expired this
1545    ///   number of times without a Publish request being available to send a NotificationMessage,
1546    ///   then the Subscription shall be deleted by the Server.
1547    /// * `max_keep_alive_count` - Requested maximum keep-alive count. When the publishing timer has
1548    ///   expired this number of times without requiring any NotificationMessage to be sent, the
1549    ///   Subscription sends a keep-alive Message to the Client. The negotiated value for this
1550    ///   parameter is returned in the response. If the requested value is 0, the server shall
1551    ///   revise with the smallest supported keep-alive count.
1552    /// * `max_notifications_per_publish` - The maximum number of notifications that the Client
1553    ///   wishes to receive in a single Publish response. A value of zero indicates that there is
1554    ///   no limit. The number of notifications per Publish is the sum of monitoredItems in
1555    ///   the DataChangeNotification and events in the EventNotificationList.
1556    /// * `priority` - Indicates the relative priority of the Subscription. When more than one
1557    ///   Subscription needs to send Notifications, the Server should de-queue a Publish request
1558    ///   to the Subscription with the highest priority number. For Subscriptions with equal
1559    ///   priority the Server should de-queue Publish requests in a round-robin fashion.
1560    ///   A Client that does not require special priority settings should set this value to zero.
1561    ///
1562    /// # Returns
1563    ///
1564    /// * `Ok(())` - Success
1565    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1566    ///
1567    pub async fn modify_subscription(
1568        &self,
1569        subscription_id: u32,
1570        publishing_interval: Duration,
1571        lifetime_count: u32,
1572        max_keep_alive_count: u32,
1573        max_notifications_per_publish: u32,
1574        priority: u8,
1575    ) -> Result<(), StatusCode> {
1576        if !self.subscription_exists(subscription_id) {
1577            session_error!(self, "modify_subscription, subscription id does not exist");
1578            return Err(StatusCode::BadInvalidArgument);
1579        }
1580
1581        let response = ModifySubscription::new(subscription_id, self)
1582            .publishing_interval(publishing_interval)
1583            .max_lifetime_count(lifetime_count)
1584            .max_keep_alive_count(max_keep_alive_count)
1585            .max_notifications_per_publish(max_notifications_per_publish)
1586            .priority(priority)
1587            .send(&self.channel)
1588            .await?;
1589
1590        {
1591            let mut subscription_state = trace_lock!(self.subscription_state);
1592            subscription_state.modify_subscription(
1593                subscription_id,
1594                Duration::from_millis(response.revised_publishing_interval.max(0.0).floor() as u64),
1595                response.revised_lifetime_count,
1596                response.revised_max_keep_alive_count,
1597                max_notifications_per_publish,
1598                priority,
1599            );
1600        }
1601
1602        Ok(())
1603    }
1604
1605    /// Changes the publishing mode of subscriptions by sending a [`SetPublishingModeRequest`] to the server.
1606    ///
1607    /// See OPC UA Part 4 - Services 5.13.4 for complete description of the service and error responses.
1608    ///
1609    /// # Arguments
1610    ///
1611    /// * `subscription_ids` - one or more subscription identifiers.
1612    /// * `publishing_enabled` - A boolean parameter with the following values - `true` publishing
1613    ///   is enabled for the Subscriptions, `false`, publishing is disabled for the Subscriptions.
1614    ///
1615    /// # Returns
1616    ///
1617    /// * `Ok(Vec<StatusCode>)` - Service return code for the action for each id, `Good` or `BadSubscriptionIdInvalid`
1618    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1619    ///
1620    pub async fn set_publishing_mode(
1621        &self,
1622        subscription_ids: &[u32],
1623        publishing_enabled: bool,
1624    ) -> Result<Vec<StatusCode>, StatusCode> {
1625        let results = SetPublishingMode::new(publishing_enabled, self)
1626            .subscription_ids(subscription_ids.to_vec())
1627            .send(&self.channel)
1628            .await?
1629            .results
1630            .unwrap_or_default();
1631
1632        {
1633            // Update all subscriptions where the returned status is good.
1634            let mut subscription_state = trace_lock!(self.subscription_state);
1635            let ids = subscription_ids
1636                .iter()
1637                .zip(results.iter())
1638                .filter(|(_, s)| s.is_good())
1639                .map(|(v, _)| *v)
1640                .collect::<Vec<_>>();
1641            subscription_state.set_publishing_mode(&ids, publishing_enabled);
1642        }
1643
1644        if publishing_enabled {
1645            self.trigger_publish_now();
1646        }
1647        Ok(results)
1648    }
1649
1650    /// Transfers Subscriptions and their MonitoredItems from one Session to another. For example,
1651    /// a Client may need to reopen a Session and then transfer its Subscriptions to that Session.
1652    /// It may also be used by one Client to take over a Subscription from another Client by
1653    /// transferring the Subscription to its Session.
1654    ///
1655    /// Note that if you call this manually, you will need to register the
1656    /// subscriptions in the subscription state ([`Session::subscription_state`]) in order to
1657    /// receive notifications.
1658    ///
1659    /// See OPC UA Part 4 - Services 5.13.7 for complete description of the service and error responses.
1660    ///
1661    /// * `subscription_ids` - one or more subscription identifiers.
1662    /// * `send_initial_values` - A boolean parameter with the following values - `true` the first
1663    ///   publish response shall contain the current values of all monitored items in the subscription,
1664    ///   `false`, the first publish response shall contain only the value changes since the last
1665    ///   publish response was sent.
1666    ///
1667    /// # Returns
1668    ///
1669    /// * `Ok(Vec<TransferResult>)` - The [`TransferResult`] for each transfer subscription.
1670    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1671    ///
1672    pub async fn transfer_subscriptions(
1673        &self,
1674        subscription_ids: &[u32],
1675        send_initial_values: bool,
1676    ) -> Result<Vec<TransferResult>, StatusCode> {
1677        let r = TransferSubscriptions::new(self)
1678            .send_initial_values(send_initial_values)
1679            .subscription_ids(subscription_ids.to_vec())
1680            .send(&self.channel)
1681            .await?
1682            .results
1683            .unwrap_or_default();
1684
1685        self.trigger_publish_now();
1686
1687        Ok(r)
1688    }
1689
1690    /// Deletes a subscription by sending a [`DeleteSubscriptionsRequest`] to the server.
1691    ///
1692    /// See OPC UA Part 4 - Services 5.13.8 for complete description of the service and error responses.
1693    ///
1694    /// # Arguments
1695    ///
1696    /// * `subscription_id` - subscription identifier returned from `create_subscription`.
1697    ///
1698    /// # Returns
1699    ///
1700    /// * `Ok(StatusCode)` - Service return code for the delete action, `Good` or `BadSubscriptionIdInvalid`
1701    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1702    ///
1703    pub async fn delete_subscription(
1704        &self,
1705        subscription_id: u32,
1706    ) -> Result<StatusCode, StatusCode> {
1707        if subscription_id == 0 {
1708            session_error!(self, "delete_subscription, subscription id 0 is invalid");
1709            Err(StatusCode::BadInvalidArgument)
1710        } else if !self.subscription_exists(subscription_id) {
1711            session_error!(
1712                self,
1713                "delete_subscription, subscription id {} does not exist",
1714                subscription_id
1715            );
1716            Err(StatusCode::BadInvalidArgument)
1717        } else {
1718            let result = self.delete_subscriptions(&[subscription_id]).await?;
1719            Ok(result[0])
1720        }
1721    }
1722
1723    /// Deletes subscriptions by sending a [`DeleteSubscriptionsRequest`] to the server with the list
1724    /// of subscriptions to delete.
1725    ///
1726    /// See OPC UA Part 4 - Services 5.13.8 for complete description of the service and error responses.
1727    ///
1728    /// # Arguments
1729    ///
1730    /// * `subscription_ids` - List of subscription identifiers to delete.
1731    ///
1732    /// # Returns
1733    ///
1734    /// * `Ok(Vec<StatusCode>)` - List of result for delete action on each id, `Good` or `BadSubscriptionIdInvalid`
1735    ///   The size and order of the list matches the size and order of the input.
1736    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1737    ///
1738    pub async fn delete_subscriptions(
1739        &self,
1740        subscription_ids: &[u32],
1741    ) -> Result<Vec<StatusCode>, StatusCode> {
1742        let result = DeleteSubscriptions::new(self)
1743            .subscription_ids(subscription_ids.to_vec())
1744            .send(&self.channel)
1745            .await?
1746            .results
1747            .unwrap_or_default();
1748        {
1749            // Clear out deleted subscriptions, assuming the delete worked
1750            let mut subscription_state = trace_lock!(self.subscription_state);
1751            for id in subscription_ids {
1752                subscription_state.delete_subscription(*id);
1753            }
1754        }
1755
1756        Ok(result)
1757    }
1758
1759    /// Creates monitored items on a subscription by sending a [`CreateMonitoredItemsRequest`] to the server.
1760    ///
1761    /// See OPC UA Part 4 - Services 5.12.2 for complete description of the service and error responses.
1762    ///
1763    /// # Arguments
1764    ///
1765    /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem
1766    /// * `timestamps_to_return` - An enumeration that specifies the timestamp Attributes to be transmitted for each MonitoredItem.
1767    /// * `items_to_create` - A list of [`MonitoredItemCreateRequest`] to be created and assigned to the specified Subscription.
1768    ///
1769    /// # Returns
1770    ///
1771    /// * `Ok(Vec<MonitoredItemCreateResult>)` - A list of [`MonitoredItemCreateResult`] corresponding to the items to create.
1772    ///   The size and order of the list matches the size and order of the `items_to_create` request parameter.
1773    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1774    ///
1775    pub async fn create_monitored_items(
1776        &self,
1777        subscription_id: u32,
1778        timestamps_to_return: TimestampsToReturn,
1779        mut items_to_create: Vec<MonitoredItemCreateRequest>,
1780    ) -> Result<Vec<CreatedMonitoredItem>, StatusCode> {
1781        {
1782            let state = trace_lock!(self.subscription_state);
1783            if !state.subscription_exists(subscription_id) {
1784                session_error!(
1785                    self,
1786                    "create_monitored_items, subscription id {} does not exist",
1787                    subscription_id
1788                );
1789                return Err(StatusCode::BadSubscriptionIdInvalid);
1790            }
1791        }
1792
1793        for item in &mut items_to_create {
1794            if item.requested_parameters.client_handle == 0 {
1795                item.requested_parameters.client_handle = self.monitored_item_handle.next();
1796            }
1797        }
1798
1799        // Add the monitored items _before_ making the request, to avoid
1800        // race conditions where publish responses arrive before the monitored items
1801        // are added to the internal state.
1802        let pre_insert = PreInsertMonitoredItems::new(
1803            &self.subscription_state,
1804            subscription_id,
1805            &items_to_create,
1806        );
1807
1808        let result = CreateMonitoredItems::new(subscription_id, self)
1809            .items_to_create(items_to_create)
1810            .timestamps_to_return(timestamps_to_return)
1811            .send(&self.channel)
1812            .await?;
1813
1814        // Set the items in our internal state
1815        pre_insert.finish(&result.results);
1816
1817        Ok(result.results)
1818    }
1819
1820    /// Modifies monitored items on a subscription by sending a [`ModifyMonitoredItemsRequest`] to the server.
1821    ///
1822    /// See OPC UA Part 4 - Services 5.12.3 for complete description of the service and error responses.
1823    ///
1824    /// # Arguments
1825    ///
1826    /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem.
1827    /// * `timestamps_to_return` - An enumeration that specifies the timestamp Attributes to be transmitted for each MonitoredItem.
1828    /// * `items_to_modify` - The list of [`MonitoredItemModifyRequest`] to modify.
1829    ///
1830    /// # Returns
1831    ///
1832    /// * `Ok(Vec<MonitoredItemModifyResult>)` - A list of [`MonitoredItemModifyResult`] corresponding to the MonitoredItems to modify.
1833    ///   The size and order of the list matches the size and order of the `items_to_modify` request parameter.
1834    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1835    ///
1836    pub async fn modify_monitored_items(
1837        &self,
1838        subscription_id: u32,
1839        timestamps_to_return: TimestampsToReturn,
1840        items_to_modify: &[MonitoredItemModifyRequest],
1841    ) -> Result<Vec<MonitoredItemModifyResult>, StatusCode> {
1842        {
1843            let state = trace_lock!(self.subscription_state);
1844            if !state.subscription_exists(subscription_id) {
1845                session_error!(
1846                    self,
1847                    "modify_monitored_items, subscription id {} does not exist",
1848                    subscription_id
1849                );
1850                return Err(StatusCode::BadSubscriptionIdInvalid);
1851            }
1852        }
1853        let ids = items_to_modify
1854            .iter()
1855            .map(|i| i.monitored_item_id)
1856            .collect::<Vec<_>>();
1857        let results = ModifyMonitoredItems::new(subscription_id, self)
1858            .timestamps_to_return(timestamps_to_return)
1859            .items_to_modify(items_to_modify.to_vec())
1860            .send(&self.channel)
1861            .await?
1862            .results
1863            .unwrap_or_default();
1864
1865        let items_to_modify = ids
1866            .iter()
1867            .zip(results.iter())
1868            .map(|(id, r)| ModifyMonitoredItem {
1869                id: *id,
1870                queue_size: r.revised_queue_size,
1871                sampling_interval: r.revised_sampling_interval,
1872            })
1873            .collect::<Vec<ModifyMonitoredItem>>();
1874        {
1875            let mut subscription_state = trace_lock!(self.subscription_state);
1876            subscription_state.modify_monitored_items(subscription_id, &items_to_modify);
1877        }
1878
1879        Ok(results)
1880    }
1881
1882    /// Sets the monitoring mode on one or more monitored items by sending a [`SetMonitoringModeRequest`]
1883    /// to the server.
1884    ///
1885    /// See OPC UA Part 4 - Services 5.12.4 for complete description of the service and error responses.
1886    ///
1887    /// # Arguments
1888    ///
1889    /// * `subscription_id` - the subscription identifier containing the monitored items to be modified.
1890    /// * `monitoring_mode` - the monitored mode to apply to the monitored items
1891    /// * `monitored_item_ids` - the monitored items to be modified
1892    ///
1893    /// # Returns
1894    ///
1895    /// * `Ok(Vec<StatusCode>)` - Individual result for each monitored item.
1896    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1897    ///
1898    pub async fn set_monitoring_mode(
1899        &self,
1900        subscription_id: u32,
1901        monitoring_mode: MonitoringMode,
1902        monitored_item_ids: &[u32],
1903    ) -> Result<Vec<StatusCode>, StatusCode> {
1904        {
1905            let state = trace_lock!(self.subscription_state);
1906            if !state.subscription_exists(subscription_id) {
1907                session_error!(
1908                    self,
1909                    "set_monitoring_mode, subscription id {} does not exist",
1910                    subscription_id
1911                );
1912                return Err(StatusCode::BadSubscriptionIdInvalid);
1913            }
1914        }
1915        let results = SetMonitoringMode::new(subscription_id, monitoring_mode, self)
1916            .monitored_item_ids(monitored_item_ids.to_vec())
1917            .send(&self.channel)
1918            .await?
1919            .results
1920            .unwrap_or_default();
1921
1922        let ok_ids: Vec<_> = monitored_item_ids
1923            .iter()
1924            .zip(results.iter())
1925            .filter(|(_, s)| s.is_good())
1926            .map(|(v, _)| *v)
1927            .collect();
1928        {
1929            let mut subscription_state = trace_lock!(self.subscription_state);
1930            subscription_state.set_monitoring_mode(subscription_id, &ok_ids, monitoring_mode);
1931        }
1932
1933        Ok(results)
1934    }
1935
1936    /// Sets a monitored item so it becomes the trigger that causes other monitored items to send
1937    /// change events in the same update. Sends a [`SetTriggeringRequest`] to the server.
1938    /// Note that `items_to_remove` is applied before `items_to_add`.
1939    ///
1940    /// See OPC UA Part 4 - Services 5.12.5 for complete description of the service and error responses.
1941    ///
1942    /// # Arguments
1943    ///
1944    /// * `subscription_id` - the subscription identifier containing the monitored item to be used as the trigger.
1945    /// * `monitored_item_id` - the monitored item that is the trigger.
1946    /// * `links_to_add` - zero or more items to be added to the monitored item's triggering list.
1947    /// * `items_to_remove` - zero or more items to be removed from the monitored item's triggering list.
1948    ///
1949    /// # Returns
1950    ///
1951    /// * `Ok((Option<Vec<StatusCode>>, Option<Vec<StatusCode>>))` - Individual result for each item added / removed for the SetTriggering call.
1952    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
1953    ///
1954    pub async fn set_triggering(
1955        &self,
1956        subscription_id: u32,
1957        triggering_item_id: u32,
1958        links_to_add: &[u32],
1959        links_to_remove: &[u32],
1960    ) -> Result<(Option<Vec<StatusCode>>, Option<Vec<StatusCode>>), StatusCode> {
1961        {
1962            let state = trace_lock!(self.subscription_state);
1963            if !state.subscription_exists(subscription_id) {
1964                session_error!(
1965                    self,
1966                    "set_triggering, subscription id {} does not exist",
1967                    subscription_id
1968                );
1969                return Err(StatusCode::BadSubscriptionIdInvalid);
1970            }
1971        }
1972        let response = SetTriggering::new(subscription_id, triggering_item_id, self)
1973            .links_to_add(links_to_add.to_vec())
1974            .links_to_remove(links_to_remove.to_vec())
1975            .send(&self.channel)
1976            .await?;
1977
1978        let to_add_res = response.add_results.as_deref().unwrap_or(&[]);
1979        let to_remove_res = response.remove_results.as_deref().unwrap_or(&[]);
1980
1981        let ok_adds = to_add_res
1982            .iter()
1983            .zip(links_to_add)
1984            .filter(|(s, _)| s.is_good())
1985            .map(|(_, v)| v)
1986            .copied()
1987            .collect::<Vec<_>>();
1988        let ok_removes = to_remove_res
1989            .iter()
1990            .zip(links_to_remove)
1991            .filter(|(s, _)| s.is_good())
1992            .map(|(_, v)| v)
1993            .copied()
1994            .collect::<Vec<_>>();
1995
1996        // Update client side state
1997        let mut subscription_state = trace_lock!(self.subscription_state);
1998        subscription_state.set_triggering(
1999            subscription_id,
2000            triggering_item_id,
2001            &ok_adds,
2002            &ok_removes,
2003        );
2004        Ok((response.add_results, response.remove_results))
2005    }
2006
2007    /// Deletes monitored items from a subscription by sending a [`DeleteMonitoredItemsRequest`] to the server.
2008    ///
2009    /// See OPC UA Part 4 - Services 5.12.6 for complete description of the service and error responses.
2010    ///
2011    /// # Arguments
2012    ///
2013    /// * `subscription_id` - The Server-assigned identifier for the Subscription that will report Notifications for this MonitoredItem.
2014    /// * `items_to_delete` - List of Server-assigned ids for the MonitoredItems to be deleted.
2015    ///
2016    /// # Returns
2017    ///
2018    /// * `Ok(Vec<StatusCode>)` - List of StatusCodes for the MonitoredItems to delete. The size and
2019    ///   order of the list matches the size and order of the `items_to_delete` request parameter.
2020    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
2021    ///
2022    pub async fn delete_monitored_items(
2023        &self,
2024        subscription_id: u32,
2025        items_to_delete: &[u32],
2026    ) -> Result<Vec<StatusCode>, StatusCode> {
2027        {
2028            let state = trace_lock!(self.subscription_state);
2029            if !state.subscription_exists(subscription_id) {
2030                session_error!(
2031                    self,
2032                    "delete_monitored_items, subscription id {} does not exist",
2033                    subscription_id
2034                );
2035                return Err(StatusCode::BadSubscriptionIdInvalid);
2036            }
2037        }
2038        let response = DeleteMonitoredItems::new(subscription_id, self)
2039            .items_to_delete(items_to_delete.to_vec())
2040            .send(&self.channel)
2041            .await?
2042            .results
2043            .unwrap_or_default();
2044        let mut subscription_state = trace_lock!(self.subscription_state);
2045        subscription_state.delete_monitored_items(subscription_id, items_to_delete);
2046        Ok(response)
2047    }
2048
2049    pub(crate) fn next_publish_time(&self, set_last_publish: bool) -> Option<Instant> {
2050        let mut subscription_state = trace_lock!(self.subscription_state);
2051        if set_last_publish {
2052            subscription_state.set_last_publish();
2053        }
2054        subscription_state.next_publish_time()
2055    }
2056
2057    /// Send a publish request, returning `true` if the session should send a new request
2058    /// immediately.
2059    pub(crate) async fn publish(&self) -> Result<bool, StatusCode> {
2060        let acks = {
2061            let mut subscription_state = trace_lock!(self.subscription_state);
2062            let acks = subscription_state.take_acknowledgements();
2063            if !acks.is_empty() {
2064                Some(acks)
2065            } else {
2066                None
2067            }
2068        };
2069
2070        match Publish::new(self)
2071            .acks(acks.clone().unwrap_or_default())
2072            .send(&self.channel)
2073            .await
2074        {
2075            Ok(r) => {
2076                let mut subscription_state = trace_lock!(self.subscription_state);
2077                subscription_state.handle_notification(r.subscription_id, r.notification_message);
2078                Ok(r.more_notifications)
2079            }
2080            Err(e) => {
2081                if let Some(acks) = acks {
2082                    let mut subscription_state = trace_lock!(self.subscription_state);
2083                    subscription_state.re_queue_acknowledgements(acks);
2084                }
2085                Err(e)
2086            }
2087        }
2088    }
2089
2090    /// Send a request to re-publish an unacknowledged notification message from the server.
2091    ///
2092    /// If this succeeds, the session will automatically acknowledge the notification in the next publish request.
2093    ///
2094    /// # Arguments
2095    ///
2096    /// * `subscription_id` - The Server-assigned identifier for the Subscription to republish from.
2097    /// * `sequence_number` - Sequence number to re-publish.
2098    ///
2099    /// # Returns
2100    ///
2101    /// * `Ok(NotificationMessage)` - Re-published notification message.
2102    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
2103    ///
2104    pub async fn republish(
2105        &self,
2106        subscription_id: u32,
2107        sequence_number: u32,
2108    ) -> Result<NotificationMessage, StatusCode> {
2109        let res = Republish::new(subscription_id, sequence_number, self)
2110            .send(&self.channel)
2111            .await?;
2112
2113        {
2114            let mut lck = trace_lock!(self.subscription_state);
2115            lck.add_acknowledgement(subscription_id, sequence_number);
2116        }
2117
2118        Ok(res.notification_message)
2119    }
2120
2121    /// This code attempts to take the existing subscriptions created by a previous session and
2122    /// either transfer them to this session, or construct them from scratch.
2123    pub(crate) async fn transfer_subscriptions_from_old_session(&self) {
2124        let subscription_ids = {
2125            let subscription_state = trace_lock!(self.subscription_state);
2126            subscription_state.subscription_ids()
2127        };
2128
2129        let Some(subscription_ids) = subscription_ids else {
2130            return;
2131        };
2132
2133        // Start by getting the subscription ids
2134        // Try to use TransferSubscriptions to move subscriptions_ids over. If this
2135        // works then there is nothing else to do.
2136        let mut subscription_ids_to_recreate =
2137            subscription_ids.iter().copied().collect::<HashSet<u32>>();
2138        if let Ok(transfer_results) = self.transfer_subscriptions(&subscription_ids, true).await {
2139            session_debug!(self, "transfer_results = {:?}", transfer_results);
2140            transfer_results.iter().enumerate().for_each(|(i, r)| {
2141                if r.status_code.is_good() {
2142                    // Subscription was transferred so it does not need to be recreated
2143                    subscription_ids_to_recreate.remove(&subscription_ids[i]);
2144                }
2145            });
2146        }
2147
2148        // But if it didn't work, then some or all subscriptions have to be remade.
2149        if !subscription_ids_to_recreate.is_empty() {
2150            session_warn!(self, "Some or all of the existing subscriptions could not be transferred and must be created manually");
2151        }
2152
2153        for subscription_id in subscription_ids_to_recreate {
2154            session_debug!(self, "Recreating subscription {}", subscription_id);
2155
2156            let deleted_subscription = {
2157                let mut subscription_state = trace_lock!(self.subscription_state);
2158                subscription_state.delete_subscription(subscription_id)
2159            };
2160
2161            let Some(subscription) = deleted_subscription else {
2162                session_warn!(
2163                    self,
2164                    "Subscription removed from session while transfer in progress"
2165                );
2166                continue;
2167            };
2168
2169            let Ok(subscription_id) = self
2170                .create_subscription_inner(
2171                    subscription.publishing_interval,
2172                    subscription.lifetime_count,
2173                    subscription.max_keep_alive_count,
2174                    subscription.max_notifications_per_publish,
2175                    subscription.publishing_enabled,
2176                    subscription.priority,
2177                    subscription.callback,
2178                )
2179                .await
2180            else {
2181                session_warn!(
2182                    self,
2183                    "Could not create a subscription from the existing subscription {}",
2184                    subscription_id
2185                );
2186                continue;
2187            };
2188
2189            let items_to_create = subscription
2190                .monitored_items
2191                .values()
2192                .map(|item| MonitoredItemCreateRequest {
2193                    item_to_monitor: item.item_to_monitor().clone(),
2194                    monitoring_mode: item.monitoring_mode,
2195                    requested_parameters: MonitoringParameters {
2196                        client_handle: item.client_handle(),
2197                        sampling_interval: item.sampling_interval(),
2198                        filter: item.filter.clone(),
2199                        queue_size: item.queue_size() as u32,
2200                        discard_oldest: item.discard_oldest(),
2201                    },
2202                })
2203                .collect::<Vec<MonitoredItemCreateRequest>>();
2204
2205            let mut iter = items_to_create.into_iter();
2206
2207            loop {
2208                let chunk = (&mut iter)
2209                    .take(self.recreate_monitored_items_chunk)
2210                    .collect::<Vec<_>>();
2211
2212                if chunk.is_empty() {
2213                    break;
2214                }
2215
2216                let _ = self
2217                    .create_monitored_items(subscription_id, TimestampsToReturn::Both, chunk)
2218                    .await;
2219            }
2220
2221            for item in subscription.monitored_items.values() {
2222                let triggered_items = item.triggered_items();
2223                if !triggered_items.is_empty() {
2224                    let links_to_add = triggered_items.iter().copied().collect::<Vec<u32>>();
2225                    let _ = self
2226                        .set_triggering(subscription_id, item.id(), links_to_add.as_slice(), &[])
2227                        .await;
2228                }
2229            }
2230        }
2231    }
2232}