use std::time::Duration;
use crate::{
session::{
process_service_result, process_unexpected_response,
request_builder::{builder_base, builder_debug, builder_error, RequestHeaderBuilder},
UARequest,
},
AsyncSecureChannel, Session,
};
use opcua_core::ResponseMessage;
use opcua_types::{
DataValue, DeleteAtTimeDetails, DeleteEventDetails, DeleteRawModifiedDetails, ExtensionObject,
HistoryReadRequest, HistoryReadResponse, HistoryReadResult, HistoryReadValueId,
HistoryUpdateRequest, HistoryUpdateResponse, HistoryUpdateResult, IntegerId, NodeId,
ReadAtTimeDetails, ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, ReadRequest,
ReadResponse, ReadValueId, StatusCode, TimestampsToReturn, UpdateDataDetails,
UpdateEventDetails, UpdateStructureDataDetails, WriteRequest, WriteResponse, WriteValue,
};
#[derive(Debug, Clone)]
pub enum HistoryReadAction {
ReadEventDetails(ReadEventDetails),
ReadRawModifiedDetails(ReadRawModifiedDetails),
ReadProcessedDetails(ReadProcessedDetails),
ReadAtTimeDetails(ReadAtTimeDetails),
}
impl From<HistoryReadAction> for ExtensionObject {
fn from(action: HistoryReadAction) -> Self {
match action {
HistoryReadAction::ReadEventDetails(v) => Self::from_message(v),
HistoryReadAction::ReadRawModifiedDetails(v) => Self::from_message(v),
HistoryReadAction::ReadProcessedDetails(v) => Self::from_message(v),
HistoryReadAction::ReadAtTimeDetails(v) => Self::from_message(v),
}
}
}
#[derive(Debug, Clone)]
pub enum HistoryUpdateAction {
UpdateDataDetails(UpdateDataDetails),
UpdateStructureDataDetails(UpdateStructureDataDetails),
UpdateEventDetails(UpdateEventDetails),
DeleteRawModifiedDetails(DeleteRawModifiedDetails),
DeleteAtTimeDetails(DeleteAtTimeDetails),
DeleteEventDetails(DeleteEventDetails),
}
impl From<UpdateDataDetails> for HistoryUpdateAction {
fn from(value: UpdateDataDetails) -> Self {
Self::UpdateDataDetails(value)
}
}
impl From<UpdateStructureDataDetails> for HistoryUpdateAction {
fn from(value: UpdateStructureDataDetails) -> Self {
Self::UpdateStructureDataDetails(value)
}
}
impl From<UpdateEventDetails> for HistoryUpdateAction {
fn from(value: UpdateEventDetails) -> Self {
Self::UpdateEventDetails(value)
}
}
impl From<DeleteRawModifiedDetails> for HistoryUpdateAction {
fn from(value: DeleteRawModifiedDetails) -> Self {
Self::DeleteRawModifiedDetails(value)
}
}
impl From<DeleteAtTimeDetails> for HistoryUpdateAction {
fn from(value: DeleteAtTimeDetails) -> Self {
Self::DeleteAtTimeDetails(value)
}
}
impl From<DeleteEventDetails> for HistoryUpdateAction {
fn from(value: DeleteEventDetails) -> Self {
Self::DeleteEventDetails(value)
}
}
impl From<HistoryUpdateAction> for ExtensionObject {
fn from(action: HistoryUpdateAction) -> Self {
match action {
HistoryUpdateAction::UpdateDataDetails(v) => Self::from_message(v),
HistoryUpdateAction::UpdateStructureDataDetails(v) => Self::from_message(v),
HistoryUpdateAction::UpdateEventDetails(v) => Self::from_message(v),
HistoryUpdateAction::DeleteRawModifiedDetails(v) => Self::from_message(v),
HistoryUpdateAction::DeleteAtTimeDetails(v) => Self::from_message(v),
HistoryUpdateAction::DeleteEventDetails(v) => Self::from_message(v),
}
}
}
#[derive(Debug, Clone)]
pub struct Read {
nodes_to_read: Vec<ReadValueId>,
timestamps_to_return: TimestampsToReturn,
max_age: f64,
header: RequestHeaderBuilder,
}
impl Read {
pub fn new(session: &Session) -> Self {
Self {
nodes_to_read: Vec::new(),
timestamps_to_return: TimestampsToReturn::Neither,
max_age: 0.0,
header: RequestHeaderBuilder::new_from_session(session),
}
}
pub fn new_manual(
session_id: u32,
timeout: Duration,
auth_token: NodeId,
request_handle: IntegerId,
) -> Self {
Self {
nodes_to_read: Vec::new(),
timestamps_to_return: TimestampsToReturn::Neither,
max_age: 0.0,
header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
}
}
pub fn timestamps_to_return(mut self, timestamps: TimestampsToReturn) -> Self {
self.timestamps_to_return = timestamps;
self
}
pub fn max_age(mut self, max_age: f64) -> Self {
self.max_age = max_age;
self
}
pub fn nodes_to_read(mut self, nodes_to_read: Vec<ReadValueId>) -> Self {
self.nodes_to_read = nodes_to_read;
self
}
pub fn node(mut self, node: ReadValueId) -> Self {
self.nodes_to_read.push(node);
self
}
}
builder_base!(Read);
impl UARequest for Read {
type Out = ReadResponse;
async fn send<'b>(self, channel: &'b AsyncSecureChannel) -> Result<Self::Out, StatusCode>
where
Self: 'b,
{
if self.nodes_to_read.is_empty() {
builder_error!(self, "read(), was not supplied with any nodes to read");
return Err(StatusCode::BadNothingToDo);
}
let request = ReadRequest {
request_header: self.header.header,
max_age: self.max_age,
timestamps_to_return: self.timestamps_to_return,
nodes_to_read: Some(self.nodes_to_read),
};
let response = channel.send(request, self.header.timeout).await?;
if let ResponseMessage::Read(response) = response {
builder_debug!(self, "read(), success");
process_service_result(&response.response_header)?;
Ok(*response)
} else {
builder_error!(self, "read() value failed");
Err(process_unexpected_response(response))
}
}
}
#[derive(Debug, Clone)]
pub struct HistoryRead {
details: HistoryReadAction,
timestamps_to_return: TimestampsToReturn,
release_continuation_points: bool,
nodes_to_read: Vec<HistoryReadValueId>,
header: RequestHeaderBuilder,
}
builder_base!(HistoryRead);
impl HistoryRead {
pub fn new(details: HistoryReadAction, session: &Session) -> Self {
Self {
details,
timestamps_to_return: TimestampsToReturn::Neither,
release_continuation_points: false,
nodes_to_read: Vec::new(),
header: RequestHeaderBuilder::new_from_session(session),
}
}
pub fn new_manual(
details: HistoryReadAction,
session_id: u32,
timeout: Duration,
auth_token: NodeId,
request_handle: IntegerId,
) -> Self {
Self {
details,
timestamps_to_return: TimestampsToReturn::Neither,
release_continuation_points: false,
nodes_to_read: Vec::new(),
header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
}
}
pub fn timestamps_to_return(mut self, timestamps: TimestampsToReturn) -> Self {
self.timestamps_to_return = timestamps;
self
}
pub fn release_continuation_points(mut self, release_continuation_points: bool) -> Self {
self.release_continuation_points = release_continuation_points;
self
}
pub fn nodes_to_read(mut self, nodes_to_read: Vec<HistoryReadValueId>) -> Self {
self.nodes_to_read = nodes_to_read;
self
}
pub fn node(mut self, node: HistoryReadValueId) -> Self {
self.nodes_to_read.push(node);
self
}
}
impl UARequest for HistoryRead {
type Out = HistoryReadResponse;
async fn send<'b>(self, channel: &'b AsyncSecureChannel) -> Result<Self::Out, StatusCode>
where
Self: 'b,
{
let history_read_details = ExtensionObject::from(self.details);
builder_debug!(
self,
"history_read() requested to read nodes {:?}",
self.nodes_to_read
);
let request = HistoryReadRequest {
request_header: self.header.header,
history_read_details,
timestamps_to_return: self.timestamps_to_return,
release_continuation_points: self.release_continuation_points,
nodes_to_read: if self.nodes_to_read.is_empty() {
None
} else {
Some(self.nodes_to_read)
},
};
let response = channel.send(request, self.header.timeout).await?;
if let ResponseMessage::HistoryRead(response) = response {
builder_debug!(self, "history_read(), success");
process_service_result(&response.response_header)?;
Ok(*response)
} else {
builder_error!(self, "history_read() value failed");
Err(process_unexpected_response(response))
}
}
}
#[derive(Debug, Clone)]
pub struct Write {
nodes_to_write: Vec<WriteValue>,
header: RequestHeaderBuilder,
}
builder_base!(Write);
impl Write {
pub fn new(session: &Session) -> Self {
Self {
nodes_to_write: Vec::new(),
header: RequestHeaderBuilder::new_from_session(session),
}
}
pub fn new_manual(
session_id: u32,
timeout: Duration,
auth_token: NodeId,
request_handle: IntegerId,
) -> Self {
Self {
nodes_to_write: Vec::new(),
header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
}
}
pub fn nodes_to_write(mut self, nodes_to_write: Vec<WriteValue>) -> Self {
self.nodes_to_write = nodes_to_write;
self
}
pub fn node(mut self, node: impl Into<WriteValue>) -> Self {
self.nodes_to_write.push(node.into());
self
}
}
impl UARequest for Write {
type Out = WriteResponse;
async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
where
Self: 'a,
{
if self.nodes_to_write.is_empty() {
builder_error!(self, "write() was not supplied with any nodes to write");
return Err(StatusCode::BadNothingToDo);
}
let request = WriteRequest {
request_header: self.header.header,
nodes_to_write: Some(self.nodes_to_write.to_vec()),
};
let response = channel.send(request, self.header.timeout).await?;
if let ResponseMessage::Write(response) = response {
builder_debug!(self, "write(), success");
process_service_result(&response.response_header)?;
Ok(*response)
} else {
builder_error!(self, "write() failed {:?}", response);
Err(process_unexpected_response(response))
}
}
}
#[derive(Debug, Clone)]
pub struct HistoryUpdate {
details: Vec<HistoryUpdateAction>,
header: RequestHeaderBuilder,
}
builder_base!(HistoryUpdate);
impl HistoryUpdate {
pub fn new(session: &Session) -> Self {
Self {
details: Vec::new(),
header: RequestHeaderBuilder::new_from_session(session),
}
}
pub fn new_manual(
session_id: u32,
timeout: Duration,
auth_token: NodeId,
request_handle: IntegerId,
) -> Self {
Self {
details: Vec::new(),
header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
}
}
pub fn details(mut self, details: Vec<HistoryUpdateAction>) -> Self {
self.details = details;
self
}
pub fn action(mut self, action: impl Into<HistoryUpdateAction>) -> Self {
self.details.push(action.into());
self
}
}
impl UARequest for HistoryUpdate {
type Out = HistoryUpdateResponse;
async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
where
Self: 'a,
{
if self.details.is_empty() {
builder_error!(
self,
"history_update(), was not supplied with any detail to update"
);
return Err(StatusCode::BadNothingToDo);
}
let details = self
.details
.into_iter()
.map(ExtensionObject::from)
.collect();
let request = HistoryUpdateRequest {
request_header: self.header.header,
history_update_details: Some(details),
};
let response = channel.send(request, self.header.timeout).await?;
if let ResponseMessage::HistoryUpdate(response) = response {
builder_error!(self, "history_update(), success");
process_service_result(&response.response_header)?;
Ok(*response)
} else {
builder_error!(self, "history_update() failed {:?}", response);
Err(process_unexpected_response(response))
}
}
}
impl Session {
pub async fn read(
&self,
nodes_to_read: &[ReadValueId],
timestamps_to_return: TimestampsToReturn,
max_age: f64,
) -> Result<Vec<DataValue>, StatusCode> {
Ok(Read::new(self)
.nodes_to_read(nodes_to_read.to_vec())
.timestamps_to_return(timestamps_to_return)
.max_age(max_age)
.send(&self.channel)
.await?
.results
.unwrap_or_default())
}
pub async fn history_read(
&self,
history_read_details: HistoryReadAction,
timestamps_to_return: TimestampsToReturn,
release_continuation_points: bool,
nodes_to_read: &[HistoryReadValueId],
) -> Result<Vec<HistoryReadResult>, StatusCode> {
Ok(HistoryRead::new(history_read_details, self)
.timestamps_to_return(timestamps_to_return)
.release_continuation_points(release_continuation_points)
.nodes_to_read(nodes_to_read.to_vec())
.send(&self.channel)
.await?
.results
.unwrap_or_default())
}
pub async fn write(
&self,
nodes_to_write: &[WriteValue],
) -> Result<Vec<StatusCode>, StatusCode> {
Ok(Write::new(self)
.nodes_to_write(nodes_to_write.to_vec())
.send(&self.channel)
.await?
.results
.unwrap_or_default())
}
pub async fn history_update(
&self,
history_update_details: &[HistoryUpdateAction],
) -> Result<Vec<HistoryUpdateResult>, StatusCode> {
Ok(HistoryUpdate::new(self)
.details(history_update_details.to_vec())
.send(&self.channel)
.await?
.results
.unwrap_or_default())
}
}