up_rust/communication/
usubscription_client.rs

1/********************************************************************************
2 * Copyright (c) 2024 Contributors to the Eclipse Foundation
3 *
4 * See the NOTICE file(s) distributed with this work for additional
5 * information regarding copyright ownership.
6 *
7 * This program and the accompanying materials are made available under the
8 * terms of the Apache License Version 2.0 which is available at
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * SPDX-License-Identifier: Apache-2.0
12 ********************************************************************************/
13
14use std::sync::Arc;
15
16use async_trait::async_trait;
17
18use crate::{
19    core::usubscription::{
20        usubscription_uri, FetchSubscribersRequest, FetchSubscribersResponse,
21        FetchSubscriptionsRequest, FetchSubscriptionsResponse, NotificationsRequest,
22        NotificationsResponse, ResetRequest, ResetResponse, SubscriptionRequest,
23        SubscriptionResponse, USubscription, UnsubscribeRequest, UnsubscribeResponse,
24        RESOURCE_ID_FETCH_SUBSCRIBERS, RESOURCE_ID_FETCH_SUBSCRIPTIONS,
25        RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_RESET, RESOURCE_ID_SUBSCRIBE,
26        RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE,
27    },
28    UStatus,
29};
30
31use super::{CallOptions, RpcClient};
32
33/// A [`USubscription`] client implementation for invoking operations of a local USubscription service.
34///
35/// The client requires an [`RpcClient`] for performing the remote procedure calls.
36pub struct RpcClientUSubscription {
37    rpc_client: Arc<dyn RpcClient>,
38}
39
40impl RpcClientUSubscription {
41    /// Creates a new Notifier for a given transport.
42    ///
43    /// # Arguments
44    ///
45    /// * `rpc_client` - The client to use for performing the remote procedure calls on the USubscription service.
46    pub fn new(rpc_client: Arc<dyn RpcClient>) -> Self {
47        RpcClientUSubscription { rpc_client }
48    }
49
50    fn default_call_options() -> CallOptions {
51        CallOptions::for_rpc_request(5_000, None, None, None)
52    }
53}
54
55#[async_trait]
56impl USubscription for RpcClientUSubscription {
57    async fn subscribe(
58        &self,
59        subscription_request: SubscriptionRequest,
60    ) -> Result<SubscriptionResponse, UStatus> {
61        self.rpc_client
62            .invoke_proto_method::<_, SubscriptionResponse>(
63                usubscription_uri(RESOURCE_ID_SUBSCRIBE),
64                Self::default_call_options(),
65                subscription_request,
66            )
67            .await
68            .map_err(UStatus::from)
69    }
70
71    async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
72        self.rpc_client
73            .invoke_proto_method::<_, UnsubscribeResponse>(
74                usubscription_uri(RESOURCE_ID_UNSUBSCRIBE),
75                Self::default_call_options(),
76                unsubscribe_request,
77            )
78            .await
79            .map(|_response| ())
80            .map_err(UStatus::from)
81    }
82
83    async fn fetch_subscriptions(
84        &self,
85        fetch_subscriptions_request: FetchSubscriptionsRequest,
86    ) -> Result<FetchSubscriptionsResponse, UStatus> {
87        self.rpc_client
88            .invoke_proto_method::<_, FetchSubscriptionsResponse>(
89                usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS),
90                Self::default_call_options(),
91                fetch_subscriptions_request,
92            )
93            .await
94            .map_err(UStatus::from)
95    }
96
97    async fn register_for_notifications(
98        &self,
99        notifications_register_request: NotificationsRequest,
100    ) -> Result<(), UStatus> {
101        self.rpc_client
102            .invoke_proto_method::<_, NotificationsResponse>(
103                usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS),
104                Self::default_call_options(),
105                notifications_register_request,
106            )
107            .await
108            .map(|_response| ())
109            .map_err(UStatus::from)
110    }
111
112    async fn unregister_for_notifications(
113        &self,
114        notifications_unregister_request: NotificationsRequest,
115    ) -> Result<(), UStatus> {
116        self.rpc_client
117            .invoke_proto_method::<_, NotificationsResponse>(
118                usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS),
119                Self::default_call_options(),
120                notifications_unregister_request,
121            )
122            .await
123            .map(|_response| ())
124            .map_err(UStatus::from)
125    }
126
127    async fn fetch_subscribers(
128        &self,
129        fetch_subscribers_request: FetchSubscribersRequest,
130    ) -> Result<FetchSubscribersResponse, UStatus> {
131        self.rpc_client
132            .invoke_proto_method::<_, FetchSubscribersResponse>(
133                usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS),
134                Self::default_call_options(),
135                fetch_subscribers_request,
136            )
137            .await
138            .map_err(UStatus::from)
139    }
140
141    async fn reset(&self, reset_request: ResetRequest) -> Result<ResetResponse, UStatus> {
142        self.rpc_client
143            .invoke_proto_method::<_, ResetResponse>(
144                usubscription_uri(RESOURCE_ID_RESET),
145                Self::default_call_options(),
146                reset_request,
147            )
148            .await
149            .map_err(UStatus::from)
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use mockall::Sequence;
156
157    use super::*;
158    use crate::{
159        communication::{rpc::MockRpcClient, UPayload},
160        core::usubscription::{Request, SubscriptionResponse},
161        UCode, UUri,
162    };
163    use std::sync::Arc;
164
165    #[tokio::test]
166    async fn test_subscribe_invokes_rpc_client() {
167        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
168        let request = SubscriptionRequest {
169            topic: Some(topic).into(),
170            ..Default::default()
171        };
172        let expected_request = request.clone();
173        let mut rpc_client = MockRpcClient::new();
174        let mut seq = Sequence::new();
175        rpc_client
176            .expect_invoke_method()
177            .once()
178            .in_sequence(&mut seq)
179            .withf(|method, _options, payload| {
180                method == &usubscription_uri(RESOURCE_ID_SUBSCRIBE) && payload.is_some()
181            })
182            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
183                "internal error".to_string(),
184            )));
185        rpc_client
186            .expect_invoke_method()
187            .once()
188            .in_sequence(&mut seq)
189            .withf(move |method, _options, payload| {
190                let request = payload
191                    .to_owned()
192                    .unwrap()
193                    .extract_protobuf::<SubscriptionRequest>()
194                    .unwrap();
195                request == expected_request && method == &usubscription_uri(RESOURCE_ID_SUBSCRIBE)
196            })
197            .returning(move |_method, _options, _payload| {
198                let response = SubscriptionResponse {
199                    ..Default::default()
200                };
201                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
202            });
203
204        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
205
206        assert!(usubscription_client
207            .subscribe(request.clone())
208            .await
209            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
210        assert!(usubscription_client.subscribe(request).await.is_ok());
211    }
212
213    #[tokio::test]
214    async fn test_unsubscribe_invokes_rpc_client() {
215        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
216        let request = UnsubscribeRequest {
217            topic: Some(topic).into(),
218            ..Default::default()
219        };
220        let expected_request = request.clone();
221        let mut rpc_client = MockRpcClient::new();
222        let mut seq = Sequence::new();
223        rpc_client
224            .expect_invoke_method()
225            .once()
226            .in_sequence(&mut seq)
227            .withf(|method, _options, payload| {
228                method == &usubscription_uri(RESOURCE_ID_UNSUBSCRIBE) && payload.is_some()
229            })
230            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
231                "internal error".to_string(),
232            )));
233        rpc_client
234            .expect_invoke_method()
235            .once()
236            .in_sequence(&mut seq)
237            .withf(move |method, _options, payload| {
238                let request = payload
239                    .to_owned()
240                    .unwrap()
241                    .extract_protobuf::<UnsubscribeRequest>()
242                    .unwrap();
243                request == expected_request && method == &usubscription_uri(RESOURCE_ID_UNSUBSCRIBE)
244            })
245            .returning(move |_method, _options, _payload| {
246                let response = UnsubscribeResponse {
247                    ..Default::default()
248                };
249                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
250            });
251
252        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
253
254        assert!(usubscription_client
255            .unsubscribe(request.clone())
256            .await
257            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
258        assert!(usubscription_client.unsubscribe(request).await.is_ok());
259    }
260
261    #[tokio::test]
262    async fn test_fetch_subscriptions_invokes_rpc_client() {
263        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
264        let request = FetchSubscriptionsRequest {
265            request: Some(Request::Topic(topic)),
266            ..Default::default()
267        };
268        let expected_request = request.clone();
269        let mut rpc_client = MockRpcClient::new();
270        let mut seq = Sequence::new();
271        rpc_client
272            .expect_invoke_method()
273            .once()
274            .in_sequence(&mut seq)
275            .withf(|method, _options, payload| {
276                method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS) && payload.is_some()
277            })
278            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
279                "internal error".to_string(),
280            )));
281        rpc_client
282            .expect_invoke_method()
283            .once()
284            .in_sequence(&mut seq)
285            .withf(move |method, _options, payload| {
286                let request = payload
287                    .to_owned()
288                    .unwrap()
289                    .extract_protobuf::<FetchSubscriptionsRequest>()
290                    .unwrap();
291
292                request == expected_request
293                    && method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS)
294            })
295            .returning(move |_method, _options, _payload| {
296                let response = FetchSubscriptionsResponse {
297                    ..Default::default()
298                };
299                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
300            });
301
302        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
303
304        assert!(usubscription_client
305            .fetch_subscriptions(request.clone())
306            .await
307            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
308        assert!(usubscription_client
309            .fetch_subscriptions(request)
310            .await
311            .is_ok());
312    }
313
314    #[tokio::test]
315    async fn test_fetch_subscribers_invokes_rpc_client() {
316        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
317        let request = FetchSubscribersRequest {
318            topic: Some(topic).into(),
319            ..Default::default()
320        };
321        let expected_request = request.clone();
322        let mut rpc_client = MockRpcClient::new();
323        let mut seq = Sequence::new();
324        rpc_client
325            .expect_invoke_method()
326            .once()
327            .in_sequence(&mut seq)
328            .withf(|method, _options, payload| {
329                method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS) && payload.is_some()
330            })
331            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
332                "internal error".to_string(),
333            )));
334        rpc_client
335            .expect_invoke_method()
336            .once()
337            .in_sequence(&mut seq)
338            .withf(move |method, _options, payload| {
339                let request = payload
340                    .to_owned()
341                    .unwrap()
342                    .extract_protobuf::<FetchSubscribersRequest>()
343                    .unwrap();
344
345                request == expected_request
346                    && method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS)
347            })
348            .returning(move |_method, _options, _payload| {
349                let response = FetchSubscribersResponse {
350                    ..Default::default()
351                };
352                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
353            });
354
355        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
356
357        assert!(usubscription_client
358            .fetch_subscribers(request.clone())
359            .await
360            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
361        assert!(usubscription_client
362            .fetch_subscribers(request)
363            .await
364            .is_ok());
365    }
366
367    #[tokio::test]
368    async fn test_register_for_notifications_invokes_rpc_client() {
369        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
370        let request = NotificationsRequest {
371            topic: Some(topic).into(),
372            ..Default::default()
373        };
374        let expected_request = request.clone();
375        let mut rpc_client = MockRpcClient::new();
376        let mut seq = Sequence::new();
377        rpc_client
378            .expect_invoke_method()
379            .once()
380            .in_sequence(&mut seq)
381            .withf(|method, _options, payload| {
382                method == &usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS)
383                    && payload.is_some()
384            })
385            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
386                "internal error".to_string(),
387            )));
388        rpc_client
389            .expect_invoke_method()
390            .once()
391            .in_sequence(&mut seq)
392            .withf(move |method, _options, payload| {
393                let request = payload
394                    .to_owned()
395                    .unwrap()
396                    .extract_protobuf::<NotificationsRequest>()
397                    .unwrap();
398
399                request == expected_request
400                    && method == &usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS)
401            })
402            .returning(move |_method, _options, _payload| {
403                let response = NotificationsResponse {
404                    ..Default::default()
405                };
406                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
407            });
408
409        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
410
411        assert!(usubscription_client
412            .register_for_notifications(request.clone())
413            .await
414            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
415        assert!(usubscription_client
416            .register_for_notifications(request)
417            .await
418            .is_ok());
419    }
420
421    #[tokio::test]
422    async fn test_unregister_for_notifications_invokes_rpc_client() {
423        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
424        let request = NotificationsRequest {
425            topic: Some(topic).into(),
426            ..Default::default()
427        };
428        let expected_request = request.clone();
429        let mut rpc_client = MockRpcClient::new();
430        let mut seq = Sequence::new();
431        rpc_client
432            .expect_invoke_method()
433            .once()
434            .in_sequence(&mut seq)
435            .withf(|method, _options, payload| {
436                method == &usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS)
437                    && payload.is_some()
438            })
439            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
440                "internal error".to_string(),
441            )));
442        rpc_client
443            .expect_invoke_method()
444            .once()
445            .in_sequence(&mut seq)
446            .withf(move |method, _options, payload| {
447                let request = payload
448                    .to_owned()
449                    .unwrap()
450                    .extract_protobuf::<NotificationsRequest>()
451                    .unwrap();
452
453                request == expected_request
454                    && method == &usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS)
455            })
456            .returning(move |_method, _options, _payload| {
457                let response = NotificationsResponse {
458                    ..Default::default()
459                };
460                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
461            });
462
463        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
464
465        assert!(usubscription_client
466            .unregister_for_notifications(request.clone())
467            .await
468            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
469        assert!(usubscription_client
470            .unregister_for_notifications(request)
471            .await
472            .is_ok());
473    }
474
475    #[tokio::test]
476    async fn test_reset_invokes_rpc_client() {
477        let request = ResetRequest::default();
478
479        let expected_request = request.clone();
480        let mut rpc_client = MockRpcClient::new();
481        let mut seq = Sequence::new();
482        rpc_client
483            .expect_invoke_method()
484            .once()
485            .in_sequence(&mut seq)
486            .withf(|method, _options, payload| {
487                method == &usubscription_uri(RESOURCE_ID_RESET) && payload.is_some()
488            })
489            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
490                "internal error".to_string(),
491            )));
492        rpc_client
493            .expect_invoke_method()
494            .once()
495            .in_sequence(&mut seq)
496            .withf(move |method, _options, payload| {
497                let request = payload
498                    .to_owned()
499                    .unwrap()
500                    .extract_protobuf::<ResetRequest>()
501                    .unwrap();
502
503                request == expected_request && method == &usubscription_uri(RESOURCE_ID_RESET)
504            })
505            .returning(move |_method, _options, _payload| {
506                let response = ResetResponse {
507                    ..Default::default()
508                };
509                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
510            });
511
512        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
513
514        assert!(usubscription_client
515            .reset(request.clone())
516            .await
517            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
518        assert!(usubscription_client.reset(request).await.is_ok());
519    }
520}