1use alloc::string::String;
11use alloc::vec::Vec;
12
13use zerodds_opcua_gateway::data_value::{DataValue, Variant};
14use zerodds_opcua_gateway::node_id::NodeId;
15use zerodds_opcua_pubsub::uadp::datatypes::{
16 ApplicationDescription, ApplicationType, EndpointDescription, MessageSecurityMode,
17};
18use zerodds_opcua_pubsub::{DecodeError, EncodeError, UaDecode, UaReader};
19use zerodds_opcua_uacp::connection::{HelloMessage, MessageHeader, MessageType};
20use zerodds_opcua_uacp::securechannel::{
21 OpenSecureChannelRequest, OpenSecureChannelResponse, RequestHeader, SecureChannel,
22 SecurityTokenRequestType, null_extension_object, parse_chunk,
23};
24
25use crate::services::{
26 ATTRIBUTE_VALUE, ActivateSessionRequest, BrowseDescription, BrowseRequest, CallMethodRequest,
27 CallMethodResult, CallRequest, CreateMonitoredItemsRequest, CreateSessionRequest,
28 CreateSubscriptionRequest, DataChangeNotification, DeleteSubscriptionsRequest,
29 FindServersRequest, GetEndpointsRequest, MonitoredItemCreateRequest, MonitoredItemCreateResult,
30 MonitoredItemNotification, MonitoringParameters, PublishRequest, ReadRequest, ReadValueId,
31 ReferenceDescription, ServiceRequest, ServiceResponse, SignatureData, ViewDescription,
32 WriteRequest, WriteValue, null_filter,
33};
34use zerodds_opcua_uacp::securechannel::node_ids;
35
36#[cfg(feature = "crypto")]
37use alloc::boxed::Box;
38#[cfg(feature = "crypto")]
39use zerodds_opcua_uacp::crypto::{
40 AsymmetricContext, CryptoRngCore, RsaPrivateKey, RsaPublicKey, SecuredMode, SecurityPolicy,
41 build_asymmetric_chunk, derive_keys, open_asymmetric_chunk,
42};
43#[cfg(feature = "crypto")]
44use zerodds_opcua_uacp::securechannel::SecuritySession;
45
46#[cfg(feature = "crypto")]
51pub struct ClientSecurity {
52 pub policy: SecurityPolicy,
54 pub mode: SecuredMode,
56 pub private_key: RsaPrivateKey,
58 pub certificate: Vec<u8>,
60 pub server_certificate: Vec<u8>,
62 pub server_public_key: RsaPublicKey,
64 pub rng: Box<dyn CryptoRngCore + Send>,
66}
67
68#[derive(Debug, Clone, PartialEq)]
70pub enum ClientError {
71 Decode(DecodeError),
73 Encode(EncodeError),
75 Protocol(&'static str),
77 Io(String),
79}
80
81impl From<DecodeError> for ClientError {
82 fn from(e: DecodeError) -> Self {
83 Self::Decode(e)
84 }
85}
86
87impl From<EncodeError> for ClientError {
88 fn from(e: EncodeError) -> Self {
89 Self::Encode(e)
90 }
91}
92
93impl core::fmt::Display for ClientError {
94 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
95 match self {
96 Self::Decode(e) => write!(f, "decode error: {e}"),
97 Self::Encode(e) => write!(f, "encode error: {e}"),
98 Self::Protocol(m) => write!(f, "protocol error: {m}"),
99 Self::Io(m) => write!(f, "transport I/O error: {m}"),
100 }
101 }
102}
103
104#[cfg(feature = "std")]
105impl std::error::Error for ClientError {}
106
107pub trait Transport {
109 fn request(&mut self, message: &[u8]) -> Result<Vec<u8>, ClientError>;
114
115 fn send(&mut self, message: &[u8]) -> Result<(), ClientError>;
121}
122
123pub struct LoopbackTransport<'a> {
126 server: &'a mut crate::server::Server,
127}
128
129impl<'a> LoopbackTransport<'a> {
130 pub fn new(server: &'a mut crate::server::Server) -> Self {
132 Self { server }
133 }
134}
135
136impl Transport for LoopbackTransport<'_> {
137 fn request(&mut self, message: &[u8]) -> Result<Vec<u8>, ClientError> {
138 self.server
139 .process(message)
140 .map_err(|_| ClientError::Protocol("server rejected the request"))?
141 .ok_or(ClientError::Protocol("server produced no response"))
142 }
143
144 fn send(&mut self, message: &[u8]) -> Result<(), ClientError> {
145 self.server
146 .process(message)
147 .map(|_| ())
148 .map_err(|_| ClientError::Protocol("server rejected the message"))
149 }
150}
151
152pub struct Client {
156 channel: SecureChannel,
157 auth_token: NodeId,
158 session_id: NodeId,
159 request_handle: u32,
160 next_request_id: u32,
161 channel_open: bool,
162 connected: bool,
163 #[cfg(feature = "crypto")]
164 security: Option<ClientSecurity>,
165}
166
167impl core::fmt::Debug for Client {
168 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
169 f.debug_struct("Client")
170 .field("session_id", &self.session_id)
171 .field("connected", &self.connected)
172 .finish_non_exhaustive()
173 }
174}
175
176impl Default for Client {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182impl Client {
183 #[must_use]
185 pub fn new() -> Self {
186 Self {
187 channel: SecureChannel::new(0, 0),
188 auth_token: NodeId::numeric(0, 0),
189 session_id: NodeId::numeric(0, 0),
190 request_handle: 0,
191 next_request_id: 0,
192 channel_open: false,
193 connected: false,
194 #[cfg(feature = "crypto")]
195 security: None,
196 }
197 }
198
199 #[cfg(feature = "crypto")]
201 pub fn set_security(&mut self, security: ClientSecurity) {
202 self.security = Some(security);
203 }
204
205 #[must_use]
207 pub fn session_id(&self) -> &NodeId {
208 &self.session_id
209 }
210
211 fn next_handle(&mut self) -> u32 {
212 self.request_handle = self.request_handle.wrapping_add(1);
213 self.request_handle
214 }
215
216 fn next_request_id(&mut self) -> u32 {
217 self.next_request_id = self.next_request_id.wrapping_add(1);
218 self.next_request_id
219 }
220
221 fn header(&mut self) -> RequestHeader {
222 let token = self.auth_token.clone();
223 let handle = self.next_handle();
224 RequestHeader::new(token, handle)
225 }
226
227 pub fn open_channel<T: Transport>(
234 &mut self,
235 transport: &mut T,
236 endpoint_url: &str,
237 ) -> Result<(), ClientError> {
238 let hello = HelloMessage {
240 protocol_version: 0,
241 receive_buffer_size: 65_535,
242 send_buffer_size: 65_535,
243 max_message_size: 0,
244 max_chunk_count: 0,
245 endpoint_url: String::from(endpoint_url),
246 };
247 let ack_bytes = transport.request(&hello.encode()?)?;
248 let mut ar = UaReader::new(&ack_bytes);
249 let ah = MessageHeader::decode(&mut ar)?;
250 if ah.message_type != MessageType::Acknowledge {
251 return Err(ClientError::Protocol("expected Acknowledge"));
252 }
253
254 let secured = {
256 #[cfg(feature = "crypto")]
257 {
258 if self.security.is_some() {
259 self.open_secure_channel_secured(transport)?;
260 true
261 } else {
262 false
263 }
264 }
265 #[cfg(not(feature = "crypto"))]
266 {
267 false
268 }
269 };
270 if !secured {
271 self.open_secure_channel_plain(transport)?;
272 }
273 self.channel_open = true;
274 Ok(())
275 }
276
277 pub fn connect<T: Transport>(
283 &mut self,
284 transport: &mut T,
285 endpoint_url: &str,
286 ) -> Result<(), ClientError> {
287 self.open_channel(transport, endpoint_url)?;
288
289 let create = ServiceRequest::CreateSession(CreateSessionRequest {
291 request_header: self.header(),
292 client_description: client_description(),
293 server_uri: String::new(),
294 endpoint_url: String::from(endpoint_url),
295 session_name: String::from("zerodds-client"),
296 client_nonce: Vec::new(),
297 client_certificate: Vec::new(),
298 requested_session_timeout: 1_200_000.0,
299 max_response_message_size: 0,
300 });
301 let resp = self.call_service(transport, create)?;
302 let ServiceResponse::CreateSession(cs) = resp else {
303 return Err(ClientError::Protocol("expected CreateSessionResponse"));
304 };
305 self.session_id = cs.session_id;
306 self.auth_token = cs.authentication_token;
307
308 let activate = ServiceRequest::ActivateSession(ActivateSessionRequest {
310 request_header: self.header(),
311 client_signature: SignatureData::default(),
312 locale_ids: Vec::new(),
313 user_identity_token: null_extension_object(),
314 user_token_signature: SignatureData::default(),
315 });
316 let resp = self.call_service(transport, activate)?;
317 if !matches!(resp, ServiceResponse::ActivateSession(_)) {
318 return Err(ClientError::Protocol("expected ActivateSessionResponse"));
319 }
320 self.connected = true;
321 Ok(())
322 }
323
324 fn open_secure_channel_plain<T: Transport>(
326 &mut self,
327 transport: &mut T,
328 ) -> Result<(), ClientError> {
329 let open = OpenSecureChannelRequest {
330 request_header: RequestHeader::new(NodeId::numeric(0, 0), self.next_handle()),
331 client_protocol_version: 0,
332 request_type: SecurityTokenRequestType::Issue,
333 security_mode: MessageSecurityMode::None,
334 client_nonce: Vec::new(),
335 requested_lifetime: 3_600_000,
336 };
337 let rid = self.next_request_id();
338 let open_resp_bytes = transport.request(&self.channel.open_chunk(rid, &open.encode()?)?)?;
339 let chunk = parse_chunk(&open_resp_bytes)?;
340 let mut br = UaReader::new(&chunk.body);
341 let _type_id = NodeId::decode(&mut br)?;
342 let open_resp = OpenSecureChannelResponse::decode_body(&mut br)?;
343 self.channel = SecureChannel::new(
345 open_resp.security_token.channel_id,
346 open_resp.security_token.token_id,
347 );
348 Ok(())
349 }
350
351 #[cfg(feature = "crypto")]
355 fn open_secure_channel_secured<T: Transport>(
356 &mut self,
357 transport: &mut T,
358 ) -> Result<(), ClientError> {
359 let handle = self.next_handle();
360 let request_id = self.next_request_id();
361 let seq = self.channel.next_send_sequence();
362
363 let policy;
364 let mode;
365 let client_nonce;
366 let opn_bytes;
367 {
368 let sec = self
369 .security
370 .as_mut()
371 .ok_or(ClientError::Protocol("no SecurityPolicy configured"))?;
372 policy = sec.policy;
373 mode = sec.mode;
374 let msm = match mode {
375 SecuredMode::Sign => MessageSecurityMode::Sign,
376 SecuredMode::SignAndEncrypt => MessageSecurityMode::SignAndEncrypt,
377 };
378 let nonce_len = policy.sym_enc_key_len().max(32);
379 let mut nonce = alloc::vec![0u8; nonce_len];
380 sec.rng.fill_bytes(&mut nonce);
381 client_nonce = nonce;
382 let open = OpenSecureChannelRequest {
383 request_header: RequestHeader::new(NodeId::numeric(0, 0), handle),
384 client_protocol_version: 0,
385 request_type: SecurityTokenRequestType::Issue,
386 security_mode: msm,
387 client_nonce: client_nonce.clone(),
388 requested_lifetime: 3_600_000,
389 };
390 let body = open.encode()?;
391 let ctx = AsymmetricContext {
392 policy,
393 sender_certificate: &sec.certificate,
394 sender_private_key: &sec.private_key,
395 receiver_certificate: &sec.server_certificate,
396 receiver_public_key: &sec.server_public_key,
397 };
398 let mut rng_ref: &mut dyn CryptoRngCore = sec.rng.as_mut();
399 opn_bytes = build_asymmetric_chunk(&mut rng_ref, &ctx, 0, seq, request_id, &body)
400 .map_err(|_| ClientError::Protocol("secured OPN build failed"))?;
401 }
402
403 let resp_bytes = transport.request(&opn_bytes)?;
404
405 let open_resp = {
406 let sec = self
407 .security
408 .as_ref()
409 .ok_or(ClientError::Protocol("no SecurityPolicy configured"))?;
410 let opened = open_asymmetric_chunk(
411 policy,
412 &sec.private_key,
413 &sec.server_public_key,
414 &resp_bytes,
415 )
416 .map_err(|_| ClientError::Protocol("secured OPN response open/verify failed"))?;
417 let mut br = UaReader::new(&opened.body);
418 let _type_id = NodeId::decode(&mut br)?;
419 OpenSecureChannelResponse::decode_body(&mut br)?
420 };
421
422 let send_keys = derive_keys(policy, &open_resp.server_nonce, &client_nonce)
423 .map_err(|_| ClientError::Protocol("key derivation failed"))?;
424 let recv_keys = derive_keys(policy, &client_nonce, &open_resp.server_nonce)
425 .map_err(|_| ClientError::Protocol("key derivation failed"))?;
426 let mut channel = SecureChannel::new(
427 open_resp.security_token.channel_id,
428 open_resp.security_token.token_id,
429 );
430 channel.install_security(SecuritySession {
431 policy,
432 mode,
433 send_keys,
434 recv_keys,
435 });
436 self.channel = channel;
437 Ok(())
438 }
439
440 pub fn read_values<T: Transport>(
445 &mut self,
446 transport: &mut T,
447 node_ids: &[NodeId],
448 ) -> Result<Vec<DataValue>, ClientError> {
449 if !self.connected {
450 return Err(ClientError::Protocol("not connected"));
451 }
452 let nodes_to_read = node_ids
453 .iter()
454 .map(|n| ReadValueId {
455 node_id: n.clone(),
456 attribute_id: ATTRIBUTE_VALUE,
457 index_range: String::new(),
458 data_encoding: zerodds_opcua_gateway::types::QualifiedName {
459 namespace_index: 0,
460 name: String::new(),
461 },
462 })
463 .collect();
464 let req = ServiceRequest::Read(ReadRequest {
465 request_header: self.header(),
466 max_age: 0.0,
467 timestamps_to_return: 0,
468 nodes_to_read,
469 });
470 let ServiceResponse::Read(rr) = self.call_service(transport, req)? else {
471 return Err(ClientError::Protocol("expected ReadResponse"));
472 };
473 Ok(rr.results)
474 }
475
476 pub fn write_values<T: Transport>(
482 &mut self,
483 transport: &mut T,
484 writes: &[(NodeId, DataValue)],
485 ) -> Result<Vec<u32>, ClientError> {
486 if !self.connected {
487 return Err(ClientError::Protocol("not connected"));
488 }
489 let nodes_to_write = writes
490 .iter()
491 .map(|(n, v)| WriteValue {
492 node_id: n.clone(),
493 attribute_id: ATTRIBUTE_VALUE,
494 index_range: String::new(),
495 value: v.clone(),
496 })
497 .collect();
498 let req = ServiceRequest::Write(WriteRequest {
499 request_header: self.header(),
500 nodes_to_write,
501 });
502 let ServiceResponse::Write(wr) = self.call_service(transport, req)? else {
503 return Err(ClientError::Protocol("expected WriteResponse"));
504 };
505 Ok(wr.results)
506 }
507
508 pub fn get_endpoints<T: Transport>(
515 &mut self,
516 transport: &mut T,
517 endpoint_url: &str,
518 ) -> Result<Vec<EndpointDescription>, ClientError> {
519 if !self.channel_open {
520 return Err(ClientError::Protocol("channel not open"));
521 }
522 let req = ServiceRequest::GetEndpoints(GetEndpointsRequest {
523 request_header: self.header(),
524 endpoint_url: String::from(endpoint_url),
525 locale_ids: Vec::new(),
526 profile_uris: Vec::new(),
527 });
528 let ServiceResponse::GetEndpoints(ge) = self.call_service(transport, req)? else {
529 return Err(ClientError::Protocol("expected GetEndpointsResponse"));
530 };
531 Ok(ge.endpoints)
532 }
533
534 pub fn find_servers<T: Transport>(
539 &mut self,
540 transport: &mut T,
541 endpoint_url: &str,
542 ) -> Result<Vec<ApplicationDescription>, ClientError> {
543 if !self.channel_open {
544 return Err(ClientError::Protocol("channel not open"));
545 }
546 let req = ServiceRequest::FindServers(FindServersRequest {
547 request_header: self.header(),
548 endpoint_url: String::from(endpoint_url),
549 locale_ids: Vec::new(),
550 server_uris: Vec::new(),
551 });
552 let ServiceResponse::FindServers(fs) = self.call_service(transport, req)? else {
553 return Err(ClientError::Protocol("expected FindServersResponse"));
554 };
555 Ok(fs.servers)
556 }
557
558 pub fn create_subscription<T: Transport>(
563 &mut self,
564 transport: &mut T,
565 ) -> Result<u32, ClientError> {
566 if !self.connected {
567 return Err(ClientError::Protocol("not connected"));
568 }
569 let req = ServiceRequest::CreateSubscription(CreateSubscriptionRequest {
570 request_header: self.header(),
571 requested_publishing_interval: 1000.0,
572 requested_lifetime_count: 10_000,
573 requested_max_keep_alive_count: 10,
574 max_notifications_per_publish: 0,
575 publishing_enabled: true,
576 priority: 0,
577 });
578 let ServiceResponse::CreateSubscription(cs) = self.call_service(transport, req)? else {
579 return Err(ClientError::Protocol("expected CreateSubscriptionResponse"));
580 };
581 Ok(cs.subscription_id)
582 }
583
584 pub fn create_monitored_items<T: Transport>(
590 &mut self,
591 transport: &mut T,
592 subscription_id: u32,
593 items: &[(NodeId, u32)],
594 ) -> Result<Vec<MonitoredItemCreateResult>, ClientError> {
595 if !self.connected {
596 return Err(ClientError::Protocol("not connected"));
597 }
598 let items_to_create = items
599 .iter()
600 .map(|(node_id, client_handle)| MonitoredItemCreateRequest {
601 item_to_monitor: ReadValueId {
602 node_id: node_id.clone(),
603 attribute_id: ATTRIBUTE_VALUE,
604 index_range: String::new(),
605 data_encoding: zerodds_opcua_gateway::types::QualifiedName {
606 namespace_index: 0,
607 name: String::new(),
608 },
609 },
610 monitoring_mode: 2, requested_parameters: MonitoringParameters {
612 client_handle: *client_handle,
613 sampling_interval: 1000.0,
614 filter: null_filter(),
615 queue_size: 1,
616 discard_oldest: true,
617 },
618 })
619 .collect();
620 let req = ServiceRequest::CreateMonitoredItems(CreateMonitoredItemsRequest {
621 request_header: self.header(),
622 subscription_id,
623 timestamps_to_return: 0,
624 items_to_create,
625 });
626 let ServiceResponse::CreateMonitoredItems(cm) = self.call_service(transport, req)? else {
627 return Err(ClientError::Protocol(
628 "expected CreateMonitoredItemsResponse",
629 ));
630 };
631 Ok(cm.results)
632 }
633
634 pub fn publish<T: Transport>(
640 &mut self,
641 transport: &mut T,
642 ) -> Result<(u32, Vec<MonitoredItemNotification>), ClientError> {
643 if !self.connected {
644 return Err(ClientError::Protocol("not connected"));
645 }
646 let req = ServiceRequest::Publish(PublishRequest {
647 request_header: self.header(),
648 subscription_acknowledgements: Vec::new(),
649 });
650 let ServiceResponse::Publish(pr) = self.call_service(transport, req)? else {
651 return Err(ClientError::Protocol("expected PublishResponse"));
652 };
653 let mut notifications = Vec::new();
654 for eo in &pr.notification_message.notification_data {
655 if eo.type_id == node_ids::DATA_CHANGE_NOTIFICATION {
656 let dcn = DataChangeNotification::from_extension_object(eo)?;
657 notifications.extend(dcn.monitored_items);
658 }
659 }
660 Ok((pr.subscription_id, notifications))
661 }
662
663 pub fn delete_subscriptions<T: Transport>(
668 &mut self,
669 transport: &mut T,
670 subscription_ids: &[u32],
671 ) -> Result<Vec<u32>, ClientError> {
672 if !self.connected {
673 return Err(ClientError::Protocol("not connected"));
674 }
675 let req = ServiceRequest::DeleteSubscriptions(DeleteSubscriptionsRequest {
676 request_header: self.header(),
677 subscription_ids: subscription_ids.to_vec(),
678 });
679 let ServiceResponse::DeleteSubscriptions(ds) = self.call_service(transport, req)? else {
680 return Err(ClientError::Protocol(
681 "expected DeleteSubscriptionsResponse",
682 ));
683 };
684 Ok(ds.results)
685 }
686
687 pub fn browse<T: Transport>(
693 &mut self,
694 transport: &mut T,
695 node_id: NodeId,
696 ) -> Result<Vec<ReferenceDescription>, ClientError> {
697 if !self.connected {
698 return Err(ClientError::Protocol("not connected"));
699 }
700 let req = ServiceRequest::Browse(BrowseRequest {
701 request_header: self.header(),
702 view: ViewDescription::default(),
703 requested_max_references_per_node: 0,
704 nodes_to_browse: alloc::vec![BrowseDescription {
705 node_id,
706 browse_direction: 0, reference_type_id: NodeId::numeric(0, 31), include_subtypes: true,
709 node_class_mask: 0,
710 result_mask: 0x3F,
711 }],
712 });
713 let ServiceResponse::Browse(br) = self.call_service(transport, req)? else {
714 return Err(ClientError::Protocol("expected BrowseResponse"));
715 };
716 let first = br
717 .results
718 .into_iter()
719 .next()
720 .ok_or(ClientError::Protocol("empty BrowseResponse"))?;
721 if first.status_code != 0 {
722 return Err(ClientError::Protocol("browse returned a bad status"));
723 }
724 Ok(first.references)
725 }
726
727 pub fn call_method<T: Transport>(
732 &mut self,
733 transport: &mut T,
734 object_id: NodeId,
735 method_id: NodeId,
736 input_arguments: Vec<Variant>,
737 ) -> Result<CallMethodResult, ClientError> {
738 if !self.connected {
739 return Err(ClientError::Protocol("not connected"));
740 }
741 let req = ServiceRequest::Call(CallRequest {
742 request_header: self.header(),
743 methods_to_call: alloc::vec![CallMethodRequest {
744 object_id,
745 method_id,
746 input_arguments,
747 }],
748 });
749 let ServiceResponse::Call(cr) = self.call_service(transport, req)? else {
750 return Err(ClientError::Protocol("expected CallResponse"));
751 };
752 cr.results
753 .into_iter()
754 .next()
755 .ok_or(ClientError::Protocol("empty CallResponse"))
756 }
757
758 fn call_service<T: Transport>(
761 &mut self,
762 transport: &mut T,
763 req: ServiceRequest,
764 ) -> Result<ServiceResponse, ClientError> {
765 let rid = self.next_request_id();
766 let chunk = self.channel.message_chunk(rid, &req.encode()?)?;
767 let resp_bytes = transport.request(&chunk)?;
768 let parsed = self.channel.open_incoming(&resp_bytes)?;
769 ServiceResponse::decode(&parsed.body).map_err(ClientError::from)
770 }
771}
772
773fn client_description() -> ApplicationDescription {
774 ApplicationDescription {
775 application_uri: String::from("urn:zerodds:opcua-client"),
776 product_uri: String::from("urn:zerodds"),
777 application_name: zerodds_opcua_gateway::types::LocalizedText {
778 locale: None,
779 text: Some(String::from("ZeroDDS OPC-UA Client")),
780 },
781 application_type: ApplicationType::Client,
782 gateway_server_uri: String::new(),
783 discovery_profile_uri: String::new(),
784 discovery_urls: Vec::new(),
785 }
786}
787
788#[cfg(feature = "std")]
793pub use tcp::{TcpTransport, serve_connection};
794
795#[cfg(feature = "std")]
796mod tcp {
797 use super::ClientError;
798 use crate::server::Server;
799 use alloc::vec;
800 use alloc::vec::Vec;
801 use std::io::{Read, Write};
802 use std::net::TcpStream;
803 use std::string::ToString;
804
805 fn read_message(stream: &mut TcpStream) -> Result<Vec<u8>, ClientError> {
806 let mut header = [0u8; 8];
807 stream
808 .read_exact(&mut header)
809 .map_err(|e| ClientError::Io(e.to_string()))?;
810 let size = u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as usize;
811 if size < 8 {
812 return Err(ClientError::Protocol(
813 "UACP MessageSize below header length",
814 ));
815 }
816 let mut rest = vec![0u8; size - 8];
817 stream
818 .read_exact(&mut rest)
819 .map_err(|e| ClientError::Io(e.to_string()))?;
820 let mut msg = Vec::with_capacity(size);
821 msg.extend_from_slice(&header);
822 msg.extend_from_slice(&rest);
823 Ok(msg)
824 }
825
826 #[derive(Debug)]
828 pub struct TcpTransport {
829 stream: TcpStream,
830 }
831
832 impl TcpTransport {
833 pub fn connect(addr: &str) -> Result<Self, ClientError> {
838 let stream = TcpStream::connect(addr).map_err(|e| ClientError::Io(e.to_string()))?;
839 Ok(Self { stream })
840 }
841 }
842
843 impl super::Transport for TcpTransport {
844 fn request(&mut self, message: &[u8]) -> Result<Vec<u8>, ClientError> {
845 self.stream
846 .write_all(message)
847 .map_err(|e| ClientError::Io(e.to_string()))?;
848 read_message(&mut self.stream)
849 }
850
851 fn send(&mut self, message: &[u8]) -> Result<(), ClientError> {
852 self.stream
853 .write_all(message)
854 .map_err(|e| ClientError::Io(e.to_string()))
855 }
856 }
857
858 pub fn serve_connection(server: &mut Server, mut stream: TcpStream) -> Result<(), ClientError> {
864 loop {
865 let msg = match read_message(&mut stream) {
866 Ok(m) => m,
867 Err(_) => return Ok(()), };
869 match server
870 .process(&msg)
871 .map_err(|_| ClientError::Protocol("server error"))?
872 {
873 Some(resp) => stream
874 .write_all(&resp)
875 .map_err(|e| ClientError::Io(e.to_string()))?,
876 None => return Ok(()), }
878 }
879 }
880}
881
882#[cfg(test)]
883mod tests {
884 use super::*;
885 use crate::address_space::{AddressSpace, MethodOutcome};
886 use crate::server::Server;
887 use zerodds_opcua_gateway::data_value::VariantValue;
888
889 fn demo_server() -> Server {
890 let mut space = AddressSpace::new();
891 space.set_value(
892 NodeId::numeric(1, 1),
893 DataValue::new_value(Variant::scalar(VariantValue::Int32(4242)), 0, 0),
894 );
895 space.register_method(NodeId::numeric(1, 100), |args| {
897 if let Some(v) = args.first() {
898 if let Some(VariantValue::Int32(x)) = v.value.first() {
899 return MethodOutcome::good(alloc::vec![Variant::scalar(VariantValue::Int32(
900 x * 2
901 ))]);
902 }
903 }
904 MethodOutcome::fault(0x8000_0000)
905 });
906 Server::new("opc.tcp://localhost:4840", space)
907 }
908
909 #[test]
910 fn e2e_loopback_connect_read_call() {
911 let mut server = demo_server();
912 let mut client = Client::new();
913 {
914 let mut t = LoopbackTransport::new(&mut server);
915 client
916 .connect(&mut t, "opc.tcp://localhost:4840")
917 .expect("connect");
918 assert_ne!(*client.session_id(), NodeId::numeric(0, 0));
919
920 let vals = client
922 .read_values(&mut t, &[NodeId::numeric(1, 1)])
923 .expect("read");
924 assert_eq!(
925 vals[0].value,
926 Some(Variant::scalar(VariantValue::Int32(4242)))
927 );
928
929 let res = client
931 .call_method(
932 &mut t,
933 NodeId::numeric(0, 0),
934 NodeId::numeric(1, 100),
935 alloc::vec![Variant::scalar(VariantValue::Int32(21))],
936 )
937 .expect("call");
938 assert_eq!(res.status_code, 0);
939 assert_eq!(
940 res.output_arguments[0],
941 Variant::scalar(VariantValue::Int32(42))
942 );
943 }
944 }
945
946 #[test]
947 fn e2e_loopback_write_then_read() {
948 let mut server = demo_server();
949 let mut client = Client::new();
950 let mut t = LoopbackTransport::new(&mut server);
951 client.connect(&mut t, "opc.tcp://x").expect("connect");
952
953 let results = client
955 .write_values(
956 &mut t,
957 &[
958 (
959 NodeId::numeric(1, 1),
960 DataValue::new_value(Variant::scalar(VariantValue::Int32(7)), 0, 0),
961 ),
962 (
963 NodeId::numeric(1, 2),
964 DataValue::new_value(Variant::scalar(VariantValue::Int32(9)), 0, 0),
965 ),
966 ],
967 )
968 .expect("write");
969 assert_eq!(results, alloc::vec![0, 0]);
970
971 let vals = client
973 .read_values(&mut t, &[NodeId::numeric(1, 1), NodeId::numeric(1, 2)])
974 .expect("read");
975 assert_eq!(vals[0].value, Some(Variant::scalar(VariantValue::Int32(7))));
976 assert_eq!(vals[1].value, Some(Variant::scalar(VariantValue::Int32(9))));
977 }
978
979 #[test]
980 fn read_unknown_node_yields_bad_status() {
981 let mut server = demo_server();
982 let mut client = Client::new();
983 let mut t = LoopbackTransport::new(&mut server);
984 client.connect(&mut t, "opc.tcp://x").expect("connect");
985 let vals = client
986 .read_values(&mut t, &[NodeId::numeric(1, 999)])
987 .expect("read");
988 assert_eq!(vals[0].status, Some(crate::server::BAD_NODE_ID_UNKNOWN));
989 assert_eq!(vals[0].value, None);
990 }
991
992 #[test]
993 fn e2e_loopback_subscription_publish() {
994 let mut server = demo_server();
995 let mut client = Client::new();
996 let mut t = LoopbackTransport::new(&mut server);
997 client.connect(&mut t, "opc.tcp://x").expect("connect");
998
999 let sub = client
1000 .create_subscription(&mut t)
1001 .expect("create subscription");
1002 let results = client
1003 .create_monitored_items(&mut t, sub, &[(NodeId::numeric(1, 1), 77)])
1004 .expect("create monitored items");
1005 assert_eq!(results.len(), 1);
1006 assert_eq!(results[0].status_code, 0);
1007
1008 let (sid, notes) = client.publish(&mut t).expect("publish 1");
1010 assert_eq!(sid, sub);
1011 assert_eq!(notes.len(), 1);
1012 assert_eq!(notes[0].client_handle, 77);
1013 assert_eq!(
1014 notes[0].value.value,
1015 Some(Variant::scalar(VariantValue::Int32(4242)))
1016 );
1017
1018 let (_, none) = client.publish(&mut t).expect("publish 2");
1020 assert!(none.is_empty());
1021
1022 client
1024 .write_values(
1025 &mut t,
1026 &[(
1027 NodeId::numeric(1, 1),
1028 DataValue::new_value(Variant::scalar(VariantValue::Int32(123)), 0, 0),
1029 )],
1030 )
1031 .expect("write");
1032 let (_, changed) = client.publish(&mut t).expect("publish 3");
1033 assert_eq!(changed.len(), 1);
1034 assert_eq!(
1035 changed[0].value.value,
1036 Some(Variant::scalar(VariantValue::Int32(123)))
1037 );
1038
1039 let del = client.delete_subscriptions(&mut t, &[sub]).expect("delete");
1041 assert_eq!(del, alloc::vec![0]);
1042 let (sid2, _) = client.publish(&mut t).expect("publish 4");
1044 assert_eq!(sid2, 0);
1045 }
1046
1047 #[test]
1048 fn e2e_loopback_discovery() {
1049 let mut server = demo_server();
1050 let mut client = Client::new();
1051 let mut t = LoopbackTransport::new(&mut server);
1052 client
1054 .open_channel(&mut t, "opc.tcp://localhost:4840")
1055 .expect("open channel");
1056
1057 let eps = client
1058 .get_endpoints(&mut t, "opc.tcp://localhost:4840")
1059 .expect("get endpoints");
1060 assert_eq!(eps.len(), 1);
1061 assert_eq!(eps[0].endpoint_url, "opc.tcp://localhost:4840");
1062 assert_eq!(
1063 eps[0].security_policy_uri,
1064 zerodds_opcua_uacp::securechannel::SECURITY_POLICY_NONE
1065 );
1066
1067 let servers = client
1068 .find_servers(&mut t, "opc.tcp://localhost:4840")
1069 .expect("find servers");
1070 assert_eq!(servers.len(), 1);
1071 assert_eq!(
1072 servers[0].application_name.text.as_deref(),
1073 Some("ZeroDDS OPC-UA Server")
1074 );
1075
1076 let mut fresh = Client::new();
1078 let mut t2 = LoopbackTransport::new(&mut server);
1079 assert!(fresh.get_endpoints(&mut t2, "opc.tcp://x").is_err());
1080 }
1081
1082 #[test]
1083 fn e2e_loopback_browse() {
1084 use crate::address_space::{NodeClass, NodeMeta, reference_types};
1085 use zerodds_opcua_gateway::types::{LocalizedText, QualifiedName};
1086
1087 let mut space = AddressSpace::new();
1088 let objects = NodeId::numeric(0, 85);
1089 let boiler = NodeId::numeric(1, 1);
1090 let temp = NodeId::numeric(1, 2);
1091 for (id, class, name) in [
1092 (objects.clone(), NodeClass::Object, "Objects"),
1093 (boiler.clone(), NodeClass::Object, "Boiler"),
1094 (temp.clone(), NodeClass::Variable, "Temperature"),
1095 ] {
1096 space.add_node(NodeMeta {
1097 node_id: id,
1098 node_class: class,
1099 browse_name: QualifiedName {
1100 namespace_index: 1,
1101 name: String::from(name),
1102 },
1103 display_name: LocalizedText {
1104 locale: None,
1105 text: Some(String::from(name)),
1106 },
1107 type_definition: NodeId::numeric(0, 58),
1108 });
1109 }
1110 space.add_reference(objects.clone(), reference_types::ORGANIZES, boiler.clone());
1111 space.add_reference(boiler.clone(), reference_types::HAS_COMPONENT, temp.clone());
1112 let mut server = Server::new("opc.tcp://localhost:4840", space);
1113
1114 let mut client = Client::new();
1115 let mut t = LoopbackTransport::new(&mut server);
1116 client
1117 .connect(&mut t, "opc.tcp://localhost:4840")
1118 .expect("connect");
1119
1120 let refs = client.browse(&mut t, objects).expect("browse objects");
1122 assert_eq!(refs.len(), 1);
1123 assert_eq!(refs[0].node_id.node_id, boiler);
1124 assert_eq!(refs[0].browse_name.name, "Boiler");
1125 assert_eq!(refs[0].reference_type_id, reference_types::ORGANIZES);
1126 assert!(refs[0].is_forward);
1127
1128 let children = client.browse(&mut t, boiler).expect("browse boiler");
1130 assert_eq!(children.len(), 1);
1131 assert_eq!(children[0].node_id.node_id, temp);
1132 assert_eq!(children[0].node_class, NodeClass::Variable.as_i32());
1133
1134 assert!(client.browse(&mut t, NodeId::numeric(1, 999)).is_err());
1137 }
1138
1139 #[cfg(feature = "crypto")]
1140 #[test]
1141 fn e2e_secured_loopback_sign_and_encrypt() {
1142 use crate::server::ServerSecurity;
1143 use rand::rngs::OsRng;
1144 use zerodds_opcua_uacp::crypto::{RsaPrivateKey, SecuredMode, SecurityPolicy};
1145
1146 let mut kg = OsRng;
1148 let client_key = RsaPrivateKey::new(&mut kg, 2048).expect("client key");
1149 let server_key = RsaPrivateKey::new(&mut kg, 2048).expect("server key");
1150 let client_cert = b"client-cert-der".to_vec();
1151 let server_cert = b"server-cert-der".to_vec();
1152 let client_pub = client_key.to_public_key();
1153 let server_pub = server_key.to_public_key();
1154 let policy = SecurityPolicy::Basic256Sha256;
1155 let mode = SecuredMode::SignAndEncrypt;
1156
1157 let mut server = demo_server();
1158 server.set_security(ServerSecurity {
1159 policy,
1160 mode,
1161 private_key: server_key.clone(),
1162 certificate: server_cert.clone(),
1163 client_certificate: client_cert.clone(),
1164 client_public_key: client_pub,
1165 rng: alloc::boxed::Box::new(OsRng),
1166 });
1167
1168 let mut client = Client::new();
1169 client.set_security(super::ClientSecurity {
1170 policy,
1171 mode,
1172 private_key: client_key,
1173 certificate: client_cert,
1174 server_certificate: server_cert,
1175 server_public_key: server_pub,
1176 rng: alloc::boxed::Box::new(OsRng),
1177 });
1178
1179 let mut t = LoopbackTransport::new(&mut server);
1180 client
1181 .connect(&mut t, "opc.tcp://secure")
1182 .expect("secured connect");
1183 assert_ne!(*client.session_id(), NodeId::numeric(0, 0));
1184
1185 let vals = client
1187 .read_values(&mut t, &[NodeId::numeric(1, 1)])
1188 .expect("secured read");
1189 assert_eq!(
1190 vals[0].value,
1191 Some(Variant::scalar(VariantValue::Int32(4242)))
1192 );
1193
1194 let res = client
1196 .call_method(
1197 &mut t,
1198 NodeId::numeric(0, 0),
1199 NodeId::numeric(1, 100),
1200 alloc::vec![Variant::scalar(VariantValue::Int32(21))],
1201 )
1202 .expect("secured call");
1203 assert_eq!(
1204 res.output_arguments[0],
1205 Variant::scalar(VariantValue::Int32(42))
1206 );
1207
1208 let w = client
1210 .write_values(
1211 &mut t,
1212 &[(
1213 NodeId::numeric(1, 2),
1214 DataValue::new_value(Variant::scalar(VariantValue::Int32(9)), 0, 0),
1215 )],
1216 )
1217 .expect("secured write");
1218 assert_eq!(w, alloc::vec![0]);
1219 let back = client
1220 .read_values(&mut t, &[NodeId::numeric(1, 2)])
1221 .expect("secured read back");
1222 assert_eq!(back[0].value, Some(Variant::scalar(VariantValue::Int32(9))));
1223 }
1224
1225 #[cfg(feature = "std")]
1226 #[test]
1227 fn e2e_tcp_connect_read_call() {
1228 use std::net::TcpListener;
1229 use std::thread;
1230
1231 let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
1232 let addr = listener.local_addr().expect("addr").to_string();
1233
1234 let server_thread = thread::spawn(move || {
1235 let mut server = demo_server();
1236 let (stream, _) = listener.accept().expect("accept");
1237 serve_connection(&mut server, stream).expect("serve");
1238 });
1239
1240 let mut transport = TcpTransport::connect(&addr).expect("connect tcp");
1241 let mut client = Client::new();
1242 let url = alloc::format!("opc.tcp://{addr}");
1243 client.connect(&mut transport, &url).expect("connect");
1244 let vals = client
1245 .read_values(&mut transport, &[NodeId::numeric(1, 1)])
1246 .expect("read");
1247 assert_eq!(
1248 vals[0].value,
1249 Some(Variant::scalar(VariantValue::Int32(4242)))
1250 );
1251 let res = client
1252 .call_method(
1253 &mut transport,
1254 NodeId::numeric(0, 0),
1255 NodeId::numeric(1, 100),
1256 alloc::vec![Variant::scalar(VariantValue::Int32(50))],
1257 )
1258 .expect("call");
1259 assert_eq!(
1260 res.output_arguments[0],
1261 Variant::scalar(VariantValue::Int32(100))
1262 );
1263
1264 drop(transport);
1266 server_thread.join().expect("join");
1267 }
1268}