1use std::fmt::{Debug, Formatter};
15use std::hash::{Hash, Hasher};
16use std::num::TryFromIntError;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21
22use crate::{UCode, UMessage, UStatus, UUri};
23
24pub fn verify_filter_criteria(
36 source_filter: &UUri,
37 sink_filter: Option<&UUri>,
38) -> Result<(), UStatus> {
39 if let Some(sink_filter_uuri) = sink_filter {
40 if sink_filter_uuri.is_notification_destination()
41 && source_filter.is_notification_destination()
42 {
43 return Err(UStatus::fail_with_code(
44 UCode::INVALID_ARGUMENT,
45 "source and sink filters must not both have resource ID 0",
46 ));
47 }
48 if sink_filter_uuri.is_rpc_method()
49 && !source_filter.has_wildcard_resource_id()
50 && !source_filter.is_notification_destination()
51 {
52 return Err(UStatus::fail_with_code(
53 UCode::INVALID_ARGUMENT,
54 "source filter must either have the wildcard resource ID or resource ID 0, if sink filter matches RPC method resource ID"));
55 }
56 } else if !source_filter.has_wildcard_resource_id() && !source_filter.is_event() {
57 return Err(UStatus::fail_with_code(
58 UCode::INVALID_ARGUMENT,
59 "source filter must either have the wildcard resource ID or a resource ID from topic range, if sink filter is empty"));
60 }
61 Ok(())
63}
64
65#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
71pub trait LocalUriProvider: Send + Sync {
72 fn get_authority(&self) -> String;
74 fn get_resource_uri(&self, resource_id: u16) -> UUri;
76 fn get_source_uri(&self) -> UUri;
79}
80
81pub struct StaticUriProvider {
83 local_uri: UUri,
84}
85
86impl StaticUriProvider {
87 pub fn new(authority: impl Into<String>, entity_id: u32, major_version: u8) -> Self {
104 let local_uri = UUri {
105 authority_name: authority.into(),
106 ue_id: entity_id,
107 ue_version_major: major_version as u32,
108 resource_id: 0x0000,
109 ..Default::default()
110 };
111 StaticUriProvider { local_uri }
112 }
113}
114
115impl LocalUriProvider for StaticUriProvider {
116 fn get_authority(&self) -> String {
117 self.local_uri.authority_name.clone()
118 }
119
120 fn get_resource_uri(&self, resource_id: u16) -> UUri {
121 let mut uri = self.local_uri.clone();
122 uri.resource_id = resource_id as u32;
123 uri
124 }
125
126 fn get_source_uri(&self) -> UUri {
127 self.local_uri.clone()
128 }
129}
130
131impl TryFrom<UUri> for StaticUriProvider {
132 type Error = TryFromIntError;
133 fn try_from(value: UUri) -> Result<Self, Self::Error> {
134 Self::try_from(&value)
135 }
136}
137
138impl TryFrom<&UUri> for StaticUriProvider {
139 type Error = TryFromIntError;
140 fn try_from(source_uri: &UUri) -> Result<Self, Self::Error> {
175 let major_version = u8::try_from(source_uri.ue_version_major)?;
176 Ok(StaticUriProvider::new(
177 &source_uri.authority_name,
178 source_uri.ue_id,
179 major_version,
180 ))
181 }
182}
183
184#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
192#[async_trait]
193pub trait UListener: Send + Sync {
194 async fn on_receive(&self, msg: UMessage);
206}
207
208#[async_trait]
218pub trait UTransport: Send + Sync {
219 async fn send(&self, message: UMessage) -> Result<(), UStatus>;
231
232 async fn receive(
246 &self,
247 _source_filter: &UUri,
248 _sink_filter: Option<&UUri>,
249 ) -> Result<UMessage, UStatus> {
250 Err(UStatus::fail_with_code(
251 UCode::UNIMPLEMENTED,
252 "not implemented",
253 ))
254 }
255
256 async fn register_listener(
275 &self,
276 _source_filter: &UUri,
277 _sink_filter: Option<&UUri>,
278 _listener: Arc<dyn UListener>,
279 ) -> Result<(), UStatus> {
280 Err(UStatus::fail_with_code(
281 UCode::UNIMPLEMENTED,
282 "not implemented",
283 ))
284 }
285
286 async fn unregister_listener(
303 &self,
304 _source_filter: &UUri,
305 _sink_filter: Option<&UUri>,
306 _listener: Arc<dyn UListener>,
307 ) -> Result<(), UStatus> {
308 Err(UStatus::fail_with_code(
309 UCode::UNIMPLEMENTED,
310 "not implemented",
311 ))
312 }
313}
314
315#[cfg(not(tarpaulin_include))]
316#[cfg(any(test, feature = "test-util"))]
317mockall::mock! {
318 pub Transport {
321 pub async fn do_send(&self, message: UMessage) -> Result<(), UStatus>;
322 pub async fn do_register_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
323 pub async fn do_unregister_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
324 }
325}
326
327#[cfg(not(tarpaulin_include))]
328#[cfg(any(test, feature = "test-util"))]
329#[async_trait]
330impl UTransport for MockTransport {
333 async fn send(&self, message: UMessage) -> Result<(), UStatus> {
334 self.do_send(message).await
335 }
336 async fn register_listener(
337 &self,
338 source_filter: &UUri,
339 sink_filter: Option<&UUri>,
340 listener: Arc<dyn UListener>,
341 ) -> Result<(), UStatus> {
342 self.do_register_listener(source_filter, sink_filter, listener)
343 .await
344 }
345 async fn unregister_listener(
346 &self,
347 source_filter: &UUri,
348 sink_filter: Option<&UUri>,
349 listener: Arc<dyn UListener>,
350 ) -> Result<(), UStatus> {
351 self.do_unregister_listener(source_filter, sink_filter, listener)
352 .await
353 }
354}
355
356#[derive(Clone)]
371pub struct ComparableListener {
372 listener: Arc<dyn UListener>,
373}
374
375impl ComparableListener {
376 pub fn new(listener: Arc<dyn UListener>) -> Self {
377 Self { listener }
378 }
379 pub fn into_inner(&self) -> Arc<dyn UListener> {
381 self.listener.clone()
382 }
383
384 fn pointer_address(&self) -> usize {
386 let ptr = Arc::as_ptr(&self.listener);
388 let thin_ptr = ptr as *const ();
390 thin_ptr as usize
392 }
393}
394
395impl Deref for ComparableListener {
396 type Target = dyn UListener;
397
398 fn deref(&self) -> &Self::Target {
399 &*self.listener
400 }
401}
402
403impl Hash for ComparableListener {
404 fn hash<H: Hasher>(&self, state: &mut H) {
408 Arc::as_ptr(&self.listener).hash(state);
409 }
410}
411
412impl PartialEq for ComparableListener {
413 fn eq(&self, other: &Self) -> bool {
420 Arc::ptr_eq(&self.listener, &other.listener)
421 }
422}
423
424impl Eq for ComparableListener {}
425
426impl Debug for ComparableListener {
427 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
428 write!(f, "ComparableListener: {}", self.pointer_address())
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use crate::{ComparableListener, UListener, UMessage};
435 use std::{
436 hash::{DefaultHasher, Hash, Hasher},
437 ops::Deref,
438 str::FromStr,
439 sync::Arc,
440 };
441
442 use super::*;
443
444 #[test]
445 fn test_static_uri_provider_get_source() {
446 let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
447 let source_uri = provider.get_source_uri();
448 assert_eq!(source_uri.authority_name, "my-vehicle");
449 assert_eq!(source_uri.ue_id, 0x4210);
450 assert_eq!(source_uri.ue_version_major, 0x05);
451 assert_eq!(source_uri.resource_id, 0x0000);
452 }
453
454 #[test]
455 fn test_static_uri_provider_get_resource() {
456 let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
457 let resource_uri = provider.get_resource_uri(0x1234);
458 assert_eq!(resource_uri.authority_name, "my-vehicle");
459 assert_eq!(resource_uri.ue_id, 0x4210);
460 assert_eq!(resource_uri.ue_version_major, 0x05);
461 assert_eq!(resource_uri.resource_id, 0x1234);
462 }
463
464 #[tokio::test]
465 async fn test_deref_returns_wrapped_listener() {
466 let mut mock_listener = MockUListener::new();
467 mock_listener.expect_on_receive().once().return_const(());
468 let listener_one = Arc::new(mock_listener);
469 let comparable_listener_one = ComparableListener::new(listener_one);
470 comparable_listener_one
471 .deref()
472 .on_receive(UMessage::default())
473 .await;
474 }
475
476 #[tokio::test]
477 async fn test_to_inner_returns_reference_to_wrapped_listener() {
478 let mut mock_listener = MockUListener::new();
479 mock_listener.expect_on_receive().once().return_const(());
480 let listener_one = Arc::new(mock_listener);
481 let comparable_listener_one = ComparableListener::new(listener_one);
482 comparable_listener_one
483 .into_inner()
484 .on_receive(UMessage::default())
485 .await;
486 }
487
488 #[tokio::test]
489 async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_same_listener() {
490 let mut mock_listener = MockUListener::new();
491 mock_listener.expect_on_receive().times(2).return_const(());
492 let listener_one = Arc::new(mock_listener);
493 let listener_two = listener_one.clone();
494 listener_one.on_receive(UMessage::default()).await;
495 listener_two.on_receive(UMessage::default()).await;
496 let comparable_listener_one = ComparableListener::new(listener_one);
497 let comparable_listener_two = ComparableListener::new(listener_two);
498 assert!(&comparable_listener_one.eq(&comparable_listener_two));
499
500 let mut hasher = DefaultHasher::new();
501 comparable_listener_one.hash(&mut hasher);
502 let hash_one = hasher.finish();
503 let mut hasher = DefaultHasher::new();
504 comparable_listener_two.hash(&mut hasher);
505 let hash_two = hasher.finish();
506 assert_eq!(hash_one, hash_two);
507 }
508
509 #[tokio::test]
510 async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_different_listeners()
511 {
512 let mut mock_listener_one = MockUListener::new();
513 mock_listener_one
514 .expect_on_receive()
515 .once()
516 .return_const(());
517 let listener_one = Arc::new(mock_listener_one);
518 let mut mock_listener_two = MockUListener::new();
519 mock_listener_two
520 .expect_on_receive()
521 .once()
522 .return_const(());
523 let listener_two = Arc::new(mock_listener_two);
524 listener_one.on_receive(UMessage::default()).await;
525 listener_two.on_receive(UMessage::default()).await;
526 let comparable_listener_one = ComparableListener::new(listener_one);
527 let comparable_listener_two = ComparableListener::new(listener_two);
528 assert!(!&comparable_listener_one.eq(&comparable_listener_two));
529
530 let mut hasher = DefaultHasher::new();
531 comparable_listener_one.hash(&mut hasher);
532 let hash_one = hasher.finish();
533 let mut hasher = DefaultHasher::new();
534 comparable_listener_two.hash(&mut hasher);
535 let hash_two = hasher.finish();
536 assert_ne!(hash_one, hash_two);
537 }
538
539 #[tokio::test]
540 async fn test_utransport_default_implementations() {
541 struct EmptyTransport {}
542 #[async_trait::async_trait]
543 impl UTransport for EmptyTransport {
544 async fn send(&self, _message: UMessage) -> Result<(), UStatus> {
545 todo!()
546 }
547 }
548
549 let transport = EmptyTransport {};
550 let listener = Arc::new(MockUListener::new());
551
552 assert!(transport
553 .receive(&UUri::any(), None)
554 .await
555 .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
556 assert!(transport
557 .register_listener(&UUri::any(), None, listener.clone())
558 .await
559 .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
560 assert!(transport
561 .unregister_listener(&UUri::any(), None, listener)
562 .await
563 .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
564 }
565
566 #[test]
567 fn test_comparable_listener_pointer_address() {
568 let bar = Arc::new(MockUListener::new());
569 let comp_listener = ComparableListener::new(bar);
570
571 let comp_listener_thread = comp_listener.clone();
572 let handle = std::thread::spawn(move || comp_listener_thread.pointer_address());
573
574 let comp_listener_address_other_thread = handle.join().unwrap();
575 let comp_listener_address_this_thread = comp_listener.pointer_address();
576
577 assert_eq!(
578 comp_listener_address_this_thread,
579 comp_listener_address_other_thread
580 );
581 }
582
583 #[test]
584 fn test_comparable_listener_debug_outputs() {
585 let bar = Arc::new(MockUListener::new());
586 let comp_listener = ComparableListener::new(bar);
587 let debug_output = format!("{comp_listener:?}");
588 assert!(!debug_output.is_empty());
589 }
590
591 #[test_case::test_case(
592 "//vehicle1/AA/1/0",
593 Some("//vehicle2/BB/1/0");
594 "source and sink both having resource ID 0")]
595 #[test_case::test_case(
596 "//vehicle1/AA/1/CC",
597 Some("//vehicle2/BB/1/1A");
598 "sink is RPC but source has invalid resource ID")]
599 #[test_case::test_case(
600 "//vehicle1/AA/1/CC",
601 None;
602 "sink is empty but source has non-topic resource ID")]
603 fn test_verify_filter_criteria_fails_for(source: &str, sink: Option<&str>) {
604 let source_filter = UUri::from_str(source).expect("invalid source URI");
605 let sink_filter = sink.map(|s| UUri::from_str(s).expect("invalid sink URI"));
606 assert!(verify_filter_criteria(&source_filter, sink_filter.as_ref())
607 .is_err_and(|err| matches!(err.get_code(), UCode::INVALID_ARGUMENT)));
608 }
609}