1use crate::backend::connector::{ComConnector, ConnectedGroup, ConnectedServer, ServerConnector};
2use crate::bindings::da::{
3 OPC_BRANCH, OPC_BROWSE_DOWN, OPC_BROWSE_UP, OPC_DS_DEVICE, OPC_FLAT, OPC_LEAF, OPC_NS_FLAT,
4 tagOPCITEMDEF,
5};
6use crate::helpers::{
7 filetime_to_string, format_hresult, opc_value_to_variant, quality_to_string, variant_to_string,
8};
9use crate::provider::{OpcProvider, OpcValue, TagValue, WriteResult};
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15pub struct OpcDaWrapper<C: ServerConnector = ComConnector> {
20 connector: Arc<C>,
21}
22
23impl Default for OpcDaWrapper<ComConnector> {
24 fn default() -> Self {
25 Self::new(ComConnector)
26 }
27}
28
29impl<C: ServerConnector> OpcDaWrapper<C> {
30 pub fn new(connector: C) -> Self {
32 Self {
33 connector: Arc::new(connector),
34 }
35 }
36}
37
38fn browse_recursive<S: ConnectedServer>(
39 server: &S,
40 tags: &mut Vec<String>,
41 max_tags: usize,
42 progress: &Arc<AtomicUsize>,
43 tags_sink: &Arc<std::sync::Mutex<Vec<String>>>,
44 depth: usize,
45) -> Result<()> {
46 const MAX_DEPTH: usize = 50;
47 if depth > MAX_DEPTH || tags.len() >= max_tags {
48 if depth > MAX_DEPTH {
49 tracing::warn!(depth, "Max browse depth reached, truncating");
50 }
51 return Ok(());
52 }
53
54 let branch_enum = server
55 .browse_opc_item_ids(OPC_BRANCH.0 as u32, Some(""), 0, 0)
56 .context("Failed to browse branches at current position")?;
57
58 let branches: Vec<String> = branch_enum
59 .filter_map(|r| match r {
60 Ok(name) => Some(name),
61 Err(e) => {
62 tracing::warn!(error = ?e, "Branch iteration error, skipping");
63 None
64 }
65 })
66 .collect();
67
68 for branch in branches {
69 if tags.len() >= max_tags {
70 break;
71 }
72 if let Err(e) = server.change_browse_position(OPC_BROWSE_DOWN.0 as u32, &branch) {
73 tracing::warn!(branch = %branch, error = ?e, "Failed to enter branch, skipping");
74 continue;
75 }
76
77 let recurse_result =
78 browse_recursive(server, tags, max_tags, progress, tags_sink, depth + 1);
79
80 if let Err(e) = server.change_browse_position(OPC_BROWSE_UP.0 as u32, "") {
81 tracing::error!(branch = %branch, error = ?e, "Failed to navigate back up!");
82 return Err(anyhow::anyhow!(
83 "Browse position corrupted: failed to navigate up from branch '{branch}'"
84 ));
85 }
86 recurse_result?;
87 }
88
89 if let Ok(leaf_enum) = server.browse_opc_item_ids(OPC_LEAF.0 as u32, Some(""), 0, 0) {
90 for leaf_res in leaf_enum {
91 if tags.len() >= max_tags {
92 break;
93 }
94 let browse_name = match leaf_res {
95 Ok(name) => name,
96 Err(e) => {
97 tracing::warn!(error = ?e, "Leaf iteration error, skipping");
98 continue;
99 }
100 };
101
102 let tag = match server.get_item_id(&browse_name) {
103 Ok(item_id) => {
104 tracing::debug!(
105 browse_name = %browse_name,
106 item_id = %item_id,
107 "get_item_id resolved"
108 );
109 item_id
110 }
111 Err(e) => {
112 tracing::warn!(
113 browse_name = %browse_name,
114 error = ?e,
115 "get_item_id failed, using browse name as fallback"
116 );
117 browse_name
118 }
119 };
120 tags.push(tag.clone());
121 if let Ok(mut sink) = tags_sink.lock() {
122 sink.push(tag);
123 }
124 progress.fetch_add(1, Ordering::Relaxed);
125 }
126 }
127
128 Ok(())
129}
130
131#[allow(clippy::too_many_lines)]
132#[async_trait]
133impl<C: ServerConnector + 'static> OpcProvider for OpcDaWrapper<C> {
134 async fn list_servers(&self, host: &str) -> Result<Vec<String>> {
135 let host_owned = host.to_string();
136 let connector = Arc::clone(&self.connector);
137 tokio::task::spawn_blocking(move || {
138 let span = tracing::info_span!("opc.list_servers", host = %host_owned);
139 let _enter = span.enter();
140
141 let _guard = crate::ComGuard::new()?;
142 let servers = connector.enumerate_servers()?;
143 tracing::info!(count = servers.len(), "list_servers completed");
144 Ok(servers)
145 })
146 .await?
147 }
148
149 async fn browse_tags(
150 &self,
151 server: &str,
152 max_tags: usize,
153 progress: Arc<AtomicUsize>,
154 tags_sink: Arc<std::sync::Mutex<Vec<String>>>,
155 ) -> Result<Vec<String>> {
156 let server_name = server.to_string();
157 let connector = Arc::clone(&self.connector);
158 tokio::task::spawn_blocking(move || {
159 let span = tracing::info_span!("opc.browse_tags", server = %server_name, max_tags);
160 let _enter = span.enter();
161
162 let _guard = crate::ComGuard::new()?;
163 let opc_server = connector.connect(&server_name)?;
164
165 let org = opc_server
166 .query_organization()
167 .context("Failed to query namespace organization")?;
168 let mut tags = Vec::new();
169
170 if org == OPC_NS_FLAT.0 as u32 {
171 let string_iter =
172 opc_server.browse_opc_item_ids(OPC_LEAF.0 as u32, Some(""), 0, 0)?;
173 for tag_res in string_iter {
174 if tags.len() >= max_tags {
175 break;
176 }
177 let tag = tag_res.map_err(|e| anyhow::anyhow!("Tag iteration error: {e:?}"))?;
178 tags.push(tag.clone());
179 if let Ok(mut sink) = tags_sink.lock() {
180 sink.push(tag);
181 }
182 progress.fetch_add(1, Ordering::Relaxed);
183 }
184 } else {
185 let use_flat = match opc_server.browse_opc_item_ids(OPC_FLAT.0 as u32, Some(""), 0, 0) {
188 Ok(mut flat_enum) => match flat_enum.next() {
189 Some(Ok(first_tag)) => {
190 tracing::info!("OPC_FLAT browse supported — using fast flat enumeration");
191 tags.push(first_tag.clone());
192 if let Ok(mut sink) = tags_sink.lock() {
193 sink.push(first_tag);
194 }
195 progress.fetch_add(1, Ordering::Relaxed);
196
197 for tag_res in flat_enum {
198 if tags.len() >= max_tags { break; }
199 match tag_res {
200 Ok(tag) => {
201 tags.push(tag.clone());
202 if let Ok(mut sink) = tags_sink.lock() {
203 sink.push(tag);
204 }
205 progress.fetch_add(1, Ordering::Relaxed);
206 }
207 Err(e) => {
208 tracing::warn!(error = ?e, "OPC_FLAT tag iteration error, skipping");
209 }
210 }
211 }
212 true
213 }
214 Some(Err(e)) => {
215 tracing::debug!(error = ?e, "OPC_FLAT first item error, falling back to recursive");
216 false
217 }
218 None => {
219 tracing::debug!("OPC_FLAT returned no items, falling back to recursive");
220 false
221 }
222 },
223 Err(e) => {
224 tracing::debug!(error = ?e, "OPC_FLAT not supported, falling back to recursive");
225 false
226 }
227 };
228
229 if !use_flat {
230 browse_recursive(&opc_server, &mut tags, max_tags, &progress, &tags_sink, 0)?;
231 }
232 }
233 tracing::info!(count = tags.len(), "browse_tags completed");
234 Ok(tags)
235 })
236 .await?
237 }
238
239 async fn read_tag_values(&self, server: &str, tag_ids: Vec<String>) -> Result<Vec<TagValue>> {
240 let server_name = server.to_string();
241 let connector = Arc::clone(&self.connector);
242 tokio::task::spawn_blocking(move || {
243 let span = tracing::info_span!(
244 "opc.read_tag_values",
245 server = %server_name,
246 tag_count = tag_ids.len()
247 );
248 let _enter = span.enter();
249
250 let _guard = crate::ComGuard::new()?;
251 let opc_server = connector.connect(&server_name)?;
252
253 let mut revised_update_rate = 0u32;
254 let mut server_handle = 0u32;
255 let group = opc_server.add_group(
256 "opc-da-client-read",
257 true,
258 1000, 0, 0, 0.0, 0, &mut revised_update_rate,
264 &mut server_handle,
265 )?;
266
267 let item_id_wides: Vec<Vec<u16>> = tag_ids
270 .iter()
271 .map(|tag_id| tag_id.encode_utf16().chain(std::iter::once(0)).collect())
272 .collect();
273
274 let item_defs: Vec<tagOPCITEMDEF> = item_id_wides
275 .iter()
276 .enumerate()
277 .map(|(idx, wide)| tagOPCITEMDEF {
278 szAccessPath: windows::core::PWSTR::null(),
279 szItemID: windows::core::PWSTR(wide.as_ptr().cast_mut()),
280 bActive: windows::Win32::Foundation::TRUE,
281 #[allow(clippy::cast_possible_truncation)]
282 hClient: idx as u32,
283 dwBlobSize: 0,
284 pBlob: std::ptr::null_mut(),
285 vtRequestedDataType: 0,
286 wReserved: 0,
287 })
288 .collect();
289
290 let (results, errors) = group.add_items(&item_defs)?;
291
292 let mut tag_values: Vec<TagValue> = tag_ids
295 .iter()
296 .map(|tag_id| TagValue {
297 tag_id: tag_id.clone(),
298 value: "Error".to_string(),
299 quality: "Bad — not added to group".to_string(),
300 timestamp: String::new(),
301 })
302 .collect();
303
304 let mut server_handles = Vec::new();
305 let mut valid_indices = Vec::new();
306
307 for (idx, (item_result, error)) in results
308 .as_slice()
309 .iter()
310 .zip(errors.as_slice().iter())
311 .enumerate()
312 {
313 if error.is_ok() {
314 server_handles.push(item_result.hServer);
315 valid_indices.push(idx);
316 } else {
317 let hint = format_hresult(*error);
318 tracing::warn!(
319 tag = %tag_ids[idx],
320 error = %hint,
321 "read_tag_values: add_items rejected tag"
322 );
323 tag_values[idx].quality = format!("Bad — {hint}");
324 }
325 }
326
327 if server_handles.is_empty() {
328 if let Err(e) = opc_server.remove_group(server_handle, true) {
329 tracing::warn!(error = ?e, operation = "read_tag_values", "Failed to remove OPC group during cleanup");
330 }
331 return Ok(tag_values);
332 }
333
334 let (item_states, read_errors) = group.read(OPC_DS_DEVICE, &server_handles)?;
335 let item_states_slice = item_states.as_slice();
336 let read_errors_slice = read_errors.as_slice();
337
338 for (i, idx) in valid_indices.iter().enumerate() {
339 let state = &item_states_slice[i];
340 let read_error = &read_errors_slice[i];
341
342 let (value_str, quality_str) = if read_error.is_ok() {
343 (
344 variant_to_string(&state.vDataValue),
345 quality_to_string(state.wQuality),
346 )
347 } else {
348 let full_msg = format_hresult(*read_error);
349 tracing::warn!(
350 tag = %tag_ids[*idx],
351 error = ?read_error,
352 hint = %full_msg,
353 "read_tag_values: per-item read error"
354 );
355 ("Error".to_string(), format!("Bad — {full_msg}"))
356 };
357
358 tag_values[*idx] = TagValue {
359 tag_id: tag_ids[*idx].clone(),
360 value: value_str,
361 quality: quality_str,
362 timestamp: filetime_to_string(state.ftTimeStamp),
363 };
364 }
365
366 tracing::info!(count = tag_values.len(), "read_tag_values completed");
367 if let Err(e) = opc_server.remove_group(server_handle, true) {
368 tracing::warn!(error = ?e, operation = "read_tag_values", "Failed to remove OPC group during cleanup");
369 }
370 Ok(tag_values)
371 })
372 .await?
373 }
374
375 async fn write_tag_value(
376 &self,
377 server: &str,
378 tag_id: &str,
379 value: OpcValue,
380 ) -> Result<WriteResult> {
381 let server_name = server.to_string();
382 let tag = tag_id.to_string();
383 let connector = Arc::clone(&self.connector);
384 tokio::task::spawn_blocking(move || {
385 let span = tracing::info_span!(
386 "opc.write_tag_value",
387 server = %server_name,
388 tag = %tag
389 );
390 let _enter = span.enter();
391
392 let _guard = crate::ComGuard::new()?;
393 let opc_server = connector.connect(&server_name)?;
394
395 let mut revised_update_rate = 0u32;
396 let mut server_handle = 0u32;
397 let group = opc_server.add_group(
398 "opc-da-client-write",
399 true,
400 1000,
401 0,
402 0,
403 0.0,
404 0,
405 &mut revised_update_rate,
406 &mut server_handle,
407 )?;
408
409 let mut item_id_wide: Vec<u16> =
412 tag.encode_utf16().chain(std::iter::once(0)).collect();
413 let item_def = tagOPCITEMDEF {
414 szAccessPath: windows::core::PWSTR::null(),
415 szItemID: windows::core::PWSTR(item_id_wide.as_mut_ptr()),
416 bActive: windows::Win32::Foundation::TRUE,
417 hClient: 0,
418 dwBlobSize: 0,
419 pBlob: std::ptr::null_mut(),
420 vtRequestedDataType: 0,
421 wReserved: 0,
422 };
423
424 let (results, errors) = group.add_items(&[item_def])?;
425 let item_res = results
426 .as_slice()
427 .first()
428 .context("Server returned empty item results")?;
429 let item_err = errors
430 .as_slice()
431 .first()
432 .context("Server returned empty item errors")?;
433
434 if let Err(e) = item_err.ok() {
435 tracing::warn!(error = ?e, "write_tag_value: failed to add tag to group");
436 if let Err(e) = opc_server.remove_group(server_handle, true) {
437 tracing::warn!(error = ?e, operation = "write_tag_value", "Failed to remove OPC group during cleanup");
438 }
439 return Ok(WriteResult {
440 tag_id: tag,
441 success: false,
442 error: Some(format!("Failed to add tag to group: {:?}", item_err)),
443 });
444 }
445
446 let variant = opc_value_to_variant(&value);
447 let write_errors = group.write(&[item_res.hServer], &[variant])?;
448 let write_error = write_errors
449 .as_slice()
450 .first()
451 .context("Server returned empty write errors")?;
452
453 let write_result = if write_error.is_ok() {
454 tracing::info!("write_tag_value: write completed successfully");
455 WriteResult {
456 tag_id: tag,
457 success: true,
458 error: None,
459 }
460 } else {
461 let hint = format_hresult(*write_error);
462 tracing::warn!(error = ?write_error, hint = %hint, "write_tag_value: server rejected write");
463 WriteResult {
464 tag_id: tag,
465 success: false,
466 error: Some(hint),
467 }
468 };
469
470 if let Err(e) = opc_server.remove_group(server_handle, true) {
471 tracing::warn!(error = ?e, operation = "write_tag_value", "Failed to remove OPC group during cleanup");
472 }
473 Ok(write_result)
474 })
475 .await?
476 }
477}
478
479#[cfg(test)]
480#[allow(
481 clippy::undocumented_unsafe_blocks,
482 clippy::ptr_as_ptr,
483 clippy::cast_possible_truncation,
484 clippy::cast_possible_wrap,
485 clippy::ref_as_ptr,
486 clippy::inline_always
487)]
488mod tests {
489 use super::*;
490 use crate::backend::connector::{ConnectedGroup, ConnectedServer, ServerConnector};
491 use crate::bindings::da::{
492 OPC_NS_HIERARCHIAL, tagOPCITEMDEF, tagOPCITEMRESULT, tagOPCITEMSTATE,
493 };
494 use crate::opc_da::client::StringIterator;
495 use crate::opc_da::utils::RemoteArray;
496 use windows::Win32::System::Com::{IEnumString, IEnumString_Impl};
497 use windows::Win32::System::Variant::VARIANT;
498 use windows::core::{PWSTR, implement};
499
500 #[allow(clippy::ref_as_ptr, clippy::inline_always)]
501 #[implement(IEnumString)]
502 struct MockEnumString {
503 items: Vec<String>,
504 index: std::sync::atomic::AtomicUsize,
505 }
506
507 impl IEnumString_Impl for MockEnumString_Impl {
508 fn Next(
509 &self,
510 celt: u32,
511 rgelt: *mut PWSTR,
512 pceltfetched: *mut u32,
513 ) -> windows::core::HRESULT {
514 let mut fetched = 0;
515 let index = self.index.load(std::sync::atomic::Ordering::Relaxed);
516 let rgelt = unsafe { std::slice::from_raw_parts_mut(rgelt, celt as usize) };
517
518 for (i, elem) in rgelt.iter_mut().enumerate().take(celt as usize) {
519 if index + i < self.items.len() {
520 let s = &self.items[index + i];
521 let w: Vec<u16> = s.encode_utf16().chain(std::iter::once(0)).collect();
522 let ptr = unsafe { windows::Win32::System::Com::CoTaskMemAlloc(w.len() * 2) };
523 unsafe { std::ptr::copy_nonoverlapping(w.as_ptr(), ptr as *mut u16, w.len()) };
524 *elem = PWSTR(ptr as *mut u16);
525 fetched += 1;
526 } else {
527 break;
528 }
529 }
530
531 self.index
532 .store(index + fetched, std::sync::atomic::Ordering::Relaxed);
533
534 if !pceltfetched.is_null() {
535 unsafe { *pceltfetched = fetched as u32 };
536 }
537
538 if fetched == celt as usize {
539 windows::Win32::Foundation::S_OK
540 } else {
541 windows::Win32::Foundation::S_FALSE
542 }
543 }
544 fn Skip(&self, _celt: u32) -> windows::core::HRESULT {
545 windows::Win32::Foundation::E_NOTIMPL
546 }
547 fn Reset(&self) -> windows::core::Result<()> {
548 self.index.store(0, std::sync::atomic::Ordering::Relaxed);
549 Ok(())
550 }
551 fn Clone(&self) -> windows::core::Result<IEnumString> {
552 Err(windows::core::Error::from_hresult(
553 windows::Win32::Foundation::E_NOTIMPL,
554 ))
555 }
556 }
557
558 struct MockGroup;
559
560 impl ConnectedGroup for MockGroup {
561 fn add_items(
562 &self,
563 items: &[tagOPCITEMDEF],
564 ) -> anyhow::Result<(
565 RemoteArray<tagOPCITEMRESULT>,
566 RemoteArray<windows::core::HRESULT>,
567 )> {
568 let mut results = Vec::new();
569 let mut errors = Vec::new();
570
571 for (i, item) in items.iter().enumerate() {
572 let mut name_w = Vec::new();
574 let mut ptr = item.szItemID.0;
575 unsafe {
576 while !ptr.is_null() && *ptr != 0 {
577 name_w.push(*ptr);
578 ptr = ptr.add(1);
579 }
580 }
581 let name = String::from_utf16_lossy(&name_w);
582
583 let res = tagOPCITEMRESULT {
584 hServer: (i + 1) as u32,
585 ..Default::default()
586 };
587
588 if name == "RejectMe" {
589 errors.push(windows::core::HRESULT(0xC004_0007_u32 as i32)); } else if name == "RejectAll" {
591 return Err(anyhow::anyhow!("Total failure"));
592 } else {
593 errors.push(windows::core::HRESULT(0)); }
595 results.push(res);
596 }
597
598 unsafe {
599 let p_res = windows::Win32::System::Com::CoTaskMemAlloc(
600 results.len() * std::mem::size_of::<tagOPCITEMRESULT>(),
601 ) as *mut tagOPCITEMRESULT;
602 std::ptr::copy_nonoverlapping(results.as_ptr(), p_res, results.len());
603 let p_err = windows::Win32::System::Com::CoTaskMemAlloc(
604 errors.len() * std::mem::size_of::<windows::core::HRESULT>(),
605 ) as *mut windows::core::HRESULT;
606 std::ptr::copy_nonoverlapping(errors.as_ptr(), p_err, errors.len());
607
608 Ok((
609 RemoteArray::from_mut_ptr(p_res, results.len() as u32),
610 RemoteArray::from_mut_ptr(p_err, errors.len() as u32),
611 ))
612 }
613 }
614
615 fn read(
616 &self,
617 _source: crate::bindings::da::tagOPCDATASOURCE,
618 server_handles: &[u32],
619 ) -> anyhow::Result<(
620 RemoteArray<tagOPCITEMSTATE>,
621 RemoteArray<windows::core::HRESULT>,
622 )> {
623 let mut states = Vec::new();
624 let mut errors = Vec::new();
625
626 for &handle in server_handles {
627 let mut state = tagOPCITEMSTATE {
628 hClient: handle, wQuality: crate::bindings::da::OPC_QUALITY_GOOD,
630 ..Default::default()
631 };
632 use windows::Win32::System::Variant::{
634 VARENUM, VARIANT, VARIANT_0, VARIANT_0_0, VARIANT_0_0_0,
635 };
636 let variant = VARIANT_0_0 {
637 vt: VARENUM(3), Anonymous: VARIANT_0_0_0 { lVal: 42 },
639 ..Default::default()
640 };
641 state.vDataValue = VARIANT {
642 Anonymous: VARIANT_0 {
643 Anonymous: std::mem::ManuallyDrop::new(variant),
644 },
645 };
646
647 states.push(state);
648 errors.push(windows::core::HRESULT(0)); }
650
651 unsafe {
652 let p_states = windows::Win32::System::Com::CoTaskMemAlloc(
653 states.len() * std::mem::size_of::<tagOPCITEMSTATE>(),
654 ) as *mut tagOPCITEMSTATE;
655 std::ptr::copy_nonoverlapping(states.as_ptr(), p_states, states.len());
656 let p_err = windows::Win32::System::Com::CoTaskMemAlloc(
657 errors.len() * std::mem::size_of::<windows::core::HRESULT>(),
658 ) as *mut windows::core::HRESULT;
659 std::ptr::copy_nonoverlapping(errors.as_ptr(), p_err, errors.len());
660
661 Ok((
662 RemoteArray::from_mut_ptr(p_states, states.len() as u32),
663 RemoteArray::from_mut_ptr(p_err, errors.len() as u32),
664 ))
665 }
666 }
667
668 fn write(
669 &self,
670 server_handles: &[u32],
671 _values: &[VARIANT],
672 ) -> anyhow::Result<RemoteArray<windows::core::HRESULT>> {
673 let mut errors = Vec::new();
674 for _ in server_handles {
675 errors.push(windows::core::HRESULT(0)); }
677
678 unsafe {
679 let p_err = windows::Win32::System::Com::CoTaskMemAlloc(
680 errors.len() * std::mem::size_of::<windows::core::HRESULT>(),
681 ) as *mut windows::core::HRESULT;
682 std::ptr::copy_nonoverlapping(errors.as_ptr(), p_err, errors.len());
683 Ok(RemoteArray::from_mut_ptr(p_err, errors.len() as u32))
684 }
685 }
686 }
687
688 struct MockServer;
689
690 impl ConnectedServer for MockServer {
691 type Group = MockGroup;
692
693 fn query_organization(&self) -> anyhow::Result<u32> {
694 Ok(crate::bindings::da::OPC_NS_FLAT.0 as u32)
695 }
696
697 fn browse_opc_item_ids(
698 &self,
699 _browse_type: u32,
700 _filter: Option<&str>,
701 _data_type: u16,
702 _access_rights: u32,
703 ) -> anyhow::Result<StringIterator> {
704 let mock_enum: IEnumString = MockEnumString {
705 items: vec!["MockTag.1".to_string(), "MockTag.2".to_string()],
706 index: std::sync::atomic::AtomicUsize::new(0),
707 }
708 .into();
709 Ok(StringIterator::new(mock_enum))
710 }
711
712 fn change_browse_position(&self, _direction: u32, _name: &str) -> anyhow::Result<()> {
713 Ok(())
714 }
715
716 fn get_item_id(&self, item_name: &str) -> anyhow::Result<String> {
717 Ok(item_name.to_string())
718 }
719
720 fn add_group(
721 &self,
722 _name: &str,
723 _active: bool,
724 _update_rate: u32,
725 _client_handle: u32,
726 _time_bias: i32,
727 _percent_deadband: f32,
728 _locale_id: u32,
729 revised_update_rate: &mut u32,
730 server_handle: &mut u32,
731 ) -> anyhow::Result<Self::Group> {
732 *revised_update_rate = 1000;
733 *server_handle = 1;
734 Ok(MockGroup)
735 }
736
737 fn remove_group(&self, _server_group: u32, _force: bool) -> anyhow::Result<()> {
738 Ok(())
739 }
740 }
741
742 #[derive(Clone)]
743 struct MockConnector;
744
745 impl ServerConnector for MockConnector {
746 type Server = MockServer;
747
748 fn enumerate_servers(&self) -> anyhow::Result<Vec<String>> {
749 Ok(vec![
750 "Mock.Server.1".to_string(),
751 "Mock.Server.2".to_string(),
752 ])
753 }
754
755 fn connect(&self, _server_name: &str) -> anyhow::Result<Self::Server> {
756 Ok(MockServer)
757 }
758 }
759
760 #[derive(Clone, Debug, PartialEq)]
761 enum OpcFlatBehavior {
762 Success(Vec<String>),
763 ReturnsError,
764 ReturnsEmpty,
765 }
766
767 struct MockHierarchicalServer {
768 opc_flat_behavior: OpcFlatBehavior,
769 position: std::cell::RefCell<Vec<String>>,
770 }
771
772 impl ConnectedServer for MockHierarchicalServer {
773 type Group = MockGroup;
774
775 fn query_organization(&self) -> anyhow::Result<u32> {
776 Ok(OPC_NS_HIERARCHIAL.0 as u32)
777 }
778
779 fn browse_opc_item_ids(
780 &self,
781 browse_type: u32,
782 _filter: Option<&str>,
783 _data_type: u16,
784 _access_rights: u32,
785 ) -> anyhow::Result<StringIterator> {
786 let pos = self.position.borrow();
787 let mut results = Vec::new();
788
789 if browse_type == OPC_FLAT.0 as u32 {
790 match &self.opc_flat_behavior {
791 OpcFlatBehavior::Success(items) => {
792 results = items.clone();
793 }
794 OpcFlatBehavior::ReturnsError => {
795 return Err(anyhow::anyhow!("OPC_FLAT not supported mock error"));
796 }
797 OpcFlatBehavior::ReturnsEmpty => {
798 }
800 }
801 } else if browse_type == OPC_BRANCH.0 as u32 {
802 if pos.is_empty() {
803 results.push("Branch1".to_string());
804 results.push("Branch2".to_string());
805 }
806 } else if browse_type == OPC_LEAF.0 as u32 {
807 if pos.len() == 1 && pos[0] == "Branch1" {
808 results.push("Leaf1".to_string());
809 results.push("Leaf2".to_string());
810 } else if pos.len() == 1 && pos[0] == "Branch2" {
811 results.push("Leaf3".to_string());
812 }
813 }
814
815 let mock_enum: IEnumString = MockEnumString {
816 items: results,
817 index: std::sync::atomic::AtomicUsize::new(0),
818 }
819 .into();
820 Ok(StringIterator::new(mock_enum))
821 }
822
823 fn change_browse_position(&self, direction: u32, name: &str) -> anyhow::Result<()> {
824 let mut pos = self.position.borrow_mut();
825 if direction == OPC_BROWSE_DOWN.0 as u32 {
826 pos.push(name.to_string());
827 } else if direction == OPC_BROWSE_UP.0 as u32 {
828 pos.pop();
829 }
830 Ok(())
831 }
832
833 fn get_item_id(&self, item_name: &str) -> anyhow::Result<String> {
834 let pos = self.position.borrow();
835 if pos.is_empty() {
836 Ok(item_name.to_string())
837 } else {
838 Ok(format!("{}.{}", pos.join("."), item_name))
839 }
840 }
841
842 fn add_group(
843 &self,
844 _name: &str,
845 _active: bool,
846 _update_rate: u32,
847 _client_handle: u32,
848 _time_bias: i32,
849 _percent_deadband: f32,
850 _locale_id: u32,
851 _revised_update_rate: &mut u32,
852 _server_handle: &mut u32,
853 ) -> anyhow::Result<Self::Group> {
854 Ok(MockGroup)
855 }
856
857 fn remove_group(&self, _server_group: u32, _force: bool) -> anyhow::Result<()> {
858 Ok(())
859 }
860 }
861
862 struct MockHierarchicalConnector {
863 opc_flat_behavior: OpcFlatBehavior,
864 }
865
866 impl ServerConnector for MockHierarchicalConnector {
867 type Server = MockHierarchicalServer;
868
869 fn enumerate_servers(&self) -> anyhow::Result<Vec<String>> {
870 Ok(vec!["Mock.Hierarchical.1".to_string()])
871 }
872
873 fn connect(&self, _server_name: &str) -> anyhow::Result<Self::Server> {
874 Ok(MockHierarchicalServer {
875 opc_flat_behavior: self.opc_flat_behavior.clone(),
876 position: std::cell::RefCell::new(Vec::new()),
877 })
878 }
879 }
880
881 #[test]
882 fn test_browse_tags_flat_server() {
883 let rt = tokio::runtime::Builder::new_current_thread()
884 .build()
885 .unwrap();
886 rt.block_on(async {
887 let wrapper = OpcDaWrapper {
888 connector: std::sync::Arc::new(MockConnector),
889 };
890
891 let progress = Arc::new(AtomicUsize::new(0));
892 let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
893
894 let tags = wrapper
895 .browse_tags("Mock.Server", 100, progress.clone(), sink.clone())
896 .await
897 .unwrap();
898
899 assert_eq!(tags.len(), 2);
900 assert_eq!(tags[0], "MockTag.1");
901 assert_eq!(tags[1], "MockTag.2");
902
903 assert_eq!(progress.load(Ordering::Relaxed), 2);
904 let sink_tags = sink.lock().unwrap().clone();
905 assert_eq!(sink_tags, tags);
906 });
907 }
908
909 #[test]
910 fn test_browse_tags_hierarchical_recursive() {
911 let rt = tokio::runtime::Builder::new_current_thread()
912 .build()
913 .unwrap();
914 rt.block_on(async {
915 let connector = MockHierarchicalConnector {
916 opc_flat_behavior: OpcFlatBehavior::ReturnsError,
917 };
918 let wrapper = OpcDaWrapper {
919 connector: std::sync::Arc::new(connector),
920 };
921
922 let progress = Arc::new(AtomicUsize::new(0));
923 let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
924
925 let tags = wrapper
926 .browse_tags("Mock.Hierarchical", 100, progress.clone(), sink.clone())
927 .await
928 .unwrap();
929
930 assert_eq!(tags.len(), 3);
931 assert_eq!(tags[0], "Branch1.Leaf1");
932 assert_eq!(tags[1], "Branch1.Leaf2");
933 assert_eq!(tags[2], "Branch2.Leaf3");
934
935 assert_eq!(progress.load(Ordering::Relaxed), 3);
936 let sink_tags = sink.lock().unwrap().clone();
937 assert_eq!(sink_tags, tags);
938 });
939 }
940
941 #[test]
942 fn test_browse_tags_opc_flat_success() {
943 let rt = tokio::runtime::Builder::new_current_thread()
944 .build()
945 .unwrap();
946 rt.block_on(async {
947 let connector = MockHierarchicalConnector {
948 opc_flat_behavior: OpcFlatBehavior::Success(vec![
949 "FQ.Branch1.Leaf1".to_string(),
950 "FQ.Branch1.Leaf2".to_string(),
951 "FQ.Branch2.Leaf3".to_string(),
952 ]),
953 };
954 let wrapper = OpcDaWrapper {
955 connector: std::sync::Arc::new(connector),
956 };
957
958 let progress = Arc::new(AtomicUsize::new(0));
959 let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
960
961 let tags = wrapper
962 .browse_tags("Mock.Hierarchical", 100, progress.clone(), sink.clone())
963 .await
964 .unwrap();
965
966 assert_eq!(tags.len(), 3);
967 assert_eq!(tags[0], "FQ.Branch1.Leaf1");
968 assert_eq!(tags[1], "FQ.Branch1.Leaf2");
969 assert_eq!(tags[2], "FQ.Branch2.Leaf3");
970
971 assert_eq!(progress.load(Ordering::Relaxed), 3);
972 let sink_tags = sink.lock().unwrap().clone();
973 assert_eq!(sink_tags, tags);
974 });
975 }
976
977 #[test]
978 fn test_browse_tags_opc_flat_error_fallback() {
979 let rt = tokio::runtime::Builder::new_current_thread()
980 .build()
981 .unwrap();
982 rt.block_on(async {
983 let connector = MockHierarchicalConnector {
984 opc_flat_behavior: OpcFlatBehavior::ReturnsError,
985 };
986 let wrapper = OpcDaWrapper {
987 connector: std::sync::Arc::new(connector),
988 };
989
990 let progress = Arc::new(AtomicUsize::new(0));
991 let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
992
993 let tags = wrapper
994 .browse_tags("Mock.Hierarchical", 100, progress.clone(), sink.clone())
995 .await
996 .unwrap();
997
998 assert_eq!(tags.len(), 3);
999 assert_eq!(tags[0], "Branch1.Leaf1");
1000 assert_eq!(tags[1], "Branch1.Leaf2");
1001 assert_eq!(tags[2], "Branch2.Leaf3");
1002 });
1003 }
1004
1005 #[test]
1006 fn test_browse_tags_opc_flat_empty_fallback() {
1007 let rt = tokio::runtime::Builder::new_current_thread()
1008 .build()
1009 .unwrap();
1010 rt.block_on(async {
1011 let connector = MockHierarchicalConnector {
1012 opc_flat_behavior: OpcFlatBehavior::ReturnsEmpty,
1013 };
1014 let wrapper = OpcDaWrapper {
1015 connector: std::sync::Arc::new(connector),
1016 };
1017
1018 let progress = Arc::new(AtomicUsize::new(0));
1019 let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
1020
1021 let tags = wrapper
1022 .browse_tags("Mock.Hierarchical", 100, progress.clone(), sink.clone())
1023 .await
1024 .unwrap();
1025
1026 assert_eq!(tags.len(), 3);
1027 assert_eq!(tags[0], "Branch1.Leaf1");
1028 assert_eq!(tags[1], "Branch1.Leaf2");
1029 assert_eq!(tags[2], "Branch2.Leaf3");
1030 });
1031 }
1032
1033 #[test]
1034 fn test_browse_tags_max_tags_limit() {
1035 let rt = tokio::runtime::Builder::new_current_thread()
1036 .build()
1037 .unwrap();
1038 rt.block_on(async {
1039 let wrapper = OpcDaWrapper {
1040 connector: std::sync::Arc::new(MockConnector),
1041 };
1042
1043 let progress = Arc::new(AtomicUsize::new(0));
1044 let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
1045
1046 let tags = wrapper
1048 .browse_tags("Mock.Server", 2, progress.clone(), sink.clone())
1049 .await
1050 .unwrap();
1051
1052 assert_eq!(tags.len(), 2);
1053 assert_eq!(progress.load(Ordering::Relaxed), 2);
1054 });
1055 }
1056
1057 #[test]
1058 fn test_mock_list_servers() {
1059 let rt = tokio::runtime::Builder::new_current_thread()
1060 .build()
1061 .unwrap();
1062 rt.block_on(async {
1063 let wrapper = OpcDaWrapper {
1064 connector: std::sync::Arc::new(MockConnector),
1065 };
1066
1067 let servers = wrapper.list_servers("localhost").await.unwrap();
1068 assert_eq!(servers, vec!["Mock.Server.1", "Mock.Server.2"]);
1069 });
1070 }
1071
1072 #[test]
1073 fn test_mock_read_tags_happy() {
1074 let rt = tokio::runtime::Builder::new_current_thread()
1075 .build()
1076 .unwrap();
1077 rt.block_on(async {
1078 let wrapper = OpcDaWrapper {
1079 connector: std::sync::Arc::new(MockConnector),
1080 };
1081
1082 let tags = vec!["Tag1".to_string(), "Tag2".to_string()];
1083 let results = wrapper
1084 .read_tag_values("Mock.Server.1", tags)
1085 .await
1086 .unwrap();
1087 assert_eq!(results.len(), 2);
1088 assert_eq!(results[0].tag_id, "Tag1");
1089 assert_eq!(results[0].value, "42");
1090 assert_eq!(results[1].tag_id, "Tag2");
1091 assert_eq!(results[1].value, "42");
1092 });
1093 }
1094
1095 #[test]
1096 fn test_mock_read_tags_partial_reject() {
1097 let rt = tokio::runtime::Builder::new_current_thread()
1098 .build()
1099 .unwrap();
1100 rt.block_on(async {
1101 let wrapper = OpcDaWrapper {
1102 connector: std::sync::Arc::new(MockConnector),
1103 };
1104
1105 let tags = vec![
1106 "Tag1".to_string(),
1107 "RejectMe".to_string(),
1108 "Tag3".to_string(),
1109 ];
1110 let results = wrapper
1111 .read_tag_values("Mock.Server.1", tags)
1112 .await
1113 .unwrap();
1114 assert_eq!(results.len(), 3);
1115
1116 assert_eq!(results[0].value, "42");
1117 assert_eq!(results[1].value, "Error");
1118 assert!(results[1].quality.starts_with("Bad"));
1119 assert_eq!(results[2].value, "42");
1120 });
1121 }
1122
1123 #[test]
1124 fn test_mock_read_tags_all_reject() {
1125 let rt = tokio::runtime::Builder::new_current_thread()
1126 .build()
1127 .unwrap();
1128 rt.block_on(async {
1129 let wrapper = OpcDaWrapper {
1130 connector: std::sync::Arc::new(MockConnector),
1131 };
1132
1133 let tags = vec!["RejectAll".to_string()];
1136 let res = wrapper.read_tag_values("Mock.Server.1", tags).await;
1137 assert!(res.is_err()); let tags2 = vec!["RejectMe".to_string(), "RejectMe".to_string()];
1140 let results2 = wrapper
1141 .read_tag_values("Mock.Server.1", tags2)
1142 .await
1143 .unwrap();
1144 assert_eq!(results2.len(), 2);
1145 assert_eq!(results2[0].value, "Error");
1146 assert_eq!(results2[1].value, "Error");
1147 });
1148 }
1149
1150 #[test]
1151 fn test_mock_write_tag_happy() {
1152 let rt = tokio::runtime::Builder::new_current_thread()
1153 .build()
1154 .unwrap();
1155 rt.block_on(async {
1156 let wrapper = OpcDaWrapper {
1157 connector: std::sync::Arc::new(MockConnector),
1158 };
1159
1160 use crate::provider::OpcValue;
1161 let res = wrapper
1162 .write_tag_value("Mock.Server.1", "Tag1", OpcValue::Int(42))
1163 .await
1164 .unwrap();
1165 assert!(res.success);
1166 assert!(res.error.is_none());
1167 });
1168 }
1169
1170 #[test]
1171 fn test_mock_write_tag_add_fail() {
1172 let rt = tokio::runtime::Builder::new_current_thread()
1173 .build()
1174 .unwrap();
1175 rt.block_on(async {
1176 let wrapper = OpcDaWrapper {
1177 connector: std::sync::Arc::new(MockConnector),
1178 };
1179
1180 use crate::provider::OpcValue;
1181 let res = wrapper
1182 .write_tag_value("Mock.Server.1", "RejectMe", OpcValue::Int(42))
1183 .await
1184 .unwrap();
1185 assert!(!res.success);
1186 assert!(res.error.is_some());
1187 });
1188 }
1189}