up_rust/communication/
default_notifier.rs

1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14// [impl->req~up-language-comm-api-default-impl~1]
15
16use std::sync::Arc;
17
18use async_trait::async_trait;
19
20use crate::{LocalUriProvider, UListener, UMessageBuilder, UTransport, UUri};
21
22use super::{
23    apply_common_options, build_message, CallOptions, NotificationError, Notifier,
24    RegistrationError, UPayload,
25};
26
27/// A [`Notifier`] that uses the uProtocol Transport Layer API to send and receive
28/// notifications to/from (other) uEntities.
29pub struct SimpleNotifier {
30    transport: Arc<dyn UTransport>,
31    uri_provider: Arc<dyn LocalUriProvider>,
32}
33
34impl SimpleNotifier {
35    /// Creates a new Notifier for a given transport.
36    ///
37    /// # Arguments
38    ///
39    /// * `transport` - The uProtocol Transport Layer implementation to use for sending and receiving notification messages.
40    /// * `uri_provider` - The helper for creating URIs that represent local resources.
41    pub fn new(transport: Arc<dyn UTransport>, uri_provider: Arc<dyn LocalUriProvider>) -> Self {
42        SimpleNotifier {
43            transport,
44            uri_provider,
45        }
46    }
47}
48
49#[async_trait]
50impl Notifier for SimpleNotifier {
51    async fn notify(
52        &self,
53        resource_id: u16,
54        destination: &UUri,
55        call_options: CallOptions,
56        payload: Option<UPayload>,
57    ) -> Result<(), NotificationError> {
58        let mut builder = UMessageBuilder::notification(
59            self.uri_provider.get_resource_uri(resource_id),
60            destination.to_owned(),
61        );
62        apply_common_options(call_options, &mut builder);
63        let msg = build_message(&mut builder, payload)
64            .map_err(|e| NotificationError::InvalidArgument(e.to_string()))?;
65        self.transport
66            .send(msg)
67            .await
68            .map_err(NotificationError::NotifyError)
69    }
70
71    async fn start_listening(
72        &self,
73        topic: &UUri,
74        listener: Arc<dyn UListener>,
75    ) -> Result<(), RegistrationError> {
76        topic
77            .verify_no_wildcards()
78            .map_err(|e| RegistrationError::InvalidFilter(e.to_string()))?;
79        self.transport
80            .register_listener(topic, Some(&self.uri_provider.get_source_uri()), listener)
81            .await
82            .map_err(RegistrationError::from)
83    }
84
85    async fn stop_listening(
86        &self,
87        topic: &UUri,
88        listener: Arc<dyn UListener>,
89    ) -> Result<(), RegistrationError> {
90        topic
91            .verify_no_wildcards()
92            .map_err(|e| RegistrationError::InvalidFilter(e.to_string()))?;
93        self.transport
94            .unregister_listener(topic, Some(&self.uri_provider.get_source_uri()), listener)
95            .await
96            .map_err(RegistrationError::from)
97    }
98}
99
100#[cfg(test)]
101mod tests {
102
103    // [utest->req~up-language-comm-api-default-impl~1]
104
105    use super::*;
106
107    use protobuf::well_known_types::wrappers::StringValue;
108
109    use crate::{
110        utransport::{MockTransport, MockUListener},
111        StaticUriProvider, UCode, UPriority, UStatus, UUri, UUID,
112    };
113
114    fn new_uri_provider() -> Arc<dyn LocalUriProvider> {
115        Arc::new(StaticUriProvider::new("", 0x0005, 0x02))
116    }
117
118    #[tokio::test]
119    async fn test_start_stop_listening_rejects_wildcard_topic() {
120        let mut transport = MockTransport::new();
121        transport.expect_do_register_listener().never();
122        let uri_provider = new_uri_provider();
123        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
124
125        let invalid_topic = UUri::try_from("up://my-vin/A15B/1/FFFF").unwrap();
126        let mut listener = MockUListener::new();
127        listener.expect_on_receive().never();
128        let wrapped_listener = Arc::new(listener);
129
130        let result = notifier
131            .start_listening(&invalid_topic, wrapped_listener.clone())
132            .await;
133        assert!(result.is_err_and(|e| matches!(e, RegistrationError::InvalidFilter(_))));
134
135        let result = notifier
136            .stop_listening(&invalid_topic, wrapped_listener)
137            .await;
138        assert!(result.is_err_and(|e| matches!(e, RegistrationError::InvalidFilter(_))));
139    }
140
141    #[tokio::test]
142    async fn test_start_listening_succeeds() {
143        let uri_provider = new_uri_provider();
144        let topic = UUri::try_from("up://my-vin/A15B/1/B10F").unwrap();
145        let expected_source_filter = topic.clone();
146        let expected_sink_filter = uri_provider.get_source_uri();
147        let mut transport = MockTransport::new();
148        transport
149            .expect_do_register_listener()
150            .once()
151            .withf(move |source_filter, sink_filter, _listener| {
152                source_filter == &expected_source_filter
153                    && *sink_filter == Some(&expected_sink_filter)
154            })
155            .return_const(Ok(()));
156        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
157
158        let mut listener = MockUListener::new();
159        listener.expect_on_receive().never();
160        let result = notifier.start_listening(&topic, Arc::new(listener)).await;
161        assert!(result.is_ok());
162    }
163
164    #[tokio::test]
165    async fn test_stop_listening_succeeds() {
166        let uri_provider = new_uri_provider();
167        let topic = UUri::try_from("up://my-vin/A15B/1/B10F").unwrap();
168        let expected_source_filter = topic.clone();
169        let expected_sink_filter = uri_provider.get_source_uri();
170        let mut transport = MockTransport::new();
171        transport
172            .expect_do_unregister_listener()
173            .once()
174            .withf(move |source_filter, sink_filter, _listener| {
175                source_filter == &expected_source_filter
176                    && *sink_filter == Some(&expected_sink_filter)
177            })
178            .return_const(Ok(()));
179        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
180
181        let mut listener = MockUListener::new();
182        listener.expect_on_receive().never();
183        let result = notifier.stop_listening(&topic, Arc::new(listener)).await;
184        assert!(result.is_ok());
185    }
186
187    #[tokio::test]
188    async fn test_publish_succeeds() {
189        let message_id = UUID::build();
190        let uri_provider = new_uri_provider();
191        let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
192        let expected_message_id = message_id.clone();
193        let expected_sink = destination.clone();
194        let expected_source = uri_provider.get_resource_uri(0xB10F);
195        let mut transport = MockTransport::new();
196        transport
197            .expect_do_send()
198            .once()
199            .withf(move |message| {
200                let Ok(payload) = message.extract_protobuf::<StringValue>() else {
201                    return false;
202                };
203                let Some(attribs) = message.attributes.as_ref() else {
204                    return false;
205                };
206                attribs.is_notification()
207                    && attribs.id.get_or_default() == &expected_message_id
208                    && attribs.source.get_or_default() == &expected_source
209                    && attribs.sink.get_or_default() == &expected_sink
210                    && attribs.ttl == Some(10_000)
211                    && attribs.priority.enum_value_or_default() == UPriority::UPRIORITY_CS2
212                    && payload.value == *"Hello"
213            })
214            .return_const(Ok(()));
215        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
216
217        let mut v = StringValue::new();
218        v.value = "Hello".to_string();
219        let payload = UPayload::try_from_protobuf(v).unwrap();
220        let options = CallOptions::for_notification(
221            Some(10_000),
222            Some(message_id),
223            Some(UPriority::UPRIORITY_CS2),
224        );
225        let result = notifier
226            .notify(0xB10F, &destination, options, Some(payload))
227            .await;
228        assert!(result.is_ok());
229    }
230
231    #[tokio::test]
232    async fn test_publish_fails_for_transport_error() {
233        let uri_provider = new_uri_provider();
234        let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
235        let mut transport = MockTransport::new();
236        transport
237            .expect_do_send()
238            .once()
239            .return_const(Err(UStatus::fail_with_code(
240                crate::UCode::UNAVAILABLE,
241                "connection lost",
242            )));
243        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
244
245        let options = CallOptions::for_notification(None, None, None);
246        let result = notifier.notify(0xB10F, &destination, options, None).await;
247        assert!(result.is_err_and(|e| match e {
248            NotificationError::NotifyError(status) => status.get_code() == UCode::UNAVAILABLE,
249            _ => false,
250        }));
251    }
252
253    #[tokio::test]
254    async fn test_publish_fails_for_invalid_destination() {
255        let uri_provider = new_uri_provider();
256        // destination has resource ID != 0
257        let destination = UUri::try_from("up://other-vin/A15B/1/10").unwrap();
258        let mut transport = MockTransport::new();
259        transport.expect_do_send().never();
260        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
261
262        let options = CallOptions::for_notification(None, None, None);
263        let result = notifier.notify(0xB10F, &destination, options, None).await;
264        assert!(result.is_err_and(|e| matches!(e, NotificationError::InvalidArgument(_))));
265    }
266
267    #[tokio::test]
268    async fn test_publish_fails_for_invalid_resource_id() {
269        let uri_provider = new_uri_provider();
270        let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
271        let mut transport = MockTransport::new();
272        transport.expect_do_send().never();
273        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
274
275        let options = CallOptions::for_notification(None, None, None);
276        // resource ID of origin address must not be 0
277        let result = notifier.notify(0x0000, &destination, options, None).await;
278        assert!(result.is_err_and(|e| matches!(e, NotificationError::InvalidArgument(_))));
279    }
280}