1use std::{collections::VecDeque, future::Future};
2
3use futures::{stream::FuturesUnordered, Stream, StreamExt};
4use hashbrown::{Equivalent, HashSet};
5use opcua_types::{BrowseDescription, BrowseDirection, ByteString, Error, NodeId, StatusCode};
6
7use crate::{
8 session::{Browse, BrowseNext},
9 RequestRetryPolicy, Session,
10};
11
12use super::{result::BrowserResult, BrowseResultItem, Browser, BrowserPolicy, RequestWithRetries};
13
14impl<'a, T: BrowserPolicy + 'a, R: RequestRetryPolicy + Clone + 'a> Browser<'a, T, R> {
15 pub fn run(
19 self,
20 initial: Vec<BrowseDescription>,
21 ) -> impl Stream<Item = Result<BrowseResultItem, Error>> + 'a {
22 let initial_exec = BrowserExecution {
29 browser: self,
30 running_browses: FuturesUnordered::new(),
31 pending: initial
32 .into_iter()
33 .map(|r| RequestWithRetries {
34 request: r,
35 num_outer_retries: 0,
36 depth: 0,
37 })
38 .collect(),
39 pending_out: VecDeque::new(),
40 browsed_nodes: HashSet::new(),
41 pending_continuation_points: HashSet::new(),
42 };
43 futures::stream::try_unfold(initial_exec, |mut s| async move {
44 loop {
45 if s.browser.token.is_cancelled() {
47 s.cleanup().await;
48 return Ok(None);
49 }
50
51 if let Some(n) = s.pending_out.pop_front() {
53 return Ok(Some((n, s)));
54 }
55
56 while s.running_browses.len() < s.browser.config.max_concurrent_requests
59 || s.browser.config.max_concurrent_requests == 0
60 {
61 let mut chunk = Vec::new();
62 while chunk.len() < s.browser.config.max_nodes_per_request
63 || s.browser.config.max_nodes_per_request == 0
64 {
65 let Some(it) = s.pending.pop_front() else {
66 break;
67 };
68 chunk.push(it);
69 }
70 if !chunk.is_empty() {
71 s.running_browses.push(run_browse(
72 s.browser.session,
73 BrowseBatch::Browse(chunk),
74 s.browser.retry_policy.clone(),
75 s.browser.config.max_references_per_node,
76 ));
77 } else {
78 break;
79 }
80 }
81
82 let Some(next) = s.running_browses.next().await else {
84 return Ok(None);
85 };
86
87 let browse_next = match next.and_then(|m| s.process_result(m)) {
89 Ok(next) => next,
90 Err(e) => {
91 s.cleanup().await;
92 return Err(e);
93 }
94 };
95
96 if !browse_next.is_empty() {
100 s.running_browses.push(run_browse(
101 s.browser.session,
102 BrowseBatch::Next(browse_next),
103 s.browser.retry_policy.clone(),
104 s.browser.config.max_references_per_node,
105 ));
106 }
107 }
108 })
109 }
110
111 pub async fn run_into_result(
113 self,
114 initial: Vec<BrowseDescription>,
115 ) -> Result<BrowserResult, Error> {
116 BrowserResult::build_from_browser(self.run(initial)).await
117 }
118}
119
120struct BrowserExecution<'a, T, R, TFut> {
125 browser: Browser<'a, T, R>,
126 running_browses: FuturesUnordered<TFut>,
127 pending: VecDeque<RequestWithRetries>,
128 pending_out: VecDeque<BrowseResultItem>,
129 browsed_nodes: HashSet<BrowsedNode>,
130 pending_continuation_points: HashSet<ByteString>,
131}
132
133#[derive(PartialEq, Eq, Hash)]
134enum Direction {
135 Forward,
136 Inverse,
137 Both,
138}
139
140#[derive(PartialEq, Eq, Hash)]
141struct BrowsedNode {
142 id: NodeId,
143 direction: Direction,
144}
145
146impl From<&BrowseDescription> for BrowsedNode {
147 fn from(value: &BrowseDescription) -> Self {
148 Self {
149 id: value.node_id.clone(),
150 direction: match value.browse_direction {
151 BrowseDirection::Forward => Direction::Forward,
152 BrowseDirection::Inverse => Direction::Inverse,
153 BrowseDirection::Both => Direction::Both,
154 BrowseDirection::Invalid => Direction::Both,
155 },
156 }
157 }
158}
159
160#[derive(PartialEq, Eq, Hash)]
161struct BrowsedNodeRef<'a> {
162 id: &'a NodeId,
163 direction: Direction,
164}
165
166impl Equivalent<BrowsedNode> for BrowsedNodeRef<'_> {
167 fn equivalent(&self, key: &BrowsedNode) -> bool {
168 self.id == &key.id && self.direction == key.direction
169 }
170}
171
172struct InnerResultItem {
173 it: BrowseResultItem,
174 cp: Option<ByteString>,
175}
176
177struct BrowseNextItem {
178 original: RequestWithRetries,
179 cp: ByteString,
180}
181
182enum BrowseBatch {
183 Browse(Vec<RequestWithRetries>),
184 Next(Vec<BrowseNextItem>),
185}
186
187async fn run_browse<R: RequestRetryPolicy>(
188 session: &Session,
189 batch: BrowseBatch,
190 policy: R,
191 max_references_per_node: u32,
192) -> Result<Vec<InnerResultItem>, Error> {
193 match batch {
194 BrowseBatch::Browse(items) => {
195 let r = session
196 .send_with_retry(
197 Browse::new(session)
198 .max_references_per_node(max_references_per_node)
199 .nodes_to_browse(items.iter().map(|r| r.request.clone()).collect()),
200 policy,
201 )
202 .await
203 .map_err(|e| Error::new(e, "Browse failed"))?;
204
205 let res = r.results.unwrap_or_default();
206 if res.len() != items.len() {
207 return Err(Error::new(
208 StatusCode::BadUnexpectedError,
209 format!(
210 "Incorrect number of results returned from Browse, expected {}, got {}",
211 items.len(),
212 res.len()
213 ),
214 ));
215 }
216 Ok(res
217 .into_iter()
218 .zip(items)
219 .map(|(res, it)| InnerResultItem {
220 it: BrowseResultItem {
221 status: res.status_code,
222 request: it,
223 references: res.references.unwrap_or_default(),
224 request_continuation_point: None,
225 },
226 cp: if res.continuation_point.is_null() {
227 None
228 } else {
229 Some(res.continuation_point)
230 },
231 })
232 .collect())
233 }
234 BrowseBatch::Next(items) => {
235 let r = session
236 .send_with_retry(
237 BrowseNext::new(session)
238 .continuation_points(items.iter().map(|r| r.cp.clone()).collect()),
239 policy,
240 )
241 .await
242 .map_err(|e| Error::new(e, "BrowseNext failed"))?;
243
244 let res = r.results.unwrap_or_default();
245 if res.len() != items.len() {
246 return Err(Error::new(
247 StatusCode::BadUnexpectedError,
248 format!(
249 "Incorrect number of results returned from BrowseNext, expected {}, got {}",
250 items.len(),
251 res.len()
252 ),
253 ));
254 }
255 Ok(res
256 .into_iter()
257 .zip(items)
258 .map(|(res, it)| InnerResultItem {
259 it: BrowseResultItem {
260 status: res.status_code,
261 request: it.original,
262 references: res.references.unwrap_or_default(),
263 request_continuation_point: Some(it.cp),
264 },
265 cp: if res.continuation_point.is_null() {
266 None
267 } else {
268 Some(res.continuation_point)
269 },
270 })
271 .collect())
272 }
273 }
274}
275
276impl<
277 'a,
278 T: BrowserPolicy,
279 R: RequestRetryPolicy + Clone + 'a,
280 TFut: Future<Output = Result<Vec<InnerResultItem>, Error>> + 'a,
281 > BrowserExecution<'a, T, R, TFut>
282{
283 fn visited(&self, dir: Direction, node_id: &NodeId) -> bool {
284 self.browsed_nodes.contains(&BrowsedNodeRef {
285 id: node_id,
286 direction: dir,
287 })
288 }
289
290 async fn cleanup(&mut self) {
291 while let Some(r) = self.running_browses.next().await {
293 let Ok(r) = r else {
294 continue;
295 };
296 for res in r {
297 if let Some(old_cp) = res.it.request_continuation_point.as_ref() {
298 self.pending_continuation_points.remove(old_cp);
299 }
300 if let Some(cp) = res.cp {
301 self.pending_continuation_points.insert(cp.clone());
302 }
303 }
304 }
305
306 let to_consume: Vec<_> = self.pending_continuation_points.drain().collect();
308 let mut futures = Vec::new();
309 for chunk in to_consume.chunks(self.browser.config.max_nodes_per_request) {
310 let session = self.browser.session;
311 futures.push(async move {
312 let _ = session.browse_next(true, chunk).await;
314 });
315 }
316
317 let mut it = FuturesUnordered::new();
319 loop {
320 while it.len() < self.browser.config.max_concurrent_requests
321 || self.browser.config.max_concurrent_requests == 0
322 {
323 let Some(fut) = futures.pop() else {
324 break;
325 };
326 it.push(fut);
327 }
328
329 if it.next().await.is_none() {
330 break;
331 }
332 }
333 }
334
335 pub(crate) fn process_result(
336 &mut self,
337 next: Vec<InnerResultItem>,
338 ) -> Result<Vec<BrowseNextItem>, Error> {
339 let mut browse_next = Vec::new();
340 for res in next {
341 let to_enqueue = self.browser.handler.get_next(&res.it);
343 for mut it in to_enqueue {
344 match it.browse_direction {
346 BrowseDirection::Forward => {
347 if self.visited(Direction::Forward, &it.node_id)
348 || self.visited(Direction::Both, &it.node_id)
349 {
350 continue;
351 }
352 }
353 BrowseDirection::Inverse => {
354 if self.visited(Direction::Inverse, &it.node_id)
355 || self.visited(Direction::Both, &it.node_id)
356 {
357 continue;
358 }
359 }
360 BrowseDirection::Both => {
361 if self.visited(Direction::Both, &it.node_id) {
362 continue;
363 }
364
365 let visited_inv = self.visited(Direction::Inverse, &it.node_id);
366 let visited_for = self.visited(Direction::Forward, &it.node_id);
367 if visited_for && visited_inv {
368 continue;
369 } else if visited_for {
370 it.browse_direction = BrowseDirection::Inverse;
371 } else if visited_inv {
372 it.browse_direction = BrowseDirection::Forward;
373 }
374 }
375 BrowseDirection::Invalid => {
376 return Err(Error::new(
377 StatusCode::BadBrowseDirectionInvalid,
378 "Produced an invalid browse direction",
379 ))
380 }
381 }
382
383 self.browsed_nodes.insert(BrowsedNode::from(&it));
385 self.pending.push_back(RequestWithRetries {
386 request: it,
387 num_outer_retries: 0,
388 depth: res.it.request.depth + 1,
389 });
390 }
391 if let Some(old_cp) = res.it.request_continuation_point.as_ref() {
393 self.pending_continuation_points.remove(old_cp);
394 }
395 if let Some(cp) = res.cp {
396 self.pending_continuation_points.insert(cp.clone());
399 browse_next.push(BrowseNextItem {
400 original: res.it.request.clone(),
401 cp,
402 });
403 } else if matches!(res.it.status, StatusCode::BadContinuationPointInvalid)
404 && self.browser.config.max_continuation_point_retries
405 > res.it.request.num_outer_retries
406 {
407 self.pending.push_back(RequestWithRetries {
410 request: res.it.request.request.clone(),
411 num_outer_retries: res.it.request.num_outer_retries + 1,
412 depth: res.it.request.depth,
413 });
414 }
415
416 self.pending_out.push_back(res.it);
417 }
418 Ok(browse_next)
419 }
420}