1use std::sync::Arc;
15
16use async_trait::async_trait;
17
18use crate::{
19 core::usubscription::{
20 usubscription_uri, FetchSubscribersRequest, FetchSubscribersResponse,
21 FetchSubscriptionsRequest, FetchSubscriptionsResponse, NotificationsRequest,
22 NotificationsResponse, ResetRequest, ResetResponse, SubscriptionRequest,
23 SubscriptionResponse, USubscription, UnsubscribeRequest, UnsubscribeResponse,
24 RESOURCE_ID_FETCH_SUBSCRIBERS, RESOURCE_ID_FETCH_SUBSCRIPTIONS,
25 RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_RESET, RESOURCE_ID_SUBSCRIBE,
26 RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE,
27 },
28 UStatus,
29};
30
31use super::{CallOptions, RpcClient};
32
33pub struct RpcClientUSubscription {
37 rpc_client: Arc<dyn RpcClient>,
38}
39
40impl RpcClientUSubscription {
41 pub fn new(rpc_client: Arc<dyn RpcClient>) -> Self {
47 RpcClientUSubscription { rpc_client }
48 }
49
50 fn default_call_options() -> CallOptions {
51 CallOptions::for_rpc_request(5_000, None, None, None)
52 }
53}
54
55#[async_trait]
56impl USubscription for RpcClientUSubscription {
57 async fn subscribe(
58 &self,
59 subscription_request: SubscriptionRequest,
60 ) -> Result<SubscriptionResponse, UStatus> {
61 self.rpc_client
62 .invoke_proto_method::<_, SubscriptionResponse>(
63 usubscription_uri(RESOURCE_ID_SUBSCRIBE),
64 Self::default_call_options(),
65 subscription_request,
66 )
67 .await
68 .map_err(UStatus::from)
69 }
70
71 async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
72 self.rpc_client
73 .invoke_proto_method::<_, UnsubscribeResponse>(
74 usubscription_uri(RESOURCE_ID_UNSUBSCRIBE),
75 Self::default_call_options(),
76 unsubscribe_request,
77 )
78 .await
79 .map(|_response| ())
80 .map_err(UStatus::from)
81 }
82
83 async fn fetch_subscriptions(
84 &self,
85 fetch_subscriptions_request: FetchSubscriptionsRequest,
86 ) -> Result<FetchSubscriptionsResponse, UStatus> {
87 self.rpc_client
88 .invoke_proto_method::<_, FetchSubscriptionsResponse>(
89 usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS),
90 Self::default_call_options(),
91 fetch_subscriptions_request,
92 )
93 .await
94 .map_err(UStatus::from)
95 }
96
97 async fn register_for_notifications(
98 &self,
99 notifications_register_request: NotificationsRequest,
100 ) -> Result<(), UStatus> {
101 self.rpc_client
102 .invoke_proto_method::<_, NotificationsResponse>(
103 usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS),
104 Self::default_call_options(),
105 notifications_register_request,
106 )
107 .await
108 .map(|_response| ())
109 .map_err(UStatus::from)
110 }
111
112 async fn unregister_for_notifications(
113 &self,
114 notifications_unregister_request: NotificationsRequest,
115 ) -> Result<(), UStatus> {
116 self.rpc_client
117 .invoke_proto_method::<_, NotificationsResponse>(
118 usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS),
119 Self::default_call_options(),
120 notifications_unregister_request,
121 )
122 .await
123 .map(|_response| ())
124 .map_err(UStatus::from)
125 }
126
127 async fn fetch_subscribers(
128 &self,
129 fetch_subscribers_request: FetchSubscribersRequest,
130 ) -> Result<FetchSubscribersResponse, UStatus> {
131 self.rpc_client
132 .invoke_proto_method::<_, FetchSubscribersResponse>(
133 usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS),
134 Self::default_call_options(),
135 fetch_subscribers_request,
136 )
137 .await
138 .map_err(UStatus::from)
139 }
140
141 async fn reset(&self, reset_request: ResetRequest) -> Result<ResetResponse, UStatus> {
142 self.rpc_client
143 .invoke_proto_method::<_, ResetResponse>(
144 usubscription_uri(RESOURCE_ID_RESET),
145 Self::default_call_options(),
146 reset_request,
147 )
148 .await
149 .map_err(UStatus::from)
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use mockall::Sequence;
156
157 use super::*;
158 use crate::{
159 communication::{rpc::MockRpcClient, UPayload},
160 core::usubscription::{Request, SubscriptionResponse},
161 UCode, UUri,
162 };
163 use std::sync::Arc;
164
165 #[tokio::test]
166 async fn test_subscribe_invokes_rpc_client() {
167 let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
168 let request = SubscriptionRequest {
169 topic: Some(topic).into(),
170 ..Default::default()
171 };
172 let expected_request = request.clone();
173 let mut rpc_client = MockRpcClient::new();
174 let mut seq = Sequence::new();
175 rpc_client
176 .expect_invoke_method()
177 .once()
178 .in_sequence(&mut seq)
179 .withf(|method, _options, payload| {
180 method == &usubscription_uri(RESOURCE_ID_SUBSCRIBE) && payload.is_some()
181 })
182 .return_const(Err(crate::communication::ServiceInvocationError::Internal(
183 "internal error".to_string(),
184 )));
185 rpc_client
186 .expect_invoke_method()
187 .once()
188 .in_sequence(&mut seq)
189 .withf(move |method, _options, payload| {
190 let request = payload
191 .to_owned()
192 .unwrap()
193 .extract_protobuf::<SubscriptionRequest>()
194 .unwrap();
195 request == expected_request && method == &usubscription_uri(RESOURCE_ID_SUBSCRIBE)
196 })
197 .returning(move |_method, _options, _payload| {
198 let response = SubscriptionResponse {
199 ..Default::default()
200 };
201 Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
202 });
203
204 let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
205
206 assert!(usubscription_client
207 .subscribe(request.clone())
208 .await
209 .is_err_and(|e| e.get_code() == UCode::INTERNAL));
210 assert!(usubscription_client.subscribe(request).await.is_ok());
211 }
212
213 #[tokio::test]
214 async fn test_unsubscribe_invokes_rpc_client() {
215 let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
216 let request = UnsubscribeRequest {
217 topic: Some(topic).into(),
218 ..Default::default()
219 };
220 let expected_request = request.clone();
221 let mut rpc_client = MockRpcClient::new();
222 let mut seq = Sequence::new();
223 rpc_client
224 .expect_invoke_method()
225 .once()
226 .in_sequence(&mut seq)
227 .withf(|method, _options, payload| {
228 method == &usubscription_uri(RESOURCE_ID_UNSUBSCRIBE) && payload.is_some()
229 })
230 .return_const(Err(crate::communication::ServiceInvocationError::Internal(
231 "internal error".to_string(),
232 )));
233 rpc_client
234 .expect_invoke_method()
235 .once()
236 .in_sequence(&mut seq)
237 .withf(move |method, _options, payload| {
238 let request = payload
239 .to_owned()
240 .unwrap()
241 .extract_protobuf::<UnsubscribeRequest>()
242 .unwrap();
243 request == expected_request && method == &usubscription_uri(RESOURCE_ID_UNSUBSCRIBE)
244 })
245 .returning(move |_method, _options, _payload| {
246 let response = UnsubscribeResponse {
247 ..Default::default()
248 };
249 Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
250 });
251
252 let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
253
254 assert!(usubscription_client
255 .unsubscribe(request.clone())
256 .await
257 .is_err_and(|e| e.get_code() == UCode::INTERNAL));
258 assert!(usubscription_client.unsubscribe(request).await.is_ok());
259 }
260
261 #[tokio::test]
262 async fn test_fetch_subscriptions_invokes_rpc_client() {
263 let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
264 let request = FetchSubscriptionsRequest {
265 request: Some(Request::Topic(topic)),
266 ..Default::default()
267 };
268 let expected_request = request.clone();
269 let mut rpc_client = MockRpcClient::new();
270 let mut seq = Sequence::new();
271 rpc_client
272 .expect_invoke_method()
273 .once()
274 .in_sequence(&mut seq)
275 .withf(|method, _options, payload| {
276 method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS) && payload.is_some()
277 })
278 .return_const(Err(crate::communication::ServiceInvocationError::Internal(
279 "internal error".to_string(),
280 )));
281 rpc_client
282 .expect_invoke_method()
283 .once()
284 .in_sequence(&mut seq)
285 .withf(move |method, _options, payload| {
286 let request = payload
287 .to_owned()
288 .unwrap()
289 .extract_protobuf::<FetchSubscriptionsRequest>()
290 .unwrap();
291
292 request == expected_request
293 && method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS)
294 })
295 .returning(move |_method, _options, _payload| {
296 let response = FetchSubscriptionsResponse {
297 ..Default::default()
298 };
299 Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
300 });
301
302 let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
303
304 assert!(usubscription_client
305 .fetch_subscriptions(request.clone())
306 .await
307 .is_err_and(|e| e.get_code() == UCode::INTERNAL));
308 assert!(usubscription_client
309 .fetch_subscriptions(request)
310 .await
311 .is_ok());
312 }
313
314 #[tokio::test]
315 async fn test_fetch_subscribers_invokes_rpc_client() {
316 let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
317 let request = FetchSubscribersRequest {
318 topic: Some(topic).into(),
319 ..Default::default()
320 };
321 let expected_request = request.clone();
322 let mut rpc_client = MockRpcClient::new();
323 let mut seq = Sequence::new();
324 rpc_client
325 .expect_invoke_method()
326 .once()
327 .in_sequence(&mut seq)
328 .withf(|method, _options, payload| {
329 method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS) && payload.is_some()
330 })
331 .return_const(Err(crate::communication::ServiceInvocationError::Internal(
332 "internal error".to_string(),
333 )));
334 rpc_client
335 .expect_invoke_method()
336 .once()
337 .in_sequence(&mut seq)
338 .withf(move |method, _options, payload| {
339 let request = payload
340 .to_owned()
341 .unwrap()
342 .extract_protobuf::<FetchSubscribersRequest>()
343 .unwrap();
344
345 request == expected_request
346 && method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS)
347 })
348 .returning(move |_method, _options, _payload| {
349 let response = FetchSubscribersResponse {
350 ..Default::default()
351 };
352 Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
353 });
354
355 let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
356
357 assert!(usubscription_client
358 .fetch_subscribers(request.clone())
359 .await
360 .is_err_and(|e| e.get_code() == UCode::INTERNAL));
361 assert!(usubscription_client
362 .fetch_subscribers(request)
363 .await
364 .is_ok());
365 }
366
367 #[tokio::test]
368 async fn test_register_for_notifications_invokes_rpc_client() {
369 let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
370 let request = NotificationsRequest {
371 topic: Some(topic).into(),
372 ..Default::default()
373 };
374 let expected_request = request.clone();
375 let mut rpc_client = MockRpcClient::new();
376 let mut seq = Sequence::new();
377 rpc_client
378 .expect_invoke_method()
379 .once()
380 .in_sequence(&mut seq)
381 .withf(|method, _options, payload| {
382 method == &usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS)
383 && payload.is_some()
384 })
385 .return_const(Err(crate::communication::ServiceInvocationError::Internal(
386 "internal error".to_string(),
387 )));
388 rpc_client
389 .expect_invoke_method()
390 .once()
391 .in_sequence(&mut seq)
392 .withf(move |method, _options, payload| {
393 let request = payload
394 .to_owned()
395 .unwrap()
396 .extract_protobuf::<NotificationsRequest>()
397 .unwrap();
398
399 request == expected_request
400 && method == &usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS)
401 })
402 .returning(move |_method, _options, _payload| {
403 let response = NotificationsResponse {
404 ..Default::default()
405 };
406 Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
407 });
408
409 let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
410
411 assert!(usubscription_client
412 .register_for_notifications(request.clone())
413 .await
414 .is_err_and(|e| e.get_code() == UCode::INTERNAL));
415 assert!(usubscription_client
416 .register_for_notifications(request)
417 .await
418 .is_ok());
419 }
420
421 #[tokio::test]
422 async fn test_unregister_for_notifications_invokes_rpc_client() {
423 let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
424 let request = NotificationsRequest {
425 topic: Some(topic).into(),
426 ..Default::default()
427 };
428 let expected_request = request.clone();
429 let mut rpc_client = MockRpcClient::new();
430 let mut seq = Sequence::new();
431 rpc_client
432 .expect_invoke_method()
433 .once()
434 .in_sequence(&mut seq)
435 .withf(|method, _options, payload| {
436 method == &usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS)
437 && payload.is_some()
438 })
439 .return_const(Err(crate::communication::ServiceInvocationError::Internal(
440 "internal error".to_string(),
441 )));
442 rpc_client
443 .expect_invoke_method()
444 .once()
445 .in_sequence(&mut seq)
446 .withf(move |method, _options, payload| {
447 let request = payload
448 .to_owned()
449 .unwrap()
450 .extract_protobuf::<NotificationsRequest>()
451 .unwrap();
452
453 request == expected_request
454 && method == &usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS)
455 })
456 .returning(move |_method, _options, _payload| {
457 let response = NotificationsResponse {
458 ..Default::default()
459 };
460 Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
461 });
462
463 let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
464
465 assert!(usubscription_client
466 .unregister_for_notifications(request.clone())
467 .await
468 .is_err_and(|e| e.get_code() == UCode::INTERNAL));
469 assert!(usubscription_client
470 .unregister_for_notifications(request)
471 .await
472 .is_ok());
473 }
474
475 #[tokio::test]
476 async fn test_reset_invokes_rpc_client() {
477 let request = ResetRequest::default();
478
479 let expected_request = request.clone();
480 let mut rpc_client = MockRpcClient::new();
481 let mut seq = Sequence::new();
482 rpc_client
483 .expect_invoke_method()
484 .once()
485 .in_sequence(&mut seq)
486 .withf(|method, _options, payload| {
487 method == &usubscription_uri(RESOURCE_ID_RESET) && payload.is_some()
488 })
489 .return_const(Err(crate::communication::ServiceInvocationError::Internal(
490 "internal error".to_string(),
491 )));
492 rpc_client
493 .expect_invoke_method()
494 .once()
495 .in_sequence(&mut seq)
496 .withf(move |method, _options, payload| {
497 let request = payload
498 .to_owned()
499 .unwrap()
500 .extract_protobuf::<ResetRequest>()
501 .unwrap();
502
503 request == expected_request && method == &usubscription_uri(RESOURCE_ID_RESET)
504 })
505 .returning(move |_method, _options, _payload| {
506 let response = ResetResponse {
507 ..Default::default()
508 };
509 Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
510 });
511
512 let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
513
514 assert!(usubscription_client
515 .reset(request.clone())
516 .await
517 .is_err_and(|e| e.get_code() == UCode::INTERNAL));
518 assert!(usubscription_client.reset(request).await.is_ok());
519 }
520}