open62541 0.10.1

High-level, safe bindings for the C99 library open62541, an open source and free implementation of OPC UA (OPC Unified Architecture).
Documentation
use std::{ffi::c_void, mem, ptr};

use futures_channel::oneshot;
use open62541_sys::{
    UA_Client, UA_Client_DataChangeNotificationCallback, UA_Client_DeleteMonitoredItemCallback,
    UA_Client_MonitoredItems_createDataChanges_async, UA_CreateMonitoredItemsResponse,
    UA_DataValue, UA_UInt32, UA_Variant,
};

use crate::{CallbackMut, CallbackOnce, DataType as _, Error, MonitoredItemKind, Result, ua};

type CbResponse =
    CallbackOnce<std::result::Result<ua::CreateMonitoredItemsResponse, ua::StatusCode>>;
type CbDataChange = CallbackMut<ua::DataValue>;
type CbEvent = CallbackMut<ua::Array<ua::Variant>>;

// Wrapper type so that we can mark `*mut c_void` for callbacks as safe to send.
#[repr(transparent)]
struct Context(*mut c_void);

// SAFETY: As long as payload is `Send`, wrapper is `Send`.
unsafe impl Send for Context
where
    CbDataChange: Send + Sync,
    CbEvent: Send + Sync,
{
}

/// Creates monitored items.
///
/// Notifications for monitored are handled by callback closures. The callback
/// closures are created by invoking `create_value_callback_fn` repeatedly while
/// preparing the items to create for the request. The argument of `create_value_callback_fn`
/// is an `index` that starts at 0 and is incremented by 1 consecutively, i.e. by enumerating
/// the items to be created.
//
// TODO: How to pass `&ua::MonitoredItemCreateRequest` as a second argument to
// `create_value_callback_fn`? `impl for<'a> FnMut(usize, &'a ua::MonitoredItemCreateRequest) -> F`
// doesn't work.
// See also: <https://rust-lang.github.io/rfcs/3216-closure-lifetime-binder.html>
pub(super) async fn call<K: MonitoredItemKind, F>(
    client: &ua::Client,
    request: &ua::CreateMonitoredItemsRequest,
    mut create_value_callback_fn: impl FnMut(usize) -> F,
) -> Result<ua::CreateMonitoredItemsResponse>
where
    F: FnMut(K::Value) + 'static,
{
    let (tx, rx) = oneshot::channel::<Result<ua::CreateMonitoredItemsResponse>>();

    let response_callback =
        move |result: std::result::Result<ua::CreateMonitoredItemsResponse, _>| {
            // We always send a result back via `tx` (in fact, `rx.await` below expects this). We do not
            // care if that succeeds though: the receiver might already have gone out of scope (when its
            // future has been cancelled) and we must not panic in FFI callbacks.
            let _unused = tx.send(result.map_err(Error::new));
        };

    let items_to_create = request.items_to_create().unwrap_or_default();

    let mut notification_callbacks: Vec<UA_Client_DataChangeNotificationCallback> =
        Vec::with_capacity(items_to_create.len());
    let mut delete_notification_callbacks: Vec<UA_Client_DeleteMonitoredItemCallback> =
        Vec::with_capacity(items_to_create.len());
    let mut contexts = Vec::with_capacity(items_to_create.len());

    for (item_index, item_to_create) in items_to_create.iter().enumerate() {
        // `open62541` requires one set of notification/delete callback and context per monitored
        // item in the request.
        let notification_type = NotificationType::for_request(item_to_create);

        let notification_callback = unsafe { notification_type.to_callback() };
        let delete_notification_callback = notification_type.to_delete_callback();

        // TODO: let value_callback = create_value_callback_fn(item_index, item_to_create);
        let value_callback_fn = create_value_callback_fn(item_index);
        let context = notification_type.to_context::<K>(value_callback_fn);

        // SAFETY: This cast is possible because `UA_Client_MonitoredItems_createDataChanges_async`
        // internally casts the function pointer back to the appropriate type before calling (union
        // type of attribute `handler` in `UA_Client_MonitoredItem`).
        notification_callbacks.push(Some(notification_callback));
        delete_notification_callbacks.push(Some(delete_notification_callback));
        contexts.push(context);
    }

    let status_code = ua::StatusCode::new({
        log::debug!(
            "Calling MonitoredItems_createDataChanges(), count={}",
            contexts.len()
        );

        // SAFETY: `UA_Client_MonitoredItems_createDataChanges_async()` expects the request passed
        // by value but does not take ownership.
        let request = unsafe { ua::CreateMonitoredItemsRequest::to_raw_copy(request) };

        unsafe {
            // This handles both data change and event notifications as determined by the monitored
            // attribute ID, delegating to `createDataChanges_async()` in both cases. We must still
            // make sure to pass the expected callback function in `notification_callbacks` above.
            UA_Client_MonitoredItems_createDataChanges_async(
                // SAFETY: Cast to `mut` pointer, function is marked `UA_THREADSAFE`.
                client.as_ptr().cast_mut(),
                request,
                contexts.as_mut_ptr().cast::<*mut c_void>(),
                notification_callbacks.as_mut_ptr(),
                delete_notification_callbacks.as_mut_ptr(),
                Some(response_callback_c),
                CbResponse::prepare(response_callback),
                ptr::null_mut(),
            )
        }
    });
    Error::verify_good(&status_code)?;

    // PANIC: When `callback` is called (which owns `tx`), we always call `tx.send()`. So the sender
    // is only dropped after placing a value into the channel and `rx.await` always finds this value
    // there.
    rx.await
        .unwrap_or(Err(Error::internal("callback should send result")))
}

enum NotificationType {
    DataChange,
    Event,
}

impl NotificationType {
    fn for_request(request: &ua::MonitoredItemCreateRequest) -> Self {
        if request.attribute_id() == ua::AttributeId::EVENTNOTIFIER {
            Self::Event
        } else {
            Self::DataChange
        }
    }

    /// Provides callback function for C call.
    ///
    /// # Safety
    ///
    /// This always returns a function pointer for [`UA_Client_DataChangeNotificationCallback`], for
    /// both data change _and_ event callbacks. Care must be taken to only pass the expected handler
    /// to the corresponding [`ua::MonitoredItemCreateRequest`], depending on the attribute ID.
    unsafe fn to_callback(&self) -> DataChangeCallbackC {
        match self {
            Self::DataChange => data_change_notification_callback_c,

            // This is rather unfortunate. Since we cannot call `createDataChanges_async()` directly
            // (it is not exported by open62541), we must use one of the two wrapper functions, i.e.
            // `UA_Client_MonitoredItems_create[DataChanges|Events]_async()`, instead. These wrapper
            // functions only adjust the types in the function signature and add a mutex lock. Thus,
            // apart from the fact that open62541 does some `void` pointer magic, the transmute here
            // is safe (at least not more unsafe/unportable than the underlying C code already is).
            Self::Event => unsafe {
                mem::transmute::<EventNotificationCallbackC, DataChangeCallbackC>(
                    event_notification_callback_c,
                )
            },
        }
    }

    fn to_delete_callback(&self) -> DeleteNotificationCallbackC {
        match self {
            Self::DataChange => delete_data_change_notification_callback_c,
            Self::Event => delete_event_notification_callback_c,
        }
    }

    fn to_context<K: MonitoredItemKind>(
        &self,
        mut value_callback_fn: impl FnMut(K::Value) + 'static,
    ) -> Context {
        match self {
            NotificationType::DataChange => {
                let data_change_callback = move |value| {
                    value_callback_fn(K::map_data_change(value));
                };
                Context(CallbackMut::prepare(data_change_callback))
            }
            NotificationType::Event => {
                let event_callback = move |value| {
                    value_callback_fn(K::map_event(value));
                };
                Context(CallbackMut::prepare(event_callback))
            }
        }
    }
}

type DataChangeCallbackC = unsafe extern "C" fn(
    client: *mut UA_Client,
    sub_id: UA_UInt32,
    sub_context: *mut c_void,
    mon_id: UA_UInt32,
    mon_context: *mut c_void,
    value: *mut UA_DataValue,
);

unsafe extern "C" fn data_change_notification_callback_c(
    _client: *mut UA_Client,
    _sub_id: UA_UInt32,
    _sub_context: *mut c_void,
    _mon_id: UA_UInt32,
    mon_context: *mut c_void,
    value: *mut UA_DataValue,
) {
    log::debug!("DataChangeNotificationCallback() was called");

    // SAFETY: Incoming pointer is valid for access.
    // PANIC: We expect pointer to be valid when called.
    let value = unsafe { value.as_ref() }.expect("value should be set");
    let value = ua::DataValue::clone_raw(value);

    // SAFETY: `mon_context` is result of `CbDataChange::prepare()` and is used only before `delete()`.
    unsafe {
        CbDataChange::execute(mon_context, value);
    }
}

type EventNotificationCallbackC = unsafe extern "C" fn(
    client: *mut UA_Client,
    sub_id: UA_UInt32,
    sub_context: *mut c_void,
    mon_id: UA_UInt32,
    mon_context: *mut c_void,
    n_event_fields: usize,
    event_fields: *mut UA_Variant,
);

unsafe extern "C" fn event_notification_callback_c(
    _client: *mut UA_Client,
    _sub_id: UA_UInt32,
    _sub_context: *mut c_void,
    _mon_id: UA_UInt32,
    mon_context: *mut c_void,
    n_event_fields: usize,
    event_fields: *mut UA_Variant,
) {
    log::debug!("EventNotificationCallback() was called");

    // PANIC: We expect pointer to be valid when called.
    let fields = ua::Array::from_raw_parts(n_event_fields, event_fields)
        .expect("event fields should be set");

    // SAFETY: `mon_context` is result of `CbEvent::prepare()` and is used only before `delete()`.
    unsafe {
        CbEvent::execute(mon_context, fields);
    }
}

type DeleteNotificationCallbackC = unsafe extern "C" fn(
    client: *mut UA_Client,
    sub_id: UA_UInt32,
    sub_context: *mut c_void,
    mon_id: UA_UInt32,
    mon_context: *mut c_void,
);

unsafe extern "C" fn delete_data_change_notification_callback_c(
    _client: *mut UA_Client,
    _sub_id: UA_UInt32,
    _sub_context: *mut c_void,
    _mon_id: UA_UInt32,
    mon_context: *mut c_void,
) {
    log::debug!("DeleteMonitoredItemCallback() for data change was called");

    // SAFETY: `mon_context` is result of `CbDataChange::prepare()` and is used only before `delete()`.
    unsafe {
        CbDataChange::delete(mon_context);
    }
}

unsafe extern "C" fn delete_event_notification_callback_c(
    _client: *mut UA_Client,
    _sub_id: UA_UInt32,
    _sub_context: *mut c_void,
    _mon_id: UA_UInt32,
    mon_context: *mut c_void,
) {
    log::debug!("DeleteMonitoredItemCallback() for event was called");

    // SAFETY: `mon_context` is result of `CbEvent::prepare()` and is used only before `delete()`.
    unsafe {
        CbEvent::delete(mon_context);
    }
}

unsafe extern "C" fn response_callback_c(
    _client: *mut UA_Client,
    userdata: *mut c_void,
    _request_id: UA_UInt32,
    response: *mut c_void,
) {
    log::debug!("MonitoredItems_createDataChanges() completed");

    let response = response.cast::<UA_CreateMonitoredItemsResponse>();
    // SAFETY: Incoming pointer is valid for access.
    // PANIC: We expect pointer to be valid when good.
    let response = unsafe { response.as_ref() }.expect("response should be set");
    let status_code = ua::StatusCode::new(response.responseHeader.serviceResult);

    let result = if status_code.is_good() {
        Ok(ua::CreateMonitoredItemsResponse::clone_raw(response))
    } else {
        Err(status_code)
    };

    // SAFETY: `userdata` is the result of `CbResponse::prepare()` and is used only once.
    unsafe {
        CbResponse::execute(userdata, result);
    }
}