open62541/
async_subscription.rs1use std::{
2 ffi::c_void,
3 num::NonZeroU32,
4 ptr,
5 sync::{Arc, Weak},
6 time::Duration,
7};
8
9use futures_channel::oneshot;
10use open62541_sys::{
11 UA_Client, UA_Client_Subscriptions_create_async, UA_Client_Subscriptions_delete_async,
12 UA_CreateSubscriptionResponse, UA_DeleteSubscriptionsResponse, UA_UInt32,
13};
14
15use crate::{
16 ua, AsyncClient, AsyncMonitoredItem, CallbackOnce, DataType as _, Error, MonitoredItemBuilder,
17 Result,
18};
19
20#[derive(Debug, Default)]
21pub struct SubscriptionBuilder {
22 #[allow(clippy::option_option)]
23 requested_publishing_interval: Option<Option<Duration>>,
24 requested_lifetime_count: Option<u32>,
25 #[allow(clippy::option_option)]
26 requested_max_keep_alive_count: Option<Option<NonZeroU32>>,
27 #[allow(clippy::option_option)]
28 max_notifications_per_publish: Option<Option<NonZeroU32>>,
29 publishing_enabled: Option<bool>,
30 priority: Option<u8>,
31}
32
33impl SubscriptionBuilder {
35 #[must_use]
41 pub const fn requested_publishing_interval(
42 mut self,
43 requested_publishing_interval: Option<Duration>,
44 ) -> Self {
45 self.requested_publishing_interval = Some(requested_publishing_interval);
46 self
47 }
48
49 #[must_use]
55 pub const fn requested_lifetime_count(mut self, requested_lifetime_count: u32) -> Self {
56 self.requested_lifetime_count = Some(requested_lifetime_count);
57 self
58 }
59
60 #[must_use]
66 pub const fn requested_max_keep_alive_count(
67 mut self,
68 requested_max_keep_alive_count: Option<NonZeroU32>,
69 ) -> Self {
70 self.requested_max_keep_alive_count = Some(requested_max_keep_alive_count);
71 self
72 }
73
74 #[must_use]
81 pub const fn max_notifications_per_publish(
82 mut self,
83 max_notifications_per_publish: Option<NonZeroU32>,
84 ) -> Self {
85 self.max_notifications_per_publish = Some(max_notifications_per_publish);
86 self
87 }
88
89 #[must_use]
95 pub const fn publishing_enabled(mut self, publishing_enabled: bool) -> Self {
96 self.publishing_enabled = Some(publishing_enabled);
97 self
98 }
99
100 #[must_use]
106 pub const fn priority(mut self, priority: u8) -> Self {
107 self.priority = Some(priority);
108 self
109 }
110
111 pub async fn create(
117 self,
118 client: &AsyncClient,
119 ) -> Result<(ua::CreateSubscriptionResponse, AsyncSubscription)> {
120 let client = client.client();
121
122 let response = create_subscription(client, &self.into_request()).await?;
123
124 let subscription = AsyncSubscription {
125 client: Arc::downgrade(client),
126 subscription_id: response.subscription_id(),
127 };
128
129 Ok((response, subscription))
130 }
131
132 fn into_request(self) -> ua::CreateSubscriptionRequest {
133 let Self {
134 requested_publishing_interval,
135 requested_lifetime_count,
136 requested_max_keep_alive_count,
137 max_notifications_per_publish,
138 publishing_enabled,
139 priority,
140 } = self;
141
142 let mut request = ua::CreateSubscriptionRequest::default();
143
144 if let Some(requested_publishing_interval) = requested_publishing_interval {
145 request = request.with_requested_publishing_interval(requested_publishing_interval);
146 }
147 if let Some(requested_lifetime_count) = requested_lifetime_count {
148 request = request.with_requested_lifetime_count(requested_lifetime_count);
149 }
150 if let Some(requested_max_keep_alive_count) = requested_max_keep_alive_count {
151 request = request.with_requested_max_keep_alive_count(requested_max_keep_alive_count);
152 }
153 if let Some(max_notifications_per_publish) = max_notifications_per_publish {
154 request = request.with_max_notifications_per_publish(max_notifications_per_publish);
155 }
156 if let Some(publishing_enabled) = publishing_enabled {
157 request = request.with_publishing_enabled(publishing_enabled);
158 }
159 if let Some(priority) = priority {
160 request = request.with_priority(priority);
161 }
162
163 request
164 }
165}
166
167#[derive(Debug)]
169pub struct AsyncSubscription {
170 client: Weak<ua::Client>,
171 subscription_id: ua::SubscriptionId,
172}
173
174impl AsyncSubscription {
175 pub async fn create_monitored_item(&self, node_id: &ua::NodeId) -> Result<AsyncMonitoredItem> {
183 let results = MonitoredItemBuilder::new([node_id.clone()])
184 .create(self)
185 .await?;
186
187 let Ok::<[_; 1], _>([result]) = results.try_into() else {
189 return Err(Error::internal("expected exactly one monitored item"));
190 };
191
192 let (_, monitored_item) = result?;
194
195 Ok(monitored_item)
196 }
197
198 #[must_use]
199 pub(crate) const fn client(&self) -> &Weak<ua::Client> {
200 &self.client
201 }
202
203 #[must_use]
204 pub(crate) const fn subscription_id(&self) -> ua::SubscriptionId {
205 self.subscription_id
206 }
207}
208
209impl Drop for AsyncSubscription {
210 fn drop(&mut self) {
211 let Some(client) = self.client.upgrade() else {
212 return;
213 };
214
215 let request =
216 ua::DeleteSubscriptionsRequest::init().with_subscription_ids(&[self.subscription_id]);
217
218 delete_subscriptions(&client, &request);
219 }
220}
221
222async fn create_subscription(
223 client: &ua::Client,
224 request: &ua::CreateSubscriptionRequest,
225) -> Result<ua::CreateSubscriptionResponse> {
226 type Cb = CallbackOnce<std::result::Result<ua::CreateSubscriptionResponse, ua::StatusCode>>;
227
228 unsafe extern "C" fn callback_c(
229 _client: *mut UA_Client,
230 userdata: *mut c_void,
231 _request_id: UA_UInt32,
232 response: *mut c_void,
233 ) {
234 log::debug!("Subscriptions_create() completed");
235
236 let response = response.cast::<UA_CreateSubscriptionResponse>();
237 let response = unsafe { response.as_ref() }.expect("response should be set");
240 let status_code = ua::StatusCode::new(response.responseHeader.serviceResult);
241
242 let result = if status_code.is_good() {
243 Ok(ua::CreateSubscriptionResponse::clone_raw(response))
244 } else {
245 Err(status_code)
246 };
247
248 unsafe {
250 Cb::execute(userdata, result);
251 }
252 }
253
254 let (tx, rx) = oneshot::channel::<Result<ua::CreateSubscriptionResponse>>();
255
256 let callback = |result: std::result::Result<ua::CreateSubscriptionResponse, _>| {
257 let _unused = tx.send(result.map_err(Error::new));
261 };
262
263 let status_code = ua::StatusCode::new({
264 log::debug!("Calling Subscriptions_create()");
265
266 let request = unsafe { ua::CreateSubscriptionRequest::to_raw_copy(request) };
269
270 unsafe {
271 UA_Client_Subscriptions_create_async(
272 client.as_ptr().cast_mut(),
274 request,
275 ptr::null_mut(),
276 None,
277 None,
278 Some(callback_c),
279 Cb::prepare(callback),
280 ptr::null_mut(),
281 )
282 }
283 });
284 Error::verify_good(&status_code)?;
285
286 rx.await
290 .unwrap_or(Err(Error::internal("callback should send result")))
291}
292
293fn delete_subscriptions(client: &ua::Client, request: &ua::DeleteSubscriptionsRequest) {
294 unsafe extern "C" fn callback_c(
295 _client: *mut UA_Client,
296 _userdata: *mut c_void,
297 _request_id: UA_UInt32,
298 response: *mut c_void,
299 ) {
300 log::debug!("Subscriptions_delete() completed");
301
302 let response = response.cast::<UA_DeleteSubscriptionsResponse>();
303 let response = unsafe { response.as_ref() }.expect("response should be set");
306 let status_code = ua::StatusCode::new(response.responseHeader.serviceResult);
307
308 if let Err(error) = Error::verify_good(&status_code) {
309 log::warn!("Error in response when deleting subscriptions: {error}");
310 }
311 }
312
313 let status_code = ua::StatusCode::new({
314 log::debug!("Calling Subscriptions_delete()");
315
316 let request = unsafe { ua::DeleteSubscriptionsRequest::to_raw_copy(request) };
319
320 unsafe {
321 UA_Client_Subscriptions_delete_async(
322 client.as_ptr().cast_mut(),
324 request,
325 Some(callback_c),
326 ptr::null_mut(),
327 ptr::null_mut(),
328 )
329 }
330 });
331 if let Err(error) = Error::verify_good(&status_code) {
332 log::warn!("Error in request when deleting subscriptions: {error}");
333 }
334}