mod create_request_builder;
pub(crate) mod create_monitored_items;
pub(crate) mod delete_monitored_items;
use std::{
marker::PhantomData,
sync::{Arc, Weak},
};
use crate::{AsyncClient, Attribute, DataType as _, DataValue, Error, Result, attributes, ua};
pub use self::create_request_builder::MonitoredItemCreateRequestBuilder;
#[derive(Debug)]
pub struct MonitoredItemHandle {
client: Weak<ua::Client>,
subscription_id: ua::SubscriptionId,
monitored_item_id: Option<ua::MonitoredItemId>,
}
impl MonitoredItemHandle {
pub(crate) fn new(
client: &Arc<ua::Client>,
subscription_id: ua::SubscriptionId,
monitored_item_id: ua::MonitoredItemId,
) -> Self {
Self {
client: Arc::downgrade(client),
subscription_id,
monitored_item_id: Some(monitored_item_id),
}
}
#[must_use]
pub const fn subscription_id(&self) -> ua::SubscriptionId {
self.subscription_id
}
#[must_use]
pub const fn monitored_item_id(&self) -> Option<ua::MonitoredItemId> {
self.monitored_item_id
}
fn before_delete(&mut self) -> Result<(ua::DeleteMonitoredItemsRequest, ua::MonitoredItemId)> {
let Some(monitored_item_id) = self.monitored_item_id.take() else {
return Err(Error::internal("already deleted"));
};
let request = ua::DeleteMonitoredItemsRequest::init()
.with_subscription_id(self.subscription_id)
.with_monitored_item_ids(&[monitored_item_id]);
Ok((request, monitored_item_id))
}
fn after_delete_failed(&mut self, monitored_item_id: ua::MonitoredItemId) {
debug_assert!(self.monitored_item_id.is_none());
self.monitored_item_id = Some(monitored_item_id);
}
pub async fn delete(&mut self) -> Result<ua::DeleteMonitoredItemsResponse> {
let (request, monitored_item_id) = self.before_delete()?;
let client = AsyncClient::upgrade_weak(&self.client)?;
log::debug!(
"Delete monitored item {monitored_item_id} of subscription {subscription_id}",
subscription_id = self.subscription_id
);
delete_monitored_items::call(&client, &request)
.await
.inspect_err(|_| {
self.after_delete_failed(monitored_item_id);
})
}
}
impl Drop for MonitoredItemHandle {
fn drop(&mut self) {
let Ok((request, monitored_item_id)) = self.before_delete() else {
return;
};
let Some(client) = self.client.upgrade() else {
log::debug!("Cannot delete monitored_item {request:?} on drop without client");
return;
};
log::debug!(
"Delete monitored item {monitored_item_id} of subscription {subscription_id} on drop",
subscription_id = self.subscription_id
);
if let Err(err) = delete_monitored_items::send_request(&client, &request) {
log::warn!(
"Failed to sent request for deleting monitored item {request:?} on drop: {err:#}"
);
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MonitoredItemValue(MonitoredItemValueInner);
impl MonitoredItemValue {
#[must_use]
pub(crate) const fn data_change(value: ua::DataValue) -> Self {
Self(MonitoredItemValueInner::DataChange { value })
}
#[must_use]
pub(crate) const fn event(fields: ua::Array<ua::Variant>) -> Self {
Self(MonitoredItemValueInner::Event { fields })
}
#[must_use]
pub const fn value(&self) -> Option<&ua::DataValue> {
match &self.0 {
MonitoredItemValueInner::DataChange { value } => Some(value),
MonitoredItemValueInner::Event { fields: _ } => None,
}
}
#[must_use]
pub const fn fields(&self) -> Option<&[ua::Variant]> {
match &self.0 {
MonitoredItemValueInner::DataChange { value: _ } => None,
MonitoredItemValueInner::Event { fields } => Some(fields.as_slice()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum MonitoredItemValueInner {
DataChange { value: ua::DataValue },
Event { fields: ua::Array<ua::Variant> },
}
pub trait MonitoredItemKind: sealed::MonitoredItemKind + Send + Sync + 'static {
type Value: Send;
fn map_data_change(value: ua::DataValue) -> Self::Value;
fn map_event(fields: ua::Array<ua::Variant>) -> Self::Value;
}
#[derive(Debug)]
pub struct DataChange<T: Attribute>(PhantomData<T>);
impl<T: DataChangeAttribute + Send + Sync + 'static> MonitoredItemKind for DataChange<T> {
type Value = DataValue<T::Value>;
fn map_data_change(value: ua::DataValue) -> Self::Value {
value.cast()
}
fn map_event(_fields: ua::Array<ua::Variant>) -> Self::Value {
unreachable!("unexpected event payload in data change notification");
}
}
#[derive(Debug)]
pub struct Event;
impl MonitoredItemKind for Event {
type Value = ua::Array<ua::Variant>;
fn map_data_change(_value: ua::DataValue) -> Self::Value {
unreachable!("unexpected data change payload in event notification");
}
fn map_event(fields: ua::Array<ua::Variant>) -> Self::Value {
fields
}
}
#[derive(Debug)]
pub struct Unknown;
impl MonitoredItemKind for Unknown {
type Value = MonitoredItemValue;
fn map_data_change(value: ua::DataValue) -> Self::Value {
Self::Value::data_change(value)
}
fn map_event(fields: ua::Array<ua::Variant>) -> Self::Value {
Self::Value::event(fields)
}
}
trait DataChangeAttribute: Attribute {}
pub trait MonitoredItemAttribute: Attribute {
type Kind: MonitoredItemKind;
}
macro_rules! data_change_impl {
($($name:ident),* $(,)?) => {
$(
impl DataChangeAttribute for $crate::attributes::$name {}
impl MonitoredItemAttribute for $crate::attributes::$name {
type Kind = DataChange<$crate::attributes::$name>;
}
)*
};
}
data_change_impl!(
NodeId,
NodeClass,
BrowseName,
DisplayName,
Description,
WriteMask,
UserWriteMask,
IsAbstract,
Symmetric,
InverseName,
ContainsNoLoops,
Value,
DataType,
ValueRank,
AccessLevel,
UserAccessLevel,
MinimumSamplingInterval,
Historizing,
Executable,
UserExecutable,
DataTypeDefinition,
AccessRestrictions,
AccessLevelEx,
);
impl MonitoredItemAttribute for attributes::EventNotifier {
type Kind = Event;
}
mod sealed {
use crate::Attribute;
pub trait MonitoredItemKind {}
impl<T: Attribute> MonitoredItemKind for super::DataChange<T> {}
impl MonitoredItemKind for super::Event {}
impl MonitoredItemKind for super::Unknown {}
}
pub(crate) async fn create_monitored_items_callback<K: MonitoredItemKind, F>(
client: &Arc<ua::Client>,
subscription_id: ua::SubscriptionId,
request_builder: MonitoredItemCreateRequestBuilder<K>,
create_value_callback_fn: impl FnMut(usize) -> F,
) -> crate::Result<Vec<crate::Result<(ua::MonitoredItemCreateResult, MonitoredItemHandle)>>>
where
F: FnMut(K::Value) + 'static,
{
let request = request_builder.build(subscription_id);
let result_count = request.items_to_create().map_or(0, <[_]>::len);
let response =
create_monitored_items::call::<K, _>(client, &request, create_value_callback_fn).await?;
let Some(mut results) = response.into_results() else {
return Err(crate::Error::internal("expected monitoring item results"));
};
if results.len() != result_count {
let monitored_item_ids = results
.iter()
.filter(|result| result.status_code().is_good())
.filter_map(ua::MonitoredItemCreateResult::monitored_item_id)
.collect::<Vec<_>>();
let request = ua::DeleteMonitoredItemsRequest::init()
.with_subscription_id(subscription_id)
.with_monitored_item_ids(&monitored_item_ids);
if let Err(err) = delete_monitored_items::call(client, &request).await {
log::warn!("Failed to delete monitored items when cleaning up: {err:#}");
}
return Err(crate::Error::internal(
"unexpected number of monitored items",
));
}
let results = results
.drain_all()
.filter_map(|result| {
if let Err(err) = crate::Error::verify_good(&result.status_code()) {
return Some(Err(err));
}
let monitored_item_id = result.monitored_item_id()?;
let handle = MonitoredItemHandle::new(client, subscription_id, monitored_item_id);
Some(Ok((result, handle)))
})
.collect();
Ok(results)
}