up_rust/communication/
default_pubsub.rs

1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14// [impl->dsn~communication-layer-impl-default~1]
15
16use std::{
17    collections::{hash_map::Entry, HashMap},
18    ops::Deref,
19    sync::{Arc, RwLock},
20};
21
22use async_trait::async_trait;
23use tracing::{debug, info};
24
25use crate::{
26    core::usubscription::{
27        self, State, SubscriptionRequest, USubscription, UnsubscribeRequest, Update,
28    },
29    LocalUriProvider, UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri,
30};
31
32use super::{
33    apply_common_options, build_message, pubsub::SubscriptionChangeHandler, CallOptions,
34    InMemoryRpcClient, Notifier, PubSubError, Publisher, RegistrationError, RpcClientUSubscription,
35    SimpleNotifier, Subscriber, UPayload,
36};
37
38#[derive(Clone)]
39struct ComparableSubscriptionChangeHandler {
40    inner: Arc<dyn SubscriptionChangeHandler>,
41}
42
43impl ComparableSubscriptionChangeHandler {
44    fn new(handler: Arc<dyn SubscriptionChangeHandler>) -> Self {
45        ComparableSubscriptionChangeHandler {
46            inner: handler.clone(),
47        }
48    }
49}
50
51impl Deref for ComparableSubscriptionChangeHandler {
52    type Target = dyn SubscriptionChangeHandler;
53
54    fn deref(&self) -> &Self::Target {
55        &*self.inner
56    }
57}
58
59impl PartialEq for ComparableSubscriptionChangeHandler {
60    /// Compares this handler to another handler.
61    ///
62    /// # Returns
63    ///
64    /// `true` if the pointer to the handler held by `self` is equal to the pointer held by `other`.
65    /// This is consistent with the implementation of [`ComparableSubscriptionChangeHandler::hash`].
66    fn eq(&self, other: &Self) -> bool {
67        Arc::ptr_eq(&self.inner, &other.inner)
68    }
69}
70
71impl Eq for ComparableSubscriptionChangeHandler {}
72
73#[derive(Default)]
74struct SubscriptionChangeListener {
75    subscription_change_handlers: RwLock<HashMap<UUri, ComparableSubscriptionChangeHandler>>,
76}
77
78impl SubscriptionChangeListener {
79    /// Adds a handler for a given topic.
80    ///
81    /// # Errors
82    ///
83    /// Returns a [`RegistrationError::AlreadyExists`] if another handler has already been registered for
84    /// the given topic. Returns a [`RegistrationError::Unknown`] if the internal state could not be accessed,
85    fn add_handler(
86        &self,
87        topic: UUri,
88        subscription_change_handler: Arc<dyn SubscriptionChangeHandler>,
89    ) -> Result<(), RegistrationError> {
90        let Ok(mut handlers) = self.subscription_change_handlers.write() else {
91            return Err(RegistrationError::Unknown(UStatus::fail_with_code(
92                crate::UCode::INTERNAL,
93                "failed to acquire write lock for handler map",
94            )));
95        };
96        let handler_to_add = ComparableSubscriptionChangeHandler::new(subscription_change_handler);
97        match handlers.entry(topic) {
98            Entry::Vacant(entry) => {
99                entry.insert(handler_to_add);
100                Ok(())
101            }
102            Entry::Occupied(entry) => {
103                if entry.get() == &handler_to_add {
104                    Ok(())
105                } else {
106                    Err(RegistrationError::AlreadyExists)
107                }
108            }
109        }
110    }
111
112    /// Removes the handler for a given topic.
113    ///
114    /// This function also succeeds if no handler is registered for the topic.
115    ///
116    /// # Errors
117    ///
118    /// Returns a [`RegistrationError::Unknown`] if the internal state could not be accessed,
119    fn remove_handler(&self, topic: &UUri) -> Result<(), RegistrationError> {
120        self.subscription_change_handlers
121            .write()
122            .map_err(|_e| {
123                RegistrationError::Unknown(UStatus::fail_with_code(
124                    crate::UCode::INTERNAL,
125                    "failed to acquire write lock for handler map",
126                ))
127            })
128            .map(|mut handlers| {
129                handlers.remove(topic);
130            })
131    }
132
133    /// Removes all handlers for all topic.
134    ///
135    /// # Errors
136    ///
137    /// Returns a [`RegistrationError::Unknown`] if the internal state could not be accessed,
138    fn clear(&self) -> Result<(), RegistrationError> {
139        self.subscription_change_handlers
140            .write()
141            .map_err(|_e| {
142                RegistrationError::Unknown(UStatus::fail_with_code(
143                    crate::UCode::INTERNAL,
144                    "failed to acquire write lock for handler map",
145                ))
146            })
147            .map(|mut handlers| {
148                handlers.clear();
149            })
150    }
151
152    #[cfg(test)]
153    fn has_handler(&self, topic: &UUri) -> bool {
154        self.subscription_change_handlers
155            .read()
156            .is_ok_and(|handlers| handlers.contains_key(topic))
157    }
158}
159
160#[async_trait]
161impl UListener for SubscriptionChangeListener {
162    async fn on_receive(&self, msg: UMessage) {
163        if !msg.is_notification() {
164            return;
165        }
166        let Ok(subscription_update) = msg.extract_protobuf::<Update>() else {
167            debug!("ignoring notification that does not contain subscription update");
168            return;
169        };
170        let Some(topic) = subscription_update.topic.as_ref() else {
171            return;
172        };
173        let Some(status) = subscription_update.status.as_ref() else {
174            return;
175        };
176
177        let Ok(handlers) = self.subscription_change_handlers.read() else {
178            return;
179        };
180        if let Some(handler) = handlers.get(topic) {
181            handler.on_subscription_change(topic.to_owned(), status.to_owned());
182        }
183    }
184}
185
186/// A [`Publisher`] that uses the uProtocol Transport Layer API for publishing events to topics.
187pub struct SimplePublisher {
188    transport: Arc<dyn UTransport>,
189    uri_provider: Arc<dyn LocalUriProvider>,
190}
191
192impl SimplePublisher {
193    /// Creates a new client.
194    ///
195    /// # Arguments
196    ///
197    /// * `transport` - The transport to use for sending messages.
198    /// * `uri_provider` - The service to use for creating the event messages' _sink_ address.
199    pub fn new(transport: Arc<dyn UTransport>, uri_provider: Arc<dyn LocalUriProvider>) -> Self {
200        SimplePublisher {
201            transport,
202            uri_provider,
203        }
204    }
205}
206
207#[async_trait]
208impl Publisher for SimplePublisher {
209    async fn publish(
210        &self,
211        resource_id: u16,
212        call_options: CallOptions,
213        payload: Option<UPayload>,
214    ) -> Result<(), PubSubError> {
215        let mut builder = UMessageBuilder::publish(self.uri_provider.get_resource_uri(resource_id));
216        apply_common_options(call_options, &mut builder);
217        match build_message(&mut builder, payload) {
218            Ok(publish_message) => self
219                .transport
220                .send(publish_message)
221                .await
222                .map_err(PubSubError::PublishError),
223            Err(e) => Err(PubSubError::InvalidArgument(format!(
224                "failed to create Publish message from parameters: {e}"
225            ))),
226        }
227    }
228}
229
230/// A [`Subscriber`] which keeps all information about registered subscription change handlers in memory.
231///
232/// The subscriber requires a (client) implementation of [`USubscription`] in order to inform the local
233/// USubscription service about newly subscribed and unsubscribed topics. It also needs a [`Notifier`]
234/// for receiving notifications about subscription status updates from the local USubscription service.
235/// Finally, it needs a [`UTransport`] for receiving events that have been published to subscribed topics.
236///
237/// During [startup](`Self::for_clients`) the subscriber uses the Notifier to register a generic [`UListener`]
238/// for receiving notifications from the USubscription service. The listener maintains an in-memory mapping
239/// of subscribed topics to corresponding subscription change handlers.
240///
241/// When a client [`subscribes to a topic`](Self::subscribe), the local USubscription service is informed
242/// about the new subscription and a (client provided) subscription change handler is registered with the
243/// listener. When a subscription change notification arrives from the USubscription service, the corresponding
244/// handler is being looked up and invoked.
245pub struct InMemorySubscriber {
246    transport: Arc<dyn UTransport>,
247    _uri_provider: Arc<dyn LocalUriProvider>,
248    usubscription: Arc<dyn USubscription>,
249    notifier: Arc<dyn Notifier>,
250    subscription_change_listener: Arc<SubscriptionChangeListener>,
251}
252
253impl InMemorySubscriber {
254    /// Creates a new Subscriber for a given transport.
255    ///
256    /// The subscriber keeps track of subscription change handlers in memory only.
257    /// This function uses the given transport to create an [`RpcClientUSubscription`] and a [`SimpleNotifier`]
258    /// and then delegate to [`Self::for_clients`] to create the Subscriber.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the Notifier cannot register a listener for notifications from the USubscription service.
263    pub async fn new(
264        transport: Arc<dyn UTransport>,
265        uri_provider: Arc<dyn LocalUriProvider>,
266    ) -> Result<Self, RegistrationError> {
267        let rpc_client = InMemoryRpcClient::new(transport.clone(), uri_provider.clone())
268            .await
269            .map(Arc::new)?;
270        let usubscription_client = Arc::new(RpcClientUSubscription::new(rpc_client));
271        let notifier = Arc::new(SimpleNotifier::new(transport.clone(), uri_provider.clone()));
272        Self::for_clients(transport, uri_provider, usubscription_client, notifier).await
273    }
274
275    /// Creates a new Subscriber for given clients.
276    ///
277    /// # Arguments
278    ///
279    /// * `transport` - The transport to use for registering the event listeners for subscribed topics.
280    /// * `uri-provider` - The service to use for creating topic addresses.
281    /// * `usubscription` - The client to use for interacting with the (local) USubscription service.
282    /// * `notifier` - The client to use for registering the listener for subscription updates from USubscription.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the Notifier cannot register a listener for notifications from the USubscription service.
287    pub async fn for_clients(
288        transport: Arc<dyn UTransport>,
289        uri_provider: Arc<dyn LocalUriProvider>,
290        usubscription: Arc<dyn USubscription>,
291        notifier: Arc<dyn Notifier>,
292    ) -> Result<Self, RegistrationError> {
293        // register a generic listener for subscription updates
294        // whenever a uE later tries to subscribe to a topic, it can provide an optional callback for
295        // handling subscription updates for the topic it tries to subscribe to
296        let subscription_change_listener = Arc::new(SubscriptionChangeListener {
297            subscription_change_handlers: RwLock::new(HashMap::new()),
298        });
299        notifier
300            .start_listening(
301                &usubscription::usubscription_uri(usubscription::RESOURCE_ID_SUBSCRIPTION_CHANGE),
302                subscription_change_listener.clone(),
303            )
304            .await?;
305        Ok(InMemorySubscriber {
306            transport,
307            _uri_provider: uri_provider,
308            usubscription,
309            notifier,
310            subscription_change_listener,
311        })
312    }
313
314    /// Stops this client.
315    ///
316    /// Clears all internal state and deregisters the listener for subscription updates from the USubscription service.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the listener could not be unregistered. In this case the internal state remains intact.
321    pub async fn stop(&self) -> Result<(), RegistrationError> {
322        self.notifier
323            .stop_listening(
324                &usubscription::usubscription_uri(usubscription::RESOURCE_ID_SUBSCRIPTION_CHANGE),
325                self.subscription_change_listener.clone(),
326            )
327            .await
328            .and_then(|_ok| self.subscription_change_listener.clear())
329    }
330
331    async fn invoke_subscribe(
332        &self,
333        topic: &UUri,
334        subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
335    ) -> Result<State, RegistrationError> {
336        let subscription_request = SubscriptionRequest {
337            topic: Some(topic.to_owned()).into(),
338            ..Default::default()
339        };
340        match self.usubscription.subscribe(subscription_request).await {
341            Ok(response) => match response.status.state.enum_value() {
342                Ok(state) if state == State::SUBSCRIBED || state == State::SUBSCRIBE_PENDING => {
343                    if let Some(handler) = subscription_change_handler.clone() {
344                        self.subscription_change_listener
345                            .add_handler(topic.to_owned(), handler)?;
346                    }
347                    Ok(state)
348                }
349                _ => {
350                    debug!(topic = %topic, "failed to subscribe to topic: {}", response.status.message);
351                    Err(RegistrationError::Unknown(UStatus::fail_with_code(
352                        crate::UCode::FAILED_PRECONDITION,
353                        response.status.message.to_owned(),
354                    )))
355                }
356            },
357            Err(e) => {
358                info!(topic = %topic, "error invoking USubscription service: {}", e);
359                Err(RegistrationError::Unknown(UStatus::fail_with_code(
360                    crate::UCode::INTERNAL,
361                    "failed to invoke USubscription service",
362                )))
363            }
364        }
365    }
366
367    async fn invoke_unsubscribe(&self, topic: &UUri) -> Result<(), RegistrationError> {
368        let request = UnsubscribeRequest {
369            ..Default::default()
370        };
371        self.usubscription
372            .unsubscribe(request)
373            .await
374            .map(|_| {
375                let _ = self.subscription_change_listener.remove_handler(topic);
376            })
377            .map_err(|e| {
378                info!(topic = %topic, "error invoking USubscription service: {}", e);
379                RegistrationError::Unknown(UStatus::fail_with_code(
380                    crate::UCode::INTERNAL,
381                    "failed to invoke USubscription service",
382                ))
383            })
384    }
385
386    #[cfg(test)]
387    fn add_subscription_change_handler(
388        &self,
389        topic: &UUri,
390        subscription_change_handler: Arc<dyn SubscriptionChangeHandler>,
391    ) -> Result<(), RegistrationError> {
392        self.subscription_change_listener
393            .add_handler(topic.to_owned(), subscription_change_handler)
394    }
395
396    #[cfg(test)]
397    fn has_subscription_change_handler(&self, topic: &UUri) -> bool {
398        self.subscription_change_listener.has_handler(topic)
399    }
400}
401
402#[async_trait]
403impl Subscriber for InMemorySubscriber {
404    async fn subscribe(
405        &self,
406        topic_filter: &UUri,
407        handler: Arc<dyn UListener>,
408        subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
409    ) -> Result<(), RegistrationError> {
410        self.invoke_subscribe(topic_filter, subscription_change_handler)
411            .await?;
412        self.transport
413            .register_listener(topic_filter, None, handler.clone())
414            .await
415            // When this fails, we have ended up in a situation where we
416            // have successfully (logically) subscribed to the topic via the USubscription service
417            // but we have not been able to register the listener with the local transport.
418            // This means that events might start getting forwarded to the local authority which
419            // are not being consumed. Apart from this inefficiency, this does not pose a real
420            // problem and since we return an err, the client might be inclined to try
421            // again and (eventually) succeed in registering the listener as well.
422            .map_err(RegistrationError::from)
423    }
424
425    async fn unsubscribe(
426        &self,
427        topic: &UUri,
428        listener: Arc<dyn UListener>,
429    ) -> Result<(), RegistrationError> {
430        self.invoke_unsubscribe(topic).await?;
431        self.transport
432            .unregister_listener(topic, None, listener)
433            .await
434            // When this fails, we have ended up in a situation where we
435            // have successfully (logically) unsubscribed from the topic via the USubscription service
436            // but we have not been able to unregister the listener from the local transport.
437            // This means that events originating from entities connected to a different transport
438            // may no longer get forwarded to the local transport, resulting in the (still registered)
439            // listener not being invoked for these events. We therefore return an error which should
440            // trigger the client to try again and (eventually) succeed in unregistering the listener as well.
441            .map_err(RegistrationError::from)
442    }
443}
444
445#[cfg(test)]
446mod tests {
447
448    // [utest->dsn~communication-layer-impl-default~1]
449
450    use super::*;
451
452    use mockall::Sequence;
453    use protobuf::well_known_types::wrappers::StringValue;
454    use usubscription::{MockUSubscription, SubscriptionResponse, SubscriptionStatus};
455
456    use crate::{
457        communication::{notification::MockNotifier, pubsub::MockSubscriptionChangeHandler},
458        utransport::{MockTransport, MockUListener},
459        StaticUriProvider, UAttributes, UCode, UMessageType, UPriority, UStatus, UUri, UUID,
460    };
461
462    fn new_uri_provider() -> Arc<dyn LocalUriProvider> {
463        Arc::new(StaticUriProvider::new("", 0x0005, 0x02))
464    }
465
466    fn succeeding_notifier() -> Arc<dyn Notifier> {
467        let mut notifier = MockNotifier::new();
468        notifier
469            .expect_start_listening()
470            .once()
471            .return_const(Ok(()));
472        Arc::new(notifier)
473    }
474
475    #[tokio::test]
476    async fn test_publish_fails_for_invalid_topic() {
477        // GIVEN a publisher
478        let uri_provider = new_uri_provider();
479        let mut transport = MockTransport::new();
480
481        transport.expect_do_send().never();
482        let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
483
484        // WHEN publishing to an invalid topic
485        let options = CallOptions::for_publish(None, None, None);
486        let publish_result = publisher
487            // resource ID for topic must be >= 0x8000
488            .publish(0x1000, options, None)
489            .await;
490
491        // THEN publishing fails with an InvalidArgument error
492        assert!(publish_result.is_err_and(|e| matches!(e, PubSubError::InvalidArgument(_msg))));
493    }
494
495    #[tokio::test]
496    async fn test_publish_fails_with_transport_error() {
497        let message_id = UUID::build();
498        // GIVEN a publisher
499        let uri_provider = new_uri_provider();
500        let mut transport = MockTransport::new();
501        // that is not connected to the underlying messaging infrastructure
502        let expected_message_id = message_id.clone();
503        transport
504            .expect_do_send()
505            .once()
506            .withf(move |msg| msg.id_unchecked() == &expected_message_id)
507            .returning(|_msg| {
508                Err(UStatus::fail_with_code(
509                    UCode::UNAVAILABLE,
510                    "transport not available",
511                ))
512            });
513        let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
514
515        // WHEN publishing to a valid topic
516        let options = CallOptions::for_publish(None, Some(message_id), None);
517        let publish_result = publisher.publish(0x9A00, options, None).await;
518
519        // THEN publishing fails with a PublishError
520        assert!(publish_result.is_err_and(|e| matches!(e, PubSubError::PublishError(_status))));
521    }
522
523    #[tokio::test]
524    async fn test_publish_succeeds() {
525        // GIVEN a publisher
526        let uri_provider = new_uri_provider();
527        let mut transport = MockTransport::new();
528        let message_id = UUID::build();
529        let expected_message_id = message_id.clone();
530
531        transport
532            .expect_do_send()
533            .once()
534            .withf(move |message| {
535                let Ok(payload) = message.extract_protobuf::<StringValue>() else {
536                    return false;
537                };
538                payload.value == *"Hello"
539                    && message.is_publish()
540                    && message.id_unchecked() == &expected_message_id
541                    && message.priority_unchecked() == UPriority::UPRIORITY_CS3
542                    && message.ttl_unchecked() == 5_000
543            })
544            .returning(|_msg| Ok(()));
545
546        let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
547
548        // WHEN publishing some data to a valid topic
549        let call_options = CallOptions::for_publish(
550            Some(5_000),
551            Some(message_id.clone()),
552            Some(crate::UPriority::UPRIORITY_CS3),
553        );
554        let payload = StringValue {
555            value: "Hello".to_string(),
556            ..Default::default()
557        };
558        let publish_result = publisher
559            .publish(
560                0x9A00,
561                call_options,
562                Some(
563                    UPayload::try_from_protobuf(payload)
564                        .expect("should have been able to create message payload"),
565                ),
566            )
567            .await;
568
569        // THEN a corresponding Publish message has been sent via the transport
570        assert!(publish_result.is_ok());
571    }
572
573    #[tokio::test]
574    async fn test_subscriber_creation_fails_when_notifier_fails_to_register_listener() {
575        // GIVEN a Notifier
576        let mut notifier = MockNotifier::new();
577        // that is not connected to its transport
578        notifier
579            .expect_start_listening()
580            .once()
581            .return_const(Err(RegistrationError::Unknown(UStatus::fail_with_code(
582                UCode::UNAVAILABLE,
583                "not available",
584            ))));
585
586        // WHEN trying to create a Subscriber for this Notifier
587        let creation_attempt = InMemorySubscriber::for_clients(
588            Arc::new(MockTransport::new()),
589            new_uri_provider(),
590            Arc::new(MockUSubscription::new()),
591            Arc::new(notifier),
592        )
593        .await;
594        // THEN creation fails
595        assert!(creation_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
596    }
597
598    #[tokio::test]
599    async fn test_subscriber_stop_succeeds() {
600        // GIVEN a Notifier
601        let mut notifier = MockNotifier::new();
602        // that succeeds to stop listening to notifications
603        notifier.expect_stop_listening().once().return_const(Ok(()));
604
605        // and a Subscriber using this Notifier
606        let subscription_change_listener = Arc::new(SubscriptionChangeListener::default());
607        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
608        let handler = Arc::new(MockSubscriptionChangeHandler::new());
609        subscription_change_listener
610            .add_handler(topic.clone(), handler)
611            .expect("adding a handler should have succeeded");
612
613        let subscriber = InMemorySubscriber {
614            transport: Arc::new(MockTransport::new()),
615            _uri_provider: new_uri_provider(),
616            usubscription: Arc::new(MockUSubscription::new()),
617            notifier: Arc::new(notifier),
618            subscription_change_listener,
619        };
620
621        // WHEN trying to stop the Subscriber
622        let stop_attempt = subscriber.stop().await;
623
624        // THEN the attempt succeeds
625        assert!(stop_attempt.is_ok_and(|_| {
626            // and the subscription change handlers have been cleared
627            !subscriber.has_subscription_change_handler(&topic)
628        }));
629    }
630
631    #[tokio::test]
632    async fn test_subscribe_fails_when_usubscription_invocation_fails() {
633        // GIVEN a USubscription client
634        let mut seq = Sequence::new();
635        let mut usubscription_client = MockUSubscription::new();
636        // that fails to perform subscription
637        // due to different reasons
638        usubscription_client
639            .expect_subscribe()
640            .once()
641            .in_sequence(&mut seq)
642            .return_const(Err(UStatus::fail_with_code(
643                UCode::UNAVAILABLE,
644                "not connected",
645            )));
646        usubscription_client
647            .expect_subscribe()
648            .once()
649            .in_sequence(&mut seq)
650            .return_const({
651                let response = SubscriptionResponse {
652                    status: Some(SubscriptionStatus {
653                        state: State::UNSUBSCRIBED.into(),
654                        message: "unsupported topic".to_string(),
655                        ..Default::default()
656                    })
657                    .into(),
658                    ..Default::default()
659                };
660                Ok(response)
661            });
662        usubscription_client
663            .expect_subscribe()
664            .once()
665            .in_sequence(&mut seq)
666            .return_const({
667                let response = SubscriptionResponse {
668                    status: Some(SubscriptionStatus {
669                        message: "unknown state".to_string(),
670                        ..Default::default()
671                    })
672                    .into(),
673                    ..Default::default()
674                };
675                Ok(response)
676            });
677
678        // and a transport
679        let mut transport = MockTransport::new();
680        transport.expect_do_register_listener().never();
681
682        // and a Subscriber using that USubscription client
683        let subscriber = InMemorySubscriber::for_clients(
684            Arc::new(transport),
685            new_uri_provider(),
686            Arc::new(usubscription_client),
687            succeeding_notifier(),
688        )
689        .await
690        .unwrap();
691
692        // WHEN subscribing to a topic
693        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
694        let mut listener = MockUListener::new();
695        listener.expect_on_receive().never();
696        let listener_ref = Arc::new(listener);
697
698        let subscribe_attempt = subscriber
699            .subscribe(&topic, listener_ref.clone(), None)
700            .await;
701
702        // THEN the first attempt fails
703        assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
704
705        let subscribe_attempt = subscriber
706            .subscribe(&topic, listener_ref.clone(), None)
707            .await;
708
709        // and the second attempt fails as well
710        assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
711
712        let subscribe_attempt = subscriber
713            .subscribe(&topic, listener_ref.clone(), None)
714            .await;
715
716        // and the third attempt fails as well
717        assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
718    }
719
720    #[tokio::test]
721    async fn test_repeated_subscribe_fails_for_different_subscription_change_handlers() {
722        // GIVEN  a USubscription client
723        let mut usubscription_client = MockUSubscription::new();
724        // that succeeds to subscribe to topics
725        usubscription_client
726            .expect_subscribe()
727            .times(2)
728            .returning(|request| {
729                let response = SubscriptionResponse {
730                    topic: request.topic.clone(),
731                    status: Some(SubscriptionStatus {
732                        state: State::SUBSCRIBED.into(),
733                        ..Default::default()
734                    })
735                    .into(),
736                    ..Default::default()
737                };
738                Ok(response)
739            });
740
741        // and a transport
742        let mut transport = MockTransport::new();
743        // that fails to register a listener
744        transport
745            .expect_do_register_listener()
746            .once()
747            .return_const(Err(UStatus::fail_with_code(
748                UCode::UNAVAILABLE,
749                "not connected",
750            )));
751
752        // and a Subscriber using that USubscription client, Notifier and transport
753        let subscriber = InMemorySubscriber::for_clients(
754            Arc::new(transport),
755            new_uri_provider(),
756            Arc::new(usubscription_client),
757            succeeding_notifier(),
758        )
759        .await
760        .unwrap();
761
762        // WHEN subscribing to a topic
763        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
764        let listener = Arc::new(MockUListener::new());
765        let subscribe_attempt = subscriber
766            .subscribe(
767                &topic,
768                listener.clone(),
769                Some(Arc::new(MockSubscriptionChangeHandler::new())),
770            )
771            .await;
772
773        // THEN the first attempt fails due to the transport having failed
774        assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
775
776        // and a second attempt using a different subscription change handler
777        let subscribe_attempt = subscriber
778            .subscribe(
779                &topic,
780                listener.clone(),
781                Some(Arc::new(MockSubscriptionChangeHandler::new())),
782            )
783            .await;
784        // fails with an ALREADY_EXISTS error
785        assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::AlreadyExists)));
786    }
787
788    #[tokio::test]
789    async fn test_subscribe_succeeds_on_second_attempt() {
790        let (captured_listener_tx, captured_listener_rx) = std::sync::mpsc::channel();
791
792        // GIVEN a USubscription client
793        let mut usubscription_client = MockUSubscription::new();
794        // that succeeds to subscribe to topics
795        usubscription_client
796            .expect_subscribe()
797            .times(2)
798            .returning(|request| {
799                let response = SubscriptionResponse {
800                    topic: request.topic.clone(),
801                    status: Some(SubscriptionStatus {
802                        state: State::SUBSCRIBED.into(),
803                        ..Default::default()
804                    })
805                    .into(),
806                    ..Default::default()
807                };
808                Ok(response)
809            });
810
811        // and a transport
812        let mut transport = MockTransport::new();
813        let mut seq = Sequence::new();
814        // that first fails to register a listener
815        transport
816            .expect_do_register_listener()
817            .once()
818            .in_sequence(&mut seq)
819            .return_const(Err(UStatus::fail_with_code(
820                UCode::UNAVAILABLE,
821                "not connected",
822            )));
823        // but succeeds on the second attempt
824        transport
825            .expect_do_register_listener()
826            .once()
827            .in_sequence(&mut seq)
828            .returning(move |_source_filter, _sink_filter, listener| {
829                captured_listener_tx
830                    .send(listener)
831                    .map_err(|_e| UStatus::fail("cannot capture listener"))
832            });
833
834        // and a Subscriber using that USubscription client, Notifier and transport
835        let subscriber = InMemorySubscriber::for_clients(
836            Arc::new(transport),
837            new_uri_provider(),
838            Arc::new(usubscription_client),
839            succeeding_notifier(),
840        )
841        .await
842        .unwrap();
843
844        // WHEN subscribing to a topic
845        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
846        let mut mock_listener = MockUListener::new();
847        mock_listener.expect_on_receive().once().return_const(());
848        let listener = Arc::new(mock_listener);
849        let handler = Arc::new(MockSubscriptionChangeHandler::new());
850        let subscribe_attempt = subscriber
851            .subscribe(&topic, listener.clone(), Some(handler.clone()))
852            .await;
853
854        // THEN the first attempt fails
855        assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
856
857        let subscribe_attempt = subscriber
858            .subscribe(&topic, listener.clone(), Some(handler.clone()))
859            .await;
860
861        // but the second attempt succeeds
862        assert!(subscribe_attempt.is_ok());
863
864        // and the registered listener receives events that are published to the topic
865        let event = UMessageBuilder::publish(topic).build().unwrap();
866        let captured_listener = captured_listener_rx.recv().unwrap().to_owned();
867        captured_listener.on_receive(event).await;
868    }
869
870    #[tokio::test]
871    async fn test_unsubscribe_fails_for_unknown_listener() {
872        // GIVEN a USubscription client
873        let mut usubscription_client = MockUSubscription::new();
874        // that succeeds to unsubscribe from topics
875        usubscription_client
876            .expect_unsubscribe()
877            .once()
878            .return_const(Ok(()));
879
880        // and a transport
881        let mut transport = MockTransport::new();
882        // which fails to unregister an unknown listener
883        transport
884            .expect_do_unregister_listener()
885            .once()
886            .return_const(Err(UStatus::fail_with_code(
887                UCode::NOT_FOUND,
888                "no such listener",
889            )));
890
891        // and a Subscriber using that USubscription client, Notifier and transport
892        let subscriber = InMemorySubscriber::for_clients(
893            Arc::new(transport),
894            new_uri_provider(),
895            Arc::new(usubscription_client),
896            succeeding_notifier(),
897        )
898        .await
899        .unwrap();
900
901        // WHEN unsubscribing from a topic for which no listener had been registered
902        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
903        let listener = Arc::new(MockUListener::new());
904        let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
905
906        // THEN the the attempt fails
907        assert!(unsubscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::NoSuchListener)));
908    }
909
910    #[tokio::test]
911    async fn test_unsubscribe_fails_if_usubscription_invocation_fails() {
912        // GIVEN a USubscription client
913        let mut usubscription_client = MockUSubscription::new();
914        // that fails to unsubscribe from topics
915        usubscription_client
916            .expect_unsubscribe()
917            .once()
918            .return_const(Err(UStatus::fail_with_code(UCode::UNAVAILABLE, "unknown")));
919
920        // and a transport
921        let mut transport = MockTransport::new();
922        // which succeeds to unregister listeners
923        transport
924            .expect_do_unregister_listener()
925            .never()
926            .return_const(Ok(()));
927
928        // and a Subscriber using that USubscription client, Notifier and transport
929        let subscriber = InMemorySubscriber::for_clients(
930            Arc::new(transport),
931            new_uri_provider(),
932            Arc::new(usubscription_client),
933            succeeding_notifier(),
934        )
935        .await
936        .unwrap();
937        // which already has a listener registered
938        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
939        let handler = MockSubscriptionChangeHandler::new();
940        subscriber
941            .add_subscription_change_handler(&topic, Arc::new(handler))
942            .expect("should be able to add handler");
943        assert!(subscriber.has_subscription_change_handler(&topic));
944
945        // WHEN unsubscribing from the topic
946        let listener = Arc::new(MockUListener::new());
947        let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener).await;
948
949        // THEN the the attempt fails
950        assert!(unsubscribe_attempt.is_err_and(|e| {
951            matches!(e, RegistrationError::Unknown(_))
952                // and the handler is still registered
953                && subscriber.has_subscription_change_handler(&topic)
954        }));
955    }
956
957    #[tokio::test]
958    async fn test_unsubscribe_succeeds() {
959        // GIVEN a USubscription client
960        let mut usubscription_client = MockUSubscription::new();
961        // that succeeds to unsubscribe from topics
962        usubscription_client
963            .expect_unsubscribe()
964            .once()
965            .return_const(Ok(()));
966
967        // and a transport
968        let mut transport = MockTransport::new();
969        // which succeeds to unregister listeners
970        transport
971            .expect_do_unregister_listener()
972            .once()
973            .return_const(Ok(()));
974
975        // and a Subscriber using that USubscription client, Notifier and transport
976        let subscriber = InMemorySubscriber::for_clients(
977            Arc::new(transport),
978            new_uri_provider(),
979            Arc::new(usubscription_client),
980            succeeding_notifier(),
981        )
982        .await
983        .unwrap();
984        // which already has a listener registered
985        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
986        let handler = MockSubscriptionChangeHandler::new();
987        subscriber
988            .add_subscription_change_handler(&topic, Arc::new(handler))
989            .expect("should be able to add handler");
990        assert!(subscriber.has_subscription_change_handler(&topic));
991
992        // WHEN unsubscribing from a topic for which no listener had been registered
993        let listener = Arc::new(MockUListener::new());
994        let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
995
996        // THEN the the attempt succeeds
997        assert!(
998            unsubscribe_attempt.is_ok_and(|_| !subscriber.has_subscription_change_handler(&topic))
999        );
1000    }
1001
1002    #[tokio::test]
1003    async fn test_unsubscribe_succeeds_on_second_attempt() {
1004        // GIVEN a USubscription client
1005        let mut usubscription_client = MockUSubscription::new();
1006        // that succeeds to unsubscribe from topics
1007        usubscription_client
1008            .expect_unsubscribe()
1009            .times(2)
1010            .return_const(Ok(()));
1011
1012        // and a transport
1013        let mut transport = MockTransport::new();
1014        let mut seq = Sequence::new();
1015        // that first fails to unregister a listener
1016        transport
1017            .expect_do_unregister_listener()
1018            .once()
1019            .in_sequence(&mut seq)
1020            .return_const(Err(UStatus::fail_with_code(
1021                UCode::UNAVAILABLE,
1022                "not connected",
1023            )));
1024        // but succeeds on the second attempt
1025        transport
1026            .expect_do_unregister_listener()
1027            .once()
1028            .in_sequence(&mut seq)
1029            .return_const(Ok(()));
1030
1031        // and a Subscriber using that USubscription client, Notifier and transport
1032        let subscriber = InMemorySubscriber::for_clients(
1033            Arc::new(transport),
1034            new_uri_provider(),
1035            Arc::new(usubscription_client),
1036            succeeding_notifier(),
1037        )
1038        .await
1039        .unwrap();
1040        // which already has a listener registered
1041        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
1042        let handler = MockSubscriptionChangeHandler::new();
1043        subscriber
1044            .add_subscription_change_handler(&topic, Arc::new(handler))
1045            .expect("should be able to add handler");
1046        assert!(subscriber.has_subscription_change_handler(&topic));
1047
1048        // WHEN unsubscribing from a topic for which a listener had been registered before
1049        let listener = Arc::new(MockUListener::new());
1050        let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
1051
1052        // THEN the first attempt fails
1053        assert!(unsubscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
1054
1055        let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener).await;
1056
1057        // but the second attempt succeeds
1058        assert!(unsubscribe_attempt.is_ok_and(|_| {
1059            // and the handler has been removed
1060            !subscriber.has_subscription_change_handler(&topic)
1061        }));
1062    }
1063
1064    fn message_with_wrong_type(msg_type: UMessageType) -> UMessage {
1065        let attributes = UAttributes {
1066            type_: msg_type.into(),
1067            ..Default::default()
1068        };
1069        UMessage {
1070            attributes: Some(attributes).into(),
1071            ..Default::default()
1072        }
1073    }
1074
1075    fn notification_with_wrong_payload() -> UMessage {
1076        let payload = UPayload::try_from_protobuf(StringValue::new())
1077            .expect("should have been able to create protobuf");
1078        let attributes = UAttributes {
1079            type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
1080            payload_format: payload.payload_format().into(),
1081            ..Default::default()
1082        };
1083        UMessage {
1084            attributes: Some(attributes).into(),
1085            payload: Some(payload.payload()),
1086            ..Default::default()
1087        }
1088    }
1089
1090    fn status_update_without_topic() -> UMessage {
1091        let status = SubscriptionStatus {
1092            state: State::SUBSCRIBED.into(),
1093            ..Default::default()
1094        };
1095        let update = Update {
1096            status: Some(status).into(),
1097            ..Default::default()
1098        };
1099        let payload =
1100            UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
1101        let attributes = UAttributes {
1102            type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
1103            payload_format: payload.payload_format().into(),
1104            ..Default::default()
1105        };
1106
1107        UMessage {
1108            attributes: Some(attributes).into(),
1109            payload: Some(payload.payload()),
1110            ..Default::default()
1111        }
1112    }
1113
1114    fn status_update_without_status() -> UMessage {
1115        let update = Update {
1116            topic: Some(UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap()).into(),
1117            ..Default::default()
1118        };
1119        let payload =
1120            UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
1121        let attributes = UAttributes {
1122            type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
1123            payload_format: payload.payload_format().into(),
1124            ..Default::default()
1125        };
1126
1127        UMessage {
1128            attributes: Some(attributes).into(),
1129            payload: Some(payload.payload()),
1130            ..Default::default()
1131        }
1132    }
1133
1134    #[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_PUBLISH); "Publish messages")]
1135    #[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_REQUEST); "Request messages")]
1136    #[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_RESPONSE); "Response messages")]
1137    #[test_case::test_case(notification_with_wrong_payload(); "wrong payload")]
1138    #[test_case::test_case(status_update_without_topic(); "status without topic")]
1139    #[test_case::test_case(status_update_without_status(); "update without status")]
1140    #[tokio::test]
1141    async fn test_subscription_change_listener_ignores(notification: UMessage) {
1142        let listener = SubscriptionChangeListener::default();
1143
1144        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
1145        let mut handler = MockSubscriptionChangeHandler::new();
1146        handler.expect_on_subscription_change().never();
1147
1148        listener
1149            .add_handler(topic.clone(), Arc::new(handler))
1150            .expect("should have been able to register listener");
1151        listener.on_receive(notification).await;
1152    }
1153
1154    #[tokio::test]
1155    async fn test_subscription_change_listener_invokes_handler_for_subscribed_topic() {
1156        let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
1157        let status = SubscriptionStatus {
1158            state: State::SUBSCRIBED.into(),
1159            ..Default::default()
1160        };
1161        let update = Update {
1162            topic: Some(topic.clone()).into(),
1163            status: Some(status.clone()).into(),
1164            ..Default::default()
1165        };
1166        let payload =
1167            UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
1168        let attributes = UAttributes {
1169            type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
1170            payload_format: payload.payload_format().into(),
1171            ..Default::default()
1172        };
1173
1174        let notification = UMessage {
1175            attributes: Some(attributes).into(),
1176            payload: Some(payload.payload()),
1177            ..Default::default()
1178        };
1179
1180        let expected_topic = topic.clone();
1181        let mut handler = MockSubscriptionChangeHandler::new();
1182        handler
1183            .expect_on_subscription_change()
1184            .once()
1185            .withf(move |topic, updated_status| {
1186                topic == &expected_topic && updated_status == &status
1187            })
1188            .return_const(());
1189
1190        let listener = SubscriptionChangeListener::default();
1191        listener
1192            .add_handler(topic, Arc::new(handler))
1193            .expect("should have been able to register listener");
1194
1195        listener.on_receive(notification).await;
1196    }
1197}