up_rust/
utransport.rs

1/********************************************************************************
2 * Copyright (c) 2023 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use std::fmt::{Debug, Formatter};
15use std::hash::{Hash, Hasher};
16use std::num::TryFromIntError;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21
22use crate::{UCode, UMessage, UStatus, UUri};
23
24/// A factory for URIs representing this uEntity's resources.
25///
26/// Implementations may use arbitrary mechanisms to determine the information that
27/// is necessary for creating URIs, e.g. environment variables, configuration files etc.
28// [impl->req~up-language-transport-api~1]
29#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
30pub trait LocalUriProvider: Send + Sync {
31    /// Gets the _authority_ used for URIs representing this uEntity's resources.
32    fn get_authority(&self) -> String;
33    /// Gets a URI that represents a given resource of this uEntity.
34    fn get_resource_uri(&self, resource_id: u16) -> UUri;
35    /// Gets the URI that represents the resource that this uEntity expects
36    /// RPC responses and notifications to be sent to.
37    fn get_source_uri(&self) -> UUri;
38}
39
40/// A URI provider that is statically configured with the uEntity's authority, entity ID and version.
41pub struct StaticUriProvider {
42    local_uri: UUri,
43}
44
45impl StaticUriProvider {
46    /// Creates a new URI provider from static information.
47    ///
48    /// # Arguments
49    ///
50    /// * `authority` - The uEntity's authority name.
51    /// * `entity_id` - The entity identifier.
52    /// * `major_version` - The uEntity's major version.
53    ///
54    /// # Examples
55    ///
56    /// ```rust
57    /// use up_rust::{LocalUriProvider, StaticUriProvider};
58    ///
59    /// let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
60    /// assert_eq!(provider.get_authority(), "my-vehicle");
61    /// ```
62    pub fn new(authority: impl Into<String>, entity_id: u32, major_version: u8) -> Self {
63        let local_uri = UUri {
64            authority_name: authority.into(),
65            ue_id: entity_id,
66            ue_version_major: major_version as u32,
67            resource_id: 0x0000,
68            ..Default::default()
69        };
70        StaticUriProvider { local_uri }
71    }
72}
73
74impl LocalUriProvider for StaticUriProvider {
75    fn get_authority(&self) -> String {
76        self.local_uri.authority_name.clone()
77    }
78
79    fn get_resource_uri(&self, resource_id: u16) -> UUri {
80        let mut uri = self.local_uri.clone();
81        uri.resource_id = resource_id as u32;
82        uri
83    }
84
85    fn get_source_uri(&self) -> UUri {
86        self.local_uri.clone()
87    }
88}
89
90impl TryFrom<UUri> for StaticUriProvider {
91    type Error = TryFromIntError;
92    fn try_from(value: UUri) -> Result<Self, Self::Error> {
93        Self::try_from(&value)
94    }
95}
96
97impl TryFrom<&UUri> for StaticUriProvider {
98    type Error = TryFromIntError;
99    /// Creates a URI provider from a UUri.
100    ///
101    /// # Arguments
102    ///
103    /// * `source_uri` - The UUri to take the entity's authority, entity ID and version information from.
104    ///                  The UUri's resource ID is ignored.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the given UUri's major version property is not a `u8`.
109    ///
110    /// # Examples
111    ///
112    /// ```rust
113    /// use up_rust::{LocalUriProvider, StaticUriProvider, UUri};
114    ///
115    /// let source_uri = UUri::try_from("//my-vehicle/4210/5/0").unwrap();
116    /// assert!(StaticUriProvider::try_from(&source_uri).is_ok());
117    /// ```
118    ///
119    /// ## Invalid Major Version
120    ///
121    /// ```rust
122    /// use up_rust::{LocalUriProvider, StaticUriProvider, UUri};
123    ///
124    /// let uuri_with_invalid_version = UUri {
125    ///   authority_name: "".to_string(),
126    ///   ue_id: 0x5430,
127    ///   ue_version_major: 0x1234, // not a u8
128    ///   resource_id: 0x0000,
129    ///   ..Default::default()
130    /// };
131    /// assert!(StaticUriProvider::try_from(uuri_with_invalid_version).is_err());
132    /// ```
133    fn try_from(source_uri: &UUri) -> Result<Self, Self::Error> {
134        let major_version = u8::try_from(source_uri.ue_version_major)?;
135        Ok(StaticUriProvider::new(
136            &source_uri.authority_name,
137            source_uri.ue_id,
138            major_version,
139        ))
140    }
141}
142
143/// A handler for processing uProtocol messages.
144///
145/// Implementations contain the details for what should occur when a message is received.
146///
147/// Please refer to the [uProtocol Transport Layer specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.4/up-l1/README.adoc)
148/// for details.
149// [impl->req~up-language-transport-api~1]
150#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
151#[async_trait]
152pub trait UListener: Send + Sync {
153    /// Performs some action on receipt of a message.
154    ///
155    /// # Parameters
156    ///
157    /// * `msg` - The message to process.
158    ///
159    /// # Implementation hints
160    ///
161    /// This function is expected to return almost immediately. If it does not, it could potentially
162    /// block processing of succeeding messages. Long-running operations for processing a message should
163    /// therefore be run on a separate thread.
164    async fn on_receive(&self, msg: UMessage);
165}
166
167/// The uProtocol Transport Layer interface that provides a common API for uEntity developers to send and
168/// receive messages.
169///
170/// Implementations contain the details for connecting to the underlying transport technology and
171/// sending [`UMessage`]s using the configured technology.
172///
173/// Please refer to the [uProtocol Transport Layer specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.4/up-l1/README.adoc)
174/// for details.
175// [impl->req~up-language-transport-api~1]
176#[async_trait]
177pub trait UTransport: Send + Sync {
178    /// Sends a message using this transport's message exchange mechanism.
179    ///
180    /// # Arguments
181    ///
182    /// * `message` - The message to send. The `type`, `source` and `sink` properties of the
183    ///   [UAttributes](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.4/basics/uattributes.adoc) contained
184    ///   in the message determine the addressing semantics.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the message could not be sent.
189    async fn send(&self, message: UMessage) -> Result<(), UStatus>;
190
191    /// Receives a message from the transport.
192    ///
193    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
194    ///
195    /// # Arguments
196    ///
197    /// * `source_filter` - The _source_ address pattern that the message to receive needs to match.
198    /// * `sink_filter` - The _sink_ address pattern that the message to receive needs to match,
199    ///                   or `None` to indicate that the message must not contain any sink address.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if no message could be received, e.g. because no message matches the given addresses.
204    async fn receive(
205        &self,
206        _source_filter: &UUri,
207        _sink_filter: Option<&UUri>,
208    ) -> Result<UMessage, UStatus> {
209        Err(UStatus::fail_with_code(
210            UCode::UNIMPLEMENTED,
211            "not implemented",
212        ))
213    }
214
215    /// Registers a listener to be called for messages.
216    ///
217    /// The listener will be invoked for each message that matches the given source and sink filter patterns
218    /// according to the rules defined by the [UUri specification](https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.4/basics/uri.adoc).
219    ///
220    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
221    ///
222    /// # Arguments
223    ///
224    /// * `source_filter` - The _source_ address pattern that messages need to match.
225    /// * `sink_filter` - The _sink_ address pattern that messages need to match,
226    ///                   or `None` to match messages that do not contain any sink address.
227    /// * `listener` - The listener to invoke.
228    ///                The listener can be unregistered again using [`UTransport::unregister_listener`].
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if the listener could not be registered.
233    async fn register_listener(
234        &self,
235        _source_filter: &UUri,
236        _sink_filter: Option<&UUri>,
237        _listener: Arc<dyn UListener>,
238    ) -> Result<(), UStatus> {
239        Err(UStatus::fail_with_code(
240            UCode::UNIMPLEMENTED,
241            "not implemented",
242        ))
243    }
244
245    /// Deregisters a message listener.
246    ///
247    /// The listener will no longer be called for any (matching) messages after this function has
248    /// returned successfully.
249    ///
250    /// This default implementation returns an error with [`UCode::UNIMPLEMENTED`].
251    ///
252    /// # Arguments
253    ///
254    /// * `source_filter` - The _source_ address pattern that the listener had been registered for.
255    /// * `sink_filter` - The _sink_ address pattern that the listener had been registered for.
256    /// * `listener` - The listener to unregister.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the listener could not be unregistered, for example if the given listener does not exist.
261    async fn unregister_listener(
262        &self,
263        _source_filter: &UUri,
264        _sink_filter: Option<&UUri>,
265        _listener: Arc<dyn UListener>,
266    ) -> Result<(), UStatus> {
267        Err(UStatus::fail_with_code(
268            UCode::UNIMPLEMENTED,
269            "not implemented",
270        ))
271    }
272}
273
274#[cfg(not(tarpaulin_include))]
275#[cfg(any(test, feature = "test-util"))]
276mockall::mock! {
277    /// This extra struct is necessary in order to comply with mockall's requirements regarding the parameter lifetimes
278    /// see <https://github.com/asomers/mockall/issues/571>
279    pub Transport {
280        pub async fn do_send(&self, message: UMessage) -> Result<(), UStatus>;
281        pub async fn do_register_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
282        pub async fn do_unregister_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
283    }
284}
285
286#[cfg(not(tarpaulin_include))]
287#[cfg(any(test, feature = "test-util"))]
288#[async_trait]
289/// This delegates the invocation of the UTransport functions to the mocked functions of the Transport struct.
290/// see <https://github.com/asomers/mockall/issues/571>
291impl UTransport for MockTransport {
292    async fn send(&self, message: UMessage) -> Result<(), UStatus> {
293        self.do_send(message).await
294    }
295    async fn register_listener(
296        &self,
297        source_filter: &UUri,
298        sink_filter: Option<&UUri>,
299        listener: Arc<dyn UListener>,
300    ) -> Result<(), UStatus> {
301        self.do_register_listener(source_filter, sink_filter, listener)
302            .await
303    }
304    async fn unregister_listener(
305        &self,
306        source_filter: &UUri,
307        sink_filter: Option<&UUri>,
308        listener: Arc<dyn UListener>,
309    ) -> Result<(), UStatus> {
310        self.do_unregister_listener(source_filter, sink_filter, listener)
311            .await
312    }
313}
314
315/// A wrapper type that allows comparing [`UListener`]s to each other.
316///
317/// # Note
318///
319/// Not necessary for end-user uEs to use. Primarily intended for `up-client-foo-rust` UPClient libraries
320/// when implementing [`UTransport`].
321///
322/// # Rationale
323///
324/// The wrapper type is implemented such that it can be used in any location you may wish to
325/// hold a type implementing [`UListener`].
326///
327/// Implements necessary traits to allow hashing, so that you may hold the wrapper type in
328/// collections which require that, such as a `HashMap` or `HashSet`
329#[derive(Clone)]
330pub struct ComparableListener {
331    listener: Arc<dyn UListener>,
332}
333
334impl ComparableListener {
335    pub fn new(listener: Arc<dyn UListener>) -> Self {
336        Self { listener }
337    }
338    /// Gets a clone of the wrapped reference to the listener.
339    pub fn into_inner(&self) -> Arc<dyn UListener> {
340        self.listener.clone()
341    }
342
343    /// Allows us to get the pointer address of this `ComparableListener` on the heap
344    fn pointer_address(&self) -> usize {
345        // Obtain the raw pointer from the Arc
346        let ptr = Arc::as_ptr(&self.listener);
347        // Cast the fat pointer to a raw thin pointer to ()
348        let thin_ptr = ptr as *const ();
349        // Convert the thin pointer to a usize
350        thin_ptr as usize
351    }
352}
353
354impl Deref for ComparableListener {
355    type Target = dyn UListener;
356
357    fn deref(&self) -> &Self::Target {
358        &*self.listener
359    }
360}
361
362impl Hash for ComparableListener {
363    /// Feeds the pointer to the listener held by `self` into the given [`Hasher`].
364    ///
365    /// This is consistent with the implementation of [`ComparableListener::eq`].
366    fn hash<H: Hasher>(&self, state: &mut H) {
367        Arc::as_ptr(&self.listener).hash(state);
368    }
369}
370
371impl PartialEq for ComparableListener {
372    /// Compares this listener to another listener.
373    ///
374    /// # Returns
375    ///
376    /// `true` if the pointer to the listener held by `self` is equal to the pointer held by `other`.
377    /// This is consistent with the implementation of [`ComparableListener::hash`].
378    fn eq(&self, other: &Self) -> bool {
379        Arc::ptr_eq(&self.listener, &other.listener)
380    }
381}
382
383impl Eq for ComparableListener {}
384
385impl Debug for ComparableListener {
386    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
387        write!(f, "ComparableListener: {}", self.pointer_address())
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use crate::{ComparableListener, UListener, UMessage};
394    use std::{
395        hash::{DefaultHasher, Hash, Hasher},
396        ops::Deref,
397        sync::Arc,
398    };
399
400    use super::*;
401
402    #[test]
403    fn test_static_uri_provider_get_source() {
404        let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
405        let source_uri = provider.get_source_uri();
406        assert_eq!(source_uri.authority_name, "my-vehicle");
407        assert_eq!(source_uri.ue_id, 0x4210);
408        assert_eq!(source_uri.ue_version_major, 0x05);
409        assert_eq!(source_uri.resource_id, 0x0000);
410    }
411
412    #[test]
413    fn test_static_uri_provider_get_resource() {
414        let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
415        let resource_uri = provider.get_resource_uri(0x1234);
416        assert_eq!(resource_uri.authority_name, "my-vehicle");
417        assert_eq!(resource_uri.ue_id, 0x4210);
418        assert_eq!(resource_uri.ue_version_major, 0x05);
419        assert_eq!(resource_uri.resource_id, 0x1234);
420    }
421
422    #[tokio::test]
423    async fn test_deref_returns_wrapped_listener() {
424        let mut mock_listener = MockUListener::new();
425        mock_listener.expect_on_receive().once().return_const(());
426        let listener_one = Arc::new(mock_listener);
427        let comparable_listener_one = ComparableListener::new(listener_one);
428        comparable_listener_one
429            .deref()
430            .on_receive(UMessage::default())
431            .await;
432    }
433
434    #[tokio::test]
435    async fn test_to_inner_returns_reference_to_wrapped_listener() {
436        let mut mock_listener = MockUListener::new();
437        mock_listener.expect_on_receive().once().return_const(());
438        let listener_one = Arc::new(mock_listener);
439        let comparable_listener_one = ComparableListener::new(listener_one);
440        comparable_listener_one
441            .into_inner()
442            .on_receive(UMessage::default())
443            .await;
444    }
445
446    #[tokio::test]
447    async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_same_listener() {
448        let mut mock_listener = MockUListener::new();
449        mock_listener.expect_on_receive().times(2).return_const(());
450        let listener_one = Arc::new(mock_listener);
451        let listener_two = listener_one.clone();
452        listener_one.on_receive(UMessage::default()).await;
453        listener_two.on_receive(UMessage::default()).await;
454        let comparable_listener_one = ComparableListener::new(listener_one);
455        let comparable_listener_two = ComparableListener::new(listener_two);
456        assert!(&comparable_listener_one.eq(&comparable_listener_two));
457
458        let mut hasher = DefaultHasher::new();
459        comparable_listener_one.hash(&mut hasher);
460        let hash_one = hasher.finish();
461        let mut hasher = DefaultHasher::new();
462        comparable_listener_two.hash(&mut hasher);
463        let hash_two = hasher.finish();
464        assert_eq!(hash_one, hash_two);
465    }
466
467    #[tokio::test]
468    async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_different_listeners()
469    {
470        let mut mock_listener_one = MockUListener::new();
471        mock_listener_one
472            .expect_on_receive()
473            .once()
474            .return_const(());
475        let listener_one = Arc::new(mock_listener_one);
476        let mut mock_listener_two = MockUListener::new();
477        mock_listener_two
478            .expect_on_receive()
479            .once()
480            .return_const(());
481        let listener_two = Arc::new(mock_listener_two);
482        listener_one.on_receive(UMessage::default()).await;
483        listener_two.on_receive(UMessage::default()).await;
484        let comparable_listener_one = ComparableListener::new(listener_one);
485        let comparable_listener_two = ComparableListener::new(listener_two);
486        assert!(!&comparable_listener_one.eq(&comparable_listener_two));
487
488        let mut hasher = DefaultHasher::new();
489        comparable_listener_one.hash(&mut hasher);
490        let hash_one = hasher.finish();
491        let mut hasher = DefaultHasher::new();
492        comparable_listener_two.hash(&mut hasher);
493        let hash_two = hasher.finish();
494        assert_ne!(hash_one, hash_two);
495    }
496
497    #[tokio::test]
498    async fn test_utransport_default_implementations() {
499        struct EmptyTransport {}
500        #[async_trait::async_trait]
501        impl UTransport for EmptyTransport {
502            async fn send(&self, _message: UMessage) -> Result<(), UStatus> {
503                todo!()
504            }
505        }
506
507        let transport = EmptyTransport {};
508        let listener = Arc::new(MockUListener::new());
509
510        assert!(transport
511            .receive(&UUri::any(), None)
512            .await
513            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
514        assert!(transport
515            .register_listener(&UUri::any(), None, listener.clone())
516            .await
517            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
518        assert!(transport
519            .unregister_listener(&UUri::any(), None, listener)
520            .await
521            .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
522    }
523
524    #[test]
525    fn test_comparable_listener_pointer_address() {
526        let bar = Arc::new(MockUListener::new());
527        let comp_listener = ComparableListener::new(bar);
528
529        let comp_listener_thread = comp_listener.clone();
530        let handle = std::thread::spawn(move || comp_listener_thread.pointer_address());
531
532        let comp_listener_address_other_thread = handle.join().unwrap();
533        let comp_listener_address_this_thread = comp_listener.pointer_address();
534
535        assert_eq!(
536            comp_listener_address_this_thread,
537            comp_listener_address_other_thread
538        );
539    }
540
541    #[test]
542    fn test_comparable_listener_debug_outputs() {
543        let bar = Arc::new(MockUListener::new());
544        let comp_listener = ComparableListener::new(bar);
545        let debug_output = format!("{comp_listener:?}");
546        assert!(!debug_output.is_empty());
547    }
548}