Skip to main content

zerodds_opcua_server/
server.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! The OPC-UA [`Server`]: a request-driven state machine that runs the
4//! Hello → OpenSecureChannel → Session handshake and dispatches the Read and
5//! Call services against an [`AddressSpace`] (SecurityMode `None`).
6
7use alloc::vec::Vec;
8
9use zerodds_opcua_gateway::data_value::DataValue;
10use zerodds_opcua_gateway::node_id::{ExpandedNodeId, NodeId};
11use zerodds_opcua_gateway::types::{LocalizedText, QualifiedName};
12use zerodds_opcua_pubsub::uadp::datatypes::{
13    ApplicationType, EndpointDescription, MessageSecurityMode,
14};
15use zerodds_opcua_pubsub::{DecodeError, EncodeError, UaDecode, UaReader};
16use zerodds_opcua_uacp::connection::{
17    AcknowledgeMessage, HelloMessage, MessageHeader, MessageType,
18};
19use zerodds_opcua_uacp::securechannel::{
20    ChannelSecurityToken, OpenSecureChannelRequest, OpenSecureChannelResponse, ResponseHeader,
21    SECURITY_POLICY_NONE, SecureChannel, parse_chunk,
22};
23
24use crate::address_space::AddressSpace;
25use crate::services::{
26    ATTRIBUTE_VALUE, ActivateSessionResponse, BrowseDescription, BrowseResponse, BrowseResult,
27    CallMethodResult, CallResponse, CloseSessionResponse, CreateMonitoredItemsResponse,
28    CreateSessionResponse, CreateSubscriptionResponse, DataChangeNotification,
29    DeleteSubscriptionsResponse, FindServersResponse, GetEndpointsResponse,
30    MonitoredItemCreateResult, MonitoredItemNotification, NotificationMessage, PublishResponse,
31    ReadResponse, ReferenceDescription, ServiceRequest, ServiceResponse, SetPublishingModeResponse,
32    SignatureData, WriteResponse, null_filter,
33};
34
35#[cfg(feature = "crypto")]
36use alloc::boxed::Box;
37#[cfg(feature = "crypto")]
38use zerodds_opcua_uacp::crypto::{
39    AsymmetricContext, CryptoRngCore, RsaPrivateKey, RsaPublicKey, SecuredMode, SecurityPolicy,
40    build_asymmetric_chunk, derive_keys, open_asymmetric_chunk,
41};
42#[cfg(feature = "crypto")]
43use zerodds_opcua_uacp::securechannel::SecuritySession;
44
45/// The server's secured-SecurityPolicy configuration (feature `crypto`): the
46/// server's own RSA key + certificate, the trusted client certificate +
47/// public key, and a caller-supplied CSPRNG. With this set, the server runs the
48/// secured OpenSecureChannel handshake and protects `MSG`/`CLO`.
49#[cfg(feature = "crypto")]
50pub struct ServerSecurity {
51    /// Negotiated SecurityPolicy.
52    pub policy: SecurityPolicy,
53    /// Sign or SignAndEncrypt.
54    pub mode: SecuredMode,
55    /// The server's RSA private key.
56    pub private_key: RsaPrivateKey,
57    /// The server's DER application certificate.
58    pub certificate: alloc::vec::Vec<u8>,
59    /// The trusted client's DER certificate.
60    pub client_certificate: alloc::vec::Vec<u8>,
61    /// The trusted client's RSA public key (verifies the client's OPN).
62    pub client_public_key: RsaPublicKey,
63    /// A cryptographically secure RNG (OS RNG under `std`).
64    pub rng: Box<dyn CryptoRngCore + Send>,
65}
66
67/// StatusCode `Good`.
68pub const GOOD: u32 = 0;
69/// StatusCode `Bad_NodeIdUnknown`.
70pub const BAD_NODE_ID_UNKNOWN: u32 = 0x8034_0000;
71/// StatusCode `Bad_MethodInvalid`.
72pub const BAD_METHOD_INVALID: u32 = 0x8025_0000;
73/// StatusCode `Bad_AttributeIdInvalid`.
74pub const BAD_ATTRIBUTE_ID_INVALID: u32 = 0x8035_0000;
75/// StatusCode `Bad_SubscriptionIdInvalid`.
76pub const BAD_SUBSCRIPTION_ID_INVALID: u32 = 0x8028_0000;
77
78/// A monitored item the server samples on Publish. (The `monitored_item_id`
79/// returned to the client is the per-subscription counter; we don't yet expose
80/// Modify/DeleteMonitoredItems, so it isn't retained per item.)
81#[derive(Debug, Clone)]
82struct MonitoredItem {
83    client_handle: u32,
84    node_id: NodeId,
85    last_value: Option<DataValue>,
86}
87
88/// A server-side subscription: a set of monitored items + a sequence counter.
89#[derive(Debug, Clone)]
90struct Subscription {
91    id: u32,
92    publishing_enabled: bool,
93    seq: u32,
94    next_item_id: u32,
95    items: Vec<MonitoredItem>,
96}
97
98/// An error from the server state machine.
99#[derive(Debug, Clone, PartialEq)]
100pub enum ServerError {
101    /// A message could not be decoded.
102    Decode(DecodeError),
103    /// A response could not be encoded.
104    Encode(EncodeError),
105    /// A protocol-state violation.
106    Protocol(&'static str),
107}
108
109impl From<DecodeError> for ServerError {
110    fn from(e: DecodeError) -> Self {
111        Self::Decode(e)
112    }
113}
114
115impl From<EncodeError> for ServerError {
116    fn from(e: EncodeError) -> Self {
117        Self::Encode(e)
118    }
119}
120
121impl core::fmt::Display for ServerError {
122    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
123        match self {
124            Self::Decode(e) => write!(f, "decode error: {e}"),
125            Self::Encode(e) => write!(f, "encode error: {e}"),
126            Self::Protocol(m) => write!(f, "protocol error: {m}"),
127        }
128    }
129}
130
131#[cfg(feature = "std")]
132impl std::error::Error for ServerError {}
133
134/// An OPC-UA server serving an [`AddressSpace`] over the SecureChannel/UACP
135/// transport. Drive it by feeding each received UACP message to
136/// [`process`](Self::process); the returned bytes (if any) are the response to
137/// send back.
138pub struct Server {
139    address_space: AddressSpace,
140    endpoint_url: alloc::string::String,
141    channel: Option<SecureChannel>,
142    next_channel_id: u32,
143    next_token_id: u32,
144    next_session: u32,
145    subscriptions: Vec<Subscription>,
146    next_subscription_id: u32,
147    #[cfg(feature = "crypto")]
148    security: Option<ServerSecurity>,
149}
150
151impl core::fmt::Debug for Server {
152    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
153        f.debug_struct("Server")
154            .field("endpoint_url", &self.endpoint_url)
155            .field("channel", &self.channel)
156            .finish_non_exhaustive()
157    }
158}
159
160impl Server {
161    /// Creates a server serving `address_space` at `endpoint_url`.
162    #[must_use]
163    pub fn new(
164        endpoint_url: impl Into<alloc::string::String>,
165        address_space: AddressSpace,
166    ) -> Self {
167        Self {
168            address_space,
169            endpoint_url: endpoint_url.into(),
170            channel: None,
171            next_channel_id: 1,
172            next_token_id: 1,
173            next_session: 1,
174            subscriptions: Vec::new(),
175            next_subscription_id: 1,
176            #[cfg(feature = "crypto")]
177            security: None,
178        }
179    }
180
181    /// Enables secured SecurityPolicies for the OpenSecureChannel handshake.
182    #[cfg(feature = "crypto")]
183    pub fn set_security(&mut self, security: ServerSecurity) {
184        self.security = Some(security);
185    }
186
187    /// Mutable access to the served AddressSpace.
188    pub fn address_space_mut(&mut self) -> &mut AddressSpace {
189        &mut self.address_space
190    }
191
192    /// Processes one received UACP message, returning the response to send
193    /// (or `None` for messages that need no reply, e.g. CloseSecureChannel).
194    ///
195    /// # Errors
196    /// [`ServerError`] on a malformed message or protocol-state violation.
197    pub fn process(&mut self, incoming: &[u8]) -> Result<Option<Vec<u8>>, ServerError> {
198        let mut r = UaReader::new(incoming);
199        let header = MessageHeader::decode(&mut r)?;
200        match header.message_type {
201            MessageType::Hello => {
202                let hello = HelloMessage::decode_body(&mut r)?;
203                let ack = AcknowledgeMessage {
204                    protocol_version: 0,
205                    receive_buffer_size: hello.receive_buffer_size.max(8192),
206                    send_buffer_size: hello.send_buffer_size.max(8192),
207                    max_message_size: hello.max_message_size,
208                    max_chunk_count: hello.max_chunk_count,
209                };
210                Ok(Some(ack.encode()?))
211            }
212            MessageType::OpenSecureChannel => {
213                let chunk = parse_chunk(incoming)?;
214                let secured = chunk
215                    .asymmetric_header
216                    .as_ref()
217                    .is_some_and(|h| h.security_policy_uri != SECURITY_POLICY_NONE);
218                #[cfg(feature = "crypto")]
219                if secured && self.security.is_some() {
220                    return Ok(Some(self.open_secure_channel_secured(incoming)?));
221                }
222                if secured {
223                    return Err(ServerError::Protocol(
224                        "secured OpenSecureChannel but server has no SecurityPolicy configured",
225                    ));
226                }
227                let mut br = UaReader::new(&chunk.body);
228                let _type_id = NodeId::decode(&mut br)?;
229                let req = OpenSecureChannelRequest::decode_body(&mut br)?;
230                Ok(Some(self.open_secure_channel(
231                    &req,
232                    chunk.sequence_header.request_id,
233                )?))
234            }
235            MessageType::Message => {
236                let chunk = self
237                    .channel
238                    .as_ref()
239                    .ok_or(ServerError::Protocol("MSG before OpenSecureChannel"))?
240                    .open_incoming(incoming)?;
241                let req = ServiceRequest::decode(&chunk.body)?;
242                let resp = self.handle_service(req);
243                let body = resp.encode()?;
244                let channel = self
245                    .channel
246                    .as_mut()
247                    .ok_or(ServerError::Protocol("MSG before OpenSecureChannel"))?;
248                Ok(Some(
249                    channel.message_chunk(chunk.sequence_header.request_id, &body)?,
250                ))
251            }
252            MessageType::CloseSecureChannel => {
253                self.channel = None;
254                Ok(None)
255            }
256            other => {
257                let _ = other;
258                Err(ServerError::Protocol("unexpected message type"))
259            }
260        }
261    }
262
263    fn open_secure_channel(
264        &mut self,
265        req: &OpenSecureChannelRequest,
266        request_id: u32,
267    ) -> Result<Vec<u8>, ServerError> {
268        let channel_id = self.next_channel_id;
269        self.next_channel_id += 1;
270        let token_id = self.next_token_id;
271        self.next_token_id += 1;
272        let mut channel = SecureChannel::new(channel_id, token_id);
273
274        let resp = OpenSecureChannelResponse {
275            response_header: ResponseHeader::new(req.request_header.request_handle, GOOD),
276            server_protocol_version: 0,
277            security_token: ChannelSecurityToken {
278                channel_id,
279                token_id,
280                created_at: 0,
281                revised_lifetime: req.requested_lifetime.max(60_000),
282            },
283            server_nonce: Vec::new(),
284        };
285        let body = resp.encode()?;
286        let out = channel.open_chunk(request_id, &body)?;
287        channel.set_token_id(token_id);
288        self.channel = Some(channel);
289        Ok(out)
290    }
291
292    /// The secured OpenSecureChannel handshake: opens the client's asymmetric
293    /// OPN, derives the §6.7.6 keys, and returns the secured OPN response while
294    /// installing the symmetric [`SecuritySession`] on the channel.
295    #[cfg(feature = "crypto")]
296    fn open_secure_channel_secured(&mut self, incoming: &[u8]) -> Result<Vec<u8>, ServerError> {
297        let sec = self
298            .security
299            .as_mut()
300            .ok_or(ServerError::Protocol("no SecurityPolicy configured"))?;
301        let policy = sec.policy;
302        let mode = sec.mode;
303
304        // 1. Open the client's asymmetric OPN (decrypt with our key, verify with
305        //    the trusted client public key).
306        let opened =
307            open_asymmetric_chunk(policy, &sec.private_key, &sec.client_public_key, incoming)
308                .map_err(|_| ServerError::Protocol("secured OPN open/verify failed"))?;
309        let mut br = UaReader::new(&opened.body);
310        let _type_id = NodeId::decode(&mut br)?;
311        let req = OpenSecureChannelRequest::decode_body(&mut br)?;
312        let client_nonce = req.client_nonce;
313
314        // 2. Generate the server nonce.
315        let nonce_len = policy.sym_enc_key_len().max(32);
316        let mut server_nonce = alloc::vec![0u8; nonce_len];
317        sec.rng.fill_bytes(&mut server_nonce);
318
319        // 3. Derive the §6.7.6 key sets (server send / receive).
320        let send_keys = derive_keys(policy, &client_nonce, &server_nonce)
321            .map_err(|_| ServerError::Protocol("key derivation failed"))?;
322        let recv_keys = derive_keys(policy, &server_nonce, &client_nonce)
323            .map_err(|_| ServerError::Protocol("key derivation failed"))?;
324
325        // 4. Allocate channel + token ids (disjoint field borrows from `sec`).
326        let channel_id = self.next_channel_id;
327        self.next_channel_id += 1;
328        let token_id = self.next_token_id;
329        self.next_token_id += 1;
330
331        // 5. Build the OpenSecureChannelResponse and frame it as a secured OPN.
332        let resp = OpenSecureChannelResponse {
333            response_header: ResponseHeader::new(req.request_header.request_handle, GOOD),
334            server_protocol_version: 0,
335            security_token: ChannelSecurityToken {
336                channel_id,
337                token_id,
338                created_at: 0,
339                revised_lifetime: req.requested_lifetime.max(60_000),
340            },
341            server_nonce,
342        };
343        let resp_body = resp.encode()?;
344        let mut channel = SecureChannel::new(channel_id, token_id);
345        let seq = channel.next_send_sequence();
346        let ctx = AsymmetricContext {
347            policy,
348            sender_certificate: &sec.certificate,
349            sender_private_key: &sec.private_key,
350            receiver_certificate: &sec.client_certificate,
351            receiver_public_key: &sec.client_public_key,
352        };
353        // `&mut dyn` is Sized and implements CryptoRngCore, satisfying the
354        // generic `R: CryptoRngCore` bound of the (Sized) chunk builder.
355        let mut rng_ref: &mut dyn CryptoRngCore = sec.rng.as_mut();
356        let out = build_asymmetric_chunk(
357            &mut rng_ref,
358            &ctx,
359            channel_id,
360            seq,
361            opened.request_id,
362            &resp_body,
363        )
364        .map_err(|_| ServerError::Protocol("secured OPN response build failed"))?;
365
366        // 6. Install the symmetric session for subsequent MSG/CLO.
367        channel.install_security(SecuritySession {
368            policy,
369            mode,
370            send_keys,
371            recv_keys,
372        });
373        self.channel = Some(channel);
374        Ok(out)
375    }
376
377    fn handle_service(&mut self, req: ServiceRequest) -> ServiceResponse {
378        match req {
379            ServiceRequest::CreateSession(r) => {
380                let session = self.next_session;
381                self.next_session += 1;
382                ServiceResponse::CreateSession(CreateSessionResponse {
383                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
384                    session_id: NodeId::numeric(1, session),
385                    authentication_token: NodeId::numeric(0, 0x1000_0000 + session),
386                    revised_session_timeout: if r.requested_session_timeout > 0.0 {
387                        r.requested_session_timeout
388                    } else {
389                        1_200_000.0
390                    },
391                    server_nonce: Vec::new(),
392                    server_certificate: Vec::new(),
393                    server_endpoints: alloc::vec![self.endpoint_none()],
394                    server_signature: SignatureData::default(),
395                    max_request_message_size: 0,
396                })
397            }
398            ServiceRequest::ActivateSession(r) => {
399                ServiceResponse::ActivateSession(ActivateSessionResponse {
400                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
401                    server_nonce: Vec::new(),
402                    results: Vec::new(),
403                })
404            }
405            ServiceRequest::CloseSession(r) => {
406                ServiceResponse::CloseSession(CloseSessionResponse {
407                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
408                })
409            }
410            ServiceRequest::Read(r) => {
411                let results = r
412                    .nodes_to_read
413                    .iter()
414                    .map(|rv| match self.address_space.value(&rv.node_id) {
415                        Some(dv) => dv.clone(),
416                        None => DataValue {
417                            value: None,
418                            status: Some(BAD_NODE_ID_UNKNOWN),
419                            source_timestamp: None,
420                            server_timestamp: None,
421                            source_pico_sec: None,
422                            server_pico_sec: None,
423                        },
424                    })
425                    .collect();
426                ServiceResponse::Read(ReadResponse {
427                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
428                    results,
429                })
430            }
431            ServiceRequest::Write(r) => {
432                let results = r
433                    .nodes_to_write
434                    .iter()
435                    .map(|wv| {
436                        if wv.attribute_id == ATTRIBUTE_VALUE {
437                            self.address_space
438                                .set_value(wv.node_id.clone(), wv.value.clone());
439                            GOOD
440                        } else {
441                            BAD_ATTRIBUTE_ID_INVALID
442                        }
443                    })
444                    .collect();
445                ServiceResponse::Write(WriteResponse {
446                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
447                    results,
448                })
449            }
450            ServiceRequest::Browse(r) => {
451                let results = r
452                    .nodes_to_browse
453                    .iter()
454                    .map(|bd| self.browse_one(bd))
455                    .collect();
456                ServiceResponse::Browse(BrowseResponse {
457                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
458                    results,
459                })
460            }
461            ServiceRequest::GetEndpoints(r) => {
462                ServiceResponse::GetEndpoints(GetEndpointsResponse {
463                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
464                    endpoints: alloc::vec![self.endpoint_none()],
465                })
466            }
467            ServiceRequest::FindServers(r) => ServiceResponse::FindServers(FindServersResponse {
468                response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
469                servers: alloc::vec![self.endpoint_none().server],
470            }),
471            ServiceRequest::CreateSubscription(r) => {
472                let id = self.next_subscription_id;
473                self.next_subscription_id += 1;
474                self.subscriptions.push(Subscription {
475                    id,
476                    publishing_enabled: r.publishing_enabled,
477                    seq: 0,
478                    next_item_id: 1,
479                    items: Vec::new(),
480                });
481                let interval = if r.requested_publishing_interval > 0.0 {
482                    r.requested_publishing_interval
483                } else {
484                    1000.0
485                };
486                ServiceResponse::CreateSubscription(CreateSubscriptionResponse {
487                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
488                    subscription_id: id,
489                    revised_publishing_interval: interval,
490                    revised_lifetime_count: r.requested_lifetime_count.max(10_000),
491                    revised_max_keep_alive_count: r.requested_max_keep_alive_count.max(10),
492                })
493            }
494            ServiceRequest::SetPublishingMode(r) => {
495                let enabled = r.publishing_enabled;
496                let results = r
497                    .subscription_ids
498                    .iter()
499                    .map(|sid| {
500                        if let Some(s) = self.subscriptions.iter_mut().find(|s| s.id == *sid) {
501                            s.publishing_enabled = enabled;
502                            GOOD
503                        } else {
504                            BAD_SUBSCRIPTION_ID_INVALID
505                        }
506                    })
507                    .collect();
508                ServiceResponse::SetPublishingMode(SetPublishingModeResponse {
509                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
510                    results,
511                })
512            }
513            ServiceRequest::CreateMonitoredItems(r) => {
514                let handle = r.request_header.request_handle;
515                match self
516                    .subscriptions
517                    .iter_mut()
518                    .find(|s| s.id == r.subscription_id)
519                {
520                    None => ServiceResponse::CreateMonitoredItems(CreateMonitoredItemsResponse {
521                        response_header: ResponseHeader::new(handle, BAD_SUBSCRIPTION_ID_INVALID),
522                        results: Vec::new(),
523                    }),
524                    Some(sub) => {
525                        let results = r
526                            .items_to_create
527                            .iter()
528                            .map(|it| {
529                                let mid = sub.next_item_id;
530                                sub.next_item_id += 1;
531                                let p = &it.requested_parameters;
532                                sub.items.push(MonitoredItem {
533                                    client_handle: p.client_handle,
534                                    node_id: it.item_to_monitor.node_id.clone(),
535                                    last_value: None,
536                                });
537                                MonitoredItemCreateResult {
538                                    status_code: GOOD,
539                                    monitored_item_id: mid,
540                                    revised_sampling_interval: if p.sampling_interval > 0.0 {
541                                        p.sampling_interval
542                                    } else {
543                                        1000.0
544                                    },
545                                    revised_queue_size: p.queue_size.max(1),
546                                    filter_result: null_filter(),
547                                }
548                            })
549                            .collect();
550                        ServiceResponse::CreateMonitoredItems(CreateMonitoredItemsResponse {
551                            response_header: ResponseHeader::new(handle, GOOD),
552                            results,
553                        })
554                    }
555                }
556            }
557            ServiceRequest::Publish(r) => {
558                ServiceResponse::Publish(self.publish_one(r.request_header.request_handle))
559            }
560            ServiceRequest::DeleteSubscriptions(r) => {
561                let results = r
562                    .subscription_ids
563                    .iter()
564                    .map(|sid| {
565                        if let Some(pos) = self.subscriptions.iter().position(|s| s.id == *sid) {
566                            self.subscriptions.remove(pos);
567                            GOOD
568                        } else {
569                            BAD_SUBSCRIPTION_ID_INVALID
570                        }
571                    })
572                    .collect();
573                ServiceResponse::DeleteSubscriptions(DeleteSubscriptionsResponse {
574                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
575                    results,
576                })
577            }
578            ServiceRequest::Call(r) => {
579                let results = r
580                    .methods_to_call
581                    .iter()
582                    .map(
583                        |m| match self.address_space.call(&m.method_id, &m.input_arguments) {
584                            Some(outcome) => CallMethodResult {
585                                status_code: outcome.status_code,
586                                input_argument_results: m
587                                    .input_arguments
588                                    .iter()
589                                    .map(|_| GOOD)
590                                    .collect(),
591                                output_arguments: outcome.output_arguments,
592                            },
593                            None => CallMethodResult {
594                                status_code: BAD_METHOD_INVALID,
595                                input_argument_results: Vec::new(),
596                                output_arguments: Vec::new(),
597                            },
598                        },
599                    )
600                    .collect();
601                ServiceResponse::Call(CallResponse {
602                    response_header: ResponseHeader::new(r.request_header.request_handle, GOOD),
603                    results,
604                })
605            }
606        }
607    }
608
609    /// Browses one [`BrowseDescription`] against the AddressSpace, honouring the
610    /// reference-type filter, node-class mask and result mask.
611    fn browse_one(&self, bd: &BrowseDescription) -> BrowseResult {
612        // An unknown source node is reported per Part 4 §5.9.2.
613        if self.address_space.node_meta(&bd.node_id).is_none() {
614            return BrowseResult {
615                status_code: BAD_NODE_ID_UNKNOWN,
616                continuation_point: Vec::new(),
617                references: Vec::new(),
618            };
619        }
620        let null_id = NodeId::numeric(0, 0);
621        let filter = if bd.reference_type_id == null_id {
622            None
623        } else {
624            Some(&bd.reference_type_id)
625        };
626        let matches = self.address_space.browse(
627            &bd.node_id,
628            bd.browse_direction,
629            filter,
630            bd.include_subtypes,
631            bd.node_class_mask,
632        );
633        let mask = bd.result_mask;
634        let empty_qn = QualifiedName {
635            namespace_index: 0,
636            name: alloc::string::String::new(),
637        };
638        let empty_lt = LocalizedText {
639            locale: None,
640            text: None,
641        };
642        let expanded = |id: NodeId| ExpandedNodeId {
643            node_id: id,
644            namespace_uri: alloc::string::String::new(),
645            server_index: 0,
646        };
647        let references = matches
648            .into_iter()
649            .map(|m| ReferenceDescription {
650                reference_type_id: if mask & 0x01 != 0 {
651                    m.reference_type
652                } else {
653                    null_id.clone()
654                },
655                is_forward: mask & 0x02 != 0 && m.is_forward,
656                node_id: expanded(m.target.node_id),
657                browse_name: if mask & 0x08 != 0 {
658                    m.target.browse_name
659                } else {
660                    empty_qn.clone()
661                },
662                display_name: if mask & 0x10 != 0 {
663                    m.target.display_name
664                } else {
665                    empty_lt.clone()
666                },
667                node_class: if mask & 0x04 != 0 {
668                    m.target.node_class.as_i32()
669                } else {
670                    0
671                },
672                type_definition: expanded(if mask & 0x20 != 0 {
673                    m.target.type_definition
674                } else {
675                    null_id.clone()
676                }),
677            })
678            .collect();
679        BrowseResult {
680            status_code: GOOD,
681            continuation_point: Vec::new(),
682            references,
683        }
684    }
685
686    /// Handles a Publish by sampling the first publishing-enabled
687    /// subscription's monitored items against the AddressSpace and returning a
688    /// `DataChangeNotification` for the values that changed since last reported
689    /// (sample-on-publish, suited to a synchronous transport).
690    fn publish_one(&mut self, request_handle: u32) -> PublishResponse {
691        let empty = NotificationMessage {
692            sequence_number: 0,
693            publish_time: 0,
694            notification_data: Vec::new(),
695        };
696        let addr = &self.address_space;
697        let Some(sub) = self.subscriptions.iter_mut().find(|s| s.publishing_enabled) else {
698            return PublishResponse {
699                response_header: ResponseHeader::new(request_handle, GOOD),
700                subscription_id: 0,
701                available_sequence_numbers: Vec::new(),
702                more_notifications: false,
703                notification_message: empty,
704                results: Vec::new(),
705            };
706        };
707        let mut notes = Vec::new();
708        for item in &mut sub.items {
709            let current = addr.value(&item.node_id).cloned();
710            if current != item.last_value {
711                item.last_value.clone_from(&current);
712                if let Some(v) = current {
713                    notes.push(MonitoredItemNotification {
714                        client_handle: item.client_handle,
715                        value: v,
716                    });
717                }
718            }
719        }
720        sub.seq += 1;
721        let seq = sub.seq;
722        let sub_id = sub.id;
723        let notification_data = if notes.is_empty() {
724            Vec::new()
725        } else {
726            match (DataChangeNotification {
727                monitored_items: notes,
728            })
729            .to_extension_object()
730            {
731                Ok(eo) => alloc::vec![eo],
732                Err(_) => Vec::new(),
733            }
734        };
735        PublishResponse {
736            response_header: ResponseHeader::new(request_handle, GOOD),
737            subscription_id: sub_id,
738            available_sequence_numbers: alloc::vec![seq],
739            more_notifications: false,
740            notification_message: NotificationMessage {
741                sequence_number: seq,
742                publish_time: 0,
743                notification_data,
744            },
745            results: Vec::new(),
746        }
747    }
748
749    fn endpoint_none(&self) -> EndpointDescription {
750        use zerodds_opcua_pubsub::uadp::datatypes::ApplicationDescription;
751        EndpointDescription {
752            endpoint_url: self.endpoint_url.clone(),
753            server: ApplicationDescription {
754                application_uri: alloc::string::String::from("urn:zerodds:opcua-server"),
755                product_uri: alloc::string::String::from("urn:zerodds"),
756                application_name: zerodds_opcua_gateway::types::LocalizedText {
757                    locale: None,
758                    text: Some(alloc::string::String::from("ZeroDDS OPC-UA Server")),
759                },
760                application_type: ApplicationType::Server,
761                gateway_server_uri: alloc::string::String::new(),
762                discovery_profile_uri: alloc::string::String::new(),
763                discovery_urls: alloc::vec![self.endpoint_url.clone()],
764            },
765            server_certificate: Vec::new(),
766            security_mode: MessageSecurityMode::None,
767            security_policy_uri: alloc::string::String::from(SECURITY_POLICY_NONE),
768            user_identity_tokens: Vec::new(),
769            transport_profile_uri: alloc::string::String::from(
770                "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary",
771            ),
772            security_level: 0,
773        }
774    }
775}