1use async_trait::async_trait;
15use log::*;
16use std::str::FromStr;
17use std::sync::Arc;
18use tokio::{
19 sync::{
20 mpsc::{self, Sender},
21 oneshot, Notify,
22 },
23 task::JoinHandle,
24};
25
26use crate::{
27 helpers, listeners,
28 notification_manager::{self, NotificationEvent},
29 subscription_manager::{self, SubscriptionEvent},
30 USubscriptionConfiguration,
31};
32
33use up_rust::core::usubscription::{
34 FetchSubscribersRequest, FetchSubscribersResponse, FetchSubscriptionsRequest,
35 FetchSubscriptionsResponse, NotificationsRequest, Request, SubscriptionRequest,
36 SubscriptionResponse, SubscriptionStatus, USubscription, UnsubscribeRequest,
37 RESOURCE_ID_FETCH_SUBSCRIBERS, RESOURCE_ID_FETCH_SUBSCRIPTIONS,
38 RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_SUBSCRIBE,
39 RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE, USUBSCRIPTION_TYPE_ID,
40};
41use up_rust::{communication::RpcClient, LocalUriProvider, UCode, UStatus, UTransport, UUri};
42
43pub const INCLUDE_SCHEMA: bool = false;
45
46pub(crate) const UP_REMOTE_TTL: u32 = 300000;
48
49impl LocalUriProvider for USubscriptionService {
50 fn get_authority(&self) -> String {
51 self.config.authority_name.clone()
52 }
53 fn get_resource_uri(&self, resource_id: u16) -> UUri {
54 self.config.get_resource_uri(resource_id)
55 }
56 fn get_source_uri(&self) -> UUri {
57 self.get_resource_uri(0x0000)
58 }
59}
60
61pub trait USubscriptionServiceAbstract:
64 USubscription + LocalUriProvider + UTransportHolder
65{
66}
67
68pub trait UTransportHolder {
71 fn get_transport(&self) -> Arc<dyn UTransport>;
72}
73
74impl UTransportHolder for USubscriptionService {
75 fn get_transport(&self) -> Arc<dyn UTransport> {
76 self.up_transport.clone()
77 }
78}
79
80pub struct USubscriptionStopper {
85 shutdown_notification: Arc<Notify>,
86 subscription_joiner: Option<JoinHandle<()>>,
87 notification_joiner: Option<JoinHandle<()>>,
88}
89
90impl USubscriptionStopper {
91 pub async fn stop(&mut self) {
92 self.shutdown_notification.notify_waiters();
93
94 self.subscription_joiner
95 .take()
96 .expect("Has this USubscription instance already been stopped?")
97 .await
98 .expect("Error shutting down subscription manager");
99 self.notification_joiner
100 .take()
101 .expect("Has this USubscription instance already been stopped?")
102 .await
103 .expect("Error shutting down notification manager");
104 }
105}
106
107#[derive(Clone)]
114pub struct USubscriptionService {
115 config: Arc<USubscriptionConfiguration>,
116
117 pub(crate) up_transport: Arc<dyn UTransport>,
118
119 subscription_sender: Sender<SubscriptionEvent>,
120 notification_sender: Sender<notification_manager::NotificationEvent>,
121}
122
123impl USubscriptionServiceAbstract for USubscriptionService {}
124
125impl USubscriptionService {
127 pub fn run(
142 config: Arc<USubscriptionConfiguration>,
143 up_transport: Arc<dyn UTransport>,
144 up_client: Arc<dyn RpcClient>,
145 ) -> Result<(Arc<dyn USubscriptionServiceAbstract>, USubscriptionStopper), UStatus> {
146 helpers::init_once();
147
148 let shutdown_notification = Arc::new(Notify::new());
149
150 let up_client_cloned = up_client.clone();
152 let own_uri_cloned = config.get_source_uri().clone();
153 let shutdown_notification_cloned = shutdown_notification.clone();
154 let (subscription_sender, subscription_receiver) =
155 mpsc::channel::<SubscriptionEvent>(config.subscription_command_buffer);
156 let subscription_joiner = helpers::spawn_and_log_error(async move {
157 subscription_manager::handle_message(
158 own_uri_cloned,
159 up_client_cloned,
160 subscription_receiver,
161 shutdown_notification_cloned,
162 )
163 .await;
164 Ok(())
165 });
166
167 let up_transport_cloned = up_transport.clone();
169 let shutdown_notification_cloned = shutdown_notification.clone();
170 let (notification_sender, notification_receiver) =
171 mpsc::channel::<notification_manager::NotificationEvent>(
172 config.notification_command_buffer,
173 );
174 let notification_joiner = helpers::spawn_and_log_error(async move {
175 notification_manager::notification_engine(
176 up_transport_cloned,
177 notification_receiver,
178 shutdown_notification_cloned,
179 )
180 .await;
181 Ok(())
182 });
183
184 Ok((
185 Arc::new(USubscriptionService {
186 config,
187 up_transport,
188 subscription_sender,
189 notification_sender,
190 }),
191 USubscriptionStopper {
192 subscription_joiner: Some(subscription_joiner),
193 notification_joiner: Some(notification_joiner),
194 shutdown_notification,
195 },
196 ))
197 }
198
199 pub async fn now_listen(
209 usubscription_service: Arc<dyn USubscriptionServiceAbstract>,
210 ) -> Result<(), UStatus> {
211 let any_request_uri = UUri::from_str("//*/FFFF/FF/0").unwrap();
212
213 let mut any_local_uri = any_request_uri.clone();
214 any_local_uri.authority_name = usubscription_service.get_authority();
215
216 let mut any_usubscription_uri = any_request_uri.clone();
217 any_usubscription_uri.ue_id = USUBSCRIPTION_TYPE_ID;
218
219 let listener = Arc::new(listeners::SubscribeListener::new(
221 usubscription_service.clone(),
222 ));
223 usubscription_service
224 .get_transport()
225 .register_listener(
226 &any_local_uri,
227 Some(&usubscription_service.get_resource_uri(RESOURCE_ID_SUBSCRIBE)),
228 listener,
229 )
230 .await?;
231
232 let listener = Arc::new(listeners::UnsubscribeListener::new(
233 usubscription_service.clone(),
234 ));
235 usubscription_service
236 .get_transport()
237 .register_listener(
238 &any_local_uri,
239 Some(&usubscription_service.get_resource_uri(RESOURCE_ID_UNSUBSCRIBE)),
240 listener,
241 )
242 .await?;
243
244 let listener = Arc::new(listeners::RegisterForNotificationsListener::new(
245 usubscription_service.clone(),
246 ));
247 usubscription_service
248 .get_transport()
249 .register_listener(
250 &any_local_uri,
251 Some(
252 &usubscription_service.get_resource_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS),
253 ),
254 listener,
255 )
256 .await?;
257
258 let listener = Arc::new(listeners::UnregisterForNotificationsListener::new(
259 usubscription_service.clone(),
260 ));
261 usubscription_service
262 .get_transport()
263 .register_listener(
264 &any_local_uri,
265 Some(
266 &usubscription_service
267 .get_resource_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS),
268 ),
269 listener,
270 )
271 .await?;
272
273 let listener = Arc::new(listeners::FetchSubscribersListener::new(
274 usubscription_service.clone(),
275 ));
276 usubscription_service
277 .get_transport()
278 .register_listener(
279 &any_local_uri,
280 Some(&usubscription_service.get_resource_uri(RESOURCE_ID_FETCH_SUBSCRIBERS)),
281 listener,
282 )
283 .await?;
284
285 let listener = Arc::new(listeners::FetchSubscriptionsListener::new(
286 usubscription_service.clone(),
287 ));
288 usubscription_service
289 .get_transport()
290 .register_listener(
291 &any_local_uri,
292 Some(&usubscription_service.get_resource_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS)),
293 listener,
294 )
295 .await?;
296
297 let listener = Arc::new(listeners::SubscribeListener::new(
299 usubscription_service.clone(),
300 ));
301 usubscription_service
302 .get_transport()
303 .register_listener(
304 &any_usubscription_uri,
305 Some(&usubscription_service.get_resource_uri(RESOURCE_ID_SUBSCRIBE)),
306 listener,
307 )
308 .await?;
309
310 let listener = Arc::new(listeners::UnsubscribeListener::new(
311 usubscription_service.clone(),
312 ));
313 usubscription_service
314 .get_transport()
315 .register_listener(
316 &any_usubscription_uri,
317 Some(&usubscription_service.get_resource_uri(RESOURCE_ID_UNSUBSCRIBE)),
318 listener,
319 )
320 .await?;
321
322 Ok(())
323 }
324}
325
326#[async_trait]
328impl USubscription for USubscriptionService {
329 async fn subscribe(
331 &self,
332 subscription_request: SubscriptionRequest,
333 ) -> Result<SubscriptionResponse, UStatus> {
334 let SubscriptionRequest {
335 subscriber, topic, ..
336 } = subscription_request;
337
338 let Some(topic) = topic.into_option() else {
340 return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "No topic"));
341 };
342 if topic.is_empty() {
343 return Err(UStatus::fail_with_code(
344 UCode::INVALID_ARGUMENT,
345 "Empty topic UUri",
346 ));
347 }
348
349 let Some(subscriber) = subscriber.into_option() else {
350 return Err(UStatus::fail_with_code(
351 UCode::INVALID_ARGUMENT,
352 "No SubscriberInfo",
353 ));
354 };
355 if subscriber.is_empty() || subscriber.uri.is_empty() {
356 return Err(UStatus::fail_with_code(
357 UCode::INVALID_ARGUMENT,
358 "Empty SubscriberInfo or subscriber UUri",
359 ));
360 }
361
362 debug!(
363 "Got SubscriptionRequest for topic {}, from subscriber {}",
364 topic.to_uri(INCLUDE_SCHEMA),
365 subscriber.uri.to_uri(INCLUDE_SCHEMA)
366 );
367
368 let (respond_to, receive_from) = oneshot::channel::<SubscriptionStatus>();
370 let se = SubscriptionEvent::AddSubscription {
371 subscriber: subscriber.clone(),
372 topic: topic.clone(),
373 respond_to,
374 };
375 if let Err(e) = self.subscription_sender.send(se).await {
376 return Err(UStatus::fail_with_code(
377 UCode::INTERNAL,
378 format!("Error communicating with subscription management: {e}"),
379 ));
380 }
381 let Ok(status) = receive_from.await else {
382 return Err(UStatus::fail_with_code(
383 UCode::INTERNAL,
384 "Error communicating with subscription management",
385 ));
386 };
387
388 let (respond_to, receive_from) = oneshot::channel::<()>();
390 if let Err(e) = self
391 .notification_sender
392 .send(NotificationEvent::StateChange {
393 subscriber,
394 topic: topic.clone(),
395 status: status.clone(),
396 respond_to,
397 })
398 .await
399 {
400 error!("Error initiating subscription-change update notification: {e}");
401 }
402 if let Err(e) = receive_from.await {
403 error!("Error sending subscription-change update notification: {e}");
405 };
406
407 Ok(SubscriptionResponse {
409 topic: Some(topic).into(),
410 status: Some(status).into(),
411 ..Default::default()
412 })
413 }
414
415 async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
417 let UnsubscribeRequest {
418 subscriber, topic, ..
419 } = unsubscribe_request;
420
421 let Some(topic) = topic.into_option() else {
423 return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "No topic"));
424 };
425 if topic.is_empty() {
426 return Err(UStatus::fail_with_code(
427 UCode::INVALID_ARGUMENT,
428 "Empty topic UUri",
429 ));
430 }
431
432 let Some(subscriber) = subscriber.into_option() else {
433 return Err(UStatus::fail_with_code(
434 UCode::INVALID_ARGUMENT,
435 "No SubscriberInfo",
436 ));
437 };
438 if subscriber.is_empty() || subscriber.uri.is_empty() {
439 return Err(UStatus::fail_with_code(
440 UCode::INVALID_ARGUMENT,
441 "Empty SubscriberInfo or subscriber UUri",
442 ));
443 }
444
445 debug!(
446 "Got UnsubscribeRequest for topic {}, from subscriber {}",
447 topic.to_uri(INCLUDE_SCHEMA),
448 subscriber.uri.to_uri(INCLUDE_SCHEMA)
449 );
450
451 let (respond_to, receive_from) = oneshot::channel::<SubscriptionStatus>();
453 let se = SubscriptionEvent::RemoveSubscription {
454 subscriber: subscriber.clone(),
455 topic: topic.clone(),
456 respond_to,
457 };
458 if let Err(e) = self.subscription_sender.send(se).await {
459 return Err(UStatus::fail_with_code(
460 UCode::INTERNAL,
461 format!("Error communicating with subscription management: {e}"),
462 ));
463 }
464 let Ok(status) = receive_from.await else {
465 return Err(UStatus::fail_with_code(
466 UCode::INTERNAL,
467 "Error communicating with subscription management",
468 ));
469 };
470
471 let (respond_to, receive_from) = oneshot::channel::<()>();
473 if let Err(e) = self
474 .notification_sender
475 .send(NotificationEvent::StateChange {
476 subscriber,
477 topic: topic.clone(),
478 status: status.clone(),
479 respond_to,
480 })
481 .await
482 {
483 error!("Error initiating subscription-change update notification: {e}");
485 }
486 if let Err(e) = receive_from.await {
487 error!("Error sending subscription-change update notification: {e}");
489 };
490
491 Ok(())
493 }
494
495 async fn register_for_notifications(
496 &self,
497 notifications_register_request: NotificationsRequest,
498 ) -> Result<(), UStatus> {
499 let NotificationsRequest {
500 subscriber, topic, ..
501 } = notifications_register_request;
502
503 let Some(topic) = topic.into_option() else {
505 return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "No topic"));
506 };
507 if topic.is_empty() {
508 return Err(UStatus::fail_with_code(
509 UCode::INVALID_ARGUMENT,
510 "Empty notification UUri",
511 ));
512 }
513 if !topic.is_event() {
514 return Err(UStatus::fail_with_code(
515 UCode::INVALID_ARGUMENT,
516 "UUri not a valid event destination",
517 ));
518 }
519 if self.get_source_uri().is_remote_authority(&topic) {
520 return Err(UStatus::fail_with_code(
521 UCode::INVALID_ARGUMENT,
522 "Cannot use remote topic for notifications",
523 ));
524 }
525
526 let Some(subscriber) = subscriber.into_option() else {
527 return Err(UStatus::fail_with_code(
528 UCode::INVALID_ARGUMENT,
529 "No SubscriberInfo",
530 ));
531 };
532 if subscriber.is_empty() {
533 return Err(UStatus::fail_with_code(
534 UCode::INVALID_ARGUMENT,
535 "Empty SubscriberInfo",
536 ));
537 }
538 let Some(subscriber_uri) = subscriber.uri.into_option() else {
539 return Err(UStatus::fail_with_code(
540 UCode::INVALID_ARGUMENT,
541 "No subscriber UUri",
542 ));
543 };
544 if subscriber_uri.is_empty() {
545 return Err(UStatus::fail_with_code(
546 UCode::INVALID_ARGUMENT,
547 "Empty subscriber UUri",
548 ));
549 }
550
551 debug!(
552 "Got RegisterForNotifications for notification topic {}, from subscriber {}",
553 topic.to_uri(INCLUDE_SCHEMA),
554 subscriber_uri.to_uri(INCLUDE_SCHEMA)
555 );
556
557 if let Err(e) = self
559 .notification_sender
560 .send(NotificationEvent::AddNotifyee {
561 subscriber: subscriber_uri,
562 topic,
563 })
564 .await
565 {
566 return Err(UStatus::fail_with_code(
567 UCode::INTERNAL,
568 format!("Failed to update notification settings: {e}"),
569 ));
570 }
571
572 Ok(())
574 }
575
576 async fn unregister_for_notifications(
577 &self,
578 notifications_unregister_request: NotificationsRequest,
579 ) -> Result<(), UStatus> {
580 let NotificationsRequest { subscriber, .. } = notifications_unregister_request;
582
583 let Some(subscriber) = subscriber.into_option() else {
585 return Err(UStatus::fail_with_code(
586 UCode::INVALID_ARGUMENT,
587 "No SubscriberInfo",
588 ));
589 };
590 if subscriber.is_empty() {
591 return Err(UStatus::fail_with_code(
592 UCode::INVALID_ARGUMENT,
593 "Empty SubscriberInfo",
594 ));
595 }
596 let Some(subscriber_uri) = subscriber.uri.into_option() else {
597 return Err(UStatus::fail_with_code(
598 UCode::INVALID_ARGUMENT,
599 "No subscriber UUri",
600 ));
601 };
602 if subscriber_uri.is_empty() {
603 return Err(UStatus::fail_with_code(
604 UCode::INVALID_ARGUMENT,
605 "Empty subscriber UUri",
606 ));
607 }
608
609 debug!(
610 "Got UnregisterForNotifications for notification from subscriber {}",
611 subscriber_uri.to_uri(INCLUDE_SCHEMA)
612 );
613
614 if let Err(e) = self
616 .notification_sender
617 .send(NotificationEvent::RemoveNotifyee {
618 subscriber: subscriber_uri,
619 })
620 .await
621 {
622 return Err(UStatus::fail_with_code(
623 UCode::INTERNAL,
624 format!("Failed to update notification settings: {e}"),
625 ));
626 }
627
628 Ok(())
630 }
631
632 async fn fetch_subscribers(
633 &self,
634 fetch_subscribers_request: FetchSubscribersRequest,
635 ) -> Result<FetchSubscribersResponse, UStatus> {
636 let Some(topic) = fetch_subscribers_request.topic.as_ref() else {
638 return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "No topic"));
639 };
640 if topic.is_empty() {
641 return Err(UStatus::fail_with_code(
642 UCode::INVALID_ARGUMENT,
643 "Empty topic UUri",
644 ));
645 }
646
647 debug!(
648 "Got FetchSubscribersRequest for topic {}",
649 topic.to_uri(INCLUDE_SCHEMA)
650 );
651
652 let (respond_to, receive_from) = oneshot::channel::<FetchSubscribersResponse>();
654 let se = SubscriptionEvent::FetchSubscribers {
655 request: fetch_subscribers_request,
656 respond_to,
657 };
658 if let Err(e) = self.subscription_sender.send(se).await {
659 return Err(UStatus::fail_with_code(
660 UCode::INTERNAL,
661 format!("Error communicating with subscription management: {e}"),
662 ));
663 }
664 let Ok(response) = receive_from.await else {
665 return Err(UStatus::fail_with_code(
666 UCode::INTERNAL,
667 "Error receiving response from subscription management",
668 ));
669 };
670
671 debug!(
673 "Returning {} subscriber entries",
674 response.subscribers.len()
675 );
676 Ok(response)
677 }
678
679 async fn fetch_subscriptions(
680 &self,
681 fetch_subscriptions_request: FetchSubscriptionsRequest,
682 ) -> Result<FetchSubscriptionsResponse, UStatus> {
683 let Some(request) = fetch_subscriptions_request.request.as_ref() else {
685 return Err(UStatus::fail_with_code(
686 UCode::INVALID_ARGUMENT,
687 "Missing Request property",
688 ));
689 };
690 match request {
691 Request::Topic(topic) => {
692 if topic.is_empty() {
693 return Err(UStatus::fail_with_code(
694 UCode::INVALID_ARGUMENT,
695 "Empty request topic UUri",
696 ));
697 }
698 }
699 Request::Subscriber(subscriber) => {
700 if subscriber.is_empty() {
701 return Err(UStatus::fail_with_code(
702 UCode::INVALID_ARGUMENT,
703 "Empty request SubscriberInfo",
704 ));
705 }
706 let Some(subscriber_uri) = subscriber.uri.as_ref() else {
707 return Err(UStatus::fail_with_code(
708 UCode::INVALID_ARGUMENT,
709 "No request subscriber UUri",
710 ));
711 };
712 if subscriber_uri.is_empty() {
713 return Err(UStatus::fail_with_code(
714 UCode::INVALID_ARGUMENT,
715 "Empty request subscriber UUri",
716 ));
717 }
718 }
719 _ => {
720 return Err(UStatus::fail_with_code(
721 UCode::INVALID_ARGUMENT,
722 "Invalid/unknown Request variant",
723 ));
724 }
725 }
726
727 debug!("Got FetchSubscriptionsRequest");
728
729 let (respond_to, receive_from) = oneshot::channel::<FetchSubscriptionsResponse>();
731 let se = SubscriptionEvent::FetchSubscriptions {
732 request: fetch_subscriptions_request,
733 respond_to,
734 };
735 if let Err(e) = self.subscription_sender.send(se).await {
736 return Err(UStatus::fail_with_code(
737 UCode::INTERNAL,
738 format!("Error communicating with subscription management: {e}"),
739 ));
740 }
741 let Ok(response) = receive_from.await else {
742 return Err(UStatus::fail_with_code(
743 UCode::INTERNAL,
744 "Error receiving response from subscription management",
745 ));
746 };
747
748 debug!(
750 "Returning {} Subscription entries",
751 response.subscriptions.len()
752 );
753 Ok(response)
754 }
755}