Skip to main content

opc_da_client/backend/
opc_da.rs

1use crate::backend::connector::{ComConnector, ConnectedGroup, ConnectedServer, ServerConnector};
2use crate::bindings::da::{
3    OPC_BRANCH, OPC_BROWSE_DOWN, OPC_BROWSE_UP, OPC_DS_DEVICE, OPC_FLAT, OPC_LEAF, OPC_NS_FLAT,
4    tagOPCITEMDEF,
5};
6use crate::helpers::{
7    filetime_to_string, format_hresult, opc_value_to_variant, quality_to_string, variant_to_string,
8};
9use crate::provider::{OpcProvider, OpcValue, TagValue, WriteResult};
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15/// Concrete [`OpcProvider`] implementation for Windows OPC DA.
16///
17/// Heavy-weight implementation that uses the `opc_da` crate for
18/// native COM interop.
19pub struct OpcDaWrapper<C: ServerConnector = ComConnector> {
20    connector: Arc<C>,
21}
22
23impl Default for OpcDaWrapper<ComConnector> {
24    fn default() -> Self {
25        Self::new(ComConnector)
26    }
27}
28
29impl<C: ServerConnector> OpcDaWrapper<C> {
30    /// Creates a new `OpcDaWrapper` with the given connector.
31    pub fn new(connector: C) -> Self {
32        Self {
33            connector: Arc::new(connector),
34        }
35    }
36}
37
38fn browse_recursive<S: ConnectedServer>(
39    server: &S,
40    tags: &mut Vec<String>,
41    max_tags: usize,
42    progress: &Arc<AtomicUsize>,
43    tags_sink: &Arc<std::sync::Mutex<Vec<String>>>,
44    depth: usize,
45) -> Result<()> {
46    const MAX_DEPTH: usize = 50;
47    if depth > MAX_DEPTH || tags.len() >= max_tags {
48        if depth > MAX_DEPTH {
49            tracing::warn!(depth, "Max browse depth reached, truncating");
50        }
51        return Ok(());
52    }
53
54    let branch_enum = server
55        .browse_opc_item_ids(OPC_BRANCH.0 as u32, Some(""), 0, 0)
56        .context("Failed to browse branches at current position")?;
57
58    let branches: Vec<String> = branch_enum
59        .filter_map(|r| match r {
60            Ok(name) => Some(name),
61            Err(e) => {
62                tracing::warn!(error = ?e, "Branch iteration error, skipping");
63                None
64            }
65        })
66        .collect();
67
68    for branch in branches {
69        if tags.len() >= max_tags {
70            break;
71        }
72        if let Err(e) = server.change_browse_position(OPC_BROWSE_DOWN.0 as u32, &branch) {
73            tracing::warn!(branch = %branch, error = ?e, "Failed to enter branch, skipping");
74            continue;
75        }
76
77        let recurse_result =
78            browse_recursive(server, tags, max_tags, progress, tags_sink, depth + 1);
79
80        if let Err(e) = server.change_browse_position(OPC_BROWSE_UP.0 as u32, "") {
81            tracing::error!(branch = %branch, error = ?e, "Failed to navigate back up!");
82            return Err(anyhow::anyhow!(
83                "Browse position corrupted: failed to navigate up from branch '{branch}'"
84            ));
85        }
86        recurse_result?;
87    }
88
89    if let Ok(leaf_enum) = server.browse_opc_item_ids(OPC_LEAF.0 as u32, Some(""), 0, 0) {
90        for leaf_res in leaf_enum {
91            if tags.len() >= max_tags {
92                break;
93            }
94            let browse_name = match leaf_res {
95                Ok(name) => name,
96                Err(e) => {
97                    tracing::warn!(error = ?e, "Leaf iteration error, skipping");
98                    continue;
99                }
100            };
101
102            let tag = match server.get_item_id(&browse_name) {
103                Ok(item_id) => {
104                    tracing::debug!(
105                        browse_name = %browse_name,
106                        item_id = %item_id,
107                        "get_item_id resolved"
108                    );
109                    item_id
110                }
111                Err(e) => {
112                    tracing::warn!(
113                        browse_name = %browse_name,
114                        error = ?e,
115                        "get_item_id failed, using browse name as fallback"
116                    );
117                    browse_name
118                }
119            };
120            tags.push(tag.clone());
121            if let Ok(mut sink) = tags_sink.lock() {
122                sink.push(tag);
123            }
124            progress.fetch_add(1, Ordering::Relaxed);
125        }
126    }
127
128    Ok(())
129}
130
131#[allow(clippy::too_many_lines)]
132#[async_trait]
133impl<C: ServerConnector + 'static> OpcProvider for OpcDaWrapper<C> {
134    async fn list_servers(&self, host: &str) -> Result<Vec<String>> {
135        let host_owned = host.to_string();
136        let connector = Arc::clone(&self.connector);
137        tokio::task::spawn_blocking(move || {
138            let span = tracing::info_span!("opc.list_servers", host = %host_owned);
139            let _enter = span.enter();
140
141            let _guard = crate::ComGuard::new()?;
142            let servers = connector.enumerate_servers()?;
143            tracing::info!(count = servers.len(), "list_servers completed");
144            Ok(servers)
145        })
146        .await?
147    }
148
149    async fn browse_tags(
150        &self,
151        server: &str,
152        max_tags: usize,
153        progress: Arc<AtomicUsize>,
154        tags_sink: Arc<std::sync::Mutex<Vec<String>>>,
155    ) -> Result<Vec<String>> {
156        let server_name = server.to_string();
157        let connector = Arc::clone(&self.connector);
158        tokio::task::spawn_blocking(move || {
159            let span = tracing::info_span!("opc.browse_tags", server = %server_name, max_tags);
160            let _enter = span.enter();
161
162            let _guard = crate::ComGuard::new()?;
163            let opc_server = connector.connect(&server_name)?;
164
165            let org = opc_server
166                .query_organization()
167                .context("Failed to query namespace organization")?;
168            let mut tags = Vec::new();
169
170            if org == OPC_NS_FLAT.0 as u32 {
171                let string_iter =
172                    opc_server.browse_opc_item_ids(OPC_LEAF.0 as u32, Some(""), 0, 0)?;
173                for tag_res in string_iter {
174                    if tags.len() >= max_tags {
175                        break;
176                    }
177                    let tag = tag_res.map_err(|e| anyhow::anyhow!("Tag iteration error: {e:?}"))?;
178                    tags.push(tag.clone());
179                    if let Ok(mut sink) = tags_sink.lock() {
180                        sink.push(tag);
181                    }
182                    progress.fetch_add(1, Ordering::Relaxed);
183                }
184            } else {
185                // Try OPC_FLAT — returns fully-qualified item IDs directly,
186                // eliminating recursive traversal and per-leaf get_item_id().
187                let use_flat = match opc_server.browse_opc_item_ids(OPC_FLAT.0 as u32, Some(""), 0, 0) {
188                    Ok(mut flat_enum) => match flat_enum.next() {
189                        Some(Ok(first_tag)) => {
190                            tracing::info!("OPC_FLAT browse supported — using fast flat enumeration");
191                            tags.push(first_tag.clone());
192                            if let Ok(mut sink) = tags_sink.lock() {
193                                sink.push(first_tag);
194                            }
195                            progress.fetch_add(1, Ordering::Relaxed);
196
197                            for tag_res in flat_enum {
198                                if tags.len() >= max_tags { break; }
199                                match tag_res {
200                                    Ok(tag) => {
201                                        tags.push(tag.clone());
202                                        if let Ok(mut sink) = tags_sink.lock() {
203                                            sink.push(tag);
204                                        }
205                                        progress.fetch_add(1, Ordering::Relaxed);
206                                    }
207                                    Err(e) => {
208                                        tracing::warn!(error = ?e, "OPC_FLAT tag iteration error, skipping");
209                                    }
210                                }
211                            }
212                            true
213                        }
214                        Some(Err(e)) => {
215                            tracing::debug!(error = ?e, "OPC_FLAT first item error, falling back to recursive");
216                            false
217                        }
218                        None => {
219                            tracing::debug!("OPC_FLAT returned no items, falling back to recursive");
220                            false
221                        }
222                    },
223                    Err(e) => {
224                        tracing::debug!(error = ?e, "OPC_FLAT not supported, falling back to recursive");
225                        false
226                    }
227                };
228
229                if !use_flat {
230                    browse_recursive(&opc_server, &mut tags, max_tags, &progress, &tags_sink, 0)?;
231                }
232            }
233            tracing::info!(count = tags.len(), "browse_tags completed");
234            Ok(tags)
235        })
236        .await?
237    }
238
239    async fn read_tag_values(&self, server: &str, tag_ids: Vec<String>) -> Result<Vec<TagValue>> {
240        let server_name = server.to_string();
241        let connector = Arc::clone(&self.connector);
242        tokio::task::spawn_blocking(move || {
243            let span = tracing::info_span!(
244                "opc.read_tag_values",
245                server = %server_name,
246                tag_count = tag_ids.len()
247            );
248            let _enter = span.enter();
249
250            let _guard = crate::ComGuard::new()?;
251            let opc_server = connector.connect(&server_name)?;
252
253            let mut revised_update_rate = 0u32;
254            let mut server_handle = 0u32;
255            let group = opc_server.add_group(
256                "opc-da-client-read",
257                true,
258                1000, // update_rate
259                0,    // client_handle
260                0,    // time_bias
261                0.0,  // percent_deadband
262                0,    // locale_id
263                &mut revised_update_rate,
264                &mut server_handle,
265            )?;
266
267            // SAFETY: item_id_wides must outlive item_defs because
268            // tagOPCITEMDEF.szItemID holds a raw pointer into each Vec.
269            let item_id_wides: Vec<Vec<u16>> = tag_ids
270                .iter()
271                .map(|tag_id| tag_id.encode_utf16().chain(std::iter::once(0)).collect())
272                .collect();
273
274            let item_defs: Vec<tagOPCITEMDEF> = item_id_wides
275                .iter()
276                .enumerate()
277                .map(|(idx, wide)| tagOPCITEMDEF {
278                    szAccessPath: windows::core::PWSTR::null(),
279                    szItemID: windows::core::PWSTR(wide.as_ptr().cast_mut()),
280                    bActive: windows::Win32::Foundation::TRUE,
281                    #[allow(clippy::cast_possible_truncation)]
282                    hClient: idx as u32,
283                    dwBlobSize: 0,
284                    pBlob: std::ptr::null_mut(),
285                    vtRequestedDataType: 0,
286                    wReserved: 0,
287                })
288                .collect();
289
290            let (results, errors) = group.add_items(&item_defs)?;
291
292            // Pre-fill ALL tags with an error placeholder.
293            // We will only overwrite the ones that are successfully added and read.
294            let mut tag_values: Vec<TagValue> = tag_ids
295                .iter()
296                .map(|tag_id| TagValue {
297                    tag_id: tag_id.clone(),
298                    value: "Error".to_string(),
299                    quality: "Bad — not added to group".to_string(),
300                    timestamp: String::new(),
301                })
302                .collect();
303
304            let mut server_handles = Vec::new();
305            let mut valid_indices = Vec::new();
306
307            for (idx, (item_result, error)) in results
308                .as_slice()
309                .iter()
310                .zip(errors.as_slice().iter())
311                .enumerate()
312            {
313                if error.is_ok() {
314                    server_handles.push(item_result.hServer);
315                    valid_indices.push(idx);
316                } else {
317                    let hint = format_hresult(*error);
318                    tracing::warn!(
319                        tag = %tag_ids[idx],
320                        error = %hint,
321                        "read_tag_values: add_items rejected tag"
322                    );
323                    tag_values[idx].quality = format!("Bad — {hint}");
324                }
325            }
326
327            if server_handles.is_empty() {
328                if let Err(e) = opc_server.remove_group(server_handle, true) {
329                    tracing::warn!(error = ?e, operation = "read_tag_values", "Failed to remove OPC group during cleanup");
330                }
331                return Ok(tag_values);
332            }
333
334            let (item_states, read_errors) = group.read(OPC_DS_DEVICE, &server_handles)?;
335            let item_states_slice = item_states.as_slice();
336            let read_errors_slice = read_errors.as_slice();
337
338            for (i, idx) in valid_indices.iter().enumerate() {
339                let state = &item_states_slice[i];
340                let read_error = &read_errors_slice[i];
341
342                let (value_str, quality_str) = if read_error.is_ok() {
343                    (
344                        variant_to_string(&state.vDataValue),
345                        quality_to_string(state.wQuality),
346                    )
347                } else {
348                    let full_msg = format_hresult(*read_error);
349                    tracing::warn!(
350                        tag = %tag_ids[*idx],
351                        error = ?read_error,
352                        hint = %full_msg,
353                        "read_tag_values: per-item read error"
354                    );
355                    ("Error".to_string(), format!("Bad — {full_msg}"))
356                };
357
358                tag_values[*idx] = TagValue {
359                    tag_id: tag_ids[*idx].clone(),
360                    value: value_str,
361                    quality: quality_str,
362                    timestamp: filetime_to_string(state.ftTimeStamp),
363                };
364            }
365
366            tracing::info!(count = tag_values.len(), "read_tag_values completed");
367            if let Err(e) = opc_server.remove_group(server_handle, true) {
368                tracing::warn!(error = ?e, operation = "read_tag_values", "Failed to remove OPC group during cleanup");
369            }
370            Ok(tag_values)
371        })
372        .await?
373    }
374
375    async fn write_tag_value(
376        &self,
377        server: &str,
378        tag_id: &str,
379        value: OpcValue,
380    ) -> Result<WriteResult> {
381        let server_name = server.to_string();
382        let tag = tag_id.to_string();
383        let connector = Arc::clone(&self.connector);
384        tokio::task::spawn_blocking(move || {
385            let span = tracing::info_span!(
386                "opc.write_tag_value",
387                server = %server_name,
388                tag = %tag
389            );
390            let _enter = span.enter();
391
392            let _guard = crate::ComGuard::new()?;
393            let opc_server = connector.connect(&server_name)?;
394
395            let mut revised_update_rate = 0u32;
396            let mut server_handle = 0u32;
397            let group = opc_server.add_group(
398                "opc-da-client-write",
399                true,
400                1000,
401                0,
402                0,
403                0.0,
404                0,
405                &mut revised_update_rate,
406                &mut server_handle,
407            )?;
408
409            // SAFETY: item_id_wide must outlive item_def because
410            // tagOPCITEMDEF.szItemID holds a raw pointer into the Vec.
411            let mut item_id_wide: Vec<u16> =
412                tag.encode_utf16().chain(std::iter::once(0)).collect();
413            let item_def = tagOPCITEMDEF {
414                szAccessPath: windows::core::PWSTR::null(),
415                szItemID: windows::core::PWSTR(item_id_wide.as_mut_ptr()),
416                bActive: windows::Win32::Foundation::TRUE,
417                hClient: 0,
418                dwBlobSize: 0,
419                pBlob: std::ptr::null_mut(),
420                vtRequestedDataType: 0,
421                wReserved: 0,
422            };
423
424            let (results, errors) = group.add_items(&[item_def])?;
425            let item_res = results
426                .as_slice()
427                .first()
428                .context("Server returned empty item results")?;
429            let item_err = errors
430                .as_slice()
431                .first()
432                .context("Server returned empty item errors")?;
433
434            if let Err(e) = item_err.ok() {
435                tracing::warn!(error = ?e, "write_tag_value: failed to add tag to group");
436                if let Err(e) = opc_server.remove_group(server_handle, true) {
437                    tracing::warn!(error = ?e, operation = "write_tag_value", "Failed to remove OPC group during cleanup");
438                }
439                return Ok(WriteResult {
440                    tag_id: tag,
441                    success: false,
442                    error: Some(format!("Failed to add tag to group: {:?}", item_err)),
443                });
444            }
445
446            let variant = opc_value_to_variant(&value);
447            let write_errors = group.write(&[item_res.hServer], &[variant])?;
448            let write_error = write_errors
449                .as_slice()
450                .first()
451                .context("Server returned empty write errors")?;
452
453            let write_result = if write_error.is_ok() {
454                tracing::info!("write_tag_value: write completed successfully");
455                WriteResult {
456                    tag_id: tag,
457                    success: true,
458                    error: None,
459                }
460            } else {
461                let hint = format_hresult(*write_error);
462                tracing::warn!(error = ?write_error, hint = %hint, "write_tag_value: server rejected write");
463                WriteResult {
464                    tag_id: tag,
465                    success: false,
466                    error: Some(hint),
467                }
468            };
469
470            if let Err(e) = opc_server.remove_group(server_handle, true) {
471                tracing::warn!(error = ?e, operation = "write_tag_value", "Failed to remove OPC group during cleanup");
472            }
473            Ok(write_result)
474        })
475        .await?
476    }
477}
478
479#[cfg(test)]
480#[allow(
481    clippy::undocumented_unsafe_blocks,
482    clippy::ptr_as_ptr,
483    clippy::cast_possible_truncation,
484    clippy::cast_possible_wrap,
485    clippy::ref_as_ptr,
486    clippy::inline_always
487)]
488mod tests {
489    use super::*;
490    use crate::backend::connector::{ConnectedGroup, ConnectedServer, ServerConnector};
491    use crate::bindings::da::{
492        OPC_NS_HIERARCHIAL, tagOPCITEMDEF, tagOPCITEMRESULT, tagOPCITEMSTATE,
493    };
494    use crate::opc_da::client::StringIterator;
495    use crate::opc_da::utils::RemoteArray;
496    use windows::Win32::System::Com::{IEnumString, IEnumString_Impl};
497    use windows::Win32::System::Variant::VARIANT;
498    use windows::core::{PWSTR, implement};
499
500    #[allow(clippy::ref_as_ptr, clippy::inline_always)]
501    #[implement(IEnumString)]
502    struct MockEnumString {
503        items: Vec<String>,
504        index: std::sync::atomic::AtomicUsize,
505    }
506
507    impl IEnumString_Impl for MockEnumString_Impl {
508        fn Next(
509            &self,
510            celt: u32,
511            rgelt: *mut PWSTR,
512            pceltfetched: *mut u32,
513        ) -> windows::core::HRESULT {
514            let mut fetched = 0;
515            let index = self.index.load(std::sync::atomic::Ordering::Relaxed);
516            let rgelt = unsafe { std::slice::from_raw_parts_mut(rgelt, celt as usize) };
517
518            for (i, elem) in rgelt.iter_mut().enumerate().take(celt as usize) {
519                if index + i < self.items.len() {
520                    let s = &self.items[index + i];
521                    let w: Vec<u16> = s.encode_utf16().chain(std::iter::once(0)).collect();
522                    let ptr = unsafe { windows::Win32::System::Com::CoTaskMemAlloc(w.len() * 2) };
523                    unsafe { std::ptr::copy_nonoverlapping(w.as_ptr(), ptr as *mut u16, w.len()) };
524                    *elem = PWSTR(ptr as *mut u16);
525                    fetched += 1;
526                } else {
527                    break;
528                }
529            }
530
531            self.index
532                .store(index + fetched, std::sync::atomic::Ordering::Relaxed);
533
534            if !pceltfetched.is_null() {
535                unsafe { *pceltfetched = fetched as u32 };
536            }
537
538            if fetched == celt as usize {
539                windows::Win32::Foundation::S_OK
540            } else {
541                windows::Win32::Foundation::S_FALSE
542            }
543        }
544        fn Skip(&self, _celt: u32) -> windows::core::HRESULT {
545            windows::Win32::Foundation::E_NOTIMPL
546        }
547        fn Reset(&self) -> windows::core::Result<()> {
548            self.index.store(0, std::sync::atomic::Ordering::Relaxed);
549            Ok(())
550        }
551        fn Clone(&self) -> windows::core::Result<IEnumString> {
552            Err(windows::core::Error::from_hresult(
553                windows::Win32::Foundation::E_NOTIMPL,
554            ))
555        }
556    }
557
558    struct MockGroup;
559
560    impl ConnectedGroup for MockGroup {
561        fn add_items(
562            &self,
563            items: &[tagOPCITEMDEF],
564        ) -> anyhow::Result<(
565            RemoteArray<tagOPCITEMRESULT>,
566            RemoteArray<windows::core::HRESULT>,
567        )> {
568            let mut results = Vec::new();
569            let mut errors = Vec::new();
570
571            for (i, item) in items.iter().enumerate() {
572                // Read the wide string
573                let mut name_w = Vec::new();
574                let mut ptr = item.szItemID.0;
575                unsafe {
576                    while !ptr.is_null() && *ptr != 0 {
577                        name_w.push(*ptr);
578                        ptr = ptr.add(1);
579                    }
580                }
581                let name = String::from_utf16_lossy(&name_w);
582
583                let res = tagOPCITEMRESULT {
584                    hServer: (i + 1) as u32,
585                    ..Default::default()
586                };
587
588                if name == "RejectMe" {
589                    errors.push(windows::core::HRESULT(0xC004_0007_u32 as i32)); // OPC_E_UNKNOWNITEMID
590                } else if name == "RejectAll" {
591                    return Err(anyhow::anyhow!("Total failure"));
592                } else {
593                    errors.push(windows::core::HRESULT(0)); // S_OK
594                }
595                results.push(res);
596            }
597
598            unsafe {
599                let p_res = windows::Win32::System::Com::CoTaskMemAlloc(
600                    results.len() * std::mem::size_of::<tagOPCITEMRESULT>(),
601                ) as *mut tagOPCITEMRESULT;
602                std::ptr::copy_nonoverlapping(results.as_ptr(), p_res, results.len());
603                let p_err = windows::Win32::System::Com::CoTaskMemAlloc(
604                    errors.len() * std::mem::size_of::<windows::core::HRESULT>(),
605                ) as *mut windows::core::HRESULT;
606                std::ptr::copy_nonoverlapping(errors.as_ptr(), p_err, errors.len());
607
608                Ok((
609                    RemoteArray::from_mut_ptr(p_res, results.len() as u32),
610                    RemoteArray::from_mut_ptr(p_err, errors.len() as u32),
611                ))
612            }
613        }
614
615        fn read(
616            &self,
617            _source: crate::bindings::da::tagOPCDATASOURCE,
618            server_handles: &[u32],
619        ) -> anyhow::Result<(
620            RemoteArray<tagOPCITEMSTATE>,
621            RemoteArray<windows::core::HRESULT>,
622        )> {
623            let mut states = Vec::new();
624            let mut errors = Vec::new();
625
626            for &handle in server_handles {
627                let mut state = tagOPCITEMSTATE {
628                    hClient: handle, // mock echoing the handle as client handle for verification
629                    wQuality: crate::bindings::da::OPC_QUALITY_GOOD,
630                    ..Default::default()
631                };
632                // mock value VT_I4 = 42
633                use windows::Win32::System::Variant::{
634                    VARENUM, VARIANT, VARIANT_0, VARIANT_0_0, VARIANT_0_0_0,
635                };
636                let variant = VARIANT_0_0 {
637                    vt: VARENUM(3), // VT_I4
638                    Anonymous: VARIANT_0_0_0 { lVal: 42 },
639                    ..Default::default()
640                };
641                state.vDataValue = VARIANT {
642                    Anonymous: VARIANT_0 {
643                        Anonymous: std::mem::ManuallyDrop::new(variant),
644                    },
645                };
646
647                states.push(state);
648                errors.push(windows::core::HRESULT(0)); // S_OK
649            }
650
651            unsafe {
652                let p_states = windows::Win32::System::Com::CoTaskMemAlloc(
653                    states.len() * std::mem::size_of::<tagOPCITEMSTATE>(),
654                ) as *mut tagOPCITEMSTATE;
655                std::ptr::copy_nonoverlapping(states.as_ptr(), p_states, states.len());
656                let p_err = windows::Win32::System::Com::CoTaskMemAlloc(
657                    errors.len() * std::mem::size_of::<windows::core::HRESULT>(),
658                ) as *mut windows::core::HRESULT;
659                std::ptr::copy_nonoverlapping(errors.as_ptr(), p_err, errors.len());
660
661                Ok((
662                    RemoteArray::from_mut_ptr(p_states, states.len() as u32),
663                    RemoteArray::from_mut_ptr(p_err, errors.len() as u32),
664                ))
665            }
666        }
667
668        fn write(
669            &self,
670            server_handles: &[u32],
671            _values: &[VARIANT],
672        ) -> anyhow::Result<RemoteArray<windows::core::HRESULT>> {
673            let mut errors = Vec::new();
674            for _ in server_handles {
675                errors.push(windows::core::HRESULT(0)); // S_OK
676            }
677
678            unsafe {
679                let p_err = windows::Win32::System::Com::CoTaskMemAlloc(
680                    errors.len() * std::mem::size_of::<windows::core::HRESULT>(),
681                ) as *mut windows::core::HRESULT;
682                std::ptr::copy_nonoverlapping(errors.as_ptr(), p_err, errors.len());
683                Ok(RemoteArray::from_mut_ptr(p_err, errors.len() as u32))
684            }
685        }
686    }
687
688    struct MockServer;
689
690    impl ConnectedServer for MockServer {
691        type Group = MockGroup;
692
693        fn query_organization(&self) -> anyhow::Result<u32> {
694            Ok(crate::bindings::da::OPC_NS_FLAT.0 as u32)
695        }
696
697        fn browse_opc_item_ids(
698            &self,
699            _browse_type: u32,
700            _filter: Option<&str>,
701            _data_type: u16,
702            _access_rights: u32,
703        ) -> anyhow::Result<StringIterator> {
704            let mock_enum: IEnumString = MockEnumString {
705                items: vec!["MockTag.1".to_string(), "MockTag.2".to_string()],
706                index: std::sync::atomic::AtomicUsize::new(0),
707            }
708            .into();
709            Ok(StringIterator::new(mock_enum))
710        }
711
712        fn change_browse_position(&self, _direction: u32, _name: &str) -> anyhow::Result<()> {
713            Ok(())
714        }
715
716        fn get_item_id(&self, item_name: &str) -> anyhow::Result<String> {
717            Ok(item_name.to_string())
718        }
719
720        fn add_group(
721            &self,
722            _name: &str,
723            _active: bool,
724            _update_rate: u32,
725            _client_handle: u32,
726            _time_bias: i32,
727            _percent_deadband: f32,
728            _locale_id: u32,
729            revised_update_rate: &mut u32,
730            server_handle: &mut u32,
731        ) -> anyhow::Result<Self::Group> {
732            *revised_update_rate = 1000;
733            *server_handle = 1;
734            Ok(MockGroup)
735        }
736
737        fn remove_group(&self, _server_group: u32, _force: bool) -> anyhow::Result<()> {
738            Ok(())
739        }
740    }
741
742    #[derive(Clone)]
743    struct MockConnector;
744
745    impl ServerConnector for MockConnector {
746        type Server = MockServer;
747
748        fn enumerate_servers(&self) -> anyhow::Result<Vec<String>> {
749            Ok(vec![
750                "Mock.Server.1".to_string(),
751                "Mock.Server.2".to_string(),
752            ])
753        }
754
755        fn connect(&self, _server_name: &str) -> anyhow::Result<Self::Server> {
756            Ok(MockServer)
757        }
758    }
759
760    #[derive(Clone, Debug, PartialEq)]
761    enum OpcFlatBehavior {
762        Success(Vec<String>),
763        ReturnsError,
764        ReturnsEmpty,
765    }
766
767    struct MockHierarchicalServer {
768        opc_flat_behavior: OpcFlatBehavior,
769        position: std::cell::RefCell<Vec<String>>,
770    }
771
772    impl ConnectedServer for MockHierarchicalServer {
773        type Group = MockGroup;
774
775        fn query_organization(&self) -> anyhow::Result<u32> {
776            Ok(OPC_NS_HIERARCHIAL.0 as u32)
777        }
778
779        fn browse_opc_item_ids(
780            &self,
781            browse_type: u32,
782            _filter: Option<&str>,
783            _data_type: u16,
784            _access_rights: u32,
785        ) -> anyhow::Result<StringIterator> {
786            let pos = self.position.borrow();
787            let mut results = Vec::new();
788
789            if browse_type == OPC_FLAT.0 as u32 {
790                match &self.opc_flat_behavior {
791                    OpcFlatBehavior::Success(items) => {
792                        results = items.clone();
793                    }
794                    OpcFlatBehavior::ReturnsError => {
795                        return Err(anyhow::anyhow!("OPC_FLAT not supported mock error"));
796                    }
797                    OpcFlatBehavior::ReturnsEmpty => {
798                        // Return empty iterator
799                    }
800                }
801            } else if browse_type == OPC_BRANCH.0 as u32 {
802                if pos.is_empty() {
803                    results.push("Branch1".to_string());
804                    results.push("Branch2".to_string());
805                }
806            } else if browse_type == OPC_LEAF.0 as u32 {
807                if pos.len() == 1 && pos[0] == "Branch1" {
808                    results.push("Leaf1".to_string());
809                    results.push("Leaf2".to_string());
810                } else if pos.len() == 1 && pos[0] == "Branch2" {
811                    results.push("Leaf3".to_string());
812                }
813            }
814
815            let mock_enum: IEnumString = MockEnumString {
816                items: results,
817                index: std::sync::atomic::AtomicUsize::new(0),
818            }
819            .into();
820            Ok(StringIterator::new(mock_enum))
821        }
822
823        fn change_browse_position(&self, direction: u32, name: &str) -> anyhow::Result<()> {
824            let mut pos = self.position.borrow_mut();
825            if direction == OPC_BROWSE_DOWN.0 as u32 {
826                pos.push(name.to_string());
827            } else if direction == OPC_BROWSE_UP.0 as u32 {
828                pos.pop();
829            }
830            Ok(())
831        }
832
833        fn get_item_id(&self, item_name: &str) -> anyhow::Result<String> {
834            let pos = self.position.borrow();
835            if pos.is_empty() {
836                Ok(item_name.to_string())
837            } else {
838                Ok(format!("{}.{}", pos.join("."), item_name))
839            }
840        }
841
842        fn add_group(
843            &self,
844            _name: &str,
845            _active: bool,
846            _update_rate: u32,
847            _client_handle: u32,
848            _time_bias: i32,
849            _percent_deadband: f32,
850            _locale_id: u32,
851            _revised_update_rate: &mut u32,
852            _server_handle: &mut u32,
853        ) -> anyhow::Result<Self::Group> {
854            Ok(MockGroup)
855        }
856
857        fn remove_group(&self, _server_group: u32, _force: bool) -> anyhow::Result<()> {
858            Ok(())
859        }
860    }
861
862    struct MockHierarchicalConnector {
863        opc_flat_behavior: OpcFlatBehavior,
864    }
865
866    impl ServerConnector for MockHierarchicalConnector {
867        type Server = MockHierarchicalServer;
868
869        fn enumerate_servers(&self) -> anyhow::Result<Vec<String>> {
870            Ok(vec!["Mock.Hierarchical.1".to_string()])
871        }
872
873        fn connect(&self, _server_name: &str) -> anyhow::Result<Self::Server> {
874            Ok(MockHierarchicalServer {
875                opc_flat_behavior: self.opc_flat_behavior.clone(),
876                position: std::cell::RefCell::new(Vec::new()),
877            })
878        }
879    }
880
881    #[test]
882    fn test_browse_tags_flat_server() {
883        let rt = tokio::runtime::Builder::new_current_thread()
884            .build()
885            .unwrap();
886        rt.block_on(async {
887            let wrapper = OpcDaWrapper {
888                connector: std::sync::Arc::new(MockConnector),
889            };
890
891            let progress = Arc::new(AtomicUsize::new(0));
892            let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
893
894            let tags = wrapper
895                .browse_tags("Mock.Server", 100, progress.clone(), sink.clone())
896                .await
897                .unwrap();
898
899            assert_eq!(tags.len(), 2);
900            assert_eq!(tags[0], "MockTag.1");
901            assert_eq!(tags[1], "MockTag.2");
902
903            assert_eq!(progress.load(Ordering::Relaxed), 2);
904            let sink_tags = sink.lock().unwrap().clone();
905            assert_eq!(sink_tags, tags);
906        });
907    }
908
909    #[test]
910    fn test_browse_tags_hierarchical_recursive() {
911        let rt = tokio::runtime::Builder::new_current_thread()
912            .build()
913            .unwrap();
914        rt.block_on(async {
915            let connector = MockHierarchicalConnector {
916                opc_flat_behavior: OpcFlatBehavior::ReturnsError,
917            };
918            let wrapper = OpcDaWrapper {
919                connector: std::sync::Arc::new(connector),
920            };
921
922            let progress = Arc::new(AtomicUsize::new(0));
923            let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
924
925            let tags = wrapper
926                .browse_tags("Mock.Hierarchical", 100, progress.clone(), sink.clone())
927                .await
928                .unwrap();
929
930            assert_eq!(tags.len(), 3);
931            assert_eq!(tags[0], "Branch1.Leaf1");
932            assert_eq!(tags[1], "Branch1.Leaf2");
933            assert_eq!(tags[2], "Branch2.Leaf3");
934
935            assert_eq!(progress.load(Ordering::Relaxed), 3);
936            let sink_tags = sink.lock().unwrap().clone();
937            assert_eq!(sink_tags, tags);
938        });
939    }
940
941    #[test]
942    fn test_browse_tags_opc_flat_success() {
943        let rt = tokio::runtime::Builder::new_current_thread()
944            .build()
945            .unwrap();
946        rt.block_on(async {
947            let connector = MockHierarchicalConnector {
948                opc_flat_behavior: OpcFlatBehavior::Success(vec![
949                    "FQ.Branch1.Leaf1".to_string(),
950                    "FQ.Branch1.Leaf2".to_string(),
951                    "FQ.Branch2.Leaf3".to_string(),
952                ]),
953            };
954            let wrapper = OpcDaWrapper {
955                connector: std::sync::Arc::new(connector),
956            };
957
958            let progress = Arc::new(AtomicUsize::new(0));
959            let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
960
961            let tags = wrapper
962                .browse_tags("Mock.Hierarchical", 100, progress.clone(), sink.clone())
963                .await
964                .unwrap();
965
966            assert_eq!(tags.len(), 3);
967            assert_eq!(tags[0], "FQ.Branch1.Leaf1");
968            assert_eq!(tags[1], "FQ.Branch1.Leaf2");
969            assert_eq!(tags[2], "FQ.Branch2.Leaf3");
970
971            assert_eq!(progress.load(Ordering::Relaxed), 3);
972            let sink_tags = sink.lock().unwrap().clone();
973            assert_eq!(sink_tags, tags);
974        });
975    }
976
977    #[test]
978    fn test_browse_tags_opc_flat_error_fallback() {
979        let rt = tokio::runtime::Builder::new_current_thread()
980            .build()
981            .unwrap();
982        rt.block_on(async {
983            let connector = MockHierarchicalConnector {
984                opc_flat_behavior: OpcFlatBehavior::ReturnsError,
985            };
986            let wrapper = OpcDaWrapper {
987                connector: std::sync::Arc::new(connector),
988            };
989
990            let progress = Arc::new(AtomicUsize::new(0));
991            let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
992
993            let tags = wrapper
994                .browse_tags("Mock.Hierarchical", 100, progress.clone(), sink.clone())
995                .await
996                .unwrap();
997
998            assert_eq!(tags.len(), 3);
999            assert_eq!(tags[0], "Branch1.Leaf1");
1000            assert_eq!(tags[1], "Branch1.Leaf2");
1001            assert_eq!(tags[2], "Branch2.Leaf3");
1002        });
1003    }
1004
1005    #[test]
1006    fn test_browse_tags_opc_flat_empty_fallback() {
1007        let rt = tokio::runtime::Builder::new_current_thread()
1008            .build()
1009            .unwrap();
1010        rt.block_on(async {
1011            let connector = MockHierarchicalConnector {
1012                opc_flat_behavior: OpcFlatBehavior::ReturnsEmpty,
1013            };
1014            let wrapper = OpcDaWrapper {
1015                connector: std::sync::Arc::new(connector),
1016            };
1017
1018            let progress = Arc::new(AtomicUsize::new(0));
1019            let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
1020
1021            let tags = wrapper
1022                .browse_tags("Mock.Hierarchical", 100, progress.clone(), sink.clone())
1023                .await
1024                .unwrap();
1025
1026            assert_eq!(tags.len(), 3);
1027            assert_eq!(tags[0], "Branch1.Leaf1");
1028            assert_eq!(tags[1], "Branch1.Leaf2");
1029            assert_eq!(tags[2], "Branch2.Leaf3");
1030        });
1031    }
1032
1033    #[test]
1034    fn test_browse_tags_max_tags_limit() {
1035        let rt = tokio::runtime::Builder::new_current_thread()
1036            .build()
1037            .unwrap();
1038        rt.block_on(async {
1039            let wrapper = OpcDaWrapper {
1040                connector: std::sync::Arc::new(MockConnector),
1041            };
1042
1043            let progress = Arc::new(AtomicUsize::new(0));
1044            let sink = Arc::new(std::sync::Mutex::new(Vec::new()));
1045
1046            // Limit to 2 tags
1047            let tags = wrapper
1048                .browse_tags("Mock.Server", 2, progress.clone(), sink.clone())
1049                .await
1050                .unwrap();
1051
1052            assert_eq!(tags.len(), 2);
1053            assert_eq!(progress.load(Ordering::Relaxed), 2);
1054        });
1055    }
1056
1057    #[test]
1058    fn test_mock_list_servers() {
1059        let rt = tokio::runtime::Builder::new_current_thread()
1060            .build()
1061            .unwrap();
1062        rt.block_on(async {
1063            let wrapper = OpcDaWrapper {
1064                connector: std::sync::Arc::new(MockConnector),
1065            };
1066
1067            let servers = wrapper.list_servers("localhost").await.unwrap();
1068            assert_eq!(servers, vec!["Mock.Server.1", "Mock.Server.2"]);
1069        });
1070    }
1071
1072    #[test]
1073    fn test_mock_read_tags_happy() {
1074        let rt = tokio::runtime::Builder::new_current_thread()
1075            .build()
1076            .unwrap();
1077        rt.block_on(async {
1078            let wrapper = OpcDaWrapper {
1079                connector: std::sync::Arc::new(MockConnector),
1080            };
1081
1082            let tags = vec!["Tag1".to_string(), "Tag2".to_string()];
1083            let results = wrapper
1084                .read_tag_values("Mock.Server.1", tags)
1085                .await
1086                .unwrap();
1087            assert_eq!(results.len(), 2);
1088            assert_eq!(results[0].tag_id, "Tag1");
1089            assert_eq!(results[0].value, "42");
1090            assert_eq!(results[1].tag_id, "Tag2");
1091            assert_eq!(results[1].value, "42");
1092        });
1093    }
1094
1095    #[test]
1096    fn test_mock_read_tags_partial_reject() {
1097        let rt = tokio::runtime::Builder::new_current_thread()
1098            .build()
1099            .unwrap();
1100        rt.block_on(async {
1101            let wrapper = OpcDaWrapper {
1102                connector: std::sync::Arc::new(MockConnector),
1103            };
1104
1105            let tags = vec![
1106                "Tag1".to_string(),
1107                "RejectMe".to_string(),
1108                "Tag3".to_string(),
1109            ];
1110            let results = wrapper
1111                .read_tag_values("Mock.Server.1", tags)
1112                .await
1113                .unwrap();
1114            assert_eq!(results.len(), 3);
1115
1116            assert_eq!(results[0].value, "42");
1117            assert_eq!(results[1].value, "Error");
1118            assert!(results[1].quality.starts_with("Bad"));
1119            assert_eq!(results[2].value, "42");
1120        });
1121    }
1122
1123    #[test]
1124    fn test_mock_read_tags_all_reject() {
1125        let rt = tokio::runtime::Builder::new_current_thread()
1126            .build()
1127            .unwrap();
1128        rt.block_on(async {
1129            let wrapper = OpcDaWrapper {
1130                connector: std::sync::Arc::new(MockConnector),
1131            };
1132
1133            // Before our fixes, this returns Err("No valid items to read").
1134            // After our fixes, it should return Ok(Vec) where all are Errors.
1135            let tags = vec!["RejectAll".to_string()];
1136            let res = wrapper.read_tag_values("Mock.Server.1", tags).await;
1137            assert!(res.is_err()); // The mock `if name == "RejectAll"` returns `return Err(anyhow::anyhow!("Total failure"));`
1138
1139            let tags2 = vec!["RejectMe".to_string(), "RejectMe".to_string()];
1140            let results2 = wrapper
1141                .read_tag_values("Mock.Server.1", tags2)
1142                .await
1143                .unwrap();
1144            assert_eq!(results2.len(), 2);
1145            assert_eq!(results2[0].value, "Error");
1146            assert_eq!(results2[1].value, "Error");
1147        });
1148    }
1149
1150    #[test]
1151    fn test_mock_write_tag_happy() {
1152        let rt = tokio::runtime::Builder::new_current_thread()
1153            .build()
1154            .unwrap();
1155        rt.block_on(async {
1156            let wrapper = OpcDaWrapper {
1157                connector: std::sync::Arc::new(MockConnector),
1158            };
1159
1160            use crate::provider::OpcValue;
1161            let res = wrapper
1162                .write_tag_value("Mock.Server.1", "Tag1", OpcValue::Int(42))
1163                .await
1164                .unwrap();
1165            assert!(res.success);
1166            assert!(res.error.is_none());
1167        });
1168    }
1169
1170    #[test]
1171    fn test_mock_write_tag_add_fail() {
1172        let rt = tokio::runtime::Builder::new_current_thread()
1173            .build()
1174            .unwrap();
1175        rt.block_on(async {
1176            let wrapper = OpcDaWrapper {
1177                connector: std::sync::Arc::new(MockConnector),
1178            };
1179
1180            use crate::provider::OpcValue;
1181            let res = wrapper
1182                .write_tag_value("Mock.Server.1", "RejectMe", OpcValue::Int(42))
1183                .await
1184                .unwrap();
1185            assert!(!res.success);
1186            assert!(res.error.is_some());
1187        });
1188    }
1189}