open62541/
async_subscription.rs

1use std::{
2    ffi::c_void,
3    num::NonZeroU32,
4    ptr,
5    sync::{Arc, Weak},
6    time::Duration,
7};
8
9use futures_channel::oneshot;
10use open62541_sys::{
11    UA_Client, UA_Client_Subscriptions_create_async, UA_Client_Subscriptions_delete_async,
12    UA_CreateSubscriptionResponse, UA_DeleteSubscriptionsResponse, UA_UInt32,
13};
14
15use crate::{
16    ua, AsyncClient, AsyncMonitoredItem, CallbackOnce, DataType as _, Error, MonitoredItemBuilder,
17    Result,
18};
19
20#[derive(Debug, Default)]
21pub struct SubscriptionBuilder {
22    #[allow(clippy::option_option)]
23    requested_publishing_interval: Option<Option<Duration>>,
24    requested_lifetime_count: Option<u32>,
25    #[allow(clippy::option_option)]
26    requested_max_keep_alive_count: Option<Option<NonZeroU32>>,
27    #[allow(clippy::option_option)]
28    max_notifications_per_publish: Option<Option<NonZeroU32>>,
29    publishing_enabled: Option<bool>,
30    priority: Option<u8>,
31}
32
33// Note: The default values in the docs below come from `UA_CreateSubscriptionRequest_default()`.
34impl SubscriptionBuilder {
35    /// Sets requested publishing interval.
36    ///
37    /// Default value is 500.0 ms.
38    ///
39    /// See [`ua::CreateSubscriptionRequest::with_requested_publishing_interval()`].
40    #[must_use]
41    pub const fn requested_publishing_interval(
42        mut self,
43        requested_publishing_interval: Option<Duration>,
44    ) -> Self {
45        self.requested_publishing_interval = Some(requested_publishing_interval);
46        self
47    }
48
49    /// Sets requested lifetime count.
50    ///
51    /// Default value is 10000.
52    ///
53    /// See [`ua::CreateSubscriptionRequest::with_requested_lifetime_count()`].
54    #[must_use]
55    pub const fn requested_lifetime_count(mut self, requested_lifetime_count: u32) -> Self {
56        self.requested_lifetime_count = Some(requested_lifetime_count);
57        self
58    }
59
60    /// Sets requested maximum keep-alive count.
61    ///
62    /// Default value is 10.
63    ///
64    /// See [`ua::CreateSubscriptionRequest::with_requested_max_keep_alive_count()`].
65    #[must_use]
66    pub const fn requested_max_keep_alive_count(
67        mut self,
68        requested_max_keep_alive_count: Option<NonZeroU32>,
69    ) -> Self {
70        self.requested_max_keep_alive_count = Some(requested_max_keep_alive_count);
71        self
72    }
73
74    /// Sets maximum number of notifications that the client wishes to receive in a single publish
75    /// response.
76    ///
77    /// Default value is `None` (unlimited).
78    ///
79    /// See [`ua::CreateSubscriptionRequest::with_max_notifications_per_publish()`].
80    #[must_use]
81    pub const fn max_notifications_per_publish(
82        mut self,
83        max_notifications_per_publish: Option<NonZeroU32>,
84    ) -> Self {
85        self.max_notifications_per_publish = Some(max_notifications_per_publish);
86        self
87    }
88
89    /// Enables or disables publishing.
90    ///
91    /// Default value is `true`.
92    ///
93    /// See [`ua::CreateSubscriptionRequest::with_publishing_enabled()`].
94    #[must_use]
95    pub const fn publishing_enabled(mut self, publishing_enabled: bool) -> Self {
96        self.publishing_enabled = Some(publishing_enabled);
97        self
98    }
99
100    /// Sets relative priority of the subscription.
101    ///
102    /// Default value is 0.
103    ///
104    /// See [`ua::CreateSubscriptionRequest::with_priority()`].
105    #[must_use]
106    pub const fn priority(mut self, priority: u8) -> Self {
107        self.priority = Some(priority);
108        self
109    }
110
111    /// Creates subscription.
112    ///
113    /// # Errors
114    ///
115    /// This fails when the client is not connected.
116    pub async fn create(
117        self,
118        client: &AsyncClient,
119    ) -> Result<(ua::CreateSubscriptionResponse, AsyncSubscription)> {
120        let client = client.client();
121
122        let response = create_subscription(client, &self.into_request()).await?;
123
124        let subscription = AsyncSubscription {
125            client: Arc::downgrade(client),
126            subscription_id: response.subscription_id(),
127        };
128
129        Ok((response, subscription))
130    }
131
132    fn into_request(self) -> ua::CreateSubscriptionRequest {
133        let Self {
134            requested_publishing_interval,
135            requested_lifetime_count,
136            requested_max_keep_alive_count,
137            max_notifications_per_publish,
138            publishing_enabled,
139            priority,
140        } = self;
141
142        let mut request = ua::CreateSubscriptionRequest::default();
143
144        if let Some(requested_publishing_interval) = requested_publishing_interval {
145            request = request.with_requested_publishing_interval(requested_publishing_interval);
146        }
147        if let Some(requested_lifetime_count) = requested_lifetime_count {
148            request = request.with_requested_lifetime_count(requested_lifetime_count);
149        }
150        if let Some(requested_max_keep_alive_count) = requested_max_keep_alive_count {
151            request = request.with_requested_max_keep_alive_count(requested_max_keep_alive_count);
152        }
153        if let Some(max_notifications_per_publish) = max_notifications_per_publish {
154            request = request.with_max_notifications_per_publish(max_notifications_per_publish);
155        }
156        if let Some(publishing_enabled) = publishing_enabled {
157            request = request.with_publishing_enabled(publishing_enabled);
158        }
159        if let Some(priority) = priority {
160            request = request.with_priority(priority);
161        }
162
163        request
164    }
165}
166
167/// Subscription (with asynchronous API).
168#[derive(Debug)]
169pub struct AsyncSubscription {
170    client: Weak<ua::Client>,
171    subscription_id: ua::SubscriptionId,
172}
173
174impl AsyncSubscription {
175    /// Creates [monitored item](AsyncMonitoredItem).
176    ///
177    /// This creates a new monitored item for the given node.
178    ///
179    /// # Errors
180    ///
181    /// This fails when the node does not exist.
182    pub async fn create_monitored_item(&self, node_id: &ua::NodeId) -> Result<AsyncMonitoredItem> {
183        let results = MonitoredItemBuilder::new([node_id.clone()])
184            .create(self)
185            .await?;
186
187        // We expect exactly one result for the single monitored item we requested above.
188        let Ok::<[_; 1], _>([result]) = results.try_into() else {
189            return Err(Error::internal("expected exactly one monitored item"));
190        };
191
192        // Verify single item's status code and return as error.
193        let (_, monitored_item) = result?;
194
195        Ok(monitored_item)
196    }
197
198    #[must_use]
199    pub(crate) const fn client(&self) -> &Weak<ua::Client> {
200        &self.client
201    }
202
203    #[must_use]
204    pub(crate) const fn subscription_id(&self) -> ua::SubscriptionId {
205        self.subscription_id
206    }
207}
208
209impl Drop for AsyncSubscription {
210    fn drop(&mut self) {
211        let Some(client) = self.client.upgrade() else {
212            return;
213        };
214
215        let request =
216            ua::DeleteSubscriptionsRequest::init().with_subscription_ids(&[self.subscription_id]);
217
218        delete_subscriptions(&client, &request);
219    }
220}
221
222async fn create_subscription(
223    client: &ua::Client,
224    request: &ua::CreateSubscriptionRequest,
225) -> Result<ua::CreateSubscriptionResponse> {
226    type Cb = CallbackOnce<std::result::Result<ua::CreateSubscriptionResponse, ua::StatusCode>>;
227
228    unsafe extern "C" fn callback_c(
229        _client: *mut UA_Client,
230        userdata: *mut c_void,
231        _request_id: UA_UInt32,
232        response: *mut c_void,
233    ) {
234        log::debug!("Subscriptions_create() completed");
235
236        let response = response.cast::<UA_CreateSubscriptionResponse>();
237        // SAFETY: Incoming pointer is valid for access.
238        // PANIC: We expect pointer to be valid when good.
239        let response = unsafe { response.as_ref() }.expect("response should be set");
240        let status_code = ua::StatusCode::new(response.responseHeader.serviceResult);
241
242        let result = if status_code.is_good() {
243            Ok(ua::CreateSubscriptionResponse::clone_raw(response))
244        } else {
245            Err(status_code)
246        };
247
248        // SAFETY: `userdata` is the result of `Cb::prepare()` and is used only once.
249        unsafe {
250            Cb::execute(userdata, result);
251        }
252    }
253
254    let (tx, rx) = oneshot::channel::<Result<ua::CreateSubscriptionResponse>>();
255
256    let callback = |result: std::result::Result<ua::CreateSubscriptionResponse, _>| {
257        // We always send a result back via `tx` (in fact, `rx.await` below expects this). We do not
258        // care if that succeeds though: the receiver might already have gone out of scope (when its
259        // future has been cancelled) and we must not panic in FFI callbacks.
260        let _unused = tx.send(result.map_err(Error::new));
261    };
262
263    let status_code = ua::StatusCode::new({
264        log::debug!("Calling Subscriptions_create()");
265
266        // SAFETY: `UA_Client_Subscriptions_create_async()` expects the request passed by value but
267        // does not take ownership.
268        let request = unsafe { ua::CreateSubscriptionRequest::to_raw_copy(request) };
269
270        unsafe {
271            UA_Client_Subscriptions_create_async(
272                // SAFETY: Cast to `mut` pointer, function is marked `UA_THREADSAFE`.
273                client.as_ptr().cast_mut(),
274                request,
275                ptr::null_mut(),
276                None,
277                None,
278                Some(callback_c),
279                Cb::prepare(callback),
280                ptr::null_mut(),
281            )
282        }
283    });
284    Error::verify_good(&status_code)?;
285
286    // PANIC: When `callback` is called (which owns `tx`), we always call `tx.send()`. So the sender
287    // is only dropped after placing a value into the channel and `rx.await` always finds this value
288    // there.
289    rx.await
290        .unwrap_or(Err(Error::internal("callback should send result")))
291}
292
293fn delete_subscriptions(client: &ua::Client, request: &ua::DeleteSubscriptionsRequest) {
294    unsafe extern "C" fn callback_c(
295        _client: *mut UA_Client,
296        _userdata: *mut c_void,
297        _request_id: UA_UInt32,
298        response: *mut c_void,
299    ) {
300        log::debug!("Subscriptions_delete() completed");
301
302        let response = response.cast::<UA_DeleteSubscriptionsResponse>();
303        // SAFETY: Incoming pointer is valid for access.
304        // PANIC: We expect pointer to be valid when good.
305        let response = unsafe { response.as_ref() }.expect("response should be set");
306        let status_code = ua::StatusCode::new(response.responseHeader.serviceResult);
307
308        if let Err(error) = Error::verify_good(&status_code) {
309            log::warn!("Error in response when deleting subscriptions: {error}");
310        }
311    }
312
313    let status_code = ua::StatusCode::new({
314        log::debug!("Calling Subscriptions_delete()");
315
316        // SAFETY: `UA_Client_Subscriptions_delete_async()` expects the request passed by value but
317        // does not take ownership.
318        let request = unsafe { ua::DeleteSubscriptionsRequest::to_raw_copy(request) };
319
320        unsafe {
321            UA_Client_Subscriptions_delete_async(
322                // SAFETY: Cast to `mut` pointer, function is marked `UA_THREADSAFE`.
323                client.as_ptr().cast_mut(),
324                request,
325                Some(callback_c),
326                ptr::null_mut(),
327                ptr::null_mut(),
328            )
329        }
330    });
331    if let Err(error) = Error::verify_good(&status_code) {
332        log::warn!("Error in request when deleting subscriptions: {error}");
333    }
334}