1use std::{
17 collections::{hash_map::Entry, HashMap},
18 ops::Deref,
19 sync::{Arc, RwLock},
20};
21
22use async_trait::async_trait;
23use tracing::{debug, info};
24
25use crate::{
26 core::usubscription::{
27 self, State, SubscriptionRequest, USubscription, UnsubscribeRequest, Update,
28 },
29 LocalUriProvider, UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri,
30};
31
32use super::{
33 apply_common_options, build_message, pubsub::SubscriptionChangeHandler, CallOptions,
34 InMemoryRpcClient, Notifier, PubSubError, Publisher, RegistrationError, RpcClientUSubscription,
35 SimpleNotifier, Subscriber, UPayload,
36};
37
38#[derive(Clone)]
39struct ComparableSubscriptionChangeHandler {
40 inner: Arc<dyn SubscriptionChangeHandler>,
41}
42
43impl ComparableSubscriptionChangeHandler {
44 fn new(handler: Arc<dyn SubscriptionChangeHandler>) -> Self {
45 ComparableSubscriptionChangeHandler {
46 inner: handler.clone(),
47 }
48 }
49}
50
51impl Deref for ComparableSubscriptionChangeHandler {
52 type Target = dyn SubscriptionChangeHandler;
53
54 fn deref(&self) -> &Self::Target {
55 &*self.inner
56 }
57}
58
59impl PartialEq for ComparableSubscriptionChangeHandler {
60 fn eq(&self, other: &Self) -> bool {
67 Arc::ptr_eq(&self.inner, &other.inner)
68 }
69}
70
71impl Eq for ComparableSubscriptionChangeHandler {}
72
73#[derive(Default)]
74struct SubscriptionChangeListener {
75 subscription_change_handlers: RwLock<HashMap<UUri, ComparableSubscriptionChangeHandler>>,
76}
77
78impl SubscriptionChangeListener {
79 fn add_handler(
86 &self,
87 topic: UUri,
88 subscription_change_handler: Arc<dyn SubscriptionChangeHandler>,
89 ) -> Result<(), RegistrationError> {
90 let Ok(mut handlers) = self.subscription_change_handlers.write() else {
91 return Err(RegistrationError::Unknown(UStatus::fail_with_code(
92 crate::UCode::INTERNAL,
93 "failed to acquire write lock for handler map",
94 )));
95 };
96 let handler_to_add = ComparableSubscriptionChangeHandler::new(subscription_change_handler);
97 match handlers.entry(topic) {
98 Entry::Vacant(entry) => {
99 entry.insert(handler_to_add);
100 Ok(())
101 }
102 Entry::Occupied(entry) => {
103 if entry.get() == &handler_to_add {
104 Ok(())
105 } else {
106 Err(RegistrationError::AlreadyExists)
107 }
108 }
109 }
110 }
111
112 fn remove_handler(&self, topic: &UUri) -> Result<(), RegistrationError> {
120 self.subscription_change_handlers
121 .write()
122 .map_err(|_e| {
123 RegistrationError::Unknown(UStatus::fail_with_code(
124 crate::UCode::INTERNAL,
125 "failed to acquire write lock for handler map",
126 ))
127 })
128 .map(|mut handlers| {
129 handlers.remove(topic);
130 })
131 }
132
133 fn clear(&self) -> Result<(), RegistrationError> {
139 self.subscription_change_handlers
140 .write()
141 .map_err(|_e| {
142 RegistrationError::Unknown(UStatus::fail_with_code(
143 crate::UCode::INTERNAL,
144 "failed to acquire write lock for handler map",
145 ))
146 })
147 .map(|mut handlers| {
148 handlers.clear();
149 })
150 }
151
152 #[cfg(test)]
153 fn has_handler(&self, topic: &UUri) -> bool {
154 self.subscription_change_handlers
155 .read()
156 .is_ok_and(|handlers| handlers.contains_key(topic))
157 }
158}
159
160#[async_trait]
161impl UListener for SubscriptionChangeListener {
162 async fn on_receive(&self, msg: UMessage) {
163 if !msg.is_notification() {
164 return;
165 }
166 let Ok(subscription_update) = msg.extract_protobuf::<Update>() else {
167 debug!("ignoring notification that does not contain subscription update");
168 return;
169 };
170 let Some(topic) = subscription_update.topic.as_ref() else {
171 return;
172 };
173 let Some(status) = subscription_update.status.as_ref() else {
174 return;
175 };
176
177 let Ok(handlers) = self.subscription_change_handlers.read() else {
178 return;
179 };
180 if let Some(handler) = handlers.get(topic) {
181 handler.on_subscription_change(topic.to_owned(), status.to_owned());
182 }
183 }
184}
185
186pub struct SimplePublisher {
188 transport: Arc<dyn UTransport>,
189 uri_provider: Arc<dyn LocalUriProvider>,
190}
191
192impl SimplePublisher {
193 pub fn new(transport: Arc<dyn UTransport>, uri_provider: Arc<dyn LocalUriProvider>) -> Self {
200 SimplePublisher {
201 transport,
202 uri_provider,
203 }
204 }
205}
206
207#[async_trait]
208impl Publisher for SimplePublisher {
209 async fn publish(
210 &self,
211 resource_id: u16,
212 call_options: CallOptions,
213 payload: Option<UPayload>,
214 ) -> Result<(), PubSubError> {
215 let mut builder = UMessageBuilder::publish(self.uri_provider.get_resource_uri(resource_id));
216 apply_common_options(call_options, &mut builder);
217 match build_message(&mut builder, payload) {
218 Ok(publish_message) => self
219 .transport
220 .send(publish_message)
221 .await
222 .map_err(PubSubError::PublishError),
223 Err(e) => Err(PubSubError::InvalidArgument(format!(
224 "failed to create Publish message from parameters: {e}"
225 ))),
226 }
227 }
228}
229
230pub struct InMemorySubscriber {
246 transport: Arc<dyn UTransport>,
247 _uri_provider: Arc<dyn LocalUriProvider>,
248 usubscription: Arc<dyn USubscription>,
249 notifier: Arc<dyn Notifier>,
250 subscription_change_listener: Arc<SubscriptionChangeListener>,
251}
252
253impl InMemorySubscriber {
254 pub async fn new(
264 transport: Arc<dyn UTransport>,
265 uri_provider: Arc<dyn LocalUriProvider>,
266 ) -> Result<Self, RegistrationError> {
267 let rpc_client = InMemoryRpcClient::new(transport.clone(), uri_provider.clone())
268 .await
269 .map(Arc::new)?;
270 let usubscription_client = Arc::new(RpcClientUSubscription::new(rpc_client));
271 let notifier = Arc::new(SimpleNotifier::new(transport.clone(), uri_provider.clone()));
272 Self::for_clients(transport, uri_provider, usubscription_client, notifier).await
273 }
274
275 pub async fn for_clients(
288 transport: Arc<dyn UTransport>,
289 uri_provider: Arc<dyn LocalUriProvider>,
290 usubscription: Arc<dyn USubscription>,
291 notifier: Arc<dyn Notifier>,
292 ) -> Result<Self, RegistrationError> {
293 let subscription_change_listener = Arc::new(SubscriptionChangeListener {
297 subscription_change_handlers: RwLock::new(HashMap::new()),
298 });
299 notifier
300 .start_listening(
301 &usubscription::usubscription_uri(usubscription::RESOURCE_ID_SUBSCRIPTION_CHANGE),
302 subscription_change_listener.clone(),
303 )
304 .await?;
305 Ok(InMemorySubscriber {
306 transport,
307 _uri_provider: uri_provider,
308 usubscription,
309 notifier,
310 subscription_change_listener,
311 })
312 }
313
314 pub async fn stop(&self) -> Result<(), RegistrationError> {
322 self.notifier
323 .stop_listening(
324 &usubscription::usubscription_uri(usubscription::RESOURCE_ID_SUBSCRIPTION_CHANGE),
325 self.subscription_change_listener.clone(),
326 )
327 .await
328 .and_then(|_ok| self.subscription_change_listener.clear())
329 }
330
331 async fn invoke_subscribe(
332 &self,
333 topic: &UUri,
334 subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
335 ) -> Result<State, RegistrationError> {
336 let subscription_request = SubscriptionRequest {
337 topic: Some(topic.to_owned()).into(),
338 ..Default::default()
339 };
340 match self.usubscription.subscribe(subscription_request).await {
341 Ok(response) => match response.status.state.enum_value() {
342 Ok(state) if state == State::SUBSCRIBED || state == State::SUBSCRIBE_PENDING => {
343 if let Some(handler) = subscription_change_handler.clone() {
344 self.subscription_change_listener
345 .add_handler(topic.to_owned(), handler)?;
346 }
347 Ok(state)
348 }
349 _ => {
350 debug!(topic = %topic, "failed to subscribe to topic: {}", response.status.message);
351 Err(RegistrationError::Unknown(UStatus::fail_with_code(
352 crate::UCode::FAILED_PRECONDITION,
353 response.status.message.to_owned(),
354 )))
355 }
356 },
357 Err(e) => {
358 info!(topic = %topic, "error invoking USubscription service: {}", e);
359 Err(RegistrationError::Unknown(UStatus::fail_with_code(
360 crate::UCode::INTERNAL,
361 "failed to invoke USubscription service",
362 )))
363 }
364 }
365 }
366
367 async fn invoke_unsubscribe(&self, topic: &UUri) -> Result<(), RegistrationError> {
368 let request = UnsubscribeRequest {
369 ..Default::default()
370 };
371 self.usubscription
372 .unsubscribe(request)
373 .await
374 .map(|_| {
375 let _ = self.subscription_change_listener.remove_handler(topic);
376 })
377 .map_err(|e| {
378 info!(topic = %topic, "error invoking USubscription service: {}", e);
379 RegistrationError::Unknown(UStatus::fail_with_code(
380 crate::UCode::INTERNAL,
381 "failed to invoke USubscription service",
382 ))
383 })
384 }
385
386 #[cfg(test)]
387 fn add_subscription_change_handler(
388 &self,
389 topic: &UUri,
390 subscription_change_handler: Arc<dyn SubscriptionChangeHandler>,
391 ) -> Result<(), RegistrationError> {
392 self.subscription_change_listener
393 .add_handler(topic.to_owned(), subscription_change_handler)
394 }
395
396 #[cfg(test)]
397 fn has_subscription_change_handler(&self, topic: &UUri) -> bool {
398 self.subscription_change_listener.has_handler(topic)
399 }
400}
401
402#[async_trait]
403impl Subscriber for InMemorySubscriber {
404 async fn subscribe(
405 &self,
406 topic_filter: &UUri,
407 handler: Arc<dyn UListener>,
408 subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
409 ) -> Result<(), RegistrationError> {
410 self.invoke_subscribe(topic_filter, subscription_change_handler)
411 .await?;
412 self.transport
413 .register_listener(topic_filter, None, handler.clone())
414 .await
415 .map_err(RegistrationError::from)
423 }
424
425 async fn unsubscribe(
426 &self,
427 topic: &UUri,
428 listener: Arc<dyn UListener>,
429 ) -> Result<(), RegistrationError> {
430 self.invoke_unsubscribe(topic).await?;
431 self.transport
432 .unregister_listener(topic, None, listener)
433 .await
434 .map_err(RegistrationError::from)
442 }
443}
444
445#[cfg(test)]
446mod tests {
447
448 use super::*;
451
452 use mockall::Sequence;
453 use protobuf::well_known_types::wrappers::StringValue;
454 use usubscription::{MockUSubscription, SubscriptionResponse, SubscriptionStatus};
455
456 use crate::{
457 communication::{notification::MockNotifier, pubsub::MockSubscriptionChangeHandler},
458 utransport::{MockTransport, MockUListener},
459 StaticUriProvider, UAttributes, UCode, UMessageType, UPriority, UStatus, UUri, UUID,
460 };
461
462 fn new_uri_provider() -> Arc<dyn LocalUriProvider> {
463 Arc::new(StaticUriProvider::new("", 0x0005, 0x02))
464 }
465
466 fn succeeding_notifier() -> Arc<dyn Notifier> {
467 let mut notifier = MockNotifier::new();
468 notifier
469 .expect_start_listening()
470 .once()
471 .return_const(Ok(()));
472 Arc::new(notifier)
473 }
474
475 #[tokio::test]
476 async fn test_publish_fails_for_invalid_topic() {
477 let uri_provider = new_uri_provider();
479 let mut transport = MockTransport::new();
480
481 transport.expect_do_send().never();
482 let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
483
484 let options = CallOptions::for_publish(None, None, None);
486 let publish_result = publisher
487 .publish(0x1000, options, None)
489 .await;
490
491 assert!(publish_result.is_err_and(|e| matches!(e, PubSubError::InvalidArgument(_msg))));
493 }
494
495 #[tokio::test]
496 async fn test_publish_fails_with_transport_error() {
497 let message_id = UUID::build();
498 let uri_provider = new_uri_provider();
500 let mut transport = MockTransport::new();
501 let expected_message_id = message_id.clone();
503 transport
504 .expect_do_send()
505 .once()
506 .withf(move |msg| msg.id_unchecked() == &expected_message_id)
507 .returning(|_msg| {
508 Err(UStatus::fail_with_code(
509 UCode::UNAVAILABLE,
510 "transport not available",
511 ))
512 });
513 let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
514
515 let options = CallOptions::for_publish(None, Some(message_id), None);
517 let publish_result = publisher.publish(0x9A00, options, None).await;
518
519 assert!(publish_result.is_err_and(|e| matches!(e, PubSubError::PublishError(_status))));
521 }
522
523 #[tokio::test]
524 async fn test_publish_succeeds() {
525 let uri_provider = new_uri_provider();
527 let mut transport = MockTransport::new();
528 let message_id = UUID::build();
529 let expected_message_id = message_id.clone();
530
531 transport
532 .expect_do_send()
533 .once()
534 .withf(move |message| {
535 let Ok(payload) = message.extract_protobuf::<StringValue>() else {
536 return false;
537 };
538 payload.value == *"Hello"
539 && message.is_publish()
540 && message.id_unchecked() == &expected_message_id
541 && message.priority_unchecked() == UPriority::UPRIORITY_CS3
542 && message.ttl_unchecked() == 5_000
543 })
544 .returning(|_msg| Ok(()));
545
546 let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
547
548 let call_options = CallOptions::for_publish(
550 Some(5_000),
551 Some(message_id.clone()),
552 Some(crate::UPriority::UPRIORITY_CS3),
553 );
554 let payload = StringValue {
555 value: "Hello".to_string(),
556 ..Default::default()
557 };
558 let publish_result = publisher
559 .publish(
560 0x9A00,
561 call_options,
562 Some(
563 UPayload::try_from_protobuf(payload)
564 .expect("should have been able to create message payload"),
565 ),
566 )
567 .await;
568
569 assert!(publish_result.is_ok());
571 }
572
573 #[tokio::test]
574 async fn test_subscriber_creation_fails_when_notifier_fails_to_register_listener() {
575 let mut notifier = MockNotifier::new();
577 notifier
579 .expect_start_listening()
580 .once()
581 .return_const(Err(RegistrationError::Unknown(UStatus::fail_with_code(
582 UCode::UNAVAILABLE,
583 "not available",
584 ))));
585
586 let creation_attempt = InMemorySubscriber::for_clients(
588 Arc::new(MockTransport::new()),
589 new_uri_provider(),
590 Arc::new(MockUSubscription::new()),
591 Arc::new(notifier),
592 )
593 .await;
594 assert!(creation_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
596 }
597
598 #[tokio::test]
599 async fn test_subscriber_stop_succeeds() {
600 let mut notifier = MockNotifier::new();
602 notifier.expect_stop_listening().once().return_const(Ok(()));
604
605 let subscription_change_listener = Arc::new(SubscriptionChangeListener::default());
607 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
608 let handler = Arc::new(MockSubscriptionChangeHandler::new());
609 subscription_change_listener
610 .add_handler(topic.clone(), handler)
611 .expect("adding a handler should have succeeded");
612
613 let subscriber = InMemorySubscriber {
614 transport: Arc::new(MockTransport::new()),
615 _uri_provider: new_uri_provider(),
616 usubscription: Arc::new(MockUSubscription::new()),
617 notifier: Arc::new(notifier),
618 subscription_change_listener,
619 };
620
621 let stop_attempt = subscriber.stop().await;
623
624 assert!(stop_attempt.is_ok_and(|_| {
626 !subscriber.has_subscription_change_handler(&topic)
628 }));
629 }
630
631 #[tokio::test]
632 async fn test_subscribe_fails_when_usubscription_invocation_fails() {
633 let mut seq = Sequence::new();
635 let mut usubscription_client = MockUSubscription::new();
636 usubscription_client
639 .expect_subscribe()
640 .once()
641 .in_sequence(&mut seq)
642 .return_const(Err(UStatus::fail_with_code(
643 UCode::UNAVAILABLE,
644 "not connected",
645 )));
646 usubscription_client
647 .expect_subscribe()
648 .once()
649 .in_sequence(&mut seq)
650 .return_const({
651 let response = SubscriptionResponse {
652 status: Some(SubscriptionStatus {
653 state: State::UNSUBSCRIBED.into(),
654 message: "unsupported topic".to_string(),
655 ..Default::default()
656 })
657 .into(),
658 ..Default::default()
659 };
660 Ok(response)
661 });
662 usubscription_client
663 .expect_subscribe()
664 .once()
665 .in_sequence(&mut seq)
666 .return_const({
667 let response = SubscriptionResponse {
668 status: Some(SubscriptionStatus {
669 message: "unknown state".to_string(),
670 ..Default::default()
671 })
672 .into(),
673 ..Default::default()
674 };
675 Ok(response)
676 });
677
678 let mut transport = MockTransport::new();
680 transport.expect_do_register_listener().never();
681
682 let subscriber = InMemorySubscriber::for_clients(
684 Arc::new(transport),
685 new_uri_provider(),
686 Arc::new(usubscription_client),
687 succeeding_notifier(),
688 )
689 .await
690 .unwrap();
691
692 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
694 let mut listener = MockUListener::new();
695 listener.expect_on_receive().never();
696 let listener_ref = Arc::new(listener);
697
698 let subscribe_attempt = subscriber
699 .subscribe(&topic, listener_ref.clone(), None)
700 .await;
701
702 assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
704
705 let subscribe_attempt = subscriber
706 .subscribe(&topic, listener_ref.clone(), None)
707 .await;
708
709 assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
711
712 let subscribe_attempt = subscriber
713 .subscribe(&topic, listener_ref.clone(), None)
714 .await;
715
716 assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
718 }
719
720 #[tokio::test]
721 async fn test_repeated_subscribe_fails_for_different_subscription_change_handlers() {
722 let mut usubscription_client = MockUSubscription::new();
724 usubscription_client
726 .expect_subscribe()
727 .times(2)
728 .returning(|request| {
729 let response = SubscriptionResponse {
730 topic: request.topic.clone(),
731 status: Some(SubscriptionStatus {
732 state: State::SUBSCRIBED.into(),
733 ..Default::default()
734 })
735 .into(),
736 ..Default::default()
737 };
738 Ok(response)
739 });
740
741 let mut transport = MockTransport::new();
743 transport
745 .expect_do_register_listener()
746 .once()
747 .return_const(Err(UStatus::fail_with_code(
748 UCode::UNAVAILABLE,
749 "not connected",
750 )));
751
752 let subscriber = InMemorySubscriber::for_clients(
754 Arc::new(transport),
755 new_uri_provider(),
756 Arc::new(usubscription_client),
757 succeeding_notifier(),
758 )
759 .await
760 .unwrap();
761
762 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
764 let listener = Arc::new(MockUListener::new());
765 let subscribe_attempt = subscriber
766 .subscribe(
767 &topic,
768 listener.clone(),
769 Some(Arc::new(MockSubscriptionChangeHandler::new())),
770 )
771 .await;
772
773 assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
775
776 let subscribe_attempt = subscriber
778 .subscribe(
779 &topic,
780 listener.clone(),
781 Some(Arc::new(MockSubscriptionChangeHandler::new())),
782 )
783 .await;
784 assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::AlreadyExists)));
786 }
787
788 #[tokio::test]
789 async fn test_subscribe_succeeds_on_second_attempt() {
790 let (captured_listener_tx, captured_listener_rx) = std::sync::mpsc::channel();
791
792 let mut usubscription_client = MockUSubscription::new();
794 usubscription_client
796 .expect_subscribe()
797 .times(2)
798 .returning(|request| {
799 let response = SubscriptionResponse {
800 topic: request.topic.clone(),
801 status: Some(SubscriptionStatus {
802 state: State::SUBSCRIBED.into(),
803 ..Default::default()
804 })
805 .into(),
806 ..Default::default()
807 };
808 Ok(response)
809 });
810
811 let mut transport = MockTransport::new();
813 let mut seq = Sequence::new();
814 transport
816 .expect_do_register_listener()
817 .once()
818 .in_sequence(&mut seq)
819 .return_const(Err(UStatus::fail_with_code(
820 UCode::UNAVAILABLE,
821 "not connected",
822 )));
823 transport
825 .expect_do_register_listener()
826 .once()
827 .in_sequence(&mut seq)
828 .returning(move |_source_filter, _sink_filter, listener| {
829 captured_listener_tx
830 .send(listener)
831 .map_err(|_e| UStatus::fail("cannot capture listener"))
832 });
833
834 let subscriber = InMemorySubscriber::for_clients(
836 Arc::new(transport),
837 new_uri_provider(),
838 Arc::new(usubscription_client),
839 succeeding_notifier(),
840 )
841 .await
842 .unwrap();
843
844 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
846 let mut mock_listener = MockUListener::new();
847 mock_listener.expect_on_receive().once().return_const(());
848 let listener = Arc::new(mock_listener);
849 let handler = Arc::new(MockSubscriptionChangeHandler::new());
850 let subscribe_attempt = subscriber
851 .subscribe(&topic, listener.clone(), Some(handler.clone()))
852 .await;
853
854 assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
856
857 let subscribe_attempt = subscriber
858 .subscribe(&topic, listener.clone(), Some(handler.clone()))
859 .await;
860
861 assert!(subscribe_attempt.is_ok());
863
864 let event = UMessageBuilder::publish(topic).build().unwrap();
866 let captured_listener = captured_listener_rx.recv().unwrap().to_owned();
867 captured_listener.on_receive(event).await;
868 }
869
870 #[tokio::test]
871 async fn test_unsubscribe_fails_for_unknown_listener() {
872 let mut usubscription_client = MockUSubscription::new();
874 usubscription_client
876 .expect_unsubscribe()
877 .once()
878 .return_const(Ok(()));
879
880 let mut transport = MockTransport::new();
882 transport
884 .expect_do_unregister_listener()
885 .once()
886 .return_const(Err(UStatus::fail_with_code(
887 UCode::NOT_FOUND,
888 "no such listener",
889 )));
890
891 let subscriber = InMemorySubscriber::for_clients(
893 Arc::new(transport),
894 new_uri_provider(),
895 Arc::new(usubscription_client),
896 succeeding_notifier(),
897 )
898 .await
899 .unwrap();
900
901 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
903 let listener = Arc::new(MockUListener::new());
904 let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
905
906 assert!(unsubscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::NoSuchListener)));
908 }
909
910 #[tokio::test]
911 async fn test_unsubscribe_fails_if_usubscription_invocation_fails() {
912 let mut usubscription_client = MockUSubscription::new();
914 usubscription_client
916 .expect_unsubscribe()
917 .once()
918 .return_const(Err(UStatus::fail_with_code(UCode::UNAVAILABLE, "unknown")));
919
920 let mut transport = MockTransport::new();
922 transport
924 .expect_do_unregister_listener()
925 .never()
926 .return_const(Ok(()));
927
928 let subscriber = InMemorySubscriber::for_clients(
930 Arc::new(transport),
931 new_uri_provider(),
932 Arc::new(usubscription_client),
933 succeeding_notifier(),
934 )
935 .await
936 .unwrap();
937 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
939 let handler = MockSubscriptionChangeHandler::new();
940 subscriber
941 .add_subscription_change_handler(&topic, Arc::new(handler))
942 .expect("should be able to add handler");
943 assert!(subscriber.has_subscription_change_handler(&topic));
944
945 let listener = Arc::new(MockUListener::new());
947 let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener).await;
948
949 assert!(unsubscribe_attempt.is_err_and(|e| {
951 matches!(e, RegistrationError::Unknown(_))
952 && subscriber.has_subscription_change_handler(&topic)
954 }));
955 }
956
957 #[tokio::test]
958 async fn test_unsubscribe_succeeds() {
959 let mut usubscription_client = MockUSubscription::new();
961 usubscription_client
963 .expect_unsubscribe()
964 .once()
965 .return_const(Ok(()));
966
967 let mut transport = MockTransport::new();
969 transport
971 .expect_do_unregister_listener()
972 .once()
973 .return_const(Ok(()));
974
975 let subscriber = InMemorySubscriber::for_clients(
977 Arc::new(transport),
978 new_uri_provider(),
979 Arc::new(usubscription_client),
980 succeeding_notifier(),
981 )
982 .await
983 .unwrap();
984 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
986 let handler = MockSubscriptionChangeHandler::new();
987 subscriber
988 .add_subscription_change_handler(&topic, Arc::new(handler))
989 .expect("should be able to add handler");
990 assert!(subscriber.has_subscription_change_handler(&topic));
991
992 let listener = Arc::new(MockUListener::new());
994 let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
995
996 assert!(
998 unsubscribe_attempt.is_ok_and(|_| !subscriber.has_subscription_change_handler(&topic))
999 );
1000 }
1001
1002 #[tokio::test]
1003 async fn test_unsubscribe_succeeds_on_second_attempt() {
1004 let mut usubscription_client = MockUSubscription::new();
1006 usubscription_client
1008 .expect_unsubscribe()
1009 .times(2)
1010 .return_const(Ok(()));
1011
1012 let mut transport = MockTransport::new();
1014 let mut seq = Sequence::new();
1015 transport
1017 .expect_do_unregister_listener()
1018 .once()
1019 .in_sequence(&mut seq)
1020 .return_const(Err(UStatus::fail_with_code(
1021 UCode::UNAVAILABLE,
1022 "not connected",
1023 )));
1024 transport
1026 .expect_do_unregister_listener()
1027 .once()
1028 .in_sequence(&mut seq)
1029 .return_const(Ok(()));
1030
1031 let subscriber = InMemorySubscriber::for_clients(
1033 Arc::new(transport),
1034 new_uri_provider(),
1035 Arc::new(usubscription_client),
1036 succeeding_notifier(),
1037 )
1038 .await
1039 .unwrap();
1040 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
1042 let handler = MockSubscriptionChangeHandler::new();
1043 subscriber
1044 .add_subscription_change_handler(&topic, Arc::new(handler))
1045 .expect("should be able to add handler");
1046 assert!(subscriber.has_subscription_change_handler(&topic));
1047
1048 let listener = Arc::new(MockUListener::new());
1050 let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
1051
1052 assert!(unsubscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
1054
1055 let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener).await;
1056
1057 assert!(unsubscribe_attempt.is_ok_and(|_| {
1059 !subscriber.has_subscription_change_handler(&topic)
1061 }));
1062 }
1063
1064 fn message_with_wrong_type(msg_type: UMessageType) -> UMessage {
1065 let attributes = UAttributes {
1066 type_: msg_type.into(),
1067 ..Default::default()
1068 };
1069 UMessage {
1070 attributes: Some(attributes).into(),
1071 ..Default::default()
1072 }
1073 }
1074
1075 fn notification_with_wrong_payload() -> UMessage {
1076 let payload = UPayload::try_from_protobuf(StringValue::new())
1077 .expect("should have been able to create protobuf");
1078 let attributes = UAttributes {
1079 type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
1080 payload_format: payload.payload_format().into(),
1081 ..Default::default()
1082 };
1083 UMessage {
1084 attributes: Some(attributes).into(),
1085 payload: Some(payload.payload()),
1086 ..Default::default()
1087 }
1088 }
1089
1090 fn status_update_without_topic() -> UMessage {
1091 let status = SubscriptionStatus {
1092 state: State::SUBSCRIBED.into(),
1093 ..Default::default()
1094 };
1095 let update = Update {
1096 status: Some(status).into(),
1097 ..Default::default()
1098 };
1099 let payload =
1100 UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
1101 let attributes = UAttributes {
1102 type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
1103 payload_format: payload.payload_format().into(),
1104 ..Default::default()
1105 };
1106
1107 UMessage {
1108 attributes: Some(attributes).into(),
1109 payload: Some(payload.payload()),
1110 ..Default::default()
1111 }
1112 }
1113
1114 fn status_update_without_status() -> UMessage {
1115 let update = Update {
1116 topic: Some(UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap()).into(),
1117 ..Default::default()
1118 };
1119 let payload =
1120 UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
1121 let attributes = UAttributes {
1122 type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
1123 payload_format: payload.payload_format().into(),
1124 ..Default::default()
1125 };
1126
1127 UMessage {
1128 attributes: Some(attributes).into(),
1129 payload: Some(payload.payload()),
1130 ..Default::default()
1131 }
1132 }
1133
1134 #[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_PUBLISH); "Publish messages")]
1135 #[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_REQUEST); "Request messages")]
1136 #[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_RESPONSE); "Response messages")]
1137 #[test_case::test_case(notification_with_wrong_payload(); "wrong payload")]
1138 #[test_case::test_case(status_update_without_topic(); "status without topic")]
1139 #[test_case::test_case(status_update_without_status(); "update without status")]
1140 #[tokio::test]
1141 async fn test_subscription_change_listener_ignores(notification: UMessage) {
1142 let listener = SubscriptionChangeListener::default();
1143
1144 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
1145 let mut handler = MockSubscriptionChangeHandler::new();
1146 handler.expect_on_subscription_change().never();
1147
1148 listener
1149 .add_handler(topic.clone(), Arc::new(handler))
1150 .expect("should have been able to register listener");
1151 listener.on_receive(notification).await;
1152 }
1153
1154 #[tokio::test]
1155 async fn test_subscription_change_listener_invokes_handler_for_subscribed_topic() {
1156 let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
1157 let status = SubscriptionStatus {
1158 state: State::SUBSCRIBED.into(),
1159 ..Default::default()
1160 };
1161 let update = Update {
1162 topic: Some(topic.clone()).into(),
1163 status: Some(status.clone()).into(),
1164 ..Default::default()
1165 };
1166 let payload =
1167 UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
1168 let attributes = UAttributes {
1169 type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
1170 payload_format: payload.payload_format().into(),
1171 ..Default::default()
1172 };
1173
1174 let notification = UMessage {
1175 attributes: Some(attributes).into(),
1176 payload: Some(payload.payload()),
1177 ..Default::default()
1178 };
1179
1180 let expected_topic = topic.clone();
1181 let mut handler = MockSubscriptionChangeHandler::new();
1182 handler
1183 .expect_on_subscription_change()
1184 .once()
1185 .withf(move |topic, updated_status| {
1186 topic == &expected_topic && updated_status == &status
1187 })
1188 .return_const(());
1189
1190 let listener = SubscriptionChangeListener::default();
1191 listener
1192 .add_handler(topic, Arc::new(handler))
1193 .expect("should have been able to register listener");
1194
1195 listener.on_receive(notification).await;
1196 }
1197}