async-opcua-server 0.18.0

OPC UA server API
Documentation
use std::sync::Arc;

use crate::{
    node_manager::{consume_results, DynNodeManager, MethodCall, NodeManagers, RequestContext},
    session::{
        controller::Response,
        message_handler::Request,
        services::{invoke_service_concurrently_mut, ServiceCb},
    },
};
use opcua_types::{CallRequest, CallResponse, ResponseHeader, StatusCode};
use tracing::debug_span;
use tracing_futures::Instrument;

pub(crate) async fn call(node_managers: NodeManagers, request: Request<CallRequest>) -> Response {
    let context = request.context();
    let method_calls = take_service_items!(
        request,
        request.request.methods_to_call,
        request.info.operational_limits.max_nodes_per_method_call
    );

    let mut calls: Vec<_> = method_calls
        .into_iter()
        .map(|c| MethodCall::new(c, request.request.request_header.return_diagnostics))
        .collect();

    struct MethodServiceCb;

    impl ServiceCb<MethodCall> for MethodServiceCb {
        async fn call(
            &self,
            items: &mut [&mut MethodCall],
            node_manager: &Arc<DynNodeManager>,
            context: RequestContext,
        ) {
            if let Err(e) = node_manager
                .call(&context, &mut *items)
                .instrument(debug_span!("Call", node_manager = %node_manager.name()))
                .await
            {
                for call in items {
                    call.set_status(e);
                }
            }
        }
    }

    invoke_service_concurrently_mut(
        context,
        &mut calls,
        &node_managers,
        MethodServiceCb,
        |call, node_manager| {
            node_manager.owns_node(call.method_id())
                && call.status() == StatusCode::BadMethodInvalid
        },
    )
    .await;

    let (results, diagnostic_infos) =
        consume_results(calls, request.request.request_header.return_diagnostics);

    Response {
        message: CallResponse {
            response_header: ResponseHeader::new_good(request.request_handle),
            results,
            diagnostic_infos,
        }
        .into(),
        request_id: request.request_id,
    }
}