up_subscription/
usubscription.rs

1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use async_trait::async_trait;
15use log::*;
16use std::str::FromStr;
17use std::sync::Arc;
18use tokio::{
19    sync::{
20        mpsc::{self, Sender},
21        oneshot, Notify,
22    },
23    task::JoinHandle,
24};
25
26use crate::{
27    helpers, listeners,
28    notification_manager::{self, NotificationEvent},
29    subscription_manager::{self, SubscriptionEvent},
30    USubscriptionConfiguration,
31};
32
33use up_rust::core::usubscription::{
34    FetchSubscribersRequest, FetchSubscribersResponse, FetchSubscriptionsRequest,
35    FetchSubscriptionsResponse, NotificationsRequest, Request, SubscriptionRequest,
36    SubscriptionResponse, SubscriptionStatus, USubscription, UnsubscribeRequest,
37    RESOURCE_ID_FETCH_SUBSCRIBERS, RESOURCE_ID_FETCH_SUBSCRIPTIONS,
38    RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_SUBSCRIBE,
39    RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID,
40};
41use up_rust::{communication::RpcClient, LocalUriProvider, UCode, UStatus, UTransport, UUri};
42
43/// Whether to include 'up:' uProtocol schema prefix in URIs in log and error messages
44pub const INCLUDE_SCHEMA: bool = false;
45
46// Remote-subscribe operation ttl; 5 minutes in milliseconds, as per https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3/usubscription/v3#6-timeout--retry-logic
47pub(crate) const UP_REMOTE_TTL: u32 = 300000;
48
49impl LocalUriProvider for USubscriptionService {
50    fn get_authority(&self) -> String {
51        self.config.authority_name.clone()
52    }
53    fn get_resource_uri(&self, resource_id: u16) -> UUri {
54        self.config.get_resource_uri(resource_id)
55    }
56    fn get_source_uri(&self) -> UUri {
57        self.get_resource_uri(0x0000)
58    }
59}
60
61/// This trait (and the comprised UTransportHolder trait) is simply there to have a generic type that
62/// usubscription Listeners deal with, so that USubscriptionService can be properly mocked.
63pub trait USubscriptionServiceAbstract:
64    USubscription + LocalUriProvider + UTransportHolder
65{
66}
67
68/// This trait primarily serves to provide a hook-point for using the mockall crate, for mocking USubscriptionService objects
69/// where we also need/want to inject custom/mock UTransport implementations that subsequently get used in test cases.
70pub trait UTransportHolder {
71    fn get_transport(&self) -> Arc<dyn UTransport>;
72}
73
74impl UTransportHolder for USubscriptionService {
75    fn get_transport(&self) -> Arc<dyn UTransport> {
76        self.up_transport.clone()
77    }
78}
79
80/// This object holds all mutable content associated with a running `USubscriptionService`, and is populated and returned when
81/// calling `USubscriptionService::run()`. It exists for two reasons: a) allow `USubscriptionService` to remain useable as an immutable
82/// object that can be put into `Arc`s and passed around freely, while b) offering a well-defined way to stop a running `USubscriptionService`
83/// by simply calling `USubscriptionStopper::stop()`.
84pub struct USubscriptionStopper {
85    shutdown_notification: Arc<Notify>,
86    subscription_joiner: Option<JoinHandle<()>>,
87    notification_joiner: Option<JoinHandle<()>>,
88}
89
90impl USubscriptionStopper {
91    pub async fn stop(&mut self) {
92        self.shutdown_notification.notify_waiters();
93
94        self.subscription_joiner
95            .take()
96            .expect("Has this USubscription instance already been stopped?")
97            .await
98            .expect("Error shutting down subscription manager");
99        self.notification_joiner
100            .take()
101            .expect("Has this USubscription instance already been stopped?")
102            .await
103            .expect("Error shutting down notification manager");
104    }
105}
106
107/// Core landing point and coordination of business logic of the uProtocol USubscription service. This implementation usually would be
108/// front-ended by the various `listeners` to connect with corresponding uProtocol RPC server endpoints.
109///
110/// Functionally, the code in this context primarily cares about:
111/// - input validation
112/// - interaction with / orchestration of backends for managing subscriptions (`usubscription_manager.rs`) and dealing with notifications (`usubscription_notification.rs`)
113#[derive(Clone)]
114pub struct USubscriptionService {
115    config: Arc<USubscriptionConfiguration>,
116
117    pub(crate) up_transport: Arc<dyn UTransport>,
118
119    subscription_sender: Sender<SubscriptionEvent>,
120    notification_sender: Sender<notification_manager::NotificationEvent>,
121}
122
123impl USubscriptionServiceAbstract for USubscriptionService {}
124
125/// Implementation of uProtocol L3 USubscription service
126impl USubscriptionService {
127    /// Start a new USubscriptionService
128    /// Atm this will directly spin up two tasks which deal with subscription and notification management, with no further action
129    /// required, but also no explicit shutdown operation yet - that's a TODO.
130    ///
131    /// # Arguments
132    ///
133    /// * `config` - The configuration details for this USUbscription service
134    /// * `up_transport` - Implementation of UTransport to be used by this USUbscription instance, for sending Listener-responses and Notifications
135    /// * `up_client` - Implementation of RpcClient to be used by this USUbscription instance, for performing remote-subscribe operations
136    ///
137    /// # Returns
138    ///
139    /// * the immutable parts of the USubscription service inside an Arc
140    /// * a `USubscriptionStopper` object which can be used to explicitly shut down the USubscription service
141    pub fn run(
142        config: Arc<USubscriptionConfiguration>,
143        up_transport: Arc<dyn UTransport>,
144        up_client: Arc<dyn RpcClient>,
145    ) -> Result<(Arc<dyn USubscriptionServiceAbstract>, USubscriptionStopper), UStatus> {
146        helpers::init_once();
147
148        let shutdown_notification = Arc::new(Notify::new());
149
150        // Set up subscription manager actor
151        let up_client_cloned = up_client.clone();
152        let own_uri_cloned = config.get_source_uri().clone();
153        let shutdown_notification_cloned = shutdown_notification.clone();
154        let (subscription_sender, subscription_receiver) =
155            mpsc::channel::<SubscriptionEvent>(config.subscription_command_buffer);
156        let subscription_joiner = helpers::spawn_and_log_error(async move {
157            subscription_manager::handle_message(
158                own_uri_cloned,
159                up_client_cloned,
160                subscription_receiver,
161                shutdown_notification_cloned,
162            )
163            .await;
164            Ok(())
165        });
166
167        // Set up notification service actor
168        let up_transport_cloned = up_transport.clone();
169        let shutdown_notification_cloned = shutdown_notification.clone();
170        let (notification_sender, notification_receiver) =
171            mpsc::channel::<notification_manager::NotificationEvent>(
172                config.notification_command_buffer,
173            );
174        let notification_joiner = helpers::spawn_and_log_error(async move {
175            notification_manager::notification_engine(
176                up_transport_cloned,
177                notification_receiver,
178                shutdown_notification_cloned,
179            )
180            .await;
181            Ok(())
182        });
183
184        Ok((
185            Arc::new(USubscriptionService {
186                config,
187                up_transport,
188                subscription_sender,
189                notification_sender,
190            }),
191            USubscriptionStopper {
192                subscription_joiner: Some(subscription_joiner),
193                notification_joiner: Some(notification_joiner),
194                shutdown_notification,
195            },
196        ))
197    }
198
199    /// This sets up all applicable listeners to connect the USubscription service with it's transport implementation.
200    /// The following rules apply:
201    ///
202    /// * complete usubscription functionality is only available for local uEntities (TODO verify how this works with empty authority_names)
203    /// * subscribe() and unsubscribe() are also available for remote callers, but only those with UEntity ID type USUSBSCRIPTION (other USubscription services)
204    ///
205    /// # Arguments:
206    ///
207    /// * the `USubscriptionServiceAbstract` object to set up listeners for
208    pub async fn now_listen(
209        usubscription_service: Arc<dyn USubscriptionServiceAbstract>,
210    ) -> Result<(), UStatus> {
211        let any_request_uri = UUri::from_str("//*/FFFF/FF/0").unwrap();
212
213        let mut any_local_uri = any_request_uri.clone();
214        any_local_uri.authority_name = usubscription_service.get_authority();
215
216        let mut any_usubscription_uri = any_request_uri.clone();
217        any_usubscription_uri.ue_id = USUBSCRIPTION_TYPE_ID;
218
219        // The following listeners are for serving any/all *local* uEntity clients
220        let listener = Arc::new(listeners::SubscribeListener::new(
221            usubscription_service.clone(),
222        ));
223        usubscription_service
224            .get_transport()
225            .register_listener(
226                &any_local_uri,
227                Some(&usubscription_service.get_resource_uri(RESOURCE_ID_SUBSCRIBE)),
228                listener,
229            )
230            .await?;
231
232        let listener = Arc::new(listeners::UnsubscribeListener::new(
233            usubscription_service.clone(),
234        ));
235        usubscription_service
236            .get_transport()
237            .register_listener(
238                &any_local_uri,
239                Some(&usubscription_service.get_resource_uri(RESOURCE_ID_UNSUBSCRIBE)),
240                listener,
241            )
242            .await?;
243
244        let listener = Arc::new(listeners::RegisterForNotificationsListener::new(
245            usubscription_service.clone(),
246        ));
247        usubscription_service
248            .get_transport()
249            .register_listener(
250                &any_local_uri,
251                Some(
252                    &usubscription_service.get_resource_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS),
253                ),
254                listener,
255            )
256            .await?;
257
258        let listener = Arc::new(listeners::UnregisterForNotificationsListener::new(
259            usubscription_service.clone(),
260        ));
261        usubscription_service
262            .get_transport()
263            .register_listener(
264                &any_local_uri,
265                Some(
266                    &usubscription_service
267                        .get_resource_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS),
268                ),
269                listener,
270            )
271            .await?;
272
273        let listener = Arc::new(listeners::FetchSubscribersListener::new(
274            usubscription_service.clone(),
275        ));
276        usubscription_service
277            .get_transport()
278            .register_listener(
279                &any_local_uri,
280                Some(&usubscription_service.get_resource_uri(RESOURCE_ID_FETCH_SUBSCRIBERS)),
281                listener,
282            )
283            .await?;
284
285        let listener = Arc::new(listeners::FetchSubscriptionsListener::new(
286            usubscription_service.clone(),
287        ));
288        usubscription_service
289            .get_transport()
290            .register_listener(
291                &any_local_uri,
292                Some(&usubscription_service.get_resource_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS)),
293                listener,
294            )
295            .await?;
296
297        // The following listeners are for serving remote usubscription services only (for remote subscribe and unsubscribe calls)
298        let listener = Arc::new(listeners::SubscribeListener::new(
299            usubscription_service.clone(),
300        ));
301        usubscription_service
302            .get_transport()
303            .register_listener(
304                &any_usubscription_uri,
305                Some(&usubscription_service.get_resource_uri(RESOURCE_ID_SUBSCRIBE)),
306                listener,
307            )
308            .await?;
309
310        let listener = Arc::new(listeners::UnsubscribeListener::new(
311            usubscription_service.clone(),
312        ));
313        usubscription_service
314            .get_transport()
315            .register_listener(
316                &any_usubscription_uri,
317                Some(&usubscription_service.get_resource_uri(RESOURCE_ID_UNSUBSCRIBE)),
318                listener,
319            )
320            .await?;
321
322        Ok(())
323    }
324}
325
326/// Implementation of <https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l3/usubscription/v3/README.adoc#usubscription>
327#[async_trait]
328impl USubscription for USubscriptionService {
329    /// Implementation of <https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3/usubscription/v3#51-subscription>
330    async fn subscribe(
331        &self,
332        subscription_request: SubscriptionRequest,
333    ) -> Result<SubscriptionResponse, UStatus> {
334        let SubscriptionRequest {
335            subscriber, topic, ..
336        } = subscription_request;
337
338        // Basic input validation
339        let Some(topic) = topic.into_option() else {
340            return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "No topic"));
341        };
342        if topic.is_empty() {
343            return Err(UStatus::fail_with_code(
344                UCode::INVALID_ARGUMENT,
345                "Empty topic UUri",
346            ));
347        }
348
349        let Some(subscriber) = subscriber.into_option() else {
350            return Err(UStatus::fail_with_code(
351                UCode::INVALID_ARGUMENT,
352                "No SubscriberInfo",
353            ));
354        };
355        if subscriber.is_empty() || subscriber.uri.is_empty() {
356            return Err(UStatus::fail_with_code(
357                UCode::INVALID_ARGUMENT,
358                "Empty SubscriberInfo or subscriber UUri",
359            ));
360        }
361
362        debug!(
363            "Got SubscriptionRequest for topic {}, from subscriber {}",
364            topic.to_uri(INCLUDE_SCHEMA),
365            subscriber.uri.to_uri(INCLUDE_SCHEMA)
366        );
367
368        // Communicate with subscription manager
369        let (respond_to, receive_from) = oneshot::channel::<SubscriptionStatus>();
370        let se = SubscriptionEvent::AddSubscription {
371            subscriber: subscriber.clone(),
372            topic: topic.clone(),
373            respond_to,
374        };
375        if let Err(e) = self.subscription_sender.send(se).await {
376            return Err(UStatus::fail_with_code(
377                UCode::INTERNAL,
378                format!("Error communicating with subscription management: {e}"),
379            ));
380        }
381        let Ok(status) = receive_from.await else {
382            return Err(UStatus::fail_with_code(
383                UCode::INTERNAL,
384                "Error communicating with subscription management",
385            ));
386        };
387
388        // Notify update channel
389        let (respond_to, receive_from) = oneshot::channel::<()>();
390        if let Err(e) = self
391            .notification_sender
392            .send(NotificationEvent::StateChange {
393                subscriber,
394                topic: topic.clone(),
395                status: status.clone(),
396                respond_to,
397            })
398            .await
399        {
400            error!("Error initiating subscription-change update notification: {e}");
401        }
402        if let Err(e) = receive_from.await {
403            // Not returning an error here, as update notification is not a core concern wrt the actual subscription management
404            error!("Error sending subscription-change update notification: {e}");
405        };
406
407        // Build and return result
408        Ok(SubscriptionResponse {
409            topic: Some(topic).into(),
410            status: Some(status).into(),
411            ..Default::default()
412        })
413    }
414
415    /// Implementation of <https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3/usubscription/v3#52-unsubscribe>
416    async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
417        let UnsubscribeRequest {
418            subscriber, topic, ..
419        } = unsubscribe_request;
420
421        // Basic input validation
422        let Some(topic) = topic.into_option() else {
423            return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "No topic"));
424        };
425        if topic.is_empty() {
426            return Err(UStatus::fail_with_code(
427                UCode::INVALID_ARGUMENT,
428                "Empty topic UUri",
429            ));
430        }
431
432        let Some(subscriber) = subscriber.into_option() else {
433            return Err(UStatus::fail_with_code(
434                UCode::INVALID_ARGUMENT,
435                "No SubscriberInfo",
436            ));
437        };
438        if subscriber.is_empty() || subscriber.uri.is_empty() {
439            return Err(UStatus::fail_with_code(
440                UCode::INVALID_ARGUMENT,
441                "Empty SubscriberInfo or subscriber UUri",
442            ));
443        }
444
445        debug!(
446            "Got UnsubscribeRequest for topic {}, from subscriber {}",
447            topic.to_uri(INCLUDE_SCHEMA),
448            subscriber.uri.to_uri(INCLUDE_SCHEMA)
449        );
450
451        // Communicate with subscription manager
452        let (respond_to, receive_from) = oneshot::channel::<SubscriptionStatus>();
453        let se = SubscriptionEvent::RemoveSubscription {
454            subscriber: subscriber.clone(),
455            topic: topic.clone(),
456            respond_to,
457        };
458        if let Err(e) = self.subscription_sender.send(se).await {
459            return Err(UStatus::fail_with_code(
460                UCode::INTERNAL,
461                format!("Error communicating with subscription management: {e}"),
462            ));
463        }
464        let Ok(status) = receive_from.await else {
465            return Err(UStatus::fail_with_code(
466                UCode::INTERNAL,
467                "Error communicating with subscription management",
468            ));
469        };
470
471        // Notify update channel
472        let (respond_to, receive_from) = oneshot::channel::<()>();
473        if let Err(e) = self
474            .notification_sender
475            .send(NotificationEvent::StateChange {
476                subscriber,
477                topic: topic.clone(),
478                status: status.clone(),
479                respond_to,
480            })
481            .await
482        {
483            // Not returning an error here, as update notification is not a core concern wrt the actual subscription management
484            error!("Error initiating subscription-change update notification: {e}");
485        }
486        if let Err(e) = receive_from.await {
487            // Not returning an error here, as update notification is not a core concern wrt the actual subscription management
488            error!("Error sending subscription-change update notification: {e}");
489        };
490
491        // Return result
492        Ok(())
493    }
494
495    async fn register_for_notifications(
496        &self,
497        notifications_register_request: NotificationsRequest,
498    ) -> Result<(), UStatus> {
499        let NotificationsRequest {
500            subscriber, topic, ..
501        } = notifications_register_request;
502
503        // Basic input validation
504        let Some(topic) = topic.into_option() else {
505            return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "No topic"));
506        };
507        if topic.is_empty() {
508            return Err(UStatus::fail_with_code(
509                UCode::INVALID_ARGUMENT,
510                "Empty notification UUri",
511            ));
512        }
513        if !topic.is_event() {
514            return Err(UStatus::fail_with_code(
515                UCode::INVALID_ARGUMENT,
516                "UUri not a valid event destination",
517            ));
518        }
519        if self.get_source_uri().is_remote_authority(&topic) {
520            return Err(UStatus::fail_with_code(
521                UCode::INVALID_ARGUMENT,
522                "Cannot use remote topic for notifications",
523            ));
524        }
525
526        let Some(subscriber) = subscriber.into_option() else {
527            return Err(UStatus::fail_with_code(
528                UCode::INVALID_ARGUMENT,
529                "No SubscriberInfo",
530            ));
531        };
532        if subscriber.is_empty() {
533            return Err(UStatus::fail_with_code(
534                UCode::INVALID_ARGUMENT,
535                "Empty SubscriberInfo",
536            ));
537        }
538        let Some(subscriber_uri) = subscriber.uri.into_option() else {
539            return Err(UStatus::fail_with_code(
540                UCode::INVALID_ARGUMENT,
541                "No subscriber UUri",
542            ));
543        };
544        if subscriber_uri.is_empty() {
545            return Err(UStatus::fail_with_code(
546                UCode::INVALID_ARGUMENT,
547                "Empty subscriber UUri",
548            ));
549        }
550
551        debug!(
552            "Got RegisterForNotifications for notification topic {}, from subscriber {}",
553            topic.to_uri(INCLUDE_SCHEMA),
554            subscriber_uri.to_uri(INCLUDE_SCHEMA)
555        );
556
557        // Perform notification management
558        if let Err(e) = self
559            .notification_sender
560            .send(NotificationEvent::AddNotifyee {
561                subscriber: subscriber_uri,
562                topic,
563            })
564            .await
565        {
566            return Err(UStatus::fail_with_code(
567                UCode::INTERNAL,
568                format!("Failed to update notification settings: {e}"),
569            ));
570        }
571
572        // Return result
573        Ok(())
574    }
575
576    async fn unregister_for_notifications(
577        &self,
578        notifications_unregister_request: NotificationsRequest,
579    ) -> Result<(), UStatus> {
580        // Current implementation, we only track one notification channel/topic per subscriber, so ignore topic here
581        let NotificationsRequest { subscriber, .. } = notifications_unregister_request;
582
583        // Basic input validation
584        let Some(subscriber) = subscriber.into_option() else {
585            return Err(UStatus::fail_with_code(
586                UCode::INVALID_ARGUMENT,
587                "No SubscriberInfo",
588            ));
589        };
590        if subscriber.is_empty() {
591            return Err(UStatus::fail_with_code(
592                UCode::INVALID_ARGUMENT,
593                "Empty SubscriberInfo",
594            ));
595        }
596        let Some(subscriber_uri) = subscriber.uri.into_option() else {
597            return Err(UStatus::fail_with_code(
598                UCode::INVALID_ARGUMENT,
599                "No subscriber UUri",
600            ));
601        };
602        if subscriber_uri.is_empty() {
603            return Err(UStatus::fail_with_code(
604                UCode::INVALID_ARGUMENT,
605                "Empty subscriber UUri",
606            ));
607        }
608
609        debug!(
610            "Got UnregisterForNotifications for notification from subscriber {}",
611            subscriber_uri.to_uri(INCLUDE_SCHEMA)
612        );
613
614        // Perform notification management
615        if let Err(e) = self
616            .notification_sender
617            .send(NotificationEvent::RemoveNotifyee {
618                subscriber: subscriber_uri,
619            })
620            .await
621        {
622            return Err(UStatus::fail_with_code(
623                UCode::INTERNAL,
624                format!("Failed to update notification settings: {e}"),
625            ));
626        }
627
628        // Return result
629        Ok(())
630    }
631
632    async fn fetch_subscribers(
633        &self,
634        fetch_subscribers_request: FetchSubscribersRequest,
635    ) -> Result<FetchSubscribersResponse, UStatus> {
636        // Basic input validation
637        let Some(topic) = fetch_subscribers_request.topic.as_ref() else {
638            return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "No topic"));
639        };
640        if topic.is_empty() {
641            return Err(UStatus::fail_with_code(
642                UCode::INVALID_ARGUMENT,
643                "Empty topic UUri",
644            ));
645        }
646
647        debug!(
648            "Got FetchSubscribersRequest for topic {}",
649            topic.to_uri(INCLUDE_SCHEMA)
650        );
651
652        // Communicate with subscription manager
653        let (respond_to, receive_from) = oneshot::channel::<FetchSubscribersResponse>();
654        let se = SubscriptionEvent::FetchSubscribers {
655            request: fetch_subscribers_request,
656            respond_to,
657        };
658        if let Err(e) = self.subscription_sender.send(se).await {
659            return Err(UStatus::fail_with_code(
660                UCode::INTERNAL,
661                format!("Error communicating with subscription management: {e}"),
662            ));
663        }
664        let Ok(response) = receive_from.await else {
665            return Err(UStatus::fail_with_code(
666                UCode::INTERNAL,
667                "Error receiving response from subscription management",
668            ));
669        };
670
671        // Return result
672        debug!(
673            "Returning {} subscriber entries",
674            response.subscribers.len()
675        );
676        Ok(response)
677    }
678
679    async fn fetch_subscriptions(
680        &self,
681        fetch_subscriptions_request: FetchSubscriptionsRequest,
682    ) -> Result<FetchSubscriptionsResponse, UStatus> {
683        // Basic input validation
684        let Some(request) = fetch_subscriptions_request.request.as_ref() else {
685            return Err(UStatus::fail_with_code(
686                UCode::INVALID_ARGUMENT,
687                "Missing Request property",
688            ));
689        };
690        match request {
691            Request::Topic(topic) => {
692                if topic.is_empty() {
693                    return Err(UStatus::fail_with_code(
694                        UCode::INVALID_ARGUMENT,
695                        "Empty request topic UUri",
696                    ));
697                }
698            }
699            Request::Subscriber(subscriber) => {
700                if subscriber.is_empty() {
701                    return Err(UStatus::fail_with_code(
702                        UCode::INVALID_ARGUMENT,
703                        "Empty request SubscriberInfo",
704                    ));
705                }
706                let Some(subscriber_uri) = subscriber.uri.as_ref() else {
707                    return Err(UStatus::fail_with_code(
708                        UCode::INVALID_ARGUMENT,
709                        "No request subscriber UUri",
710                    ));
711                };
712                if subscriber_uri.is_empty() {
713                    return Err(UStatus::fail_with_code(
714                        UCode::INVALID_ARGUMENT,
715                        "Empty request subscriber UUri",
716                    ));
717                }
718            }
719            _ => {
720                return Err(UStatus::fail_with_code(
721                    UCode::INVALID_ARGUMENT,
722                    "Invalid/unknown Request variant",
723                ));
724            }
725        }
726
727        debug!("Got FetchSubscriptionsRequest");
728
729        // Communicate with subscription manager
730        let (respond_to, receive_from) = oneshot::channel::<FetchSubscriptionsResponse>();
731        let se = SubscriptionEvent::FetchSubscriptions {
732            request: fetch_subscriptions_request,
733            respond_to,
734        };
735        if let Err(e) = self.subscription_sender.send(se).await {
736            return Err(UStatus::fail_with_code(
737                UCode::INTERNAL,
738                format!("Error communicating with subscription management: {e}"),
739            ));
740        }
741        let Ok(response) = receive_from.await else {
742            return Err(UStatus::fail_with_code(
743                UCode::INTERNAL,
744                "Error receiving response from subscription management",
745            ));
746        };
747
748        // Return result
749        debug!(
750            "Returning {} Subscription entries",
751            response.subscriptions.len()
752        );
753        Ok(response)
754    }
755}