use std::{
ffi::c_void,
num::NonZeroU32,
ptr,
sync::{Arc, Weak},
time::Duration,
};
use futures_channel::oneshot;
use open62541_sys::{
UA_Client, UA_Client_Subscriptions_create_async, UA_Client_Subscriptions_delete_async,
UA_CreateSubscriptionResponse, UA_DeleteSubscriptionsResponse, UA_UInt32,
};
use crate::{
AsyncClient, CallbackOnce, DataType as _, Error, MonitoredItemCreateRequestBuilder,
MonitoredItemHandle, MonitoredItemKind, Result, create_monitored_items_callback, ua,
};
#[derive(Debug, Default)]
pub struct SubscriptionBuilder {
#[expect(clippy::option_option, reason = "implied default vs. unset")]
requested_publishing_interval: Option<Option<Duration>>,
requested_lifetime_count: Option<u32>,
#[expect(clippy::option_option, reason = "implied default vs. unset")]
requested_max_keep_alive_count: Option<Option<NonZeroU32>>,
#[expect(clippy::option_option, reason = "implied default vs. unset")]
max_notifications_per_publish: Option<Option<NonZeroU32>>,
publishing_enabled: Option<bool>,
priority: Option<u8>,
}
impl SubscriptionBuilder {
#[must_use]
pub const fn requested_publishing_interval(
mut self,
requested_publishing_interval: Option<Duration>,
) -> Self {
self.requested_publishing_interval = Some(requested_publishing_interval);
self
}
#[must_use]
pub const fn requested_lifetime_count(mut self, requested_lifetime_count: u32) -> Self {
self.requested_lifetime_count = Some(requested_lifetime_count);
self
}
#[must_use]
pub const fn requested_max_keep_alive_count(
mut self,
requested_max_keep_alive_count: Option<NonZeroU32>,
) -> Self {
self.requested_max_keep_alive_count = Some(requested_max_keep_alive_count);
self
}
#[must_use]
pub const fn max_notifications_per_publish(
mut self,
max_notifications_per_publish: Option<NonZeroU32>,
) -> Self {
self.max_notifications_per_publish = Some(max_notifications_per_publish);
self
}
#[must_use]
pub const fn publishing_enabled(mut self, publishing_enabled: bool) -> Self {
self.publishing_enabled = Some(publishing_enabled);
self
}
#[must_use]
pub const fn priority(mut self, priority: u8) -> Self {
self.priority = Some(priority);
self
}
pub async fn create(
self,
client: &AsyncClient,
) -> Result<(ua::CreateSubscriptionResponse, AsyncSubscription)> {
let client = client.client();
let response = create_subscription(client, &self.into_request()).await?;
let Some(subscription_id) = response.subscription_id() else {
return Err(Error::Internal("invalid subscription ID"));
};
let subscription = AsyncSubscription {
client: Arc::downgrade(client),
subscription_id,
};
Ok((response, subscription))
}
fn into_request(self) -> ua::CreateSubscriptionRequest {
let Self {
requested_publishing_interval,
requested_lifetime_count,
requested_max_keep_alive_count,
max_notifications_per_publish,
publishing_enabled,
priority,
} = self;
let mut request = ua::CreateSubscriptionRequest::default();
if let Some(requested_publishing_interval) = requested_publishing_interval {
request = request.with_requested_publishing_interval(requested_publishing_interval);
}
if let Some(requested_lifetime_count) = requested_lifetime_count {
request = request.with_requested_lifetime_count(requested_lifetime_count);
}
if let Some(requested_max_keep_alive_count) = requested_max_keep_alive_count {
request = request.with_requested_max_keep_alive_count(requested_max_keep_alive_count);
}
if let Some(max_notifications_per_publish) = max_notifications_per_publish {
request = request.with_max_notifications_per_publish(max_notifications_per_publish);
}
if let Some(publishing_enabled) = publishing_enabled {
request = request.with_publishing_enabled(publishing_enabled);
}
if let Some(priority) = priority {
request = request.with_priority(priority);
}
request
}
}
#[derive(Debug)]
pub struct AsyncSubscription {
client: Weak<ua::Client>,
subscription_id: ua::SubscriptionId,
}
impl AsyncSubscription {
#[cfg(feature = "tokio")]
pub async fn create_monitored_item(
&self,
node_id: &ua::NodeId,
) -> Result<crate::AsyncMonitoredItem> {
let request_builder = crate::MonitoredItemCreateRequestBuilder::new([node_id.clone()]);
let results = crate::AsyncMonitoredItem::create(self, request_builder).await?;
let Ok::<[_; 1], _>([result]) = results.try_into() else {
return Err(Error::internal("expected exactly one monitored item"));
};
let (_, monitored_item) = result?;
Ok(monitored_item)
}
#[must_use]
pub(crate) const fn client(&self) -> &Weak<ua::Client> {
&self.client
}
#[must_use]
pub const fn subscription_id(&self) -> ua::SubscriptionId {
self.subscription_id
}
pub async fn create_monitored_items_callback<K: MonitoredItemKind, F>(
&self,
request_builder: MonitoredItemCreateRequestBuilder<K>,
create_value_callback_fn: impl FnMut(usize) -> F,
) -> Result<Vec<Result<(ua::MonitoredItemCreateResult, MonitoredItemHandle)>>>
where
F: FnMut(K::Value) + 'static,
{
let client = AsyncClient::upgrade_weak(self.client())?;
let subscription_id = self.subscription_id();
create_monitored_items_callback(
&client,
subscription_id,
request_builder,
create_value_callback_fn,
)
.await
}
}
impl Drop for AsyncSubscription {
fn drop(&mut self) {
let Some(client) = self.client.upgrade() else {
return;
};
let request =
ua::DeleteSubscriptionsRequest::init().with_subscription_ids(&[self.subscription_id]);
delete_subscriptions(&client, &request);
}
}
async fn create_subscription(
client: &ua::Client,
request: &ua::CreateSubscriptionRequest,
) -> Result<ua::CreateSubscriptionResponse> {
type Cb = CallbackOnce<std::result::Result<ua::CreateSubscriptionResponse, ua::StatusCode>>;
unsafe extern "C" fn callback_c(
_client: *mut UA_Client,
userdata: *mut c_void,
_request_id: UA_UInt32,
response: *mut c_void,
) {
log::debug!("Subscriptions_create() completed");
let response = response.cast::<UA_CreateSubscriptionResponse>();
let response = unsafe { response.as_ref() }.expect("response should be set");
let status_code = ua::StatusCode::new(response.responseHeader.serviceResult);
let result = if status_code.is_good() {
Ok(ua::CreateSubscriptionResponse::clone_raw(response))
} else {
Err(status_code)
};
unsafe {
Cb::execute(userdata, result);
}
}
let (tx, rx) = oneshot::channel::<Result<ua::CreateSubscriptionResponse>>();
let callback = move |result: std::result::Result<ua::CreateSubscriptionResponse, _>| {
let _unused = tx.send(result.map_err(Error::new));
};
let status_code = ua::StatusCode::new({
log::debug!("Calling Subscriptions_create()");
let request = unsafe { ua::CreateSubscriptionRequest::to_raw_copy(request) };
unsafe {
UA_Client_Subscriptions_create_async(
client.as_ptr().cast_mut(),
request,
ptr::null_mut(),
None,
None,
Some(callback_c),
Cb::prepare(callback),
ptr::null_mut(),
)
}
});
Error::verify_good(&status_code)?;
rx.await
.unwrap_or(Err(Error::internal("callback should send result")))
}
fn delete_subscriptions(client: &ua::Client, request: &ua::DeleteSubscriptionsRequest) {
unsafe extern "C" fn callback_c(
_client: *mut UA_Client,
_userdata: *mut c_void,
_request_id: UA_UInt32,
response: *mut c_void,
) {
log::debug!("Subscriptions_delete() completed");
let response = response.cast::<UA_DeleteSubscriptionsResponse>();
let response = unsafe { response.as_ref() }.expect("response should be set");
let status_code = ua::StatusCode::new(response.responseHeader.serviceResult);
if let Err(error) = Error::verify_good(&status_code) {
log::warn!("Error in response when deleting subscriptions: {error}");
}
}
let status_code = ua::StatusCode::new({
log::debug!("Calling Subscriptions_delete()");
let request = unsafe { ua::DeleteSubscriptionsRequest::to_raw_copy(request) };
unsafe {
UA_Client_Subscriptions_delete_async(
client.as_ptr().cast_mut(),
request,
Some(callback_c),
ptr::null_mut(),
ptr::null_mut(),
)
}
});
if let Err(error) = Error::verify_good(&status_code) {
log::warn!("Error in request when deleting subscriptions: {error}");
}
}