opcua_client/browser/
mod.rs

1//! This module contains a utility for recursively browsing the node hierarchy
2//! in a flexible, efficient, and reliable manner.
3//!
4//! # Notes on usage.
5//!
6//! The browser does not spawn any internal tasks or threads, instead
7//! it is all driven by a `Stream`, that needs to be consumed for the
8//! browser to make progress.
9//!
10//! The browser is generic over two parameters:
11//!
12//! The first is a [BrowserPolicy], which dictates the recursive behavior of
13//! the browser. It accepts a result, and returns a set of nodes to browse.
14//! For simple usage, it is implemented for [BrowseFilter], which just creates
15//! a new [BrowseDescription] for each returned reference.
16//!
17//! Note that the browser will only ever browse a node in a given direction
18//! (forward or inverse) once, so if you return the same [BrowseDescription] multiple
19//! times, even with different filters, it will be ignored.
20//!
21//! The second parameter is a [RequestRetryPolicy], this dictates how
22//! requests should be retried. It defaults to an instance of
23//! [crate::DefaultRetryPolicy] with reasonable defaults.
24//!
25//! # Cancellation
26//!
27//! You _can_ just stop listening to the stream. The pending requests
28//! will still complete, but the browser will not send any more without
29//! anyone polling the stream. The problem with this is that browsing
30//! in OPC-UA produces `ContinuationPoints` on the server that need to be
31//! freed.
32//!
33//! If you instead set a `CancellationToken` when creating the browser,
34//! cancel it, then wait for the stream to terminate, the browser will attempt
35//! to clean up any pending continuation points after all requests finish.
36//!
37//! It will also attempt to do this on an error, but without retries.
38//!
39//! If you are closing the session anyway, continuation points are
40//! probable freed by the server then, so you can ignore this and just
41//! drop the stream.
42
43use opcua_types::{
44    BrowseDescription, BrowseDirection, BrowseResultMaskFlags, ByteString, NodeClassMask, NodeId,
45    ReferenceDescription, ReferenceTypeId, StatusCode,
46};
47use tokio_util::sync::CancellationToken;
48
49mod browse;
50mod result;
51
52pub use result::{BrowserResult, NodeDescription};
53
54use crate::{RequestRetryPolicy, Session};
55
56/// Configuration for the browser
57#[derive(Debug, Clone)]
58pub struct BrowserConfig {
59    max_nodes_per_request: usize,
60    max_references_per_node: u32,
61    max_concurrent_requests: usize,
62    max_continuation_point_retries: usize,
63}
64
65impl BrowserConfig {
66    /// Create a new default browser config.
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Set the maximum number of nodes per request sent to the server.
72    ///
73    /// Note that the browser makes no guarantee that all requests sent
74    /// will be as large as possible.
75    pub fn max_nodes_per_request(mut self, max_nodes_per_request: usize) -> Self {
76        self.max_nodes_per_request = max_nodes_per_request;
77        self
78    }
79
80    /// Set the maximum number of references requested per node.
81    /// Can be 0 to let the server decide.
82    pub fn max_references_per_node(mut self, max_references_per_node: u32) -> Self {
83        self.max_references_per_node = max_references_per_node;
84        self
85    }
86
87    /// Set the maximum number of concurrent requests. Defaults to 1.
88    pub fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
89        self.max_concurrent_requests = max_concurrent_requests;
90        self
91    }
92
93    /// Set the maximum number of times a browse will be retried if the
94    /// continuation point becomes invalid while the browser is running.
95    ///
96    /// This will start the browse process over from zero for the affected node,
97    /// meaning that the same references will be returned multiple times.
98    pub fn max_continuation_point_retries(mut self, max_continuation_point_retries: usize) -> Self {
99        self.max_continuation_point_retries = max_continuation_point_retries;
100        self
101    }
102}
103
104impl Default for BrowserConfig {
105    fn default() -> Self {
106        Self {
107            max_nodes_per_request: 100,
108            max_references_per_node: 1000,
109            max_concurrent_requests: 1,
110            max_continuation_point_retries: 0,
111        }
112    }
113}
114
115#[derive(Debug, Default, Clone)]
116struct RequestWithRetries {
117    pub(self) request: BrowseDescription,
118    pub(self) num_outer_retries: usize,
119    pub(self) depth: usize,
120}
121
122#[derive(Debug, Default)]
123/// Result of a browse iteration.
124pub struct BrowseResultItem {
125    pub(self) request: RequestWithRetries,
126    pub(self) references: Vec<ReferenceDescription>,
127    pub(self) status: StatusCode,
128    pub(self) request_continuation_point: Option<ByteString>,
129}
130
131impl BrowseResultItem {
132    /// Get the parent ID for this browse iteration.
133    pub fn parent_id(&self) -> &NodeId {
134        &self.request.request.node_id
135    }
136
137    /// Consume this turning it into a list of references
138    /// and the parent ID.
139    /// Potentially more efficient than cloning from `references`.
140    pub fn into_results(self) -> (NodeId, Vec<ReferenceDescription>) {
141        (self.request.request.node_id, self.references)
142    }
143
144    /// Get the list of references.
145    ///
146    /// Use `into_results` if you need owned copies of the results.
147    pub fn references(&self) -> &[ReferenceDescription] {
148        &self.references
149    }
150
151    /// Get the status code. This may be an error if the
152    /// request failed. `BadContinuationPointInvalid` has special handling.
153    pub fn status(&self) -> StatusCode {
154        self.status
155    }
156
157    /// Get the browse request that produced this result item.
158    pub fn request(&self) -> &BrowseDescription {
159        &self.request.request
160    }
161
162    /// Get whether this was the result of a `BrowseNext` oepration.
163    pub fn is_browse_next(&self) -> bool {
164        self.request_continuation_point.is_some()
165    }
166
167    /// Depth of this reference in the recursive browse.
168    /// Depth 1 was returned from the list of root nodes.
169    pub fn depth(&self) -> usize {
170        self.request.depth + 1
171    }
172}
173
174/// Trait for deciding which nodes to browse next in a recursive browse.
175pub trait BrowserPolicy {
176    /// Given a parent node, and a list of references from that node,
177    /// return a list of nodes to browse next.
178    fn get_next(&self, results: &BrowseResultItem) -> Vec<BrowseDescription>;
179}
180
181impl<T> BrowserPolicy for T
182where
183    T: for<'a> Fn(&BrowseResultItem) -> Vec<BrowseDescription> + Send + Sync,
184{
185    fn get_next(&self, results: &BrowseResultItem) -> Vec<BrowseDescription> {
186        self(results)
187    }
188}
189
190/// Browse policy that browses nothing except the root nodes.
191#[derive(Debug, Clone, Copy)]
192pub struct NoneBrowserPolicy;
193
194impl BrowserPolicy for NoneBrowserPolicy {
195    fn get_next(&self, _results: &BrowseResultItem) -> Vec<BrowseDescription> {
196        Vec::new()
197    }
198}
199
200/// Simple filter for the [Browser]. All discovered nodes
201/// will be recursively browsed using the stored configuration.
202#[derive(Debug, Clone)]
203pub struct BrowseFilter {
204    direction: BrowseDirection,
205    include_subtypes: bool,
206    result_mask: BrowseResultMaskFlags,
207    node_class_mask: NodeClassMask,
208    reference_type_id: NodeId,
209    max_depth: usize,
210}
211
212impl BrowserPolicy for BrowseFilter {
213    fn get_next(&self, results: &BrowseResultItem) -> Vec<BrowseDescription> {
214        if self.max_depth > 0 && results.depth() >= self.max_depth {
215            return Vec::new();
216        }
217
218        results
219            .references
220            .iter()
221            .filter(|r| r.node_id.server_index == 0)
222            .map(|r| self.new_description_from_node(r.node_id.node_id.clone()))
223            .collect()
224    }
225}
226
227impl BrowseFilter {
228    /// Create a new browse filter for browsing references of
229    /// `reference_type_id` (optionally including subtypes) in the
230    /// given `direction`.
231    pub fn new(
232        direction: BrowseDirection,
233        reference_type_id: impl Into<NodeId>,
234        include_subtypes: bool,
235    ) -> Self {
236        Self {
237            direction,
238            reference_type_id: reference_type_id.into(),
239            include_subtypes,
240            result_mask: BrowseResultMaskFlags::all(),
241            node_class_mask: NodeClassMask::all(),
242            max_depth: 0,
243        }
244    }
245
246    /// Create a new browse description from this filter and a node ID to browse.
247    pub fn new_description_from_node(&self, node_id: NodeId) -> BrowseDescription {
248        BrowseDescription {
249            node_id,
250            browse_direction: self.direction,
251            reference_type_id: self.reference_type_id.clone(),
252            include_subtypes: self.include_subtypes,
253            node_class_mask: self.node_class_mask.bits(),
254            result_mask: self.result_mask.bits(),
255        }
256    }
257
258    /// Create a new browse filter for browsing hierarchical references.
259    pub fn new_hierarchical() -> Self {
260        Self::new(
261            BrowseDirection::Forward,
262            ReferenceTypeId::HierarchicalReferences,
263            true,
264        )
265    }
266
267    /// Set the node class mask, the filter for allowed node classes
268    /// in the returned references. Defaults to `all`.
269    pub fn node_class_mask(mut self, mask: NodeClassMask) -> Self {
270        self.node_class_mask = mask;
271        self
272    }
273
274    /// Set the result mask, indicating which values should be returned
275    /// for each reference. Defaults to `all`.
276    pub fn result_mask(mut self, mask: BrowseResultMaskFlags) -> Self {
277        self.result_mask = mask;
278        self
279    }
280
281    /// Set the maximum browse depth. If this is 1 only the root nodes will be browsed,
282    /// if it is 0, there is no upper limit.
283    pub fn max_depth(mut self, depth: usize) -> Self {
284        self.max_depth = depth;
285        self
286    }
287}
288
289/// A utility for recursively discovering nodes on an OPC-UA server.
290pub struct Browser<'a, T, R> {
291    pub(self) handler: T,
292    pub(self) retry_policy: R,
293    pub(self) config: BrowserConfig,
294    pub(self) session: &'a Session,
295    pub(self) token: CancellationToken,
296}
297
298impl<'a, T, R> Browser<'a, T, R> {
299    /// Create a new browser with the given handler and retry policy.
300    pub fn new(session: &'a Session, handler: T, retry_policy: R) -> Self {
301        Self {
302            session,
303            handler,
304            retry_policy,
305            config: BrowserConfig::default(),
306            token: CancellationToken::new(),
307        }
308    }
309
310    /// Set a new browser policy. This is used to generate new browses after each
311    /// visited node. Note that no matter what, a node will only be browsed _once_
312    /// in each direction. (Or once in both at the same time).
313    pub fn handler<T2: BrowserPolicy + 'a>(self, new_handler: T2) -> Browser<'a, T2, R> {
314        Browser {
315            handler: new_handler,
316            retry_policy: self.retry_policy,
317            config: self.config,
318            session: self.session,
319            token: self.token,
320        }
321    }
322
323    /// Set a new request retry policy.
324    pub fn retry_policy<R2: RequestRetryPolicy + Clone + 'a>(
325        self,
326        new_retry_policy: R2,
327    ) -> Browser<'a, T, R2> {
328        Browser {
329            handler: self.handler,
330            retry_policy: new_retry_policy,
331            config: self.config,
332            session: self.session,
333            token: self.token,
334        }
335    }
336
337    /// Set a new cancellation token. Once this is cancelled, the
338    /// browser will try to shut down gracefully, which means waiting for
339    /// any pending requests and then releasing continuation points.
340    ///
341    /// If you don't care about that (for example if you are shutting down
342    /// the session soon), then you can just stop polling to the stream.
343    pub fn token(mut self, token: CancellationToken) -> Self {
344        self.token = token;
345        self
346    }
347
348    /// Set the maximum number of nodes per request sent to the server.
349    ///
350    /// Note that the browser makes no guarantee that all requests sent
351    /// will be as large as possible.
352    pub fn max_nodes_per_request(mut self, max_nodes_per_request: usize) -> Self {
353        self.config.max_nodes_per_request = max_nodes_per_request;
354        self
355    }
356
357    /// Set the maximum number of references requested per node.
358    /// Can be 0 to let the server decide.
359    pub fn max_references_per_node(mut self, max_references_per_node: u32) -> Self {
360        self.config.max_references_per_node = max_references_per_node;
361        self
362    }
363
364    /// Set the maximum number of concurrent requests. Defaults to 1.
365    pub fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
366        self.config.max_concurrent_requests = max_concurrent_requests;
367        self
368    }
369
370    /// Set the maximum number of times a browse will be retried if the
371    /// continuation point becomes invalid while the browser is running.
372    ///
373    /// This will start the browse process over from zero for the affected node,
374    /// meaning that the same references will be returned multiple times.
375    pub fn max_continuation_point_retries(mut self, max_continuation_point_retries: usize) -> Self {
376        self.config.max_continuation_point_retries = max_continuation_point_retries;
377        self
378    }
379
380    /// Set the browse configuration.
381    pub fn config(mut self, config: BrowserConfig) -> Self {
382        self.config = config;
383        self
384    }
385}