use std::{ffi::c_void, mem, ptr};
use futures_channel::oneshot;
use open62541_sys::{
UA_Client, UA_Client_DataChangeNotificationCallback, UA_Client_DeleteMonitoredItemCallback,
UA_Client_MonitoredItems_createDataChanges_async, UA_CreateMonitoredItemsResponse,
UA_DataValue, UA_UInt32, UA_Variant,
};
use crate::{CallbackMut, CallbackOnce, DataType as _, Error, MonitoredItemKind, Result, ua};
type CbResponse =
CallbackOnce<std::result::Result<ua::CreateMonitoredItemsResponse, ua::StatusCode>>;
type CbDataChange = CallbackMut<ua::DataValue>;
type CbEvent = CallbackMut<ua::Array<ua::Variant>>;
#[repr(transparent)]
struct Context(*mut c_void);
unsafe impl Send for Context
where
CbDataChange: Send + Sync,
CbEvent: Send + Sync,
{
}
pub(super) async fn call<K: MonitoredItemKind, F>(
client: &ua::Client,
request: &ua::CreateMonitoredItemsRequest,
mut create_value_callback_fn: impl FnMut(usize) -> F,
) -> Result<ua::CreateMonitoredItemsResponse>
where
F: FnMut(K::Value) + 'static,
{
let (tx, rx) = oneshot::channel::<Result<ua::CreateMonitoredItemsResponse>>();
let response_callback =
move |result: std::result::Result<ua::CreateMonitoredItemsResponse, _>| {
let _unused = tx.send(result.map_err(Error::new));
};
let items_to_create = request.items_to_create().unwrap_or_default();
let mut notification_callbacks: Vec<UA_Client_DataChangeNotificationCallback> =
Vec::with_capacity(items_to_create.len());
let mut delete_notification_callbacks: Vec<UA_Client_DeleteMonitoredItemCallback> =
Vec::with_capacity(items_to_create.len());
let mut contexts = Vec::with_capacity(items_to_create.len());
for (item_index, item_to_create) in items_to_create.iter().enumerate() {
let notification_type = NotificationType::for_request(item_to_create);
let notification_callback = unsafe { notification_type.to_callback() };
let delete_notification_callback = notification_type.to_delete_callback();
let value_callback_fn = create_value_callback_fn(item_index);
let context = notification_type.to_context::<K>(value_callback_fn);
notification_callbacks.push(Some(notification_callback));
delete_notification_callbacks.push(Some(delete_notification_callback));
contexts.push(context);
}
let status_code = ua::StatusCode::new({
log::debug!(
"Calling MonitoredItems_createDataChanges(), count={}",
contexts.len()
);
let request = unsafe { ua::CreateMonitoredItemsRequest::to_raw_copy(request) };
unsafe {
UA_Client_MonitoredItems_createDataChanges_async(
client.as_ptr().cast_mut(),
request,
contexts.as_mut_ptr().cast::<*mut c_void>(),
notification_callbacks.as_mut_ptr(),
delete_notification_callbacks.as_mut_ptr(),
Some(response_callback_c),
CbResponse::prepare(response_callback),
ptr::null_mut(),
)
}
});
Error::verify_good(&status_code)?;
rx.await
.unwrap_or(Err(Error::internal("callback should send result")))
}
enum NotificationType {
DataChange,
Event,
}
impl NotificationType {
fn for_request(request: &ua::MonitoredItemCreateRequest) -> Self {
if request.attribute_id() == ua::AttributeId::EVENTNOTIFIER {
Self::Event
} else {
Self::DataChange
}
}
unsafe fn to_callback(&self) -> DataChangeCallbackC {
match self {
Self::DataChange => data_change_notification_callback_c,
Self::Event => unsafe {
mem::transmute::<EventNotificationCallbackC, DataChangeCallbackC>(
event_notification_callback_c,
)
},
}
}
fn to_delete_callback(&self) -> DeleteNotificationCallbackC {
match self {
Self::DataChange => delete_data_change_notification_callback_c,
Self::Event => delete_event_notification_callback_c,
}
}
fn to_context<K: MonitoredItemKind>(
&self,
mut value_callback_fn: impl FnMut(K::Value) + 'static,
) -> Context {
match self {
NotificationType::DataChange => {
let data_change_callback = move |value| {
value_callback_fn(K::map_data_change(value));
};
Context(CallbackMut::prepare(data_change_callback))
}
NotificationType::Event => {
let event_callback = move |value| {
value_callback_fn(K::map_event(value));
};
Context(CallbackMut::prepare(event_callback))
}
}
}
}
type DataChangeCallbackC = unsafe extern "C" fn(
client: *mut UA_Client,
sub_id: UA_UInt32,
sub_context: *mut c_void,
mon_id: UA_UInt32,
mon_context: *mut c_void,
value: *mut UA_DataValue,
);
unsafe extern "C" fn data_change_notification_callback_c(
_client: *mut UA_Client,
_sub_id: UA_UInt32,
_sub_context: *mut c_void,
_mon_id: UA_UInt32,
mon_context: *mut c_void,
value: *mut UA_DataValue,
) {
log::debug!("DataChangeNotificationCallback() was called");
let value = unsafe { value.as_ref() }.expect("value should be set");
let value = ua::DataValue::clone_raw(value);
unsafe {
CbDataChange::execute(mon_context, value);
}
}
type EventNotificationCallbackC = unsafe extern "C" fn(
client: *mut UA_Client,
sub_id: UA_UInt32,
sub_context: *mut c_void,
mon_id: UA_UInt32,
mon_context: *mut c_void,
n_event_fields: usize,
event_fields: *mut UA_Variant,
);
unsafe extern "C" fn event_notification_callback_c(
_client: *mut UA_Client,
_sub_id: UA_UInt32,
_sub_context: *mut c_void,
_mon_id: UA_UInt32,
mon_context: *mut c_void,
n_event_fields: usize,
event_fields: *mut UA_Variant,
) {
log::debug!("EventNotificationCallback() was called");
let fields = ua::Array::from_raw_parts(n_event_fields, event_fields)
.expect("event fields should be set");
unsafe {
CbEvent::execute(mon_context, fields);
}
}
type DeleteNotificationCallbackC = unsafe extern "C" fn(
client: *mut UA_Client,
sub_id: UA_UInt32,
sub_context: *mut c_void,
mon_id: UA_UInt32,
mon_context: *mut c_void,
);
unsafe extern "C" fn delete_data_change_notification_callback_c(
_client: *mut UA_Client,
_sub_id: UA_UInt32,
_sub_context: *mut c_void,
_mon_id: UA_UInt32,
mon_context: *mut c_void,
) {
log::debug!("DeleteMonitoredItemCallback() for data change was called");
unsafe {
CbDataChange::delete(mon_context);
}
}
unsafe extern "C" fn delete_event_notification_callback_c(
_client: *mut UA_Client,
_sub_id: UA_UInt32,
_sub_context: *mut c_void,
_mon_id: UA_UInt32,
mon_context: *mut c_void,
) {
log::debug!("DeleteMonitoredItemCallback() for event was called");
unsafe {
CbEvent::delete(mon_context);
}
}
unsafe extern "C" fn response_callback_c(
_client: *mut UA_Client,
userdata: *mut c_void,
_request_id: UA_UInt32,
response: *mut c_void,
) {
log::debug!("MonitoredItems_createDataChanges() completed");
let response = response.cast::<UA_CreateMonitoredItemsResponse>();
let response = unsafe { response.as_ref() }.expect("response should be set");
let status_code = ua::StatusCode::new(response.responseHeader.serviceResult);
let result = if status_code.is_good() {
Ok(ua::CreateMonitoredItemsResponse::clone_raw(response))
} else {
Err(status_code)
};
unsafe {
CbResponse::execute(userdata, result);
}
}