up_rust/
utransport.rs

1/********************************************************************************
2 * Copyright (c) 2023 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use std::fmt::{Debug, Formatter};
15use std::hash::{Hash, Hasher};
16use std::num::TryFromIntError;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21
22use crate::{UCode, UMessage, UStatus, UUri};
23
24/// Verifies that given UUris can be used as source and sink filter UUris
25/// for registering listeners.
26///
27/// This function is helpful for implementing [`UTransport`] in accordance with the
28/// uProtocol Transport Layer specification.
29///
30/// # Errors
31///
32/// Returns a [`UStatus`] with a [`UCode::INVALID_ARGUMENT`] and a corresponding detail
33/// message, if any of the given UUris cannot be used as filter criteria.
34///
35pub fn verify_filter_criteria(
36    source_filter: &UUri,
37    sink_filter: Option<&UUri>,
38) -> Result<(), UStatus> {
39    UUri::check_validity(source_filter).map_err(|err| {
40        UStatus::fail_with_code(
41            UCode::INVALID_ARGUMENT,
42            format!("invalid source filter URI: {err}"),
43        )
44    })?;
45    if let Some(sink_filter_uuri) = sink_filter {
46        UUri::check_validity(sink_filter_uuri).map_err(|err| {
47            UStatus::fail_with_code(
48                UCode::INVALID_ARGUMENT,
49                format!("invalid sink filter URI: {err}"),
50            )
51        })?;
52
53        if sink_filter_uuri.is_notification_destination()
54            && source_filter.is_notification_destination()
55        {
56            return Err(UStatus::fail_with_code(
57                UCode::INVALID_ARGUMENT,
58                "source and sink filters must not both have resource ID 0",
59            ));
60        }
61        if sink_filter_uuri.is_rpc_method()
62            && !source_filter.has_wildcard_resource_id()
63            && !source_filter.is_notification_destination()
64        {
65            return Err(UStatus::fail_with_code(
66                UCode::INVALID_ARGUMENT,
67                "source filter must either have the wildcard resource ID or resource ID 0, if sink filter matches RPC method resource ID"));
68        }
69    } else if !source_filter.has_wildcard_resource_id() && !source_filter.is_event() {
70        return Err(UStatus::fail_with_code(
71            UCode::INVALID_ARGUMENT,
72            "source filter must either have the wildcard resource ID or a resource ID from topic range, if sink filter is empty"));
73    }
74    // everything else might match valid messages
75    Ok(())
76}
77
78/// A factory for URIs representing this uEntity's resources.
79///
80/// Implementations may use arbitrary mechanisms to determine the information that
81/// is necessary for creating URIs, e.g. environment variables, configuration files etc.
82// [impl->dsn~localuriprovider-declaration~1]
83#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
84pub trait LocalUriProvider: Send + Sync {
85    /// Gets the _authority_ used for URIs representing this uEntity's resources.
86    fn get_authority(&self) -> String;
87    /// Gets a URI that represents a given resource of this uEntity.
88    fn get_resource_uri(&self, resource_id: u16) -> UUri;
89    /// Gets the URI that represents the resource that this uEntity expects
90    /// RPC responses and notifications to be sent to.
91    fn get_source_uri(&self) -> UUri;
92}
93
94/// A URI provider that is statically configured with the uEntity's authority, entity ID and version.
95pub struct StaticUriProvider {
96    local_uri: UUri,
97}
98
99impl StaticUriProvider {
100    /// Creates a new URI provider from static information.
101    ///
102    /// # Arguments
103    ///
104    /// * `authority` - The uEntity's authority name.
105    /// * `entity_id` - The entity identifier.
106    /// * `major_version` - The uEntity's major version.
107    ///
108    /// # Examples
109    ///
110    /// ```rust
111    /// use up_rust::{LocalUriProvider, StaticUriProvider};
112    ///
113    /// let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
114    /// assert_eq!(provider.get_authority(), "my-vehicle");
115    /// ```
116    pub fn new(authority: impl Into<String>, entity_id: u32, major_version: u8) -> Self {
117        let local_uri = UUri {
118            authority_name: authority.into(),
119            ue_id: entity_id,
120            ue_version_major: major_version as u32,
121            resource_id: 0x0000,
122            ..Default::default()
123        };
124        StaticUriProvider { local_uri }
125    }
126}
127
128impl LocalUriProvider for StaticUriProvider {
129    fn get_authority(&self) -> String {
130        self.local_uri.authority_name.clone()
131    }
132
133    fn get_resource_uri(&self, resource_id: u16) -> UUri {
134        let mut uri = self.local_uri.clone();
135        uri.resource_id = resource_id as u32;
136        uri
137    }
138
139    fn get_source_uri(&self) -> UUri {
140        self.local_uri.clone()
141    }
142}
143
144impl TryFrom<UUri> for StaticUriProvider {
145    type Error = TryFromIntError;
146    fn try_from(value: UUri) -> Result<Self, Self::Error> {
147        Self::try_from(&value)
148    }
149}
150
151impl TryFrom<&UUri> for StaticUriProvider {
152    type Error = TryFromIntError;
153    /// Creates a URI provider from a UUri.
154    ///
155    /// # Arguments
156    ///
157    /// * `source_uri` - The UUri to take the entity's authority, entity ID and version information from.
158    ///   The UUri's resource ID is ignored.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the given UUri's major version property is not a `u8`.
163    ///
164    /// # Examples
165    ///
166    /// ```rust
167    /// use up_rust::{LocalUriProvider, StaticUriProvider, UUri};
168    ///
169    /// let source_uri = UUri::try_from("//my-vehicle/4210/5/0").unwrap();
170    /// assert!(StaticUriProvider::try_from(&source_uri).is_ok());
171    /// ```
172    ///
173    /// ## Invalid Major Version
174    ///
175    /// ```rust
176    /// use up_rust::{LocalUriProvider, StaticUriProvider, UUri};
177    ///
178    /// let uuri_with_invalid_version = UUri {
179    ///   authority_name: "".to_string(),
180    ///   ue_id: 0x5430,
181    ///   ue_version_major: 0x1234, // not a u8
182    ///   resource_id: 0x0000,
183    ///   ..Default::default()
184    /// };
185    /// assert!(StaticUriProvider::try_from(uuri_with_invalid_version).is_err());
186    /// ```
187    fn try_from(source_uri: &UUri) -> Result<Self, Self::Error> {
188        let major_version = u8::try_from(source_uri.ue_version_major)?;
189        Ok(StaticUriProvider::new(
190            &source_uri.authority_name,
191            source_uri.ue_id,
192            major_version,
193        ))
194    }
195}
196
197/// A handler for processing uProtocol messages.
198///
199/// Implementations contain the details for what should occur when a message is received.
200///
201/// Please refer to the [uProtocol Transport Layer specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.7/up-l1/README.adoc)
202/// for details.
203// [impl->dsn~ulistener-declaration~1]
204#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
205#[async_trait]
206pub trait UListener: Send + Sync {
207    /// Performs some action on receipt of a message.
208    ///
209    /// # Parameters
210    ///
211    /// * `msg` - The message to process.
212    ///
213    /// # Implementation hints
214    ///
215    /// This function is expected to return almost immediately. If it does not, it could potentially
216    /// block processing of succeeding messages. Long-running operations for processing a message should
217    /// therefore be run on a separate thread.
218    async fn on_receive(&self, msg: UMessage);
219}
220
221/// The uProtocol Transport Layer interface that provides a common API for uEntity developers to send and
222/// receive messages.
223///
224/// Implementations contain the details for connecting to the underlying transport technology and
225/// sending [`UMessage`]s using the configured technology.
226///
227/// Please refer to the [uProtocol Transport Layer specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.7/up-l1/README.adoc)
228/// for details.
229// [impl->dsn~utransport-declaration~1]
230#[async_trait]
231pub trait UTransport: Send + Sync {
232    /// Sends a message using this transport's message exchange mechanism.
233    ///
234    /// # Arguments
235    ///
236    /// * `message` - The message to send. The `type`, `source` and `sink` properties of the
237    ///   [UAttributes](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.7/basics/uattributes.adoc) contained
238    ///   in the message determine the addressing semantics.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if the message could not be sent.
243    async fn send(&self, message: UMessage) -> Result<(), UStatus>;
244
245    /// Receives a message from the transport.
246    ///
247    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
248    ///
249    /// # Arguments
250    ///
251    /// * `source_filter` - The _source_ address pattern that the message to receive needs to match.
252    /// * `sink_filter` - The _sink_ address pattern that the message to receive needs to match,
253    ///                   or `None` to indicate that the message must not contain any sink address.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if no message could be received, e.g. because no message matches the given addresses.
258    async fn receive(
259        &self,
260        _source_filter: &UUri,
261        _sink_filter: Option<&UUri>,
262    ) -> Result<UMessage, UStatus> {
263        Err(UStatus::fail_with_code(
264            UCode::UNIMPLEMENTED,
265            "not implemented",
266        ))
267    }
268
269    /// Registers a listener to be called for messages.
270    ///
271    /// The listener will be invoked for each message that matches the given source and sink filter patterns
272    /// according to the rules defined by the [UUri specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.7/basics/uri.adoc).
273    ///
274    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
275    ///
276    /// # Arguments
277    ///
278    /// * `source_filter` - The _source_ address pattern that messages need to match.
279    /// * `sink_filter` - The _sink_ address pattern that messages need to match,
280    ///                   or `None` to match messages that do not contain any sink address.
281    /// * `listener` - The listener to invoke.
282    ///                The listener can be unregistered again using [`UTransport::unregister_listener`].
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the listener could not be registered.
287    async fn register_listener(
288        &self,
289        _source_filter: &UUri,
290        _sink_filter: Option<&UUri>,
291        _listener: Arc<dyn UListener>,
292    ) -> Result<(), UStatus> {
293        Err(UStatus::fail_with_code(
294            UCode::UNIMPLEMENTED,
295            "not implemented",
296        ))
297    }
298
299    /// Deregisters a message listener.
300    ///
301    /// The listener will no longer be called for any (matching) messages after this function has
302    /// returned successfully.
303    ///
304    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
305    ///
306    /// # Arguments
307    ///
308    /// * `source_filter` - The _source_ address pattern that the listener had been registered for.
309    /// * `sink_filter` - The _sink_ address pattern that the listener had been registered for.
310    /// * `listener` - The listener to unregister.
311    ///
312    /// # Errors
313    ///
314    /// Returns an error if the listener could not be unregistered, for example if the given listener does not exist.
315    async fn unregister_listener(
316        &self,
317        _source_filter: &UUri,
318        _sink_filter: Option<&UUri>,
319        _listener: Arc<dyn UListener>,
320    ) -> Result<(), UStatus> {
321        Err(UStatus::fail_with_code(
322            UCode::UNIMPLEMENTED,
323            "not implemented",
324        ))
325    }
326}
327
328#[cfg(not(tarpaulin_include))]
329#[cfg(any(test, feature = "test-util"))]
330mockall::mock! {
331    /// This extra struct is necessary in order to comply with mockall's requirements regarding the parameter lifetimes
332    /// see <https://github.com/asomers/mockall/issues/571>
333    pub Transport {
334        pub async fn do_send(&self, message: UMessage) -> Result<(), UStatus>;
335        pub async fn do_register_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
336        pub async fn do_unregister_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
337    }
338}
339
340#[cfg(not(tarpaulin_include))]
341#[cfg(any(test, feature = "test-util"))]
342#[async_trait]
343/// This delegates the invocation of the UTransport functions to the mocked functions of the Transport struct.
344/// see <https://github.com/asomers/mockall/issues/571>
345impl UTransport for MockTransport {
346    async fn send(&self, message: UMessage) -> Result<(), UStatus> {
347        self.do_send(message).await
348    }
349    async fn register_listener(
350        &self,
351        source_filter: &UUri,
352        sink_filter: Option<&UUri>,
353        listener: Arc<dyn UListener>,
354    ) -> Result<(), UStatus> {
355        self.do_register_listener(source_filter, sink_filter, listener)
356            .await
357    }
358    async fn unregister_listener(
359        &self,
360        source_filter: &UUri,
361        sink_filter: Option<&UUri>,
362        listener: Arc<dyn UListener>,
363    ) -> Result<(), UStatus> {
364        self.do_unregister_listener(source_filter, sink_filter, listener)
365            .await
366    }
367}
368
369/// A wrapper type that allows comparing [`UListener`]s to each other.
370///
371/// # Note
372///
373/// Not necessary for end-user uEs to use. Primarily intended for `up-client-foo-rust` UPClient libraries
374/// when implementing [`UTransport`].
375///
376/// # Rationale
377///
378/// The wrapper type is implemented such that it can be used in any location you may wish to
379/// hold a type implementing [`UListener`].
380///
381/// Implements necessary traits to allow hashing, so that you may hold the wrapper type in
382/// collections which require that, such as a `HashMap` or `HashSet`
383#[derive(Clone)]
384pub struct ComparableListener {
385    listener: Arc<dyn UListener>,
386}
387
388impl ComparableListener {
389    pub fn new(listener: Arc<dyn UListener>) -> Self {
390        Self { listener }
391    }
392    /// Gets a clone of the wrapped reference to the listener.
393    pub fn into_inner(&self) -> Arc<dyn UListener> {
394        self.listener.clone()
395    }
396
397    /// Allows us to get the pointer address of this `ComparableListener` on the heap
398    fn pointer_address(&self) -> usize {
399        // Obtain the raw pointer from the Arc
400        let ptr = Arc::as_ptr(&self.listener);
401        // Cast the fat pointer to a raw thin pointer to ()
402        let thin_ptr = ptr as *const ();
403        // Convert the thin pointer to a usize
404        thin_ptr as usize
405    }
406}
407
408impl Deref for ComparableListener {
409    type Target = dyn UListener;
410
411    fn deref(&self) -> &Self::Target {
412        &*self.listener
413    }
414}
415
416impl Hash for ComparableListener {
417    /// Feeds the pointer to the listener held by `self` into the given [`Hasher`].
418    ///
419    /// This is consistent with the implementation of [`ComparableListener::eq`].
420    fn hash<H: Hasher>(&self, state: &mut H) {
421        Arc::as_ptr(&self.listener).hash(state);
422    }
423}
424
425impl PartialEq for ComparableListener {
426    /// Compares this listener to another listener.
427    ///
428    /// # Returns
429    ///
430    /// `true` if the pointer to the listener held by `self` is equal to the pointer held by `other`.
431    /// This is consistent with the implementation of [`ComparableListener::hash`].
432    fn eq(&self, other: &Self) -> bool {
433        Arc::ptr_eq(&self.listener, &other.listener)
434    }
435}
436
437impl Eq for ComparableListener {}
438
439impl Debug for ComparableListener {
440    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
441        write!(f, "ComparableListener: {}", self.pointer_address())
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use crate::{ComparableListener, UListener, UMessage};
448    use std::{
449        hash::{DefaultHasher, Hash, Hasher},
450        ops::Deref,
451        str::FromStr,
452        sync::Arc,
453    };
454
455    use super::*;
456
457    #[test]
458    fn test_static_uri_provider_get_source() {
459        let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
460        let source_uri = provider.get_source_uri();
461        assert_eq!(source_uri.authority_name, "my-vehicle");
462        assert_eq!(source_uri.ue_id, 0x4210);
463        assert_eq!(source_uri.ue_version_major, 0x05);
464        assert_eq!(source_uri.resource_id, 0x0000);
465    }
466
467    #[test]
468    fn test_static_uri_provider_get_resource() {
469        let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
470        let resource_uri = provider.get_resource_uri(0x1234);
471        assert_eq!(resource_uri.authority_name, "my-vehicle");
472        assert_eq!(resource_uri.ue_id, 0x4210);
473        assert_eq!(resource_uri.ue_version_major, 0x05);
474        assert_eq!(resource_uri.resource_id, 0x1234);
475    }
476
477    #[tokio::test]
478    async fn test_deref_returns_wrapped_listener() {
479        let mut mock_listener = MockUListener::new();
480        mock_listener.expect_on_receive().once().return_const(());
481        let listener_one = Arc::new(mock_listener);
482        let comparable_listener_one = ComparableListener::new(listener_one);
483        comparable_listener_one
484            .deref()
485            .on_receive(UMessage::default())
486            .await;
487    }
488
489    #[tokio::test]
490    async fn test_to_inner_returns_reference_to_wrapped_listener() {
491        let mut mock_listener = MockUListener::new();
492        mock_listener.expect_on_receive().once().return_const(());
493        let listener_one = Arc::new(mock_listener);
494        let comparable_listener_one = ComparableListener::new(listener_one);
495        comparable_listener_one
496            .into_inner()
497            .on_receive(UMessage::default())
498            .await;
499    }
500
501    #[tokio::test]
502    async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_same_listener() {
503        let mut mock_listener = MockUListener::new();
504        mock_listener.expect_on_receive().times(2).return_const(());
505        let listener_one = Arc::new(mock_listener);
506        let listener_two = listener_one.clone();
507        listener_one.on_receive(UMessage::default()).await;
508        listener_two.on_receive(UMessage::default()).await;
509        let comparable_listener_one = ComparableListener::new(listener_one);
510        let comparable_listener_two = ComparableListener::new(listener_two);
511        assert!(&comparable_listener_one.eq(&comparable_listener_two));
512
513        let mut hasher = DefaultHasher::new();
514        comparable_listener_one.hash(&mut hasher);
515        let hash_one = hasher.finish();
516        let mut hasher = DefaultHasher::new();
517        comparable_listener_two.hash(&mut hasher);
518        let hash_two = hasher.finish();
519        assert_eq!(hash_one, hash_two);
520    }
521
522    #[tokio::test]
523    async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_different_listeners()
524    {
525        let mut mock_listener_one = MockUListener::new();
526        mock_listener_one
527            .expect_on_receive()
528            .once()
529            .return_const(());
530        let listener_one = Arc::new(mock_listener_one);
531        let mut mock_listener_two = MockUListener::new();
532        mock_listener_two
533            .expect_on_receive()
534            .once()
535            .return_const(());
536        let listener_two = Arc::new(mock_listener_two);
537        listener_one.on_receive(UMessage::default()).await;
538        listener_two.on_receive(UMessage::default()).await;
539        let comparable_listener_one = ComparableListener::new(listener_one);
540        let comparable_listener_two = ComparableListener::new(listener_two);
541        assert!(!&comparable_listener_one.eq(&comparable_listener_two));
542
543        let mut hasher = DefaultHasher::new();
544        comparable_listener_one.hash(&mut hasher);
545        let hash_one = hasher.finish();
546        let mut hasher = DefaultHasher::new();
547        comparable_listener_two.hash(&mut hasher);
548        let hash_two = hasher.finish();
549        assert_ne!(hash_one, hash_two);
550    }
551
552    #[tokio::test]
553    async fn test_utransport_default_implementations() {
554        struct EmptyTransport {}
555        #[async_trait::async_trait]
556        impl UTransport for EmptyTransport {
557            async fn send(&self, _message: UMessage) -> Result<(), UStatus> {
558                todo!()
559            }
560        }
561
562        let transport = EmptyTransport {};
563        let listener = Arc::new(MockUListener::new());
564
565        assert!(transport
566            .receive(&UUri::any(), None)
567            .await
568            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
569        assert!(transport
570            .register_listener(&UUri::any(), None, listener.clone())
571            .await
572            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
573        assert!(transport
574            .unregister_listener(&UUri::any(), None, listener)
575            .await
576            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
577    }
578
579    #[test]
580    fn test_comparable_listener_pointer_address() {
581        let bar = Arc::new(MockUListener::new());
582        let comp_listener = ComparableListener::new(bar);
583
584        let comp_listener_thread = comp_listener.clone();
585        let handle = std::thread::spawn(move || comp_listener_thread.pointer_address());
586
587        let comp_listener_address_other_thread = handle.join().unwrap();
588        let comp_listener_address_this_thread = comp_listener.pointer_address();
589
590        assert_eq!(
591            comp_listener_address_this_thread,
592            comp_listener_address_other_thread
593        );
594    }
595
596    #[test]
597    fn test_comparable_listener_debug_outputs() {
598        let bar = Arc::new(MockUListener::new());
599        let comp_listener = ComparableListener::new(bar);
600        let debug_output = format!("{comp_listener:?}");
601        assert!(!debug_output.is_empty());
602    }
603
604    #[test_case::test_case(
605        "//vehicle1/AA/1/FFFF",
606        Some("//vehicle2/BB/1/FFFF");
607        "source and sink both having wildcard resource ID")]
608    #[test_case::test_case(
609        "//vehicle1/AA/1/9000",
610        Some("//vehicle2/BB/1/0");
611        "sending notification")]
612    #[test_case::test_case(
613        "//vehicle1/AA/1/0",
614        Some("//vehicle2/BB/1/1");
615        "RPC method invocation")]
616    #[test_case::test_case(
617        "//vehicle1/AA/1/FFFF",
618        Some("//vehicle2/BB/1/1");
619        "receiving RPC requests using wildcard resource ID")]
620    #[test_case::test_case(
621        "//vehicle1/AA/1/0",
622        Some("//vehicle2/BB/1/1");
623        "receiving RPC requests using default resource ID")]
624    #[test_case::test_case(
625        "//vehicle1/AA/1/9000",
626        None;
627        "receiving events published to specific topic")]
628    #[test_case::test_case(
629        "//vehicle1/AA/1/FFFF",
630        None;
631        "receiving events published to any topic")]
632    fn test_verify_filter_criteria_succeeds_for(source: &str, sink: Option<&str>) {
633        let source_filter = UUri::from_str(source).expect("invalid source URI");
634        let sink_filter = sink.map(|s| UUri::from_str(s).expect("invalid sink URI"));
635        assert!(verify_filter_criteria(&source_filter, sink_filter.as_ref()).is_ok());
636    }
637
638    #[test_case::test_case(
639        UUri::from_str("//vehicle1/AA/1/0").unwrap(),
640        Some(UUri::from_str("//vehicle2/BB/1/0").unwrap());
641        "source and sink both having resource ID 0")]
642    #[test_case::test_case(
643        UUri::from_str("//vehicle1/AA/1/CC").unwrap(),
644        Some(UUri::from_str("//vehicle2/BB/1/1A").unwrap());
645        "sink is RPC but source has invalid resource ID")]
646    #[test_case::test_case(
647        UUri::from_str("//vehicle1/AA/1/CC").unwrap(),
648        None;
649        "sink is empty but source has non-topic resource ID")]
650    #[test_case::test_case(
651        UUri {
652            authority_name: "VEHICLE1".to_string(),
653            ue_id: 0x00AA,
654            ue_version_major: 0x01,
655            resource_id: 0x9000,
656            ..Default::default()
657        },
658        None;
659        "source has upper-case authority")]
660    #[test_case::test_case(
661        UUri::from_str("//vehicle1/AA/1/9000").unwrap(),
662        Some(UUri {
663            authority_name: "VEHICLE2".to_string(),
664            ue_id: 0x00BB,
665            ue_version_major: 0x01,
666            resource_id: 0x0000,
667            ..Default::default()
668        });
669        "sink has upper-case authority")]
670    fn test_verify_filter_criteria_fails_for(source_filter: UUri, sink_filter: Option<UUri>) {
671        assert!(verify_filter_criteria(&source_filter, sink_filter.as_ref())
672            .is_err_and(|err| matches!(err.get_code(), UCode::INVALID_ARGUMENT)));
673    }
674}