1use std::sync::Arc;
17
18use async_trait::async_trait;
19
20use crate::{LocalUriProvider, UListener, UMessageBuilder, UTransport, UUri};
21
22use super::{
23 apply_common_options, build_message, CallOptions, NotificationError, Notifier,
24 RegistrationError, UPayload,
25};
26
27pub struct SimpleNotifier {
30 transport: Arc<dyn UTransport>,
31 uri_provider: Arc<dyn LocalUriProvider>,
32}
33
34impl SimpleNotifier {
35 pub fn new(transport: Arc<dyn UTransport>, uri_provider: Arc<dyn LocalUriProvider>) -> Self {
42 SimpleNotifier {
43 transport,
44 uri_provider,
45 }
46 }
47}
48
49#[async_trait]
50impl Notifier for SimpleNotifier {
51 async fn notify(
52 &self,
53 resource_id: u16,
54 destination: &UUri,
55 call_options: CallOptions,
56 payload: Option<UPayload>,
57 ) -> Result<(), NotificationError> {
58 let mut builder = UMessageBuilder::notification(
59 self.uri_provider.get_resource_uri(resource_id),
60 destination.to_owned(),
61 );
62 apply_common_options(call_options, &mut builder);
63 let msg = build_message(&mut builder, payload)
64 .map_err(|e| NotificationError::InvalidArgument(e.to_string()))?;
65 self.transport
66 .send(msg)
67 .await
68 .map_err(NotificationError::NotifyError)
69 }
70
71 async fn start_listening(
72 &self,
73 topic: &UUri,
74 listener: Arc<dyn UListener>,
75 ) -> Result<(), RegistrationError> {
76 topic
77 .verify_no_wildcards()
78 .map_err(|e| RegistrationError::InvalidFilter(e.to_string()))?;
79 self.transport
80 .register_listener(topic, Some(&self.uri_provider.get_source_uri()), listener)
81 .await
82 .map_err(RegistrationError::from)
83 }
84
85 async fn stop_listening(
86 &self,
87 topic: &UUri,
88 listener: Arc<dyn UListener>,
89 ) -> Result<(), RegistrationError> {
90 topic
91 .verify_no_wildcards()
92 .map_err(|e| RegistrationError::InvalidFilter(e.to_string()))?;
93 self.transport
94 .unregister_listener(topic, Some(&self.uri_provider.get_source_uri()), listener)
95 .await
96 .map_err(RegistrationError::from)
97 }
98}
99
100#[cfg(test)]
101mod tests {
102
103 use super::*;
106
107 use protobuf::well_known_types::wrappers::StringValue;
108
109 use crate::{
110 utransport::{MockTransport, MockUListener},
111 StaticUriProvider, UCode, UPriority, UStatus, UUri, UUID,
112 };
113
114 fn new_uri_provider() -> Arc<dyn LocalUriProvider> {
115 Arc::new(StaticUriProvider::new("", 0x0005, 0x02))
116 }
117
118 #[tokio::test]
119 async fn test_start_stop_listening_rejects_wildcard_topic() {
120 let mut transport = MockTransport::new();
121 transport.expect_do_register_listener().never();
122 let uri_provider = new_uri_provider();
123 let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
124
125 let invalid_topic = UUri::try_from("up://my-vin/A15B/1/FFFF").unwrap();
126 let mut listener = MockUListener::new();
127 listener.expect_on_receive().never();
128 let wrapped_listener = Arc::new(listener);
129
130 let result = notifier
131 .start_listening(&invalid_topic, wrapped_listener.clone())
132 .await;
133 assert!(result.is_err_and(|e| matches!(e, RegistrationError::InvalidFilter(_))));
134
135 let result = notifier
136 .stop_listening(&invalid_topic, wrapped_listener)
137 .await;
138 assert!(result.is_err_and(|e| matches!(e, RegistrationError::InvalidFilter(_))));
139 }
140
141 #[tokio::test]
142 async fn test_start_listening_succeeds() {
143 let uri_provider = new_uri_provider();
144 let topic = UUri::try_from("up://my-vin/A15B/1/B10F").unwrap();
145 let expected_source_filter = topic.clone();
146 let expected_sink_filter = uri_provider.get_source_uri();
147 let mut transport = MockTransport::new();
148 transport
149 .expect_do_register_listener()
150 .once()
151 .withf(move |source_filter, sink_filter, _listener| {
152 source_filter == &expected_source_filter
153 && *sink_filter == Some(&expected_sink_filter)
154 })
155 .return_const(Ok(()));
156 let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
157
158 let mut listener = MockUListener::new();
159 listener.expect_on_receive().never();
160 let result = notifier.start_listening(&topic, Arc::new(listener)).await;
161 assert!(result.is_ok());
162 }
163
164 #[tokio::test]
165 async fn test_stop_listening_succeeds() {
166 let uri_provider = new_uri_provider();
167 let topic = UUri::try_from("up://my-vin/A15B/1/B10F").unwrap();
168 let expected_source_filter = topic.clone();
169 let expected_sink_filter = uri_provider.get_source_uri();
170 let mut transport = MockTransport::new();
171 transport
172 .expect_do_unregister_listener()
173 .once()
174 .withf(move |source_filter, sink_filter, _listener| {
175 source_filter == &expected_source_filter
176 && *sink_filter == Some(&expected_sink_filter)
177 })
178 .return_const(Ok(()));
179 let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
180
181 let mut listener = MockUListener::new();
182 listener.expect_on_receive().never();
183 let result = notifier.stop_listening(&topic, Arc::new(listener)).await;
184 assert!(result.is_ok());
185 }
186
187 #[tokio::test]
188 async fn test_publish_succeeds() {
189 let message_id = UUID::build();
190 let uri_provider = new_uri_provider();
191 let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
192 let expected_message_id = message_id.clone();
193 let expected_sink = destination.clone();
194 let expected_source = uri_provider.get_resource_uri(0xB10F);
195 let mut transport = MockTransport::new();
196 transport
197 .expect_do_send()
198 .once()
199 .withf(move |message| {
200 let Ok(payload) = message.extract_protobuf::<StringValue>() else {
201 return false;
202 };
203 message.is_notification()
204 && message.id_unchecked() == &expected_message_id
205 && message.source_unchecked() == &expected_source
206 && message.sink_unchecked() == &expected_sink
207 && message.ttl_unchecked() == 10_000
208 && message.priority_unchecked() == UPriority::UPRIORITY_CS2
209 && payload.value == *"Hello"
210 })
211 .return_const(Ok(()));
212 let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
213
214 let mut v = StringValue::new();
215 v.value = "Hello".to_string();
216 let payload = UPayload::try_from_protobuf(v).unwrap();
217 let options = CallOptions::for_notification(
218 Some(10_000),
219 Some(message_id),
220 Some(UPriority::UPRIORITY_CS2),
221 );
222 let result = notifier
223 .notify(0xB10F, &destination, options, Some(payload))
224 .await;
225 assert!(result.is_ok());
226 }
227
228 #[tokio::test]
229 async fn test_publish_fails_for_transport_error() {
230 let uri_provider = new_uri_provider();
231 let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
232 let mut transport = MockTransport::new();
233 transport
234 .expect_do_send()
235 .once()
236 .return_const(Err(UStatus::fail_with_code(
237 crate::UCode::UNAVAILABLE,
238 "connection lost",
239 )));
240 let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
241
242 let options = CallOptions::for_notification(None, None, None);
243 let result = notifier.notify(0xB10F, &destination, options, None).await;
244 assert!(result.is_err_and(|e| match e {
245 NotificationError::NotifyError(status) => status.get_code() == UCode::UNAVAILABLE,
246 _ => false,
247 }));
248 }
249
250 #[tokio::test]
251 async fn test_publish_fails_for_invalid_destination() {
252 let uri_provider = new_uri_provider();
253 let destination = UUri::try_from("up://other-vin/A15B/1/10").unwrap();
255 let mut transport = MockTransport::new();
256 transport.expect_do_send().never();
257 let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
258
259 let options = CallOptions::for_notification(None, None, None);
260 let result = notifier.notify(0xB10F, &destination, options, None).await;
261 assert!(result.is_err_and(|e| matches!(e, NotificationError::InvalidArgument(_))));
262 }
263
264 #[tokio::test]
265 async fn test_publish_fails_for_invalid_resource_id() {
266 let uri_provider = new_uri_provider();
267 let destination = UUri::try_from("up://other-vin/A15B/1/0").unwrap();
268 let mut transport = MockTransport::new();
269 transport.expect_do_send().never();
270 let notifier = SimpleNotifier::new(Arc::new(transport), uri_provider);
271
272 let options = CallOptions::for_notification(None, None, None);
273 let result = notifier.notify(0x0000, &destination, options, None).await;
275 assert!(result.is_err_and(|e| matches!(e, NotificationError::InvalidArgument(_))));
276 }
277}