up_rust/communication/
default_notifier.rs

1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14// [impl->dsn~communication-layer-impl-default~1]
15
16use std::sync::Arc;
17
18use async_trait::async_trait;
19
20use crate::{LocalUriProvider, UListener, UMessageBuilder, UTransport, UUri};
21
22use super::{
23    apply_common_options, build_message, CallOptions, NotificationError, Notifier,
24    RegistrationError, UPayload,
25};
26
27/// A [`Notifier`] that uses the uProtocol Transport Layer API to send and receive
28/// notifications to/from (other) uEntities.
29pub struct SimpleNotifier {
30    transport: Arc<dyn UTransport>,
31    uri_provider: Arc<dyn LocalUriProvider>,
32}
33
34impl SimpleNotifier {
35    /// Creates a new Notifier for a given transport.
36    ///
37    /// # Arguments
38    ///
39    /// * `transport` - The uProtocol Transport Layer implementation to use for sending and receiving notification messages.
40    /// * `uri_provider` - The helper for creating URIs that represent local resources.
41    pub fn new(transport: Arc<dyn UTransport>, uri_provider: Arc<dyn LocalUriProvider>) -> Self {
42        SimpleNotifier {
43            transport,
44            uri_provider,
45        }
46    }
47}
48
49#[async_trait]
50impl Notifier for SimpleNotifier {
51    async fn notify(
52        &self,
53        resource_id: u16,
54        destination: &UUri,
55        call_options: CallOptions,
56        payload: Option<UPayload>,
57    ) -> Result<(), NotificationError> {
58        let mut builder = UMessageBuilder::notification(
59            self.uri_provider.get_resource_uri(resource_id),
60            destination.to_owned(),
61        );
62        apply_common_options(call_options, &mut builder);
63        let msg = build_message(&mut builder, payload)
64            .map_err(|e| NotificationError::InvalidArgument(e.to_string()))?;
65        self.transport
66            .send(msg)
67            .await
68            .map_err(NotificationError::NotifyError)
69    }
70
71    async fn start_listening(
72        &self,
73        topic: &UUri,
74        listener: Arc<dyn UListener>,
75    ) -> Result<(), RegistrationError> {
76        topic
77            .verify_no_wildcards()
78            .map_err(|e| RegistrationError::InvalidFilter(e.to_string()))?;
79        self.transport
80            .register_listener(topic, Some(&self.uri_provider.get_source_uri()), listener)
81            .await
82            .map_err(RegistrationError::from)
83    }
84
85    async fn stop_listening(
86        &self,
87        topic: &UUri,
88        listener: Arc<dyn UListener>,
89    ) -> Result<(), RegistrationError> {
90        topic
91            .verify_no_wildcards()
92            .map_err(|e| RegistrationError::InvalidFilter(e.to_string()))?;
93        self.transport
94            .unregister_listener(topic, Some(&self.uri_provider.get_source_uri()), listener)
95            .await
96            .map_err(RegistrationError::from)
97    }
98}
99
100#[cfg(test)]
101mod tests {
102
103    // [utest->dsn~communication-layer-impl-default~1]
104
105    use super::*;
106
107    use protobuf::well_known_types::wrappers::StringValue;
108
109    use crate::{
110        utransport::{MockTransport, MockUListener},
111        StaticUriProvider, UCode, UPriority, UStatus, UUri, UUID,
112    };
113
114    fn new_uri_provider() -> Arc<dyn LocalUriProvider> {
115        Arc::new(StaticUriProvider::new("", 0x0005, 0x02))
116    }
117
118    #[tokio::test]
119    async fn test_start_stop_listening_rejects_wildcard_topic() {
120        let mut transport = MockTransport::new();
121        transport.expect_do_register_listener().never();
122        let uri_provider = new_uri_provider();
123        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
124
125        let invalid_topic = UUri::try_from("up://my-vin/A15B/1/FFFF").unwrap();
126        let mut listener = MockUListener::new();
127        listener.expect_on_receive().never();
128        let wrapped_listener = Arc::new(listener);
129
130        let result = notifier
131            .start_listening(&invalid_topic, wrapped_listener.clone())
132            .await;
133        assert!(result.is_err_and(|e| matches!(e, RegistrationError::InvalidFilter(_))));
134
135        let result = notifier
136            .stop_listening(&invalid_topic, wrapped_listener)
137            .await;
138        assert!(result.is_err_and(|e| matches!(e, RegistrationError::InvalidFilter(_))));
139    }
140
141    #[tokio::test]
142    async fn test_start_listening_succeeds() {
143        let uri_provider = new_uri_provider();
144        let topic = UUri::try_from("up://my-vin/A15B/1/B10F").unwrap();
145        let expected_source_filter = topic.clone();
146        let expected_sink_filter = uri_provider.get_source_uri();
147        let mut transport = MockTransport::new();
148        transport
149            .expect_do_register_listener()
150            .once()
151            .withf(move |source_filter, sink_filter, _listener| {
152                source_filter == &expected_source_filter
153                    && *sink_filter == Some(&expected_sink_filter)
154            })
155            .return_const(Ok(()));
156        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
157
158        let mut listener = MockUListener::new();
159        listener.expect_on_receive().never();
160        let result = notifier.start_listening(&topic, Arc::new(listener)).await;
161        assert!(result.is_ok());
162    }
163
164    #[tokio::test]
165    async fn test_stop_listening_succeeds() {
166        let uri_provider = new_uri_provider();
167        let topic = UUri::try_from("up://my-vin/A15B/1/B10F").unwrap();
168        let expected_source_filter = topic.clone();
169        let expected_sink_filter = uri_provider.get_source_uri();
170        let mut transport = MockTransport::new();
171        transport
172            .expect_do_unregister_listener()
173            .once()
174            .withf(move |source_filter, sink_filter, _listener| {
175                source_filter == &expected_source_filter
176                    && *sink_filter == Some(&expected_sink_filter)
177            })
178            .return_const(Ok(()));
179        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
180
181        let mut listener = MockUListener::new();
182        listener.expect_on_receive().never();
183        let result = notifier.stop_listening(&topic, Arc::new(listener)).await;
184        assert!(result.is_ok());
185    }
186
187    #[tokio::test]
188    async fn test_publish_succeeds() {
189        let message_id = UUID::build();
190        let uri_provider = new_uri_provider();
191        let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
192        let expected_message_id = message_id.clone();
193        let expected_sink = destination.clone();
194        let expected_source = uri_provider.get_resource_uri(0xB10F);
195        let mut transport = MockTransport::new();
196        transport
197            .expect_do_send()
198            .once()
199            .withf(move |message| {
200                let Ok(payload) = message.extract_protobuf::<StringValue>() else {
201                    return false;
202                };
203                message.is_notification()
204                    && message.id_unchecked() == &expected_message_id
205                    && message.source_unchecked() == &expected_source
206                    && message.sink_unchecked() == &expected_sink
207                    && message.ttl_unchecked() == 10_000
208                    && message.priority_unchecked() == UPriority::UPRIORITY_CS2
209                    && payload.value == *"Hello"
210            })
211            .return_const(Ok(()));
212        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
213
214        let mut v = StringValue::new();
215        v.value = "Hello".to_string();
216        let payload = UPayload::try_from_protobuf(v).unwrap();
217        let options = CallOptions::for_notification(
218            Some(10_000),
219            Some(message_id),
220            Some(UPriority::UPRIORITY_CS2),
221        );
222        let result = notifier
223            .notify(0xB10F, &destination, options, Some(payload))
224            .await;
225        assert!(result.is_ok());
226    }
227
228    #[tokio::test]
229    async fn test_publish_fails_for_transport_error() {
230        let uri_provider = new_uri_provider();
231        let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
232        let mut transport = MockTransport::new();
233        transport
234            .expect_do_send()
235            .once()
236            .return_const(Err(UStatus::fail_with_code(
237                crate::UCode::UNAVAILABLE,
238                "connection lost",
239            )));
240        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
241
242        let options = CallOptions::for_notification(None, None, None);
243        let result = notifier.notify(0xB10F, &destination, options, None).await;
244        assert!(result.is_err_and(|e| match e {
245            NotificationError::NotifyError(status) => status.get_code() == UCode::UNAVAILABLE,
246            _ => false,
247        }));
248    }
249
250    #[tokio::test]
251    async fn test_publish_fails_for_invalid_destination() {
252        let uri_provider = new_uri_provider();
253        // destination has resource ID != 0
254        let destination = UUri::try_from("up://other-vin/A15B/1/10").unwrap();
255        let mut transport = MockTransport::new();
256        transport.expect_do_send().never();
257        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
258
259        let options = CallOptions::for_notification(None, None, None);
260        let result = notifier.notify(0xB10F, &destination, options, None).await;
261        assert!(result.is_err_and(|e| matches!(e, NotificationError::InvalidArgument(_))));
262    }
263
264    #[tokio::test]
265    async fn test_publish_fails_for_invalid_resource_id() {
266        let uri_provider = new_uri_provider();
267        let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
268        let mut transport = MockTransport::new();
269        transport.expect_do_send().never();
270        let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
271
272        let options = CallOptions::for_notification(None, None, None);
273        // resource ID of origin address must not be 0
274        let result = notifier.notify(0x0000, &destination, options, None).await;
275        assert!(result.is_err_and(|e| matches!(e, NotificationError::InvalidArgument(_))));
276    }
277}