up_rust/
utransport.rs

1/********************************************************************************
2 * Copyright (c) 2023 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use std::fmt::{Debug, Formatter};
15use std::hash::{Hash, Hasher};
16use std::num::TryFromIntError;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21
22use crate::{UCode, UMessage, UStatus, UUri};
23
24/// Verifies that given UUris can be used as source and sink filter UUris
25/// for registering listeners.
26///
27/// This function is helpful for implementing [`UTransport`] in accordance with the
28/// uProtocol Transport Layer specification.
29///
30/// # Errors
31///
32/// Returns a [`UStatus`] with a [`UCode::INVALID_ARGUMENT`] and a corresponding detail
33/// message, if any of the given UUris cannot be used as filter criteria.
34///
35pub fn verify_filter_criteria(
36    source_filter: &UUri,
37    sink_filter: Option<&UUri>,
38) -> Result<(), UStatus> {
39    if let Some(sink_filter_uuri) = sink_filter {
40        if sink_filter_uuri.is_notification_destination()
41            && source_filter.is_notification_destination()
42        {
43            return Err(UStatus::fail_with_code(
44                UCode::INVALID_ARGUMENT,
45                "source and sink filters must not both have resource ID 0",
46            ));
47        }
48        if sink_filter_uuri.is_rpc_method()
49            && !source_filter.has_wildcard_resource_id()
50            && !source_filter.is_notification_destination()
51        {
52            return Err(UStatus::fail_with_code(
53                UCode::INVALID_ARGUMENT,
54                "source filter must either have the wildcard resource ID or resource ID 0, if sink filter matches RPC method resource ID"));
55        }
56    } else if !source_filter.has_wildcard_resource_id() && !source_filter.is_event() {
57        return Err(UStatus::fail_with_code(
58            UCode::INVALID_ARGUMENT,
59            "source filter must either have the wildcard resource ID or a resource ID from topic range, if sink filter is empty"));
60    }
61    // everything else might match valid messages
62    Ok(())
63}
64
65/// A factory for URIs representing this uEntity's resources.
66///
67/// Implementations may use arbitrary mechanisms to determine the information that
68/// is necessary for creating URIs, e.g. environment variables, configuration files etc.
69// [impl->dsn~localuriprovider-declaration~1]
70#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
71pub trait LocalUriProvider: Send + Sync {
72    /// Gets the _authority_ used for URIs representing this uEntity's resources.
73    fn get_authority(&self) -> String;
74    /// Gets a URI that represents a given resource of this uEntity.
75    fn get_resource_uri(&self, resource_id: u16) -> UUri;
76    /// Gets the URI that represents the resource that this uEntity expects
77    /// RPC responses and notifications to be sent to.
78    fn get_source_uri(&self) -> UUri;
79}
80
81/// A URI provider that is statically configured with the uEntity's authority, entity ID and version.
82pub struct StaticUriProvider {
83    local_uri: UUri,
84}
85
86impl StaticUriProvider {
87    /// Creates a new URI provider from static information.
88    ///
89    /// # Arguments
90    ///
91    /// * `authority` - The uEntity's authority name.
92    /// * `entity_id` - The entity identifier.
93    /// * `major_version` - The uEntity's major version.
94    ///
95    /// # Examples
96    ///
97    /// ```rust
98    /// use up_rust::{LocalUriProvider, StaticUriProvider};
99    ///
100    /// let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
101    /// assert_eq!(provider.get_authority(), "my-vehicle");
102    /// ```
103    pub fn new(authority: impl Into<String>, entity_id: u32, major_version: u8) -> Self {
104        let local_uri = UUri {
105            authority_name: authority.into(),
106            ue_id: entity_id,
107            ue_version_major: major_version as u32,
108            resource_id: 0x0000,
109            ..Default::default()
110        };
111        StaticUriProvider { local_uri }
112    }
113}
114
115impl LocalUriProvider for StaticUriProvider {
116    fn get_authority(&self) -> String {
117        self.local_uri.authority_name.clone()
118    }
119
120    fn get_resource_uri(&self, resource_id: u16) -> UUri {
121        let mut uri = self.local_uri.clone();
122        uri.resource_id = resource_id as u32;
123        uri
124    }
125
126    fn get_source_uri(&self) -> UUri {
127        self.local_uri.clone()
128    }
129}
130
131impl TryFrom<UUri> for StaticUriProvider {
132    type Error = TryFromIntError;
133    fn try_from(value: UUri) -> Result<Self, Self::Error> {
134        Self::try_from(&value)
135    }
136}
137
138impl TryFrom<&UUri> for StaticUriProvider {
139    type Error = TryFromIntError;
140    /// Creates a URI provider from a UUri.
141    ///
142    /// # Arguments
143    ///
144    /// * `source_uri` - The UUri to take the entity's authority, entity ID and version information from.
145    ///   The UUri's resource ID is ignored.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the given UUri's major version property is not a `u8`.
150    ///
151    /// # Examples
152    ///
153    /// ```rust
154    /// use up_rust::{LocalUriProvider, StaticUriProvider, UUri};
155    ///
156    /// let source_uri = UUri::try_from("//my-vehicle/4210/5/0").unwrap();
157    /// assert!(StaticUriProvider::try_from(&source_uri).is_ok());
158    /// ```
159    ///
160    /// ## Invalid Major Version
161    ///
162    /// ```rust
163    /// use up_rust::{LocalUriProvider, StaticUriProvider, UUri};
164    ///
165    /// let uuri_with_invalid_version = UUri {
166    ///   authority_name: "".to_string(),
167    ///   ue_id: 0x5430,
168    ///   ue_version_major: 0x1234, // not a u8
169    ///   resource_id: 0x0000,
170    ///   ..Default::default()
171    /// };
172    /// assert!(StaticUriProvider::try_from(uuri_with_invalid_version).is_err());
173    /// ```
174    fn try_from(source_uri: &UUri) -> Result<Self, Self::Error> {
175        let major_version = u8::try_from(source_uri.ue_version_major)?;
176        Ok(StaticUriProvider::new(
177            &source_uri.authority_name,
178            source_uri.ue_id,
179            major_version,
180        ))
181    }
182}
183
184/// A handler for processing uProtocol messages.
185///
186/// Implementations contain the details for what should occur when a message is received.
187///
188/// Please refer to the [uProtocol Transport Layer specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.6/up-l1/README.adoc)
189/// for details.
190// [impl->dsn~ulistener-declaration~1]
191#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
192#[async_trait]
193pub trait UListener: Send + Sync {
194    /// Performs some action on receipt of a message.
195    ///
196    /// # Parameters
197    ///
198    /// * `msg` - The message to process.
199    ///
200    /// # Implementation hints
201    ///
202    /// This function is expected to return almost immediately. If it does not, it could potentially
203    /// block processing of succeeding messages. Long-running operations for processing a message should
204    /// therefore be run on a separate thread.
205    async fn on_receive(&self, msg: UMessage);
206}
207
208/// The uProtocol Transport Layer interface that provides a common API for uEntity developers to send and
209/// receive messages.
210///
211/// Implementations contain the details for connecting to the underlying transport technology and
212/// sending [`UMessage`]s using the configured technology.
213///
214/// Please refer to the [uProtocol Transport Layer specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.6/up-l1/README.adoc)
215/// for details.
216// [impl->dsn~utransport-declaration~1]
217#[async_trait]
218pub trait UTransport: Send + Sync {
219    /// Sends a message using this transport's message exchange mechanism.
220    ///
221    /// # Arguments
222    ///
223    /// * `message` - The message to send. The `type`, `source` and `sink` properties of the
224    ///   [UAttributes](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.6/basics/uattributes.adoc) contained
225    ///   in the message determine the addressing semantics.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if the message could not be sent.
230    async fn send(&self, message: UMessage) -> Result<(), UStatus>;
231
232    /// Receives a message from the transport.
233    ///
234    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
235    ///
236    /// # Arguments
237    ///
238    /// * `source_filter` - The _source_ address pattern that the message to receive needs to match.
239    /// * `sink_filter` - The _sink_ address pattern that the message to receive needs to match,
240    ///                   or `None` to indicate that the message must not contain any sink address.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if no message could be received, e.g. because no message matches the given addresses.
245    async fn receive(
246        &self,
247        _source_filter: &UUri,
248        _sink_filter: Option<&UUri>,
249    ) -> Result<UMessage, UStatus> {
250        Err(UStatus::fail_with_code(
251            UCode::UNIMPLEMENTED,
252            "not implemented",
253        ))
254    }
255
256    /// Registers a listener to be called for messages.
257    ///
258    /// The listener will be invoked for each message that matches the given source and sink filter patterns
259    /// according to the rules defined by the [UUri specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.6/basics/uri.adoc).
260    ///
261    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
262    ///
263    /// # Arguments
264    ///
265    /// * `source_filter` - The _source_ address pattern that messages need to match.
266    /// * `sink_filter` - The _sink_ address pattern that messages need to match,
267    ///                   or `None` to match messages that do not contain any sink address.
268    /// * `listener` - The listener to invoke.
269    ///                The listener can be unregistered again using [`UTransport::unregister_listener`].
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the listener could not be registered.
274    async fn register_listener(
275        &self,
276        _source_filter: &UUri,
277        _sink_filter: Option<&UUri>,
278        _listener: Arc<dyn UListener>,
279    ) -> Result<(), UStatus> {
280        Err(UStatus::fail_with_code(
281            UCode::UNIMPLEMENTED,
282            "not implemented",
283        ))
284    }
285
286    /// Deregisters a message listener.
287    ///
288    /// The listener will no longer be called for any (matching) messages after this function has
289    /// returned successfully.
290    ///
291    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
292    ///
293    /// # Arguments
294    ///
295    /// * `source_filter` - The _source_ address pattern that the listener had been registered for.
296    /// * `sink_filter` - The _sink_ address pattern that the listener had been registered for.
297    /// * `listener` - The listener to unregister.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if the listener could not be unregistered, for example if the given listener does not exist.
302    async fn unregister_listener(
303        &self,
304        _source_filter: &UUri,
305        _sink_filter: Option<&UUri>,
306        _listener: Arc<dyn UListener>,
307    ) -> Result<(), UStatus> {
308        Err(UStatus::fail_with_code(
309            UCode::UNIMPLEMENTED,
310            "not implemented",
311        ))
312    }
313}
314
315#[cfg(not(tarpaulin_include))]
316#[cfg(any(test, feature = "test-util"))]
317mockall::mock! {
318    /// This extra struct is necessary in order to comply with mockall's requirements regarding the parameter lifetimes
319    /// see <https://github.com/asomers/mockall/issues/571>
320    pub Transport {
321        pub async fn do_send(&self, message: UMessage) -> Result<(), UStatus>;
322        pub async fn do_register_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
323        pub async fn do_unregister_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
324    }
325}
326
327#[cfg(not(tarpaulin_include))]
328#[cfg(any(test, feature = "test-util"))]
329#[async_trait]
330/// This delegates the invocation of the UTransport functions to the mocked functions of the Transport struct.
331/// see <https://github.com/asomers/mockall/issues/571>
332impl UTransport for MockTransport {
333    async fn send(&self, message: UMessage) -> Result<(), UStatus> {
334        self.do_send(message).await
335    }
336    async fn register_listener(
337        &self,
338        source_filter: &UUri,
339        sink_filter: Option<&UUri>,
340        listener: Arc<dyn UListener>,
341    ) -> Result<(), UStatus> {
342        self.do_register_listener(source_filter, sink_filter, listener)
343            .await
344    }
345    async fn unregister_listener(
346        &self,
347        source_filter: &UUri,
348        sink_filter: Option<&UUri>,
349        listener: Arc<dyn UListener>,
350    ) -> Result<(), UStatus> {
351        self.do_unregister_listener(source_filter, sink_filter, listener)
352            .await
353    }
354}
355
356/// A wrapper type that allows comparing [`UListener`]s to each other.
357///
358/// # Note
359///
360/// Not necessary for end-user uEs to use. Primarily intended for `up-client-foo-rust` UPClient libraries
361/// when implementing [`UTransport`].
362///
363/// # Rationale
364///
365/// The wrapper type is implemented such that it can be used in any location you may wish to
366/// hold a type implementing [`UListener`].
367///
368/// Implements necessary traits to allow hashing, so that you may hold the wrapper type in
369/// collections which require that, such as a `HashMap` or `HashSet`
370#[derive(Clone)]
371pub struct ComparableListener {
372    listener: Arc<dyn UListener>,
373}
374
375impl ComparableListener {
376    pub fn new(listener: Arc<dyn UListener>) -> Self {
377        Self { listener }
378    }
379    /// Gets a clone of the wrapped reference to the listener.
380    pub fn into_inner(&self) -> Arc<dyn UListener> {
381        self.listener.clone()
382    }
383
384    /// Allows us to get the pointer address of this `ComparableListener` on the heap
385    fn pointer_address(&self) -> usize {
386        // Obtain the raw pointer from the Arc
387        let ptr = Arc::as_ptr(&self.listener);
388        // Cast the fat pointer to a raw thin pointer to ()
389        let thin_ptr = ptr as *const ();
390        // Convert the thin pointer to a usize
391        thin_ptr as usize
392    }
393}
394
395impl Deref for ComparableListener {
396    type Target = dyn UListener;
397
398    fn deref(&self) -> &Self::Target {
399        &*self.listener
400    }
401}
402
403impl Hash for ComparableListener {
404    /// Feeds the pointer to the listener held by `self` into the given [`Hasher`].
405    ///
406    /// This is consistent with the implementation of [`ComparableListener::eq`].
407    fn hash<H: Hasher>(&self, state: &mut H) {
408        Arc::as_ptr(&self.listener).hash(state);
409    }
410}
411
412impl PartialEq for ComparableListener {
413    /// Compares this listener to another listener.
414    ///
415    /// # Returns
416    ///
417    /// `true` if the pointer to the listener held by `self` is equal to the pointer held by `other`.
418    /// This is consistent with the implementation of [`ComparableListener::hash`].
419    fn eq(&self, other: &Self) -> bool {
420        Arc::ptr_eq(&self.listener, &other.listener)
421    }
422}
423
424impl Eq for ComparableListener {}
425
426impl Debug for ComparableListener {
427    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
428        write!(f, "ComparableListener: {}", self.pointer_address())
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use crate::{ComparableListener, UListener, UMessage};
435    use std::{
436        hash::{DefaultHasher, Hash, Hasher},
437        ops::Deref,
438        str::FromStr,
439        sync::Arc,
440    };
441
442    use super::*;
443
444    #[test]
445    fn test_static_uri_provider_get_source() {
446        let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
447        let source_uri = provider.get_source_uri();
448        assert_eq!(source_uri.authority_name, "my-vehicle");
449        assert_eq!(source_uri.ue_id, 0x4210);
450        assert_eq!(source_uri.ue_version_major, 0x05);
451        assert_eq!(source_uri.resource_id, 0x0000);
452    }
453
454    #[test]
455    fn test_static_uri_provider_get_resource() {
456        let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
457        let resource_uri = provider.get_resource_uri(0x1234);
458        assert_eq!(resource_uri.authority_name, "my-vehicle");
459        assert_eq!(resource_uri.ue_id, 0x4210);
460        assert_eq!(resource_uri.ue_version_major, 0x05);
461        assert_eq!(resource_uri.resource_id, 0x1234);
462    }
463
464    #[tokio::test]
465    async fn test_deref_returns_wrapped_listener() {
466        let mut mock_listener = MockUListener::new();
467        mock_listener.expect_on_receive().once().return_const(());
468        let listener_one = Arc::new(mock_listener);
469        let comparable_listener_one = ComparableListener::new(listener_one);
470        comparable_listener_one
471            .deref()
472            .on_receive(UMessage::default())
473            .await;
474    }
475
476    #[tokio::test]
477    async fn test_to_inner_returns_reference_to_wrapped_listener() {
478        let mut mock_listener = MockUListener::new();
479        mock_listener.expect_on_receive().once().return_const(());
480        let listener_one = Arc::new(mock_listener);
481        let comparable_listener_one = ComparableListener::new(listener_one);
482        comparable_listener_one
483            .into_inner()
484            .on_receive(UMessage::default())
485            .await;
486    }
487
488    #[tokio::test]
489    async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_same_listener() {
490        let mut mock_listener = MockUListener::new();
491        mock_listener.expect_on_receive().times(2).return_const(());
492        let listener_one = Arc::new(mock_listener);
493        let listener_two = listener_one.clone();
494        listener_one.on_receive(UMessage::default()).await;
495        listener_two.on_receive(UMessage::default()).await;
496        let comparable_listener_one = ComparableListener::new(listener_one);
497        let comparable_listener_two = ComparableListener::new(listener_two);
498        assert!(&comparable_listener_one.eq(&comparable_listener_two));
499
500        let mut hasher = DefaultHasher::new();
501        comparable_listener_one.hash(&mut hasher);
502        let hash_one = hasher.finish();
503        let mut hasher = DefaultHasher::new();
504        comparable_listener_two.hash(&mut hasher);
505        let hash_two = hasher.finish();
506        assert_eq!(hash_one, hash_two);
507    }
508
509    #[tokio::test]
510    async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_different_listeners()
511    {
512        let mut mock_listener_one = MockUListener::new();
513        mock_listener_one
514            .expect_on_receive()
515            .once()
516            .return_const(());
517        let listener_one = Arc::new(mock_listener_one);
518        let mut mock_listener_two = MockUListener::new();
519        mock_listener_two
520            .expect_on_receive()
521            .once()
522            .return_const(());
523        let listener_two = Arc::new(mock_listener_two);
524        listener_one.on_receive(UMessage::default()).await;
525        listener_two.on_receive(UMessage::default()).await;
526        let comparable_listener_one = ComparableListener::new(listener_one);
527        let comparable_listener_two = ComparableListener::new(listener_two);
528        assert!(!&comparable_listener_one.eq(&comparable_listener_two));
529
530        let mut hasher = DefaultHasher::new();
531        comparable_listener_one.hash(&mut hasher);
532        let hash_one = hasher.finish();
533        let mut hasher = DefaultHasher::new();
534        comparable_listener_two.hash(&mut hasher);
535        let hash_two = hasher.finish();
536        assert_ne!(hash_one, hash_two);
537    }
538
539    #[tokio::test]
540    async fn test_utransport_default_implementations() {
541        struct EmptyTransport {}
542        #[async_trait::async_trait]
543        impl UTransport for EmptyTransport {
544            async fn send(&self, _message: UMessage) -> Result<(), UStatus> {
545                todo!()
546            }
547        }
548
549        let transport = EmptyTransport {};
550        let listener = Arc::new(MockUListener::new());
551
552        assert!(transport
553            .receive(&UUri::any(), None)
554            .await
555            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
556        assert!(transport
557            .register_listener(&UUri::any(), None, listener.clone())
558            .await
559            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
560        assert!(transport
561            .unregister_listener(&UUri::any(), None, listener)
562            .await
563            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
564    }
565
566    #[test]
567    fn test_comparable_listener_pointer_address() {
568        let bar = Arc::new(MockUListener::new());
569        let comp_listener = ComparableListener::new(bar);
570
571        let comp_listener_thread = comp_listener.clone();
572        let handle = std::thread::spawn(move || comp_listener_thread.pointer_address());
573
574        let comp_listener_address_other_thread = handle.join().unwrap();
575        let comp_listener_address_this_thread = comp_listener.pointer_address();
576
577        assert_eq!(
578            comp_listener_address_this_thread,
579            comp_listener_address_other_thread
580        );
581    }
582
583    #[test]
584    fn test_comparable_listener_debug_outputs() {
585        let bar = Arc::new(MockUListener::new());
586        let comp_listener = ComparableListener::new(bar);
587        let debug_output = format!("{comp_listener:?}");
588        assert!(!debug_output.is_empty());
589    }
590
591    #[test_case::test_case(
592        "//vehicle1/AA/1/0",
593        Some("//vehicle2/BB/1/0");
594        "source and sink both having resource ID 0")]
595    #[test_case::test_case(
596        "//vehicle1/AA/1/CC",
597        Some("//vehicle2/BB/1/1A");
598        "sink is RPC but source has invalid resource ID")]
599    #[test_case::test_case(
600        "//vehicle1/AA/1/CC",
601        None;
602        "sink is empty but source has non-topic resource ID")]
603    fn test_verify_filter_criteria_fails_for(source: &str, sink: Option<&str>) {
604        let source_filter = UUri::from_str(source).expect("invalid source URI");
605        let sink_filter = sink.map(|s| UUri::from_str(s).expect("invalid sink URI"));
606        assert!(verify_filter_criteria(&source_filter, sink_filter.as_ref())
607            .is_err_and(|err| matches!(err.get_code(), UCode::INVALID_ARGUMENT)));
608    }
609}