Skip to main content

zerodds_opcua_server/
client.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! The OPC-UA [`Client`]: drives the Hello → OpenSecureChannel → CreateSession
4//! → ActivateSession handshake and calls the Read / Call services over a
5//! [`Transport`] (SecurityMode `None`).
6//!
7//! [`LoopbackTransport`] talks to an in-process [`crate::server::Server`];
8//! [`TcpTransport`] (feature `std`) talks to a real `opc.tcp` peer.
9
10use alloc::string::String;
11use alloc::vec::Vec;
12
13use zerodds_opcua_gateway::data_value::{DataValue, Variant};
14use zerodds_opcua_gateway::node_id::NodeId;
15use zerodds_opcua_pubsub::uadp::datatypes::{
16    ApplicationDescription, ApplicationType, EndpointDescription, MessageSecurityMode,
17};
18use zerodds_opcua_pubsub::{DecodeError, EncodeError, UaDecode, UaReader};
19use zerodds_opcua_uacp::connection::{HelloMessage, MessageHeader, MessageType};
20use zerodds_opcua_uacp::securechannel::{
21    OpenSecureChannelRequest, OpenSecureChannelResponse, RequestHeader, SecureChannel,
22    SecurityTokenRequestType, null_extension_object, parse_chunk,
23};
24
25use crate::services::{
26    ATTRIBUTE_VALUE, ActivateSessionRequest, BrowseDescription, BrowseRequest, CallMethodRequest,
27    CallMethodResult, CallRequest, CreateMonitoredItemsRequest, CreateSessionRequest,
28    CreateSubscriptionRequest, DataChangeNotification, DeleteSubscriptionsRequest,
29    FindServersRequest, GetEndpointsRequest, MonitoredItemCreateRequest, MonitoredItemCreateResult,
30    MonitoredItemNotification, MonitoringParameters, PublishRequest, ReadRequest, ReadValueId,
31    ReferenceDescription, ServiceRequest, ServiceResponse, SignatureData, ViewDescription,
32    WriteRequest, WriteValue, null_filter,
33};
34use zerodds_opcua_uacp::securechannel::node_ids;
35
36#[cfg(feature = "crypto")]
37use alloc::boxed::Box;
38#[cfg(feature = "crypto")]
39use zerodds_opcua_uacp::crypto::{
40    AsymmetricContext, CryptoRngCore, RsaPrivateKey, RsaPublicKey, SecuredMode, SecurityPolicy,
41    build_asymmetric_chunk, derive_keys, open_asymmetric_chunk,
42};
43#[cfg(feature = "crypto")]
44use zerodds_opcua_uacp::securechannel::SecuritySession;
45
46/// The client's secured-SecurityPolicy configuration (feature `crypto`): the
47/// client's own RSA key + certificate, the trusted server certificate +
48/// public key, and a caller-supplied CSPRNG. With this set, [`Client::connect`]
49/// runs the secured OpenSecureChannel handshake.
50#[cfg(feature = "crypto")]
51pub struct ClientSecurity {
52    /// Negotiated SecurityPolicy.
53    pub policy: SecurityPolicy,
54    /// Sign or SignAndEncrypt.
55    pub mode: SecuredMode,
56    /// The client's RSA private key.
57    pub private_key: RsaPrivateKey,
58    /// The client's DER application certificate.
59    pub certificate: Vec<u8>,
60    /// The trusted server's DER certificate.
61    pub server_certificate: Vec<u8>,
62    /// The trusted server's RSA public key (verifies the server's OPN).
63    pub server_public_key: RsaPublicKey,
64    /// A cryptographically secure RNG (OS RNG under `std`).
65    pub rng: Box<dyn CryptoRngCore + Send>,
66}
67
68/// An error from the client / a transport.
69#[derive(Debug, Clone, PartialEq)]
70pub enum ClientError {
71    /// A message could not be decoded.
72    Decode(DecodeError),
73    /// A request could not be encoded.
74    Encode(EncodeError),
75    /// An unexpected message / wrong response type.
76    Protocol(&'static str),
77    /// A transport I/O failure.
78    Io(String),
79}
80
81impl From<DecodeError> for ClientError {
82    fn from(e: DecodeError) -> Self {
83        Self::Decode(e)
84    }
85}
86
87impl From<EncodeError> for ClientError {
88    fn from(e: EncodeError) -> Self {
89        Self::Encode(e)
90    }
91}
92
93impl core::fmt::Display for ClientError {
94    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
95        match self {
96            Self::Decode(e) => write!(f, "decode error: {e}"),
97            Self::Encode(e) => write!(f, "encode error: {e}"),
98            Self::Protocol(m) => write!(f, "protocol error: {m}"),
99            Self::Io(m) => write!(f, "transport I/O error: {m}"),
100        }
101    }
102}
103
104#[cfg(feature = "std")]
105impl std::error::Error for ClientError {}
106
107/// A synchronous request/response byte transport for OPC-UA TCP.
108pub trait Transport {
109    /// Sends one UACP message and returns the single response message.
110    ///
111    /// # Errors
112    /// [`ClientError`] on an I/O or protocol failure.
113    fn request(&mut self, message: &[u8]) -> Result<Vec<u8>, ClientError>;
114
115    /// Sends one UACP message without awaiting a response (e.g.
116    /// CloseSecureChannel).
117    ///
118    /// # Errors
119    /// [`ClientError`] on an I/O failure.
120    fn send(&mut self, message: &[u8]) -> Result<(), ClientError>;
121}
122
123/// An in-process [`Transport`] that drives a [`crate::server::Server`]
124/// directly (no sockets) — used for tests and embedding.
125pub struct LoopbackTransport<'a> {
126    server: &'a mut crate::server::Server,
127}
128
129impl<'a> LoopbackTransport<'a> {
130    /// Wraps a server.
131    pub fn new(server: &'a mut crate::server::Server) -> Self {
132        Self { server }
133    }
134}
135
136impl Transport for LoopbackTransport<'_> {
137    fn request(&mut self, message: &[u8]) -> Result<Vec<u8>, ClientError> {
138        self.server
139            .process(message)
140            .map_err(|_| ClientError::Protocol("server rejected the request"))?
141            .ok_or(ClientError::Protocol("server produced no response"))
142    }
143
144    fn send(&mut self, message: &[u8]) -> Result<(), ClientError> {
145        self.server
146            .process(message)
147            .map(|_| ())
148            .map_err(|_| ClientError::Protocol("server rejected the message"))
149    }
150}
151
152/// An OPC-UA client. SecurityMode `None` by default; call
153/// [`set_security`](Self::set_security) (feature `crypto`) for a secured
154/// SecurityPolicy.
155pub struct Client {
156    channel: SecureChannel,
157    auth_token: NodeId,
158    session_id: NodeId,
159    request_handle: u32,
160    next_request_id: u32,
161    channel_open: bool,
162    connected: bool,
163    #[cfg(feature = "crypto")]
164    security: Option<ClientSecurity>,
165}
166
167impl core::fmt::Debug for Client {
168    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
169        f.debug_struct("Client")
170            .field("session_id", &self.session_id)
171            .field("connected", &self.connected)
172            .finish_non_exhaustive()
173    }
174}
175
176impl Default for Client {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182impl Client {
183    /// Creates a fresh, unconnected client.
184    #[must_use]
185    pub fn new() -> Self {
186        Self {
187            channel: SecureChannel::new(0, 0),
188            auth_token: NodeId::numeric(0, 0),
189            session_id: NodeId::numeric(0, 0),
190            request_handle: 0,
191            next_request_id: 0,
192            channel_open: false,
193            connected: false,
194            #[cfg(feature = "crypto")]
195            security: None,
196        }
197    }
198
199    /// Enables a secured SecurityPolicy for the next [`connect`](Self::connect).
200    #[cfg(feature = "crypto")]
201    pub fn set_security(&mut self, security: ClientSecurity) {
202        self.security = Some(security);
203    }
204
205    /// The activated session id (null until connected).
206    #[must_use]
207    pub fn session_id(&self) -> &NodeId {
208        &self.session_id
209    }
210
211    fn next_handle(&mut self) -> u32 {
212        self.request_handle = self.request_handle.wrapping_add(1);
213        self.request_handle
214    }
215
216    fn next_request_id(&mut self) -> u32 {
217        self.next_request_id = self.next_request_id.wrapping_add(1);
218        self.next_request_id
219    }
220
221    fn header(&mut self) -> RequestHeader {
222        let token = self.auth_token.clone();
223        let handle = self.next_handle();
224        RequestHeader::new(token, handle)
225    }
226
227    /// Opens just the transport + SecureChannel (Hello + OpenSecureChannel),
228    /// without a session — enough for the session-less Discovery services
229    /// ([`get_endpoints`](Self::get_endpoints) / [`find_servers`](Self::find_servers)).
230    ///
231    /// # Errors
232    /// [`ClientError`] on a Hello/OpenSecureChannel failure.
233    pub fn open_channel<T: Transport>(
234        &mut self,
235        transport: &mut T,
236        endpoint_url: &str,
237    ) -> Result<(), ClientError> {
238        // 1. Hello / Acknowledge.
239        let hello = HelloMessage {
240            protocol_version: 0,
241            receive_buffer_size: 65_535,
242            send_buffer_size: 65_535,
243            max_message_size: 0,
244            max_chunk_count: 0,
245            endpoint_url: String::from(endpoint_url),
246        };
247        let ack_bytes = transport.request(&hello.encode()?)?;
248        let mut ar = UaReader::new(&ack_bytes);
249        let ah = MessageHeader::decode(&mut ar)?;
250        if ah.message_type != MessageType::Acknowledge {
251            return Err(ClientError::Protocol("expected Acknowledge"));
252        }
253
254        // 2. OpenSecureChannel — secured if a SecurityPolicy is configured.
255        let secured = {
256            #[cfg(feature = "crypto")]
257            {
258                if self.security.is_some() {
259                    self.open_secure_channel_secured(transport)?;
260                    true
261                } else {
262                    false
263                }
264            }
265            #[cfg(not(feature = "crypto"))]
266            {
267                false
268            }
269        };
270        if !secured {
271            self.open_secure_channel_plain(transport)?;
272        }
273        self.channel_open = true;
274        Ok(())
275    }
276
277    /// Runs the full connect handshake: Hello, OpenSecureChannel,
278    /// CreateSession, ActivateSession.
279    ///
280    /// # Errors
281    /// [`ClientError`] on any handshake step failing.
282    pub fn connect<T: Transport>(
283        &mut self,
284        transport: &mut T,
285        endpoint_url: &str,
286    ) -> Result<(), ClientError> {
287        self.open_channel(transport, endpoint_url)?;
288
289        // 3. CreateSession.
290        let create = ServiceRequest::CreateSession(CreateSessionRequest {
291            request_header: self.header(),
292            client_description: client_description(),
293            server_uri: String::new(),
294            endpoint_url: String::from(endpoint_url),
295            session_name: String::from("zerodds-client"),
296            client_nonce: Vec::new(),
297            client_certificate: Vec::new(),
298            requested_session_timeout: 1_200_000.0,
299            max_response_message_size: 0,
300        });
301        let resp = self.call_service(transport, create)?;
302        let ServiceResponse::CreateSession(cs) = resp else {
303            return Err(ClientError::Protocol("expected CreateSessionResponse"));
304        };
305        self.session_id = cs.session_id;
306        self.auth_token = cs.authentication_token;
307
308        // 4. ActivateSession.
309        let activate = ServiceRequest::ActivateSession(ActivateSessionRequest {
310            request_header: self.header(),
311            client_signature: SignatureData::default(),
312            locale_ids: Vec::new(),
313            user_identity_token: null_extension_object(),
314            user_token_signature: SignatureData::default(),
315        });
316        let resp = self.call_service(transport, activate)?;
317        if !matches!(resp, ServiceResponse::ActivateSession(_)) {
318            return Err(ClientError::Protocol("expected ActivateSessionResponse"));
319        }
320        self.connected = true;
321        Ok(())
322    }
323
324    /// The plaintext (SecurityMode `None`) OpenSecureChannel exchange.
325    fn open_secure_channel_plain<T: Transport>(
326        &mut self,
327        transport: &mut T,
328    ) -> Result<(), ClientError> {
329        let open = OpenSecureChannelRequest {
330            request_header: RequestHeader::new(NodeId::numeric(0, 0), self.next_handle()),
331            client_protocol_version: 0,
332            request_type: SecurityTokenRequestType::Issue,
333            security_mode: MessageSecurityMode::None,
334            client_nonce: Vec::new(),
335            requested_lifetime: 3_600_000,
336        };
337        let rid = self.next_request_id();
338        let open_resp_bytes = transport.request(&self.channel.open_chunk(rid, &open.encode()?)?)?;
339        let chunk = parse_chunk(&open_resp_bytes)?;
340        let mut br = UaReader::new(&chunk.body);
341        let _type_id = NodeId::decode(&mut br)?;
342        let open_resp = OpenSecureChannelResponse::decode_body(&mut br)?;
343        // Adopt the server-issued channel + token for symmetric messages.
344        self.channel = SecureChannel::new(
345            open_resp.security_token.channel_id,
346            open_resp.security_token.token_id,
347        );
348        Ok(())
349    }
350
351    /// The secured OpenSecureChannel exchange: builds the asymmetric OPN with
352    /// the configured SecurityPolicy, opens the server's secured response,
353    /// derives the §6.7.6 keys, and installs the symmetric session.
354    #[cfg(feature = "crypto")]
355    fn open_secure_channel_secured<T: Transport>(
356        &mut self,
357        transport: &mut T,
358    ) -> Result<(), ClientError> {
359        let handle = self.next_handle();
360        let request_id = self.next_request_id();
361        let seq = self.channel.next_send_sequence();
362
363        let policy;
364        let mode;
365        let client_nonce;
366        let opn_bytes;
367        {
368            let sec = self
369                .security
370                .as_mut()
371                .ok_or(ClientError::Protocol("no SecurityPolicy configured"))?;
372            policy = sec.policy;
373            mode = sec.mode;
374            let msm = match mode {
375                SecuredMode::Sign => MessageSecurityMode::Sign,
376                SecuredMode::SignAndEncrypt => MessageSecurityMode::SignAndEncrypt,
377            };
378            let nonce_len = policy.sym_enc_key_len().max(32);
379            let mut nonce = alloc::vec![0u8; nonce_len];
380            sec.rng.fill_bytes(&mut nonce);
381            client_nonce = nonce;
382            let open = OpenSecureChannelRequest {
383                request_header: RequestHeader::new(NodeId::numeric(0, 0), handle),
384                client_protocol_version: 0,
385                request_type: SecurityTokenRequestType::Issue,
386                security_mode: msm,
387                client_nonce: client_nonce.clone(),
388                requested_lifetime: 3_600_000,
389            };
390            let body = open.encode()?;
391            let ctx = AsymmetricContext {
392                policy,
393                sender_certificate: &sec.certificate,
394                sender_private_key: &sec.private_key,
395                receiver_certificate: &sec.server_certificate,
396                receiver_public_key: &sec.server_public_key,
397            };
398            let mut rng_ref: &mut dyn CryptoRngCore = sec.rng.as_mut();
399            opn_bytes = build_asymmetric_chunk(&mut rng_ref, &ctx, 0, seq, request_id, &body)
400                .map_err(|_| ClientError::Protocol("secured OPN build failed"))?;
401        }
402
403        let resp_bytes = transport.request(&opn_bytes)?;
404
405        let open_resp = {
406            let sec = self
407                .security
408                .as_ref()
409                .ok_or(ClientError::Protocol("no SecurityPolicy configured"))?;
410            let opened = open_asymmetric_chunk(
411                policy,
412                &sec.private_key,
413                &sec.server_public_key,
414                &resp_bytes,
415            )
416            .map_err(|_| ClientError::Protocol("secured OPN response open/verify failed"))?;
417            let mut br = UaReader::new(&opened.body);
418            let _type_id = NodeId::decode(&mut br)?;
419            OpenSecureChannelResponse::decode_body(&mut br)?
420        };
421
422        let send_keys = derive_keys(policy, &open_resp.server_nonce, &client_nonce)
423            .map_err(|_| ClientError::Protocol("key derivation failed"))?;
424        let recv_keys = derive_keys(policy, &client_nonce, &open_resp.server_nonce)
425            .map_err(|_| ClientError::Protocol("key derivation failed"))?;
426        let mut channel = SecureChannel::new(
427            open_resp.security_token.channel_id,
428            open_resp.security_token.token_id,
429        );
430        channel.install_security(SecuritySession {
431            policy,
432            mode,
433            send_keys,
434            recv_keys,
435        });
436        self.channel = channel;
437        Ok(())
438    }
439
440    /// Reads the `Value` attribute of each node.
441    ///
442    /// # Errors
443    /// [`ClientError`] if not connected or the service fails.
444    pub fn read_values<T: Transport>(
445        &mut self,
446        transport: &mut T,
447        node_ids: &[NodeId],
448    ) -> Result<Vec<DataValue>, ClientError> {
449        if !self.connected {
450            return Err(ClientError::Protocol("not connected"));
451        }
452        let nodes_to_read = node_ids
453            .iter()
454            .map(|n| ReadValueId {
455                node_id: n.clone(),
456                attribute_id: ATTRIBUTE_VALUE,
457                index_range: String::new(),
458                data_encoding: zerodds_opcua_gateway::types::QualifiedName {
459                    namespace_index: 0,
460                    name: String::new(),
461                },
462            })
463            .collect();
464        let req = ServiceRequest::Read(ReadRequest {
465            request_header: self.header(),
466            max_age: 0.0,
467            timestamps_to_return: 0,
468            nodes_to_read,
469        });
470        let ServiceResponse::Read(rr) = self.call_service(transport, req)? else {
471            return Err(ClientError::Protocol("expected ReadResponse"));
472        };
473        Ok(rr.results)
474    }
475
476    /// Writes the `Value` attribute of each `(node, value)` pair, returning one
477    /// StatusCode per write.
478    ///
479    /// # Errors
480    /// [`ClientError`] if not connected or the service fails.
481    pub fn write_values<T: Transport>(
482        &mut self,
483        transport: &mut T,
484        writes: &[(NodeId, DataValue)],
485    ) -> Result<Vec<u32>, ClientError> {
486        if !self.connected {
487            return Err(ClientError::Protocol("not connected"));
488        }
489        let nodes_to_write = writes
490            .iter()
491            .map(|(n, v)| WriteValue {
492                node_id: n.clone(),
493                attribute_id: ATTRIBUTE_VALUE,
494                index_range: String::new(),
495                value: v.clone(),
496            })
497            .collect();
498        let req = ServiceRequest::Write(WriteRequest {
499            request_header: self.header(),
500            nodes_to_write,
501        });
502        let ServiceResponse::Write(wr) = self.call_service(transport, req)? else {
503            return Err(ClientError::Protocol("expected WriteResponse"));
504        };
505        Ok(wr.results)
506    }
507
508    /// Calls the session-less Discovery service `GetEndpoints` (Part 4 §5.5.4).
509    /// Requires only an open channel ([`open_channel`](Self::open_channel) or a
510    /// full [`connect`](Self::connect)).
511    ///
512    /// # Errors
513    /// [`ClientError`] if the channel is not open or the service fails.
514    pub fn get_endpoints<T: Transport>(
515        &mut self,
516        transport: &mut T,
517        endpoint_url: &str,
518    ) -> Result<Vec<EndpointDescription>, ClientError> {
519        if !self.channel_open {
520            return Err(ClientError::Protocol("channel not open"));
521        }
522        let req = ServiceRequest::GetEndpoints(GetEndpointsRequest {
523            request_header: self.header(),
524            endpoint_url: String::from(endpoint_url),
525            locale_ids: Vec::new(),
526            profile_uris: Vec::new(),
527        });
528        let ServiceResponse::GetEndpoints(ge) = self.call_service(transport, req)? else {
529            return Err(ClientError::Protocol("expected GetEndpointsResponse"));
530        };
531        Ok(ge.endpoints)
532    }
533
534    /// Calls the session-less Discovery service `FindServers` (Part 4 §5.5.2).
535    ///
536    /// # Errors
537    /// [`ClientError`] if the channel is not open or the service fails.
538    pub fn find_servers<T: Transport>(
539        &mut self,
540        transport: &mut T,
541        endpoint_url: &str,
542    ) -> Result<Vec<ApplicationDescription>, ClientError> {
543        if !self.channel_open {
544            return Err(ClientError::Protocol("channel not open"));
545        }
546        let req = ServiceRequest::FindServers(FindServersRequest {
547            request_header: self.header(),
548            endpoint_url: String::from(endpoint_url),
549            locale_ids: Vec::new(),
550            server_uris: Vec::new(),
551        });
552        let ServiceResponse::FindServers(fs) = self.call_service(transport, req)? else {
553            return Err(ClientError::Protocol("expected FindServersResponse"));
554        };
555        Ok(fs.servers)
556    }
557
558    /// Creates a subscription with default parameters and returns its id.
559    ///
560    /// # Errors
561    /// [`ClientError`] if not connected or the service fails.
562    pub fn create_subscription<T: Transport>(
563        &mut self,
564        transport: &mut T,
565    ) -> Result<u32, ClientError> {
566        if !self.connected {
567            return Err(ClientError::Protocol("not connected"));
568        }
569        let req = ServiceRequest::CreateSubscription(CreateSubscriptionRequest {
570            request_header: self.header(),
571            requested_publishing_interval: 1000.0,
572            requested_lifetime_count: 10_000,
573            requested_max_keep_alive_count: 10,
574            max_notifications_per_publish: 0,
575            publishing_enabled: true,
576            priority: 0,
577        });
578        let ServiceResponse::CreateSubscription(cs) = self.call_service(transport, req)? else {
579            return Err(ClientError::Protocol("expected CreateSubscriptionResponse"));
580        };
581        Ok(cs.subscription_id)
582    }
583
584    /// Creates monitored items (one per `(node, client_handle)`) on a
585    /// subscription and returns the per-item results.
586    ///
587    /// # Errors
588    /// [`ClientError`] if not connected or the service fails.
589    pub fn create_monitored_items<T: Transport>(
590        &mut self,
591        transport: &mut T,
592        subscription_id: u32,
593        items: &[(NodeId, u32)],
594    ) -> Result<Vec<MonitoredItemCreateResult>, ClientError> {
595        if !self.connected {
596            return Err(ClientError::Protocol("not connected"));
597        }
598        let items_to_create = items
599            .iter()
600            .map(|(node_id, client_handle)| MonitoredItemCreateRequest {
601                item_to_monitor: ReadValueId {
602                    node_id: node_id.clone(),
603                    attribute_id: ATTRIBUTE_VALUE,
604                    index_range: String::new(),
605                    data_encoding: zerodds_opcua_gateway::types::QualifiedName {
606                        namespace_index: 0,
607                        name: String::new(),
608                    },
609                },
610                monitoring_mode: 2, // Reporting
611                requested_parameters: MonitoringParameters {
612                    client_handle: *client_handle,
613                    sampling_interval: 1000.0,
614                    filter: null_filter(),
615                    queue_size: 1,
616                    discard_oldest: true,
617                },
618            })
619            .collect();
620        let req = ServiceRequest::CreateMonitoredItems(CreateMonitoredItemsRequest {
621            request_header: self.header(),
622            subscription_id,
623            timestamps_to_return: 0,
624            items_to_create,
625        });
626        let ServiceResponse::CreateMonitoredItems(cm) = self.call_service(transport, req)? else {
627            return Err(ClientError::Protocol(
628                "expected CreateMonitoredItemsResponse",
629            ));
630        };
631        Ok(cm.results)
632    }
633
634    /// Sends a Publish and returns `(subscriptionId, changed monitored items)`
635    /// decoded from the response's `DataChangeNotification`(s).
636    ///
637    /// # Errors
638    /// [`ClientError`] if not connected or the service fails.
639    pub fn publish<T: Transport>(
640        &mut self,
641        transport: &mut T,
642    ) -> Result<(u32, Vec<MonitoredItemNotification>), ClientError> {
643        if !self.connected {
644            return Err(ClientError::Protocol("not connected"));
645        }
646        let req = ServiceRequest::Publish(PublishRequest {
647            request_header: self.header(),
648            subscription_acknowledgements: Vec::new(),
649        });
650        let ServiceResponse::Publish(pr) = self.call_service(transport, req)? else {
651            return Err(ClientError::Protocol("expected PublishResponse"));
652        };
653        let mut notifications = Vec::new();
654        for eo in &pr.notification_message.notification_data {
655            if eo.type_id == node_ids::DATA_CHANGE_NOTIFICATION {
656                let dcn = DataChangeNotification::from_extension_object(eo)?;
657                notifications.extend(dcn.monitored_items);
658            }
659        }
660        Ok((pr.subscription_id, notifications))
661    }
662
663    /// Deletes subscriptions and returns the per-subscription StatusCodes.
664    ///
665    /// # Errors
666    /// [`ClientError`] if not connected or the service fails.
667    pub fn delete_subscriptions<T: Transport>(
668        &mut self,
669        transport: &mut T,
670        subscription_ids: &[u32],
671    ) -> Result<Vec<u32>, ClientError> {
672        if !self.connected {
673            return Err(ClientError::Protocol("not connected"));
674        }
675        let req = ServiceRequest::DeleteSubscriptions(DeleteSubscriptionsRequest {
676            request_header: self.header(),
677            subscription_ids: subscription_ids.to_vec(),
678        });
679        let ServiceResponse::DeleteSubscriptions(ds) = self.call_service(transport, req)? else {
680            return Err(ClientError::Protocol(
681                "expected DeleteSubscriptionsResponse",
682            ));
683        };
684        Ok(ds.results)
685    }
686
687    /// Browses a node's forward hierarchical references (all reference types,
688    /// all node classes, full result mask) and returns the references found.
689    ///
690    /// # Errors
691    /// [`ClientError`] if not connected or the service fails.
692    pub fn browse<T: Transport>(
693        &mut self,
694        transport: &mut T,
695        node_id: NodeId,
696    ) -> Result<Vec<ReferenceDescription>, ClientError> {
697        if !self.connected {
698            return Err(ClientError::Protocol("not connected"));
699        }
700        let req = ServiceRequest::Browse(BrowseRequest {
701            request_header: self.header(),
702            view: ViewDescription::default(),
703            requested_max_references_per_node: 0,
704            nodes_to_browse: alloc::vec![BrowseDescription {
705                node_id,
706                browse_direction: 0,                       // Forward
707                reference_type_id: NodeId::numeric(0, 31), // References (all)
708                include_subtypes: true,
709                node_class_mask: 0,
710                result_mask: 0x3F,
711            }],
712        });
713        let ServiceResponse::Browse(br) = self.call_service(transport, req)? else {
714            return Err(ClientError::Protocol("expected BrowseResponse"));
715        };
716        let first = br
717            .results
718            .into_iter()
719            .next()
720            .ok_or(ClientError::Protocol("empty BrowseResponse"))?;
721        if first.status_code != 0 {
722            return Err(ClientError::Protocol("browse returned a bad status"));
723        }
724        Ok(first.references)
725    }
726
727    /// Calls a method and returns its result.
728    ///
729    /// # Errors
730    /// [`ClientError`] if not connected or the service fails.
731    pub fn call_method<T: Transport>(
732        &mut self,
733        transport: &mut T,
734        object_id: NodeId,
735        method_id: NodeId,
736        input_arguments: Vec<Variant>,
737    ) -> Result<CallMethodResult, ClientError> {
738        if !self.connected {
739            return Err(ClientError::Protocol("not connected"));
740        }
741        let req = ServiceRequest::Call(CallRequest {
742            request_header: self.header(),
743            methods_to_call: alloc::vec![CallMethodRequest {
744                object_id,
745                method_id,
746                input_arguments,
747            }],
748        });
749        let ServiceResponse::Call(cr) = self.call_service(transport, req)? else {
750            return Err(ClientError::Protocol("expected CallResponse"));
751        };
752        cr.results
753            .into_iter()
754            .next()
755            .ok_or(ClientError::Protocol("empty CallResponse"))
756    }
757
758    /// Encodes a service request into a MSG chunk, sends it, and decodes the
759    /// service response from the MSG response.
760    fn call_service<T: Transport>(
761        &mut self,
762        transport: &mut T,
763        req: ServiceRequest,
764    ) -> Result<ServiceResponse, ClientError> {
765        let rid = self.next_request_id();
766        let chunk = self.channel.message_chunk(rid, &req.encode()?)?;
767        let resp_bytes = transport.request(&chunk)?;
768        let parsed = self.channel.open_incoming(&resp_bytes)?;
769        ServiceResponse::decode(&parsed.body).map_err(ClientError::from)
770    }
771}
772
773fn client_description() -> ApplicationDescription {
774    ApplicationDescription {
775        application_uri: String::from("urn:zerodds:opcua-client"),
776        product_uri: String::from("urn:zerodds"),
777        application_name: zerodds_opcua_gateway::types::LocalizedText {
778            locale: None,
779            text: Some(String::from("ZeroDDS OPC-UA Client")),
780        },
781        application_type: ApplicationType::Client,
782        gateway_server_uri: String::new(),
783        discovery_profile_uri: String::new(),
784        discovery_urls: Vec::new(),
785    }
786}
787
788// ---------------------------------------------------------------------------
789// TCP transport + server accept loop (feature `std`).
790// ---------------------------------------------------------------------------
791
792#[cfg(feature = "std")]
793pub use tcp::{TcpTransport, serve_connection};
794
795#[cfg(feature = "std")]
796mod tcp {
797    use super::ClientError;
798    use crate::server::Server;
799    use alloc::vec;
800    use alloc::vec::Vec;
801    use std::io::{Read, Write};
802    use std::net::TcpStream;
803    use std::string::ToString;
804
805    fn read_message(stream: &mut TcpStream) -> Result<Vec<u8>, ClientError> {
806        let mut header = [0u8; 8];
807        stream
808            .read_exact(&mut header)
809            .map_err(|e| ClientError::Io(e.to_string()))?;
810        let size = u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as usize;
811        if size < 8 {
812            return Err(ClientError::Protocol(
813                "UACP MessageSize below header length",
814            ));
815        }
816        let mut rest = vec![0u8; size - 8];
817        stream
818            .read_exact(&mut rest)
819            .map_err(|e| ClientError::Io(e.to_string()))?;
820        let mut msg = Vec::with_capacity(size);
821        msg.extend_from_slice(&header);
822        msg.extend_from_slice(&rest);
823        Ok(msg)
824    }
825
826    /// An `opc.tcp` [`Transport`](super::Transport) over a `TcpStream`.
827    #[derive(Debug)]
828    pub struct TcpTransport {
829        stream: TcpStream,
830    }
831
832    impl TcpTransport {
833        /// Connects to `addr` (e.g. `127.0.0.1:4840`).
834        ///
835        /// # Errors
836        /// [`ClientError::Io`] if the connection fails.
837        pub fn connect(addr: &str) -> Result<Self, ClientError> {
838            let stream = TcpStream::connect(addr).map_err(|e| ClientError::Io(e.to_string()))?;
839            Ok(Self { stream })
840        }
841    }
842
843    impl super::Transport for TcpTransport {
844        fn request(&mut self, message: &[u8]) -> Result<Vec<u8>, ClientError> {
845            self.stream
846                .write_all(message)
847                .map_err(|e| ClientError::Io(e.to_string()))?;
848            read_message(&mut self.stream)
849        }
850
851        fn send(&mut self, message: &[u8]) -> Result<(), ClientError> {
852            self.stream
853                .write_all(message)
854                .map_err(|e| ClientError::Io(e.to_string()))
855        }
856    }
857
858    /// Serves one accepted `TcpStream` connection with `server`: reads framed
859    /// UACP messages and writes the server's responses until the peer closes.
860    ///
861    /// # Errors
862    /// [`ClientError`] on an I/O or protocol failure other than a clean close.
863    pub fn serve_connection(server: &mut Server, mut stream: TcpStream) -> Result<(), ClientError> {
864        loop {
865            let msg = match read_message(&mut stream) {
866                Ok(m) => m,
867                Err(_) => return Ok(()), // peer closed
868            };
869            match server
870                .process(&msg)
871                .map_err(|_| ClientError::Protocol("server error"))?
872            {
873                Some(resp) => stream
874                    .write_all(&resp)
875                    .map_err(|e| ClientError::Io(e.to_string()))?,
876                None => return Ok(()), // CloseSecureChannel
877            }
878        }
879    }
880}
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885    use crate::address_space::{AddressSpace, MethodOutcome};
886    use crate::server::Server;
887    use zerodds_opcua_gateway::data_value::VariantValue;
888
889    fn demo_server() -> Server {
890        let mut space = AddressSpace::new();
891        space.set_value(
892            NodeId::numeric(1, 1),
893            DataValue::new_value(Variant::scalar(VariantValue::Int32(4242)), 0, 0),
894        );
895        // A method that doubles its Int32 input.
896        space.register_method(NodeId::numeric(1, 100), |args| {
897            if let Some(v) = args.first() {
898                if let Some(VariantValue::Int32(x)) = v.value.first() {
899                    return MethodOutcome::good(alloc::vec![Variant::scalar(VariantValue::Int32(
900                        x * 2
901                    ))]);
902                }
903            }
904            MethodOutcome::fault(0x8000_0000)
905        });
906        Server::new("opc.tcp://localhost:4840", space)
907    }
908
909    #[test]
910    fn e2e_loopback_connect_read_call() {
911        let mut server = demo_server();
912        let mut client = Client::new();
913        {
914            let mut t = LoopbackTransport::new(&mut server);
915            client
916                .connect(&mut t, "opc.tcp://localhost:4840")
917                .expect("connect");
918            assert_ne!(*client.session_id(), NodeId::numeric(0, 0));
919
920            // Read node value.
921            let vals = client
922                .read_values(&mut t, &[NodeId::numeric(1, 1)])
923                .expect("read");
924            assert_eq!(
925                vals[0].value,
926                Some(Variant::scalar(VariantValue::Int32(4242)))
927            );
928
929            // Call method (double 21 → 42).
930            let res = client
931                .call_method(
932                    &mut t,
933                    NodeId::numeric(0, 0),
934                    NodeId::numeric(1, 100),
935                    alloc::vec![Variant::scalar(VariantValue::Int32(21))],
936                )
937                .expect("call");
938            assert_eq!(res.status_code, 0);
939            assert_eq!(
940                res.output_arguments[0],
941                Variant::scalar(VariantValue::Int32(42))
942            );
943        }
944    }
945
946    #[test]
947    fn e2e_loopback_write_then_read() {
948        let mut server = demo_server();
949        let mut client = Client::new();
950        let mut t = LoopbackTransport::new(&mut server);
951        client.connect(&mut t, "opc.tcp://x").expect("connect");
952
953        // Write a new value to node (1,1) and to a fresh node (1,2).
954        let results = client
955            .write_values(
956                &mut t,
957                &[
958                    (
959                        NodeId::numeric(1, 1),
960                        DataValue::new_value(Variant::scalar(VariantValue::Int32(7)), 0, 0),
961                    ),
962                    (
963                        NodeId::numeric(1, 2),
964                        DataValue::new_value(Variant::scalar(VariantValue::Int32(9)), 0, 0),
965                    ),
966                ],
967            )
968            .expect("write");
969        assert_eq!(results, alloc::vec![0, 0]);
970
971        // Read both back.
972        let vals = client
973            .read_values(&mut t, &[NodeId::numeric(1, 1), NodeId::numeric(1, 2)])
974            .expect("read");
975        assert_eq!(vals[0].value, Some(Variant::scalar(VariantValue::Int32(7))));
976        assert_eq!(vals[1].value, Some(Variant::scalar(VariantValue::Int32(9))));
977    }
978
979    #[test]
980    fn read_unknown_node_yields_bad_status() {
981        let mut server = demo_server();
982        let mut client = Client::new();
983        let mut t = LoopbackTransport::new(&mut server);
984        client.connect(&mut t, "opc.tcp://x").expect("connect");
985        let vals = client
986            .read_values(&mut t, &[NodeId::numeric(1, 999)])
987            .expect("read");
988        assert_eq!(vals[0].status, Some(crate::server::BAD_NODE_ID_UNKNOWN));
989        assert_eq!(vals[0].value, None);
990    }
991
992    #[test]
993    fn e2e_loopback_subscription_publish() {
994        let mut server = demo_server();
995        let mut client = Client::new();
996        let mut t = LoopbackTransport::new(&mut server);
997        client.connect(&mut t, "opc.tcp://x").expect("connect");
998
999        let sub = client
1000            .create_subscription(&mut t)
1001            .expect("create subscription");
1002        let results = client
1003            .create_monitored_items(&mut t, sub, &[(NodeId::numeric(1, 1), 77)])
1004            .expect("create monitored items");
1005        assert_eq!(results.len(), 1);
1006        assert_eq!(results[0].status_code, 0);
1007
1008        // First Publish reports the initial value (4242) for client handle 77.
1009        let (sid, notes) = client.publish(&mut t).expect("publish 1");
1010        assert_eq!(sid, sub);
1011        assert_eq!(notes.len(), 1);
1012        assert_eq!(notes[0].client_handle, 77);
1013        assert_eq!(
1014            notes[0].value.value,
1015            Some(Variant::scalar(VariantValue::Int32(4242)))
1016        );
1017
1018        // No change → next Publish reports nothing.
1019        let (_, none) = client.publish(&mut t).expect("publish 2");
1020        assert!(none.is_empty());
1021
1022        // Change the value, then Publish reports the new value.
1023        client
1024            .write_values(
1025                &mut t,
1026                &[(
1027                    NodeId::numeric(1, 1),
1028                    DataValue::new_value(Variant::scalar(VariantValue::Int32(123)), 0, 0),
1029                )],
1030            )
1031            .expect("write");
1032        let (_, changed) = client.publish(&mut t).expect("publish 3");
1033        assert_eq!(changed.len(), 1);
1034        assert_eq!(
1035            changed[0].value.value,
1036            Some(Variant::scalar(VariantValue::Int32(123)))
1037        );
1038
1039        // Delete the subscription.
1040        let del = client.delete_subscriptions(&mut t, &[sub]).expect("delete");
1041        assert_eq!(del, alloc::vec![0]);
1042        // After deletion, a Publish has no enabled subscription.
1043        let (sid2, _) = client.publish(&mut t).expect("publish 4");
1044        assert_eq!(sid2, 0);
1045    }
1046
1047    #[test]
1048    fn e2e_loopback_discovery() {
1049        let mut server = demo_server();
1050        let mut client = Client::new();
1051        let mut t = LoopbackTransport::new(&mut server);
1052        // Session-less discovery: open the channel only.
1053        client
1054            .open_channel(&mut t, "opc.tcp://localhost:4840")
1055            .expect("open channel");
1056
1057        let eps = client
1058            .get_endpoints(&mut t, "opc.tcp://localhost:4840")
1059            .expect("get endpoints");
1060        assert_eq!(eps.len(), 1);
1061        assert_eq!(eps[0].endpoint_url, "opc.tcp://localhost:4840");
1062        assert_eq!(
1063            eps[0].security_policy_uri,
1064            zerodds_opcua_uacp::securechannel::SECURITY_POLICY_NONE
1065        );
1066
1067        let servers = client
1068            .find_servers(&mut t, "opc.tcp://localhost:4840")
1069            .expect("find servers");
1070        assert_eq!(servers.len(), 1);
1071        assert_eq!(
1072            servers[0].application_name.text.as_deref(),
1073            Some("ZeroDDS OPC-UA Server")
1074        );
1075
1076        // Discovery before an open channel is rejected.
1077        let mut fresh = Client::new();
1078        let mut t2 = LoopbackTransport::new(&mut server);
1079        assert!(fresh.get_endpoints(&mut t2, "opc.tcp://x").is_err());
1080    }
1081
1082    #[test]
1083    fn e2e_loopback_browse() {
1084        use crate::address_space::{NodeClass, NodeMeta, reference_types};
1085        use zerodds_opcua_gateway::types::{LocalizedText, QualifiedName};
1086
1087        let mut space = AddressSpace::new();
1088        let objects = NodeId::numeric(0, 85);
1089        let boiler = NodeId::numeric(1, 1);
1090        let temp = NodeId::numeric(1, 2);
1091        for (id, class, name) in [
1092            (objects.clone(), NodeClass::Object, "Objects"),
1093            (boiler.clone(), NodeClass::Object, "Boiler"),
1094            (temp.clone(), NodeClass::Variable, "Temperature"),
1095        ] {
1096            space.add_node(NodeMeta {
1097                node_id: id,
1098                node_class: class,
1099                browse_name: QualifiedName {
1100                    namespace_index: 1,
1101                    name: String::from(name),
1102                },
1103                display_name: LocalizedText {
1104                    locale: None,
1105                    text: Some(String::from(name)),
1106                },
1107                type_definition: NodeId::numeric(0, 58),
1108            });
1109        }
1110        space.add_reference(objects.clone(), reference_types::ORGANIZES, boiler.clone());
1111        space.add_reference(boiler.clone(), reference_types::HAS_COMPONENT, temp.clone());
1112        let mut server = Server::new("opc.tcp://localhost:4840", space);
1113
1114        let mut client = Client::new();
1115        let mut t = LoopbackTransport::new(&mut server);
1116        client
1117            .connect(&mut t, "opc.tcp://localhost:4840")
1118            .expect("connect");
1119
1120        // Browse Objects → Boiler (Organizes).
1121        let refs = client.browse(&mut t, objects).expect("browse objects");
1122        assert_eq!(refs.len(), 1);
1123        assert_eq!(refs[0].node_id.node_id, boiler);
1124        assert_eq!(refs[0].browse_name.name, "Boiler");
1125        assert_eq!(refs[0].reference_type_id, reference_types::ORGANIZES);
1126        assert!(refs[0].is_forward);
1127
1128        // Browse Boiler → Temperature (HasComponent).
1129        let children = client.browse(&mut t, boiler).expect("browse boiler");
1130        assert_eq!(children.len(), 1);
1131        assert_eq!(children[0].node_id.node_id, temp);
1132        assert_eq!(children[0].node_class, NodeClass::Variable.as_i32());
1133
1134        // Browsing an unknown node yields Bad_NodeIdUnknown (browse() maps that
1135        // to a protocol error).
1136        assert!(client.browse(&mut t, NodeId::numeric(1, 999)).is_err());
1137    }
1138
1139    #[cfg(feature = "crypto")]
1140    #[test]
1141    fn e2e_secured_loopback_sign_and_encrypt() {
1142        use crate::server::ServerSecurity;
1143        use rand::rngs::OsRng;
1144        use zerodds_opcua_uacp::crypto::{RsaPrivateKey, SecuredMode, SecurityPolicy};
1145
1146        // Generate RSA key pairs and exchange public keys out of band (trust).
1147        let mut kg = OsRng;
1148        let client_key = RsaPrivateKey::new(&mut kg, 2048).expect("client key");
1149        let server_key = RsaPrivateKey::new(&mut kg, 2048).expect("server key");
1150        let client_cert = b"client-cert-der".to_vec();
1151        let server_cert = b"server-cert-der".to_vec();
1152        let client_pub = client_key.to_public_key();
1153        let server_pub = server_key.to_public_key();
1154        let policy = SecurityPolicy::Basic256Sha256;
1155        let mode = SecuredMode::SignAndEncrypt;
1156
1157        let mut server = demo_server();
1158        server.set_security(ServerSecurity {
1159            policy,
1160            mode,
1161            private_key: server_key.clone(),
1162            certificate: server_cert.clone(),
1163            client_certificate: client_cert.clone(),
1164            client_public_key: client_pub,
1165            rng: alloc::boxed::Box::new(OsRng),
1166        });
1167
1168        let mut client = Client::new();
1169        client.set_security(super::ClientSecurity {
1170            policy,
1171            mode,
1172            private_key: client_key,
1173            certificate: client_cert,
1174            server_certificate: server_cert,
1175            server_public_key: server_pub,
1176            rng: alloc::boxed::Box::new(OsRng),
1177        });
1178
1179        let mut t = LoopbackTransport::new(&mut server);
1180        client
1181            .connect(&mut t, "opc.tcp://secure")
1182            .expect("secured connect");
1183        assert_ne!(*client.session_id(), NodeId::numeric(0, 0));
1184
1185        // Secured Read.
1186        let vals = client
1187            .read_values(&mut t, &[NodeId::numeric(1, 1)])
1188            .expect("secured read");
1189        assert_eq!(
1190            vals[0].value,
1191            Some(Variant::scalar(VariantValue::Int32(4242)))
1192        );
1193
1194        // Secured Call (double 21 → 42).
1195        let res = client
1196            .call_method(
1197                &mut t,
1198                NodeId::numeric(0, 0),
1199                NodeId::numeric(1, 100),
1200                alloc::vec![Variant::scalar(VariantValue::Int32(21))],
1201            )
1202            .expect("secured call");
1203        assert_eq!(
1204            res.output_arguments[0],
1205            Variant::scalar(VariantValue::Int32(42))
1206        );
1207
1208        // Secured Write then read back.
1209        let w = client
1210            .write_values(
1211                &mut t,
1212                &[(
1213                    NodeId::numeric(1, 2),
1214                    DataValue::new_value(Variant::scalar(VariantValue::Int32(9)), 0, 0),
1215                )],
1216            )
1217            .expect("secured write");
1218        assert_eq!(w, alloc::vec![0]);
1219        let back = client
1220            .read_values(&mut t, &[NodeId::numeric(1, 2)])
1221            .expect("secured read back");
1222        assert_eq!(back[0].value, Some(Variant::scalar(VariantValue::Int32(9))));
1223    }
1224
1225    #[cfg(feature = "std")]
1226    #[test]
1227    fn e2e_tcp_connect_read_call() {
1228        use std::net::TcpListener;
1229        use std::thread;
1230
1231        let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
1232        let addr = listener.local_addr().expect("addr").to_string();
1233
1234        let server_thread = thread::spawn(move || {
1235            let mut server = demo_server();
1236            let (stream, _) = listener.accept().expect("accept");
1237            serve_connection(&mut server, stream).expect("serve");
1238        });
1239
1240        let mut transport = TcpTransport::connect(&addr).expect("connect tcp");
1241        let mut client = Client::new();
1242        let url = alloc::format!("opc.tcp://{addr}");
1243        client.connect(&mut transport, &url).expect("connect");
1244        let vals = client
1245            .read_values(&mut transport, &[NodeId::numeric(1, 1)])
1246            .expect("read");
1247        assert_eq!(
1248            vals[0].value,
1249            Some(Variant::scalar(VariantValue::Int32(4242)))
1250        );
1251        let res = client
1252            .call_method(
1253                &mut transport,
1254                NodeId::numeric(0, 0),
1255                NodeId::numeric(1, 100),
1256                alloc::vec![Variant::scalar(VariantValue::Int32(50))],
1257            )
1258            .expect("call");
1259        assert_eq!(
1260            res.output_arguments[0],
1261            Variant::scalar(VariantValue::Int32(100))
1262        );
1263
1264        // Trigger a clean close so the server thread's serve loop returns.
1265        drop(transport);
1266        server_thread.join().expect("join");
1267    }
1268}