1use std::fmt::{Debug, Formatter};
15use std::hash::{Hash, Hasher};
16use std::num::TryFromIntError;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21
22use crate::{UCode, UMessage, UStatus, UUri};
23
24pub fn verify_filter_criteria(
36 source_filter: &UUri,
37 sink_filter: Option<&UUri>,
38) -> Result<(), UStatus> {
39 UUri::check_validity(source_filter).map_err(|err| {
40 UStatus::fail_with_code(
41 UCode::INVALID_ARGUMENT,
42 format!("invalid source filter URI: {err}"),
43 )
44 })?;
45 if let Some(sink_filter_uuri) = sink_filter {
46 UUri::check_validity(sink_filter_uuri).map_err(|err| {
47 UStatus::fail_with_code(
48 UCode::INVALID_ARGUMENT,
49 format!("invalid sink filter URI: {err}"),
50 )
51 })?;
52
53 if sink_filter_uuri.is_notification_destination()
54 && source_filter.is_notification_destination()
55 {
56 return Err(UStatus::fail_with_code(
57 UCode::INVALID_ARGUMENT,
58 "source and sink filters must not both have resource ID 0",
59 ));
60 }
61 if sink_filter_uuri.is_rpc_method()
62 && !source_filter.has_wildcard_resource_id()
63 && !source_filter.is_notification_destination()
64 {
65 return Err(UStatus::fail_with_code(
66 UCode::INVALID_ARGUMENT,
67 "source filter must either have the wildcard resource ID or resource ID 0, if sink filter matches RPC method resource ID"));
68 }
69 } else if !source_filter.has_wildcard_resource_id() && !source_filter.is_event() {
70 return Err(UStatus::fail_with_code(
71 UCode::INVALID_ARGUMENT,
72 "source filter must either have the wildcard resource ID or a resource ID from topic range, if sink filter is empty"));
73 }
74 Ok(())
76}
77
78#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
84pub trait LocalUriProvider: Send + Sync {
85 fn get_authority(&self) -> String;
87 fn get_resource_uri(&self, resource_id: u16) -> UUri;
89 fn get_source_uri(&self) -> UUri;
92}
93
94pub struct StaticUriProvider {
96 local_uri: UUri,
97}
98
99impl StaticUriProvider {
100 pub fn new(authority: impl Into<String>, entity_id: u32, major_version: u8) -> Self {
117 let local_uri = UUri {
118 authority_name: authority.into(),
119 ue_id: entity_id,
120 ue_version_major: major_version as u32,
121 resource_id: 0x0000,
122 ..Default::default()
123 };
124 StaticUriProvider { local_uri }
125 }
126}
127
128impl LocalUriProvider for StaticUriProvider {
129 fn get_authority(&self) -> String {
130 self.local_uri.authority_name.clone()
131 }
132
133 fn get_resource_uri(&self, resource_id: u16) -> UUri {
134 let mut uri = self.local_uri.clone();
135 uri.resource_id = resource_id as u32;
136 uri
137 }
138
139 fn get_source_uri(&self) -> UUri {
140 self.local_uri.clone()
141 }
142}
143
144impl TryFrom<UUri> for StaticUriProvider {
145 type Error = TryFromIntError;
146 fn try_from(value: UUri) -> Result<Self, Self::Error> {
147 Self::try_from(&value)
148 }
149}
150
151impl TryFrom<&UUri> for StaticUriProvider {
152 type Error = TryFromIntError;
153 fn try_from(source_uri: &UUri) -> Result<Self, Self::Error> {
188 let major_version = u8::try_from(source_uri.ue_version_major)?;
189 Ok(StaticUriProvider::new(
190 &source_uri.authority_name,
191 source_uri.ue_id,
192 major_version,
193 ))
194 }
195}
196
197#[cfg_attr(any(test, feature = "test-util"), mockall::automock)]
205#[async_trait]
206pub trait UListener: Send + Sync {
207 async fn on_receive(&self, msg: UMessage);
219}
220
221#[async_trait]
231pub trait UTransport: Send + Sync {
232 async fn send(&self, message: UMessage) -> Result<(), UStatus>;
244
245 async fn receive(
259 &self,
260 _source_filter: &UUri,
261 _sink_filter: Option<&UUri>,
262 ) -> Result<UMessage, UStatus> {
263 Err(UStatus::fail_with_code(
264 UCode::UNIMPLEMENTED,
265 "not implemented",
266 ))
267 }
268
269 async fn register_listener(
288 &self,
289 _source_filter: &UUri,
290 _sink_filter: Option<&UUri>,
291 _listener: Arc<dyn UListener>,
292 ) -> Result<(), UStatus> {
293 Err(UStatus::fail_with_code(
294 UCode::UNIMPLEMENTED,
295 "not implemented",
296 ))
297 }
298
299 async fn unregister_listener(
316 &self,
317 _source_filter: &UUri,
318 _sink_filter: Option<&UUri>,
319 _listener: Arc<dyn UListener>,
320 ) -> Result<(), UStatus> {
321 Err(UStatus::fail_with_code(
322 UCode::UNIMPLEMENTED,
323 "not implemented",
324 ))
325 }
326}
327
328#[cfg(not(tarpaulin_include))]
329#[cfg(any(test, feature = "test-util"))]
330mockall::mock! {
331 pub Transport {
334 pub async fn do_send(&self, message: UMessage) -> Result<(), UStatus>;
335 pub async fn do_register_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
336 pub async fn do_unregister_listener<'a>(&'a self, source_filter: &'a UUri, sink_filter: Option<&'a UUri>, listener: Arc<dyn UListener>) -> Result<(), UStatus>;
337 }
338}
339
340#[cfg(not(tarpaulin_include))]
341#[cfg(any(test, feature = "test-util"))]
342#[async_trait]
343impl UTransport for MockTransport {
346 async fn send(&self, message: UMessage) -> Result<(), UStatus> {
347 self.do_send(message).await
348 }
349 async fn register_listener(
350 &self,
351 source_filter: &UUri,
352 sink_filter: Option<&UUri>,
353 listener: Arc<dyn UListener>,
354 ) -> Result<(), UStatus> {
355 self.do_register_listener(source_filter, sink_filter, listener)
356 .await
357 }
358 async fn unregister_listener(
359 &self,
360 source_filter: &UUri,
361 sink_filter: Option<&UUri>,
362 listener: Arc<dyn UListener>,
363 ) -> Result<(), UStatus> {
364 self.do_unregister_listener(source_filter, sink_filter, listener)
365 .await
366 }
367}
368
369#[derive(Clone)]
384pub struct ComparableListener {
385 listener: Arc<dyn UListener>,
386}
387
388impl ComparableListener {
389 pub fn new(listener: Arc<dyn UListener>) -> Self {
390 Self { listener }
391 }
392 pub fn into_inner(&self) -> Arc<dyn UListener> {
394 self.listener.clone()
395 }
396
397 fn pointer_address(&self) -> usize {
399 let ptr = Arc::as_ptr(&self.listener);
401 let thin_ptr = ptr as *const ();
403 thin_ptr as usize
405 }
406}
407
408impl Deref for ComparableListener {
409 type Target = dyn UListener;
410
411 fn deref(&self) -> &Self::Target {
412 &*self.listener
413 }
414}
415
416impl Hash for ComparableListener {
417 fn hash<H: Hasher>(&self, state: &mut H) {
421 Arc::as_ptr(&self.listener).hash(state);
422 }
423}
424
425impl PartialEq for ComparableListener {
426 fn eq(&self, other: &Self) -> bool {
433 Arc::ptr_eq(&self.listener, &other.listener)
434 }
435}
436
437impl Eq for ComparableListener {}
438
439impl Debug for ComparableListener {
440 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
441 write!(f, "ComparableListener: {}", self.pointer_address())
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use crate::{ComparableListener, UListener, UMessage};
448 use std::{
449 hash::{DefaultHasher, Hash, Hasher},
450 ops::Deref,
451 str::FromStr,
452 sync::Arc,
453 };
454
455 use super::*;
456
457 #[test]
458 fn test_static_uri_provider_get_source() {
459 let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
460 let source_uri = provider.get_source_uri();
461 assert_eq!(source_uri.authority_name, "my-vehicle");
462 assert_eq!(source_uri.ue_id, 0x4210);
463 assert_eq!(source_uri.ue_version_major, 0x05);
464 assert_eq!(source_uri.resource_id, 0x0000);
465 }
466
467 #[test]
468 fn test_static_uri_provider_get_resource() {
469 let provider = StaticUriProvider::new("my-vehicle", 0x4210, 0x05);
470 let resource_uri = provider.get_resource_uri(0x1234);
471 assert_eq!(resource_uri.authority_name, "my-vehicle");
472 assert_eq!(resource_uri.ue_id, 0x4210);
473 assert_eq!(resource_uri.ue_version_major, 0x05);
474 assert_eq!(resource_uri.resource_id, 0x1234);
475 }
476
477 #[tokio::test]
478 async fn test_deref_returns_wrapped_listener() {
479 let mut mock_listener = MockUListener::new();
480 mock_listener.expect_on_receive().once().return_const(());
481 let listener_one = Arc::new(mock_listener);
482 let comparable_listener_one = ComparableListener::new(listener_one);
483 comparable_listener_one
484 .deref()
485 .on_receive(UMessage::default())
486 .await;
487 }
488
489 #[tokio::test]
490 async fn test_to_inner_returns_reference_to_wrapped_listener() {
491 let mut mock_listener = MockUListener::new();
492 mock_listener.expect_on_receive().once().return_const(());
493 let listener_one = Arc::new(mock_listener);
494 let comparable_listener_one = ComparableListener::new(listener_one);
495 comparable_listener_one
496 .into_inner()
497 .on_receive(UMessage::default())
498 .await;
499 }
500
501 #[tokio::test]
502 async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_same_listener() {
503 let mut mock_listener = MockUListener::new();
504 mock_listener.expect_on_receive().times(2).return_const(());
505 let listener_one = Arc::new(mock_listener);
506 let listener_two = listener_one.clone();
507 listener_one.on_receive(UMessage::default()).await;
508 listener_two.on_receive(UMessage::default()).await;
509 let comparable_listener_one = ComparableListener::new(listener_one);
510 let comparable_listener_two = ComparableListener::new(listener_two);
511 assert!(&comparable_listener_one.eq(&comparable_listener_two));
512
513 let mut hasher = DefaultHasher::new();
514 comparable_listener_one.hash(&mut hasher);
515 let hash_one = hasher.finish();
516 let mut hasher = DefaultHasher::new();
517 comparable_listener_two.hash(&mut hasher);
518 let hash_two = hasher.finish();
519 assert_eq!(hash_one, hash_two);
520 }
521
522 #[tokio::test]
523 async fn test_eq_and_hash_are_consistent_for_comparable_listeners_wrapping_different_listeners()
524 {
525 let mut mock_listener_one = MockUListener::new();
526 mock_listener_one
527 .expect_on_receive()
528 .once()
529 .return_const(());
530 let listener_one = Arc::new(mock_listener_one);
531 let mut mock_listener_two = MockUListener::new();
532 mock_listener_two
533 .expect_on_receive()
534 .once()
535 .return_const(());
536 let listener_two = Arc::new(mock_listener_two);
537 listener_one.on_receive(UMessage::default()).await;
538 listener_two.on_receive(UMessage::default()).await;
539 let comparable_listener_one = ComparableListener::new(listener_one);
540 let comparable_listener_two = ComparableListener::new(listener_two);
541 assert!(!&comparable_listener_one.eq(&comparable_listener_two));
542
543 let mut hasher = DefaultHasher::new();
544 comparable_listener_one.hash(&mut hasher);
545 let hash_one = hasher.finish();
546 let mut hasher = DefaultHasher::new();
547 comparable_listener_two.hash(&mut hasher);
548 let hash_two = hasher.finish();
549 assert_ne!(hash_one, hash_two);
550 }
551
552 #[tokio::test]
553 async fn test_utransport_default_implementations() {
554 struct EmptyTransport {}
555 #[async_trait::async_trait]
556 impl UTransport for EmptyTransport {
557 async fn send(&self, _message: UMessage) -> Result<(), UStatus> {
558 todo!()
559 }
560 }
561
562 let transport = EmptyTransport {};
563 let listener = Arc::new(MockUListener::new());
564
565 assert!(transport
566 .receive(&UUri::any(), None)
567 .await
568 .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
569 assert!(transport
570 .register_listener(&UUri::any(), None, listener.clone())
571 .await
572 .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
573 assert!(transport
574 .unregister_listener(&UUri::any(), None, listener)
575 .await
576 .is_err_and(|e| e.get_code() == UCode::UNIMPLEMENTED));
577 }
578
579 #[test]
580 fn test_comparable_listener_pointer_address() {
581 let bar = Arc::new(MockUListener::new());
582 let comp_listener = ComparableListener::new(bar);
583
584 let comp_listener_thread = comp_listener.clone();
585 let handle = std::thread::spawn(move || comp_listener_thread.pointer_address());
586
587 let comp_listener_address_other_thread = handle.join().unwrap();
588 let comp_listener_address_this_thread = comp_listener.pointer_address();
589
590 assert_eq!(
591 comp_listener_address_this_thread,
592 comp_listener_address_other_thread
593 );
594 }
595
596 #[test]
597 fn test_comparable_listener_debug_outputs() {
598 let bar = Arc::new(MockUListener::new());
599 let comp_listener = ComparableListener::new(bar);
600 let debug_output = format!("{comp_listener:?}");
601 assert!(!debug_output.is_empty());
602 }
603
604 #[test_case::test_case(
605 "//vehicle1/AA/1/FFFF",
606 Some("//vehicle2/BB/1/FFFF");
607 "source and sink both having wildcard resource ID")]
608 #[test_case::test_case(
609 "//vehicle1/AA/1/9000",
610 Some("//vehicle2/BB/1/0");
611 "sending notification")]
612 #[test_case::test_case(
613 "//vehicle1/AA/1/0",
614 Some("//vehicle2/BB/1/1");
615 "RPC method invocation")]
616 #[test_case::test_case(
617 "//vehicle1/AA/1/FFFF",
618 Some("//vehicle2/BB/1/1");
619 "receiving RPC requests using wildcard resource ID")]
620 #[test_case::test_case(
621 "//vehicle1/AA/1/0",
622 Some("//vehicle2/BB/1/1");
623 "receiving RPC requests using default resource ID")]
624 #[test_case::test_case(
625 "//vehicle1/AA/1/9000",
626 None;
627 "receiving events published to specific topic")]
628 #[test_case::test_case(
629 "//vehicle1/AA/1/FFFF",
630 None;
631 "receiving events published to any topic")]
632 fn test_verify_filter_criteria_succeeds_for(source: &str, sink: Option<&str>) {
633 let source_filter = UUri::from_str(source).expect("invalid source URI");
634 let sink_filter = sink.map(|s| UUri::from_str(s).expect("invalid sink URI"));
635 assert!(verify_filter_criteria(&source_filter, sink_filter.as_ref()).is_ok());
636 }
637
638 #[test_case::test_case(
639 UUri::from_str("//vehicle1/AA/1/0").unwrap(),
640 Some(UUri::from_str("//vehicle2/BB/1/0").unwrap());
641 "source and sink both having resource ID 0")]
642 #[test_case::test_case(
643 UUri::from_str("//vehicle1/AA/1/CC").unwrap(),
644 Some(UUri::from_str("//vehicle2/BB/1/1A").unwrap());
645 "sink is RPC but source has invalid resource ID")]
646 #[test_case::test_case(
647 UUri::from_str("//vehicle1/AA/1/CC").unwrap(),
648 None;
649 "sink is empty but source has non-topic resource ID")]
650 #[test_case::test_case(
651 UUri {
652 authority_name: "VEHICLE1".to_string(),
653 ue_id: 0x00AA,
654 ue_version_major: 0x01,
655 resource_id: 0x9000,
656 ..Default::default()
657 },
658 None;
659 "source has upper-case authority")]
660 #[test_case::test_case(
661 UUri::from_str("//vehicle1/AA/1/9000").unwrap(),
662 Some(UUri {
663 authority_name: "VEHICLE2".to_string(),
664 ue_id: 0x00BB,
665 ue_version_major: 0x01,
666 resource_id: 0x0000,
667 ..Default::default()
668 });
669 "sink has upper-case authority")]
670 fn test_verify_filter_criteria_fails_for(source_filter: UUri, sink_filter: Option<UUri>) {
671 assert!(verify_filter_criteria(&source_filter, sink_filter.as_ref())
672 .is_err_and(|err| matches!(err.get_code(), UCode::INVALID_ARGUMENT)));
673 }
674}