opcua_client/browser/mod.rs
1//! This module contains a utility for recursively browsing the node hierarchy
2//! in a flexible, efficient, and reliable manner.
3//!
4//! # Notes on usage.
5//!
6//! The browser does not spawn any internal tasks or threads, instead
7//! it is all driven by a `Stream`, that needs to be consumed for the
8//! browser to make progress.
9//!
10//! The browser is generic over two parameters:
11//!
12//! The first is a [BrowserPolicy], which dictates the recursive behavior of
13//! the browser. It accepts a result, and returns a set of nodes to browse.
14//! For simple usage, it is implemented for [BrowseFilter], which just creates
15//! a new [BrowseDescription] for each returned reference.
16//!
17//! Note that the browser will only ever browse a node in a given direction
18//! (forward or inverse) once, so if you return the same [BrowseDescription] multiple
19//! times, even with different filters, it will be ignored.
20//!
21//! The second parameter is a [RequestRetryPolicy], this dictates how
22//! requests should be retried. It defaults to an instance of
23//! [crate::DefaultRetryPolicy] with reasonable defaults.
24//!
25//! # Cancellation
26//!
27//! You _can_ just stop listening to the stream. The pending requests
28//! will still complete, but the browser will not send any more without
29//! anyone polling the stream. The problem with this is that browsing
30//! in OPC-UA produces `ContinuationPoints` on the server that need to be
31//! freed.
32//!
33//! If you instead set a `CancellationToken` when creating the browser,
34//! cancel it, then wait for the stream to terminate, the browser will attempt
35//! to clean up any pending continuation points after all requests finish.
36//!
37//! It will also attempt to do this on an error, but without retries.
38//!
39//! If you are closing the session anyway, continuation points are
40//! probable freed by the server then, so you can ignore this and just
41//! drop the stream.
42
43use opcua_types::{
44 BrowseDescription, BrowseDirection, BrowseResultMaskFlags, ByteString, NodeClassMask, NodeId,
45 ReferenceDescription, ReferenceTypeId, StatusCode,
46};
47use tokio_util::sync::CancellationToken;
48
49mod browse;
50mod result;
51
52pub use result::{BrowserResult, NodeDescription};
53
54use crate::{RequestRetryPolicy, Session};
55
56/// Configuration for the browser
57#[derive(Debug, Clone)]
58pub struct BrowserConfig {
59 max_nodes_per_request: usize,
60 max_references_per_node: u32,
61 max_concurrent_requests: usize,
62 max_continuation_point_retries: usize,
63}
64
65impl BrowserConfig {
66 /// Create a new default browser config.
67 pub fn new() -> Self {
68 Self::default()
69 }
70
71 /// Set the maximum number of nodes per request sent to the server.
72 ///
73 /// Note that the browser makes no guarantee that all requests sent
74 /// will be as large as possible.
75 pub fn max_nodes_per_request(mut self, max_nodes_per_request: usize) -> Self {
76 self.max_nodes_per_request = max_nodes_per_request;
77 self
78 }
79
80 /// Set the maximum number of references requested per node.
81 /// Can be 0 to let the server decide.
82 pub fn max_references_per_node(mut self, max_references_per_node: u32) -> Self {
83 self.max_references_per_node = max_references_per_node;
84 self
85 }
86
87 /// Set the maximum number of concurrent requests. Defaults to 1.
88 pub fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
89 self.max_concurrent_requests = max_concurrent_requests;
90 self
91 }
92
93 /// Set the maximum number of times a browse will be retried if the
94 /// continuation point becomes invalid while the browser is running.
95 ///
96 /// This will start the browse process over from zero for the affected node,
97 /// meaning that the same references will be returned multiple times.
98 pub fn max_continuation_point_retries(mut self, max_continuation_point_retries: usize) -> Self {
99 self.max_continuation_point_retries = max_continuation_point_retries;
100 self
101 }
102}
103
104impl Default for BrowserConfig {
105 fn default() -> Self {
106 Self {
107 max_nodes_per_request: 100,
108 max_references_per_node: 1000,
109 max_concurrent_requests: 1,
110 max_continuation_point_retries: 0,
111 }
112 }
113}
114
115#[derive(Debug, Default, Clone)]
116struct RequestWithRetries {
117 pub(self) request: BrowseDescription,
118 pub(self) num_outer_retries: usize,
119 pub(self) depth: usize,
120}
121
122#[derive(Debug, Default)]
123/// Result of a browse iteration.
124pub struct BrowseResultItem {
125 pub(self) request: RequestWithRetries,
126 pub(self) references: Vec<ReferenceDescription>,
127 pub(self) status: StatusCode,
128 pub(self) request_continuation_point: Option<ByteString>,
129}
130
131impl BrowseResultItem {
132 /// Get the parent ID for this browse iteration.
133 pub fn parent_id(&self) -> &NodeId {
134 &self.request.request.node_id
135 }
136
137 /// Consume this turning it into a list of references
138 /// and the parent ID.
139 /// Potentially more efficient than cloning from `references`.
140 pub fn into_results(self) -> (NodeId, Vec<ReferenceDescription>) {
141 (self.request.request.node_id, self.references)
142 }
143
144 /// Get the list of references.
145 ///
146 /// Use `into_results` if you need owned copies of the results.
147 pub fn references(&self) -> &[ReferenceDescription] {
148 &self.references
149 }
150
151 /// Get the status code. This may be an error if the
152 /// request failed. `BadContinuationPointInvalid` has special handling.
153 pub fn status(&self) -> StatusCode {
154 self.status
155 }
156
157 /// Get the browse request that produced this result item.
158 pub fn request(&self) -> &BrowseDescription {
159 &self.request.request
160 }
161
162 /// Get whether this was the result of a `BrowseNext` oepration.
163 pub fn is_browse_next(&self) -> bool {
164 self.request_continuation_point.is_some()
165 }
166
167 /// Depth of this reference in the recursive browse.
168 /// Depth 1 was returned from the list of root nodes.
169 pub fn depth(&self) -> usize {
170 self.request.depth + 1
171 }
172}
173
174/// Trait for deciding which nodes to browse next in a recursive browse.
175pub trait BrowserPolicy {
176 /// Given a parent node, and a list of references from that node,
177 /// return a list of nodes to browse next.
178 fn get_next(&self, results: &BrowseResultItem) -> Vec<BrowseDescription>;
179}
180
181impl<T> BrowserPolicy for T
182where
183 T: for<'a> Fn(&BrowseResultItem) -> Vec<BrowseDescription> + Send + Sync,
184{
185 fn get_next(&self, results: &BrowseResultItem) -> Vec<BrowseDescription> {
186 self(results)
187 }
188}
189
190/// Browse policy that browses nothing except the root nodes.
191#[derive(Debug, Clone, Copy)]
192pub struct NoneBrowserPolicy;
193
194impl BrowserPolicy for NoneBrowserPolicy {
195 fn get_next(&self, _results: &BrowseResultItem) -> Vec<BrowseDescription> {
196 Vec::new()
197 }
198}
199
200/// Simple filter for the [Browser]. All discovered nodes
201/// will be recursively browsed using the stored configuration.
202#[derive(Debug, Clone)]
203pub struct BrowseFilter {
204 direction: BrowseDirection,
205 include_subtypes: bool,
206 result_mask: BrowseResultMaskFlags,
207 node_class_mask: NodeClassMask,
208 reference_type_id: NodeId,
209 max_depth: usize,
210}
211
212impl BrowserPolicy for BrowseFilter {
213 fn get_next(&self, results: &BrowseResultItem) -> Vec<BrowseDescription> {
214 if self.max_depth > 0 && results.depth() >= self.max_depth {
215 return Vec::new();
216 }
217
218 results
219 .references
220 .iter()
221 .filter(|r| r.node_id.server_index == 0)
222 .map(|r| self.new_description_from_node(r.node_id.node_id.clone()))
223 .collect()
224 }
225}
226
227impl BrowseFilter {
228 /// Create a new browse filter for browsing references of
229 /// `reference_type_id` (optionally including subtypes) in the
230 /// given `direction`.
231 pub fn new(
232 direction: BrowseDirection,
233 reference_type_id: impl Into<NodeId>,
234 include_subtypes: bool,
235 ) -> Self {
236 Self {
237 direction,
238 reference_type_id: reference_type_id.into(),
239 include_subtypes,
240 result_mask: BrowseResultMaskFlags::all(),
241 node_class_mask: NodeClassMask::all(),
242 max_depth: 0,
243 }
244 }
245
246 /// Create a new browse description from this filter and a node ID to browse.
247 pub fn new_description_from_node(&self, node_id: NodeId) -> BrowseDescription {
248 BrowseDescription {
249 node_id,
250 browse_direction: self.direction,
251 reference_type_id: self.reference_type_id.clone(),
252 include_subtypes: self.include_subtypes,
253 node_class_mask: self.node_class_mask.bits(),
254 result_mask: self.result_mask.bits(),
255 }
256 }
257
258 /// Create a new browse filter for browsing hierarchical references.
259 pub fn new_hierarchical() -> Self {
260 Self::new(
261 BrowseDirection::Forward,
262 ReferenceTypeId::HierarchicalReferences,
263 true,
264 )
265 }
266
267 /// Set the node class mask, the filter for allowed node classes
268 /// in the returned references. Defaults to `all`.
269 pub fn node_class_mask(mut self, mask: NodeClassMask) -> Self {
270 self.node_class_mask = mask;
271 self
272 }
273
274 /// Set the result mask, indicating which values should be returned
275 /// for each reference. Defaults to `all`.
276 pub fn result_mask(mut self, mask: BrowseResultMaskFlags) -> Self {
277 self.result_mask = mask;
278 self
279 }
280
281 /// Set the maximum browse depth. If this is 1 only the root nodes will be browsed,
282 /// if it is 0, there is no upper limit.
283 pub fn max_depth(mut self, depth: usize) -> Self {
284 self.max_depth = depth;
285 self
286 }
287}
288
289/// A utility for recursively discovering nodes on an OPC-UA server.
290pub struct Browser<'a, T, R> {
291 pub(self) handler: T,
292 pub(self) retry_policy: R,
293 pub(self) config: BrowserConfig,
294 pub(self) session: &'a Session,
295 pub(self) token: CancellationToken,
296}
297
298impl<'a, T, R> Browser<'a, T, R> {
299 /// Create a new browser with the given handler and retry policy.
300 pub fn new(session: &'a Session, handler: T, retry_policy: R) -> Self {
301 Self {
302 session,
303 handler,
304 retry_policy,
305 config: BrowserConfig::default(),
306 token: CancellationToken::new(),
307 }
308 }
309
310 /// Set a new browser policy. This is used to generate new browses after each
311 /// visited node. Note that no matter what, a node will only be browsed _once_
312 /// in each direction. (Or once in both at the same time).
313 pub fn handler<T2: BrowserPolicy + 'a>(self, new_handler: T2) -> Browser<'a, T2, R> {
314 Browser {
315 handler: new_handler,
316 retry_policy: self.retry_policy,
317 config: self.config,
318 session: self.session,
319 token: self.token,
320 }
321 }
322
323 /// Set a new request retry policy.
324 pub fn retry_policy<R2: RequestRetryPolicy + Clone + 'a>(
325 self,
326 new_retry_policy: R2,
327 ) -> Browser<'a, T, R2> {
328 Browser {
329 handler: self.handler,
330 retry_policy: new_retry_policy,
331 config: self.config,
332 session: self.session,
333 token: self.token,
334 }
335 }
336
337 /// Set a new cancellation token. Once this is cancelled, the
338 /// browser will try to shut down gracefully, which means waiting for
339 /// any pending requests and then releasing continuation points.
340 ///
341 /// If you don't care about that (for example if you are shutting down
342 /// the session soon), then you can just stop polling to the stream.
343 pub fn token(mut self, token: CancellationToken) -> Self {
344 self.token = token;
345 self
346 }
347
348 /// Set the maximum number of nodes per request sent to the server.
349 ///
350 /// Note that the browser makes no guarantee that all requests sent
351 /// will be as large as possible.
352 pub fn max_nodes_per_request(mut self, max_nodes_per_request: usize) -> Self {
353 self.config.max_nodes_per_request = max_nodes_per_request;
354 self
355 }
356
357 /// Set the maximum number of references requested per node.
358 /// Can be 0 to let the server decide.
359 pub fn max_references_per_node(mut self, max_references_per_node: u32) -> Self {
360 self.config.max_references_per_node = max_references_per_node;
361 self
362 }
363
364 /// Set the maximum number of concurrent requests. Defaults to 1.
365 pub fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
366 self.config.max_concurrent_requests = max_concurrent_requests;
367 self
368 }
369
370 /// Set the maximum number of times a browse will be retried if the
371 /// continuation point becomes invalid while the browser is running.
372 ///
373 /// This will start the browse process over from zero for the affected node,
374 /// meaning that the same references will be returned multiple times.
375 pub fn max_continuation_point_retries(mut self, max_continuation_point_retries: usize) -> Self {
376 self.config.max_continuation_point_retries = max_continuation_point_retries;
377 self
378 }
379
380 /// Set the browse configuration.
381 pub fn config(mut self, config: BrowserConfig) -> Self {
382 self.config = config;
383 self
384 }
385}