Skip to main content

opcua_client/session/services/
attributes.rs

1use std::time::Duration;
2
3use crate::{
4    session::{
5        process_service_result, process_unexpected_response,
6        request_builder::{builder_base, builder_debug, builder_error, RequestHeaderBuilder},
7        UARequest,
8    },
9    AsyncSecureChannel, Session,
10};
11use opcua_core::ResponseMessage;
12use opcua_types::{
13    DataValue, DeleteAtTimeDetails, DeleteEventDetails, DeleteRawModifiedDetails, ExtensionObject,
14    HistoryReadRequest, HistoryReadResponse, HistoryReadResult, HistoryReadValueId,
15    HistoryUpdateRequest, HistoryUpdateResponse, HistoryUpdateResult, IntegerId, NodeId,
16    ReadAtTimeDetails, ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, ReadRequest,
17    ReadResponse, ReadValueId, StatusCode, TimestampsToReturn, UpdateDataDetails,
18    UpdateEventDetails, UpdateStructureDataDetails, WriteRequest, WriteResponse, WriteValue,
19};
20
21/// Enumeration used with Session::history_read()
22#[derive(Debug, Clone)]
23pub enum HistoryReadAction {
24    /// Read historical events.
25    ReadEventDetails(ReadEventDetails),
26    /// Read raw data values.
27    ReadRawModifiedDetails(ReadRawModifiedDetails),
28    /// Read data values with processing.
29    ReadProcessedDetails(ReadProcessedDetails),
30    /// Read data values at specific timestamps.
31    ReadAtTimeDetails(ReadAtTimeDetails),
32}
33
34impl From<HistoryReadAction> for ExtensionObject {
35    fn from(action: HistoryReadAction) -> Self {
36        match action {
37            HistoryReadAction::ReadEventDetails(v) => Self::from_message(v),
38            HistoryReadAction::ReadRawModifiedDetails(v) => Self::from_message(v),
39            HistoryReadAction::ReadProcessedDetails(v) => Self::from_message(v),
40            HistoryReadAction::ReadAtTimeDetails(v) => Self::from_message(v),
41        }
42    }
43}
44
45/// Enumeration used with Session::history_update()
46#[derive(Debug, Clone)]
47pub enum HistoryUpdateAction {
48    /// Update historical data values.
49    UpdateDataDetails(UpdateDataDetails),
50    /// Update historical structures.
51    UpdateStructureDataDetails(UpdateStructureDataDetails),
52    /// Update historical events.
53    UpdateEventDetails(UpdateEventDetails),
54    /// Delete raw data values.
55    DeleteRawModifiedDetails(DeleteRawModifiedDetails),
56    /// Delete data values at specific timestamps.
57    DeleteAtTimeDetails(DeleteAtTimeDetails),
58    /// Delete historical events.
59    DeleteEventDetails(DeleteEventDetails),
60}
61
62impl From<UpdateDataDetails> for HistoryUpdateAction {
63    fn from(value: UpdateDataDetails) -> Self {
64        Self::UpdateDataDetails(value)
65    }
66}
67impl From<UpdateStructureDataDetails> for HistoryUpdateAction {
68    fn from(value: UpdateStructureDataDetails) -> Self {
69        Self::UpdateStructureDataDetails(value)
70    }
71}
72impl From<UpdateEventDetails> for HistoryUpdateAction {
73    fn from(value: UpdateEventDetails) -> Self {
74        Self::UpdateEventDetails(value)
75    }
76}
77impl From<DeleteRawModifiedDetails> for HistoryUpdateAction {
78    fn from(value: DeleteRawModifiedDetails) -> Self {
79        Self::DeleteRawModifiedDetails(value)
80    }
81}
82impl From<DeleteAtTimeDetails> for HistoryUpdateAction {
83    fn from(value: DeleteAtTimeDetails) -> Self {
84        Self::DeleteAtTimeDetails(value)
85    }
86}
87impl From<DeleteEventDetails> for HistoryUpdateAction {
88    fn from(value: DeleteEventDetails) -> Self {
89        Self::DeleteEventDetails(value)
90    }
91}
92
93impl From<HistoryUpdateAction> for ExtensionObject {
94    fn from(action: HistoryUpdateAction) -> Self {
95        match action {
96            HistoryUpdateAction::UpdateDataDetails(v) => Self::from_message(v),
97            HistoryUpdateAction::UpdateStructureDataDetails(v) => Self::from_message(v),
98            HistoryUpdateAction::UpdateEventDetails(v) => Self::from_message(v),
99            HistoryUpdateAction::DeleteRawModifiedDetails(v) => Self::from_message(v),
100            HistoryUpdateAction::DeleteAtTimeDetails(v) => Self::from_message(v),
101            HistoryUpdateAction::DeleteEventDetails(v) => Self::from_message(v),
102        }
103    }
104}
105
106/// Builder for a call to the `Read` service.
107///
108/// See OPC UA Part 4 - Services 5.10.2 for complete description of the service and error responses.
109#[derive(Debug, Clone)]
110pub struct Read {
111    nodes_to_read: Vec<ReadValueId>,
112    timestamps_to_return: TimestampsToReturn,
113    max_age: f64,
114
115    header: RequestHeaderBuilder,
116}
117
118impl Read {
119    /// Construct a new call to the `Read` service.
120    pub fn new(session: &Session) -> Self {
121        Self {
122            nodes_to_read: Vec::new(),
123            timestamps_to_return: TimestampsToReturn::Neither,
124            max_age: 0.0,
125            header: RequestHeaderBuilder::new_from_session(session),
126        }
127    }
128
129    /// Construct a new call to the `Read` service, setting header parameters manually.
130    pub fn new_manual(
131        session_id: u32,
132        timeout: Duration,
133        auth_token: NodeId,
134        request_handle: IntegerId,
135    ) -> Self {
136        Self {
137            nodes_to_read: Vec::new(),
138            timestamps_to_return: TimestampsToReturn::Neither,
139            max_age: 0.0,
140            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
141        }
142    }
143
144    /// Set timestamps to return.
145    pub fn timestamps_to_return(mut self, timestamps: TimestampsToReturn) -> Self {
146        self.timestamps_to_return = timestamps;
147        self
148    }
149
150    /// Set max age.
151    pub fn max_age(mut self, max_age: f64) -> Self {
152        self.max_age = max_age;
153        self
154    }
155
156    /// Set nodes to read, overwriting any that were set previously.
157    pub fn nodes_to_read(mut self, nodes_to_read: Vec<ReadValueId>) -> Self {
158        self.nodes_to_read = nodes_to_read;
159        self
160    }
161
162    /// Add a node to read.
163    pub fn node(mut self, node: ReadValueId) -> Self {
164        self.nodes_to_read.push(node);
165        self
166    }
167}
168
169builder_base!(Read);
170
171impl UARequest for Read {
172    type Out = ReadResponse;
173
174    async fn send<'b>(self, channel: &'b AsyncSecureChannel) -> Result<Self::Out, StatusCode>
175    where
176        Self: 'b,
177    {
178        if self.nodes_to_read.is_empty() {
179            builder_error!(self, "read(), was not supplied with any nodes to read");
180            return Err(StatusCode::BadNothingToDo);
181        }
182        let request = ReadRequest {
183            request_header: self.header.header,
184            max_age: self.max_age,
185            timestamps_to_return: self.timestamps_to_return,
186            nodes_to_read: Some(self.nodes_to_read),
187        };
188        let response = channel.send(request, self.header.timeout).await?;
189        if let ResponseMessage::Read(response) = response {
190            builder_debug!(self, "read(), success");
191            process_service_result(&response.response_header)?;
192            Ok(*response)
193        } else {
194            builder_error!(self, "read() value failed");
195            Err(process_unexpected_response(response))
196        }
197    }
198}
199
200#[derive(Debug, Clone)]
201/// Reads historical values or events of one or more nodes. The caller is expected to provide
202/// a HistoryReadAction enum which must be one of the following:
203///
204/// * [`ReadEventDetails`]
205/// * [`ReadRawModifiedDetails`]
206/// * [`ReadProcessedDetails`]
207/// * [`ReadAtTimeDetails`]
208///
209/// See OPC UA Part 4 - Services 5.10.3 for complete description of the service and error responses.
210pub struct HistoryRead {
211    details: HistoryReadAction,
212    timestamps_to_return: TimestampsToReturn,
213    release_continuation_points: bool,
214    nodes_to_read: Vec<HistoryReadValueId>,
215
216    header: RequestHeaderBuilder,
217}
218
219builder_base!(HistoryRead);
220
221impl HistoryRead {
222    /// Create a new `HistoryRead` request.
223    pub fn new(details: HistoryReadAction, session: &Session) -> Self {
224        Self {
225            details,
226            timestamps_to_return: TimestampsToReturn::Neither,
227            release_continuation_points: false,
228            nodes_to_read: Vec::new(),
229
230            header: RequestHeaderBuilder::new_from_session(session),
231        }
232    }
233
234    /// Construct a new call to the `HistoryRead` service, setting header parameters manually.
235    pub fn new_manual(
236        details: HistoryReadAction,
237        session_id: u32,
238        timeout: Duration,
239        auth_token: NodeId,
240        request_handle: IntegerId,
241    ) -> Self {
242        Self {
243            details,
244            timestamps_to_return: TimestampsToReturn::Neither,
245            release_continuation_points: false,
246            nodes_to_read: Vec::new(),
247            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
248        }
249    }
250
251    /// Set timestamps to return.
252    pub fn timestamps_to_return(mut self, timestamps: TimestampsToReturn) -> Self {
253        self.timestamps_to_return = timestamps;
254        self
255    }
256
257    /// Set release continuation points. Default is false, if this is true,
258    /// continuation points will be freed and the request will return without reading
259    /// any history.
260    pub fn release_continuation_points(mut self, release_continuation_points: bool) -> Self {
261        self.release_continuation_points = release_continuation_points;
262        self
263    }
264
265    /// Set nodes to read, overwriting any that were set previously.
266    pub fn nodes_to_read(mut self, nodes_to_read: Vec<HistoryReadValueId>) -> Self {
267        self.nodes_to_read = nodes_to_read;
268        self
269    }
270
271    /// Add a node to read.
272    pub fn node(mut self, node: HistoryReadValueId) -> Self {
273        self.nodes_to_read.push(node);
274        self
275    }
276}
277
278impl UARequest for HistoryRead {
279    type Out = HistoryReadResponse;
280
281    async fn send<'b>(self, channel: &'b AsyncSecureChannel) -> Result<Self::Out, StatusCode>
282    where
283        Self: 'b,
284    {
285        let history_read_details = ExtensionObject::from(self.details);
286        builder_debug!(
287            self,
288            "history_read() requested to read nodes {:?}",
289            self.nodes_to_read
290        );
291        let request = HistoryReadRequest {
292            request_header: self.header.header,
293            history_read_details,
294            timestamps_to_return: self.timestamps_to_return,
295            release_continuation_points: self.release_continuation_points,
296            nodes_to_read: if self.nodes_to_read.is_empty() {
297                None
298            } else {
299                Some(self.nodes_to_read)
300            },
301        };
302
303        let response = channel.send(request, self.header.timeout).await?;
304        if let ResponseMessage::HistoryRead(response) = response {
305            builder_debug!(self, "history_read(), success");
306            process_service_result(&response.response_header)?;
307            Ok(*response)
308        } else {
309            builder_error!(self, "history_read() value failed");
310            Err(process_unexpected_response(response))
311        }
312    }
313}
314
315#[derive(Debug, Clone)]
316/// Writes values to nodes by sending a [`WriteRequest`] to the server. Note that some servers may reject DataValues
317/// containing source or server timestamps.
318///
319/// See OPC UA Part 4 - Services 5.10.4 for complete description of the service and error responses.
320pub struct Write {
321    nodes_to_write: Vec<WriteValue>,
322
323    header: RequestHeaderBuilder,
324}
325
326builder_base!(Write);
327
328impl Write {
329    /// Construct a new call to the `Write` service.
330    pub fn new(session: &Session) -> Self {
331        Self {
332            nodes_to_write: Vec::new(),
333            header: RequestHeaderBuilder::new_from_session(session),
334        }
335    }
336
337    /// Construct a new call to the `Write` service, setting header parameters manually.
338    pub fn new_manual(
339        session_id: u32,
340        timeout: Duration,
341        auth_token: NodeId,
342        request_handle: IntegerId,
343    ) -> Self {
344        Self {
345            nodes_to_write: Vec::new(),
346            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
347        }
348    }
349
350    /// Set nodes to write, overwriting any that were set previously.
351    pub fn nodes_to_write(mut self, nodes_to_write: Vec<WriteValue>) -> Self {
352        self.nodes_to_write = nodes_to_write;
353        self
354    }
355
356    /// Add a write value.
357    pub fn node(mut self, node: impl Into<WriteValue>) -> Self {
358        self.nodes_to_write.push(node.into());
359        self
360    }
361}
362
363impl UARequest for Write {
364    type Out = WriteResponse;
365
366    async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
367    where
368        Self: 'a,
369    {
370        if self.nodes_to_write.is_empty() {
371            builder_error!(self, "write() was not supplied with any nodes to write");
372            return Err(StatusCode::BadNothingToDo);
373        }
374
375        let request = WriteRequest {
376            request_header: self.header.header,
377            nodes_to_write: Some(self.nodes_to_write.to_vec()),
378        };
379        let response = channel.send(request, self.header.timeout).await?;
380        if let ResponseMessage::Write(response) = response {
381            builder_debug!(self, "write(), success");
382            process_service_result(&response.response_header)?;
383            Ok(*response)
384        } else {
385            builder_error!(self, "write() failed {:?}", response);
386            Err(process_unexpected_response(response))
387        }
388    }
389}
390
391#[derive(Debug, Clone)]
392/// Updates historical values. The caller is expected to provide one or more history update operations
393/// in a slice of HistoryUpdateAction enums which are one of the following:
394///
395/// * [`UpdateDataDetails`]
396/// * [`UpdateStructureDataDetails`]
397/// * [`UpdateEventDetails`]
398/// * [`DeleteRawModifiedDetails`]
399/// * [`DeleteAtTimeDetails`]
400/// * [`DeleteEventDetails`]
401///
402/// See OPC UA Part 4 - Services 5.10.5 for complete description of the service and error responses.
403pub struct HistoryUpdate {
404    details: Vec<HistoryUpdateAction>,
405
406    header: RequestHeaderBuilder,
407}
408
409builder_base!(HistoryUpdate);
410
411impl HistoryUpdate {
412    /// Construct a new call to the `HistoryUpdate` service.
413    pub fn new(session: &Session) -> Self {
414        Self {
415            details: Vec::new(),
416
417            header: RequestHeaderBuilder::new_from_session(session),
418        }
419    }
420
421    /// Construct a new call to the `HistoryUpdate` service, setting header parameters manually.
422    pub fn new_manual(
423        session_id: u32,
424        timeout: Duration,
425        auth_token: NodeId,
426        request_handle: IntegerId,
427    ) -> Self {
428        Self {
429            details: Vec::new(),
430            header: RequestHeaderBuilder::new(session_id, timeout, auth_token, request_handle),
431        }
432    }
433
434    /// Set the history update actions to perform.
435    pub fn details(mut self, details: Vec<HistoryUpdateAction>) -> Self {
436        self.details = details;
437        self
438    }
439
440    /// Add a history update action to the list.
441    pub fn action(mut self, action: impl Into<HistoryUpdateAction>) -> Self {
442        self.details.push(action.into());
443        self
444    }
445}
446
447impl UARequest for HistoryUpdate {
448    type Out = HistoryUpdateResponse;
449
450    async fn send<'a>(self, channel: &'a AsyncSecureChannel) -> Result<Self::Out, StatusCode>
451    where
452        Self: 'a,
453    {
454        if self.details.is_empty() {
455            builder_error!(
456                self,
457                "history_update(), was not supplied with any detail to update"
458            );
459            return Err(StatusCode::BadNothingToDo);
460        }
461        let details = self
462            .details
463            .into_iter()
464            .map(ExtensionObject::from)
465            .collect();
466        let request = HistoryUpdateRequest {
467            request_header: self.header.header,
468            history_update_details: Some(details),
469        };
470        let response = channel.send(request, self.header.timeout).await?;
471        if let ResponseMessage::HistoryUpdate(response) = response {
472            builder_error!(self, "history_update(), success");
473            process_service_result(&response.response_header)?;
474            Ok(*response)
475        } else {
476            builder_error!(self, "history_update() failed {:?}", response);
477            Err(process_unexpected_response(response))
478        }
479    }
480}
481
482impl Session {
483    /// Reads the value of nodes by sending a [`ReadRequest`] to the server.
484    ///
485    /// See OPC UA Part 4 - Services 5.10.2 for complete description of the service and error responses.
486    ///
487    /// # Arguments
488    ///
489    /// * `nodes_to_read` - A list of [`ReadValueId`] to be read by the server.
490    /// * `timestamps_to_return` - The [`TimestampsToReturn`] for each node, Both, Server, Source or None
491    /// * `max_age` - The maximum age of value to read in milliseconds. Read the service description
492    ///   for details. Basically it will attempt to read a value within the age range or
493    ///   attempt to read a new value. If 0 the server will attempt to read a new value from the datasource.
494    ///   If set to `i32::MAX` or greater, the server shall attempt to get a cached value.
495    ///
496    /// # Returns
497    ///
498    /// * `Ok(Vec<DataValue>)` - A list of [`DataValue`] corresponding to each read operation.
499    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
500    ///
501    pub async fn read(
502        &self,
503        nodes_to_read: &[ReadValueId],
504        timestamps_to_return: TimestampsToReturn,
505        max_age: f64,
506    ) -> Result<Vec<DataValue>, StatusCode> {
507        Ok(Read::new(self)
508            .nodes_to_read(nodes_to_read.to_vec())
509            .timestamps_to_return(timestamps_to_return)
510            .max_age(max_age)
511            .send(&self.channel)
512            .await?
513            .results
514            .unwrap_or_default())
515    }
516
517    /// Reads historical values or events of one or more nodes. The caller is expected to provide
518    /// a HistoryReadAction enum which must be one of the following:
519    ///
520    /// * [`ReadEventDetails`]
521    /// * [`ReadRawModifiedDetails`]
522    /// * [`ReadProcessedDetails`]
523    /// * [`ReadAtTimeDetails`]
524    ///
525    /// See OPC UA Part 4 - Services 5.10.3 for complete description of the service and error responses.
526    ///
527    /// # Arguments
528    ///
529    /// * `history_read_details` - A history read operation.
530    /// * `timestamps_to_return` - Enumeration of which timestamps to return.
531    /// * `release_continuation_points` - Flag indicating whether to release the continuation point for the operation.
532    /// * `nodes_to_read` - The list of [`HistoryReadValueId`] of the nodes to apply the history read operation to.
533    ///
534    /// # Returns
535    ///
536    /// * `Ok(Vec<HistoryReadResult>)` - A list of [`HistoryReadResult`] results corresponding to history read operation.
537    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
538    ///
539    pub async fn history_read(
540        &self,
541        history_read_details: HistoryReadAction,
542        timestamps_to_return: TimestampsToReturn,
543        release_continuation_points: bool,
544        nodes_to_read: &[HistoryReadValueId],
545    ) -> Result<Vec<HistoryReadResult>, StatusCode> {
546        Ok(HistoryRead::new(history_read_details, self)
547            .timestamps_to_return(timestamps_to_return)
548            .release_continuation_points(release_continuation_points)
549            .nodes_to_read(nodes_to_read.to_vec())
550            .send(&self.channel)
551            .await?
552            .results
553            .unwrap_or_default())
554    }
555
556    /// Writes values to nodes by sending a [`WriteRequest`] to the server. Note that some servers may reject DataValues
557    /// containing source or server timestamps.
558    ///
559    /// See OPC UA Part 4 - Services 5.10.4 for complete description of the service and error responses.
560    ///
561    /// # Arguments
562    ///
563    /// * `nodes_to_write` - A list of [`WriteValue`] to be sent to the server.
564    ///
565    /// # Returns
566    ///
567    /// * `Ok(Vec<StatusCode>)` - A list of [`StatusCode`] results corresponding to each write operation.
568    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
569    ///
570    pub async fn write(
571        &self,
572        nodes_to_write: &[WriteValue],
573    ) -> Result<Vec<StatusCode>, StatusCode> {
574        Ok(Write::new(self)
575            .nodes_to_write(nodes_to_write.to_vec())
576            .send(&self.channel)
577            .await?
578            .results
579            .unwrap_or_default())
580    }
581
582    /// Updates historical values. The caller is expected to provide one or more history update operations
583    /// in a slice of HistoryUpdateAction enums which are one of the following:
584    ///
585    /// * [`UpdateDataDetails`]
586    /// * [`UpdateStructureDataDetails`]
587    /// * [`UpdateEventDetails`]
588    /// * [`DeleteRawModifiedDetails`]
589    /// * [`DeleteAtTimeDetails`]
590    /// * [`DeleteEventDetails`]
591    ///
592    /// See OPC UA Part 4 - Services 5.10.5 for complete description of the service and error responses.
593    ///
594    /// # Arguments
595    ///
596    /// * `history_update_details` - A list of history update operations.
597    ///
598    /// # Returns
599    ///
600    /// * `Ok(Vec<HistoryUpdateResult>)` - A list of [`HistoryUpdateResult`] results corresponding to history update operation.
601    /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure.
602    ///
603    pub async fn history_update(
604        &self,
605        history_update_details: &[HistoryUpdateAction],
606    ) -> Result<Vec<HistoryUpdateResult>, StatusCode> {
607        Ok(HistoryUpdate::new(self)
608            .details(history_update_details.to_vec())
609            .send(&self.channel)
610            .await?
611            .results
612            .unwrap_or_default())
613    }
614}