Skip to main content

opcua_client/browser/
browse.rs

1use std::{collections::VecDeque, future::Future};
2
3use futures::{stream::FuturesUnordered, Stream, StreamExt};
4use hashbrown::{Equivalent, HashSet};
5use opcua_types::{BrowseDescription, BrowseDirection, ByteString, Error, NodeId, StatusCode};
6
7use crate::{
8    session::{Browse, BrowseNext},
9    RequestRetryPolicy, Session,
10};
11
12use super::{result::BrowserResult, BrowseResultItem, Browser, BrowserPolicy, RequestWithRetries};
13
14impl<'a, T: BrowserPolicy + 'a, R: RequestRetryPolicy + Clone + 'a> Browser<'a, T, R> {
15    /// Start the browser, returning a stream of results.
16    ///
17    /// To stop browsing you can simply stop polling this stream.
18    pub fn run(
19        self,
20        initial: Vec<BrowseDescription>,
21    ) -> impl Stream<Item = Result<BrowseResultItem, Error>> + 'a {
22        // Streams are really hard.
23        // This code isn't ideal. Ideally most of this would be inside a method in
24        // `BrowserExecution`, but that isn't possible due to there being no way
25        // to name the future. We could box it, but that hits a compiler bug.
26        // This can hopefully be improved once either RTN, TAIT or ATPIT lands.
27
28        let initial_exec = BrowserExecution {
29            browser: self,
30            running_browses: FuturesUnordered::new(),
31            pending: initial
32                .into_iter()
33                .map(|r| RequestWithRetries {
34                    request: r,
35                    num_outer_retries: 0,
36                    depth: 0,
37                })
38                .collect(),
39            pending_out: VecDeque::new(),
40            browsed_nodes: HashSet::new(),
41            pending_continuation_points: HashSet::new(),
42        };
43        futures::stream::try_unfold(initial_exec, |mut s| async move {
44            loop {
45                // If we're cancelled, cleanup and stop immediately.
46                if s.browser.token.is_cancelled() {
47                    s.cleanup().await;
48                    return Ok(None);
49                }
50
51                // If there is something in the pending outputs, return that first.
52                if let Some(n) = s.pending_out.pop_front() {
53                    return Ok(Some((n, s)));
54                }
55
56                // If there is something in the queue, and there is space for more requests,
57                // make some new requests.
58                while s.running_browses.len() < s.browser.config.max_concurrent_requests
59                    || s.browser.config.max_concurrent_requests == 0
60                {
61                    let mut chunk = Vec::new();
62                    while chunk.len() < s.browser.config.max_nodes_per_request
63                        || s.browser.config.max_nodes_per_request == 0
64                    {
65                        let Some(it) = s.pending.pop_front() else {
66                            break;
67                        };
68                        chunk.push(it);
69                    }
70                    if !chunk.is_empty() {
71                        s.running_browses.push(run_browse(
72                            s.browser.session,
73                            BrowseBatch::Browse(chunk),
74                            s.browser.retry_policy.clone(),
75                            s.browser.config.max_references_per_node,
76                        ));
77                    } else {
78                        break;
79                    }
80                }
81
82                // Nothing more to do, wait until we get a response.
83                let Some(next) = s.running_browses.next().await else {
84                    return Ok(None);
85                };
86
87                // Process the result message, cancelling early if it was a fatal error.
88                let browse_next = match next.and_then(|m| s.process_result(m)) {
89                    Ok(next) => next,
90                    Err(e) => {
91                        s.cleanup().await;
92                        return Err(e);
93                    }
94                };
95
96                // If we returned a browse next, enqueue that immediately. There will be space,
97                // since we just consumed a future. This means that BrowseNext is prioritized, which is
98                // important to avoid overusing continuation points.
99                if !browse_next.is_empty() {
100                    s.running_browses.push(run_browse(
101                        s.browser.session,
102                        BrowseBatch::Next(browse_next),
103                        s.browser.retry_policy.clone(),
104                        s.browser.config.max_references_per_node,
105                    ));
106                }
107            }
108        })
109    }
110
111    /// Run the browser, collecting the results into a [BrowserResult] struct.
112    pub async fn run_into_result(
113        self,
114        initial: Vec<BrowseDescription>,
115    ) -> Result<BrowserResult, Error> {
116        BrowserResult::build_from_browser(self.run(initial)).await
117    }
118}
119
120// It's tricky to store futures in a struct without boxing them, because
121// they are un-nameable. In this case, `TFut` is always the future returned by `run_browse`.
122// In the future, Type Alias Impl Trait (TAIT), or Return Type Notation (RTN) will make it
123// possible to name these types.
124struct BrowserExecution<'a, T, R, TFut> {
125    browser: Browser<'a, T, R>,
126    running_browses: FuturesUnordered<TFut>,
127    pending: VecDeque<RequestWithRetries>,
128    pending_out: VecDeque<BrowseResultItem>,
129    browsed_nodes: HashSet<BrowsedNode>,
130    pending_continuation_points: HashSet<ByteString>,
131}
132
133#[derive(PartialEq, Eq, Hash)]
134enum Direction {
135    Forward,
136    Inverse,
137    Both,
138}
139
140#[derive(PartialEq, Eq, Hash)]
141struct BrowsedNode {
142    id: NodeId,
143    direction: Direction,
144}
145
146impl From<&BrowseDescription> for BrowsedNode {
147    fn from(value: &BrowseDescription) -> Self {
148        Self {
149            id: value.node_id.clone(),
150            direction: match value.browse_direction {
151                BrowseDirection::Forward => Direction::Forward,
152                BrowseDirection::Inverse => Direction::Inverse,
153                BrowseDirection::Both => Direction::Both,
154                BrowseDirection::Invalid => Direction::Both,
155            },
156        }
157    }
158}
159
160#[derive(PartialEq, Eq, Hash)]
161struct BrowsedNodeRef<'a> {
162    id: &'a NodeId,
163    direction: Direction,
164}
165
166impl Equivalent<BrowsedNode> for BrowsedNodeRef<'_> {
167    fn equivalent(&self, key: &BrowsedNode) -> bool {
168        self.id == &key.id && self.direction == key.direction
169    }
170}
171
172struct InnerResultItem {
173    it: BrowseResultItem,
174    cp: Option<ByteString>,
175}
176
177struct BrowseNextItem {
178    original: RequestWithRetries,
179    cp: ByteString,
180}
181
182enum BrowseBatch {
183    Browse(Vec<RequestWithRetries>),
184    Next(Vec<BrowseNextItem>),
185}
186
187async fn run_browse<R: RequestRetryPolicy>(
188    session: &Session,
189    batch: BrowseBatch,
190    policy: R,
191    max_references_per_node: u32,
192) -> Result<Vec<InnerResultItem>, Error> {
193    match batch {
194        BrowseBatch::Browse(items) => {
195            let r = session
196                .send_with_retry(
197                    Browse::new(session)
198                        .max_references_per_node(max_references_per_node)
199                        .nodes_to_browse(items.iter().map(|r| r.request.clone()).collect()),
200                    policy,
201                )
202                .await
203                .map_err(|e| Error::new(e, "Browse failed"))?;
204
205            let res = r.results.unwrap_or_default();
206            if res.len() != items.len() {
207                return Err(Error::new(
208                    StatusCode::BadUnexpectedError,
209                    format!(
210                        "Incorrect number of results returned from Browse, expected {}, got {}",
211                        items.len(),
212                        res.len()
213                    ),
214                ));
215            }
216            Ok(res
217                .into_iter()
218                .zip(items)
219                .map(|(res, it)| InnerResultItem {
220                    it: BrowseResultItem {
221                        status: res.status_code,
222                        request: it,
223                        references: res.references.unwrap_or_default(),
224                        request_continuation_point: None,
225                    },
226                    cp: if res.continuation_point.is_null() {
227                        None
228                    } else {
229                        Some(res.continuation_point)
230                    },
231                })
232                .collect())
233        }
234        BrowseBatch::Next(items) => {
235            let r = session
236                .send_with_retry(
237                    BrowseNext::new(session)
238                        .continuation_points(items.iter().map(|r| r.cp.clone()).collect()),
239                    policy,
240                )
241                .await
242                .map_err(|e| Error::new(e, "BrowseNext failed"))?;
243
244            let res = r.results.unwrap_or_default();
245            if res.len() != items.len() {
246                return Err(Error::new(
247                    StatusCode::BadUnexpectedError,
248                    format!(
249                        "Incorrect number of results returned from BrowseNext, expected {}, got {}",
250                        items.len(),
251                        res.len()
252                    ),
253                ));
254            }
255            Ok(res
256                .into_iter()
257                .zip(items)
258                .map(|(res, it)| InnerResultItem {
259                    it: BrowseResultItem {
260                        status: res.status_code,
261                        request: it.original,
262                        references: res.references.unwrap_or_default(),
263                        request_continuation_point: Some(it.cp),
264                    },
265                    cp: if res.continuation_point.is_null() {
266                        None
267                    } else {
268                        Some(res.continuation_point)
269                    },
270                })
271                .collect())
272        }
273    }
274}
275
276impl<
277        'a,
278        T: BrowserPolicy,
279        R: RequestRetryPolicy + Clone + 'a,
280        TFut: Future<Output = Result<Vec<InnerResultItem>, Error>> + 'a,
281    > BrowserExecution<'a, T, R, TFut>
282{
283    fn visited(&self, dir: Direction, node_id: &NodeId) -> bool {
284        self.browsed_nodes.contains(&BrowsedNodeRef {
285            id: node_id,
286            direction: dir,
287        })
288    }
289
290    async fn cleanup(&mut self) {
291        // First wait for any running browse operations to finish.
292        while let Some(r) = self.running_browses.next().await {
293            let Ok(r) = r else {
294                continue;
295            };
296            for res in r {
297                if let Some(old_cp) = res.it.request_continuation_point.as_ref() {
298                    self.pending_continuation_points.remove(old_cp);
299                }
300                if let Some(cp) = res.cp {
301                    self.pending_continuation_points.insert(cp.clone());
302                }
303            }
304        }
305
306        // Chunk-wise free all remaining continuation points.
307        let to_consume: Vec<_> = self.pending_continuation_points.drain().collect();
308        let mut futures = Vec::new();
309        for chunk in to_consume.chunks(self.browser.config.max_nodes_per_request) {
310            let session = self.browser.session;
311            futures.push(async move {
312                // Ignore the result, cleanup is best-effort only.
313                let _ = session.browse_next(true, chunk).await;
314            });
315        }
316
317        // We still want concurrency to make this go quickly if there are a lot of requests.
318        let mut it = FuturesUnordered::new();
319        loop {
320            while it.len() < self.browser.config.max_concurrent_requests
321                || self.browser.config.max_concurrent_requests == 0
322            {
323                let Some(fut) = futures.pop() else {
324                    break;
325                };
326                it.push(fut);
327            }
328
329            if it.next().await.is_none() {
330                break;
331            }
332        }
333    }
334
335    pub(crate) fn process_result(
336        &mut self,
337        next: Vec<InnerResultItem>,
338    ) -> Result<Vec<BrowseNextItem>, Error> {
339        let mut browse_next = Vec::new();
340        for res in next {
341            // Get the next set of requests from the user-defined policy.
342            let to_enqueue = self.browser.handler.get_next(&res.it);
343            for mut it in to_enqueue {
344                // Check if we have browsed this node before.
345                match it.browse_direction {
346                    BrowseDirection::Forward => {
347                        if self.visited(Direction::Forward, &it.node_id)
348                            || self.visited(Direction::Both, &it.node_id)
349                        {
350                            continue;
351                        }
352                    }
353                    BrowseDirection::Inverse => {
354                        if self.visited(Direction::Inverse, &it.node_id)
355                            || self.visited(Direction::Both, &it.node_id)
356                        {
357                            continue;
358                        }
359                    }
360                    BrowseDirection::Both => {
361                        if self.visited(Direction::Both, &it.node_id) {
362                            continue;
363                        }
364
365                        let visited_inv = self.visited(Direction::Inverse, &it.node_id);
366                        let visited_for = self.visited(Direction::Forward, &it.node_id);
367                        if visited_for && visited_inv {
368                            continue;
369                        } else if visited_for {
370                            it.browse_direction = BrowseDirection::Inverse;
371                        } else if visited_inv {
372                            it.browse_direction = BrowseDirection::Forward;
373                        }
374                    }
375                    BrowseDirection::Invalid => {
376                        return Err(Error::new(
377                            StatusCode::BadBrowseDirectionInvalid,
378                            "Produced an invalid browse direction",
379                        ))
380                    }
381                }
382
383                // Add the new request to the pending queue.
384                self.browsed_nodes.insert(BrowsedNode::from(&it));
385                self.pending.push_back(RequestWithRetries {
386                    request: it,
387                    num_outer_retries: 0,
388                    depth: res.it.request.depth + 1,
389                });
390            }
391            // Remove the old continuation point from the pending list, it should have been freed.
392            if let Some(old_cp) = res.it.request_continuation_point.as_ref() {
393                self.pending_continuation_points.remove(old_cp);
394            }
395            if let Some(cp) = res.cp {
396                // Store the new continuation point in the pending list so that we can
397                // free it if we exit early.
398                self.pending_continuation_points.insert(cp.clone());
399                browse_next.push(BrowseNextItem {
400                    original: res.it.request.clone(),
401                    cp,
402                });
403            } else if matches!(res.it.status, StatusCode::BadContinuationPointInvalid)
404                && self.browser.config.max_continuation_point_retries
405                    > res.it.request.num_outer_retries
406            {
407                // If we failed with `BadContinuationPointInvalid`, retry from the beginning
408                // if configured.
409                self.pending.push_back(RequestWithRetries {
410                    request: res.it.request.request.clone(),
411                    num_outer_retries: res.it.request.num_outer_retries + 1,
412                    depth: res.it.request.depth,
413                });
414            }
415
416            self.pending_out.push_back(res.it);
417        }
418        Ok(browse_next)
419    }
420}