use std::{
collections::{hash_map::Entry, HashMap},
ops::Deref,
sync::{Arc, RwLock},
};
use async_trait::async_trait;
use tracing::{debug, info};
use crate::{
core::usubscription::{
self, State, SubscriberInfo, SubscriptionRequest, USubscription, UnsubscribeRequest, Update,
},
LocalUriProvider, UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri,
};
use super::{
apply_common_options, build_message, pubsub::SubscriptionChangeHandler, CallOptions,
InMemoryRpcClient, Notifier, PubSubError, Publisher, RegistrationError, RpcClientUSubscription,
SimpleNotifier, Subscriber, UPayload,
};
#[derive(Clone)]
struct ComparableSubscriptionChangeHandler {
inner: Arc<dyn SubscriptionChangeHandler>,
}
impl ComparableSubscriptionChangeHandler {
fn new(handler: Arc<dyn SubscriptionChangeHandler>) -> Self {
ComparableSubscriptionChangeHandler {
inner: handler.clone(),
}
}
}
impl Deref for ComparableSubscriptionChangeHandler {
type Target = dyn SubscriptionChangeHandler;
fn deref(&self) -> &Self::Target {
&*self.inner
}
}
impl PartialEq for ComparableSubscriptionChangeHandler {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
}
impl Eq for ComparableSubscriptionChangeHandler {}
#[derive(Default)]
struct SubscriptionChangeListener {
subscription_change_handlers: RwLock<HashMap<UUri, ComparableSubscriptionChangeHandler>>,
}
impl SubscriptionChangeListener {
fn add_handler(
&self,
topic: UUri,
subscription_change_handler: Arc<dyn SubscriptionChangeHandler>,
) -> Result<(), RegistrationError> {
let Ok(mut handlers) = self.subscription_change_handlers.write() else {
return Err(RegistrationError::Unknown(UStatus::fail_with_code(
crate::UCode::INTERNAL,
"failed to acquire write lock for handler map",
)));
};
let handler_to_add = ComparableSubscriptionChangeHandler::new(subscription_change_handler);
match handlers.entry(topic) {
Entry::Vacant(entry) => {
entry.insert(handler_to_add);
Ok(())
}
Entry::Occupied(entry) => {
if entry.get() == &handler_to_add {
Ok(())
} else {
Err(RegistrationError::AlreadyExists)
}
}
}
}
fn remove_handler(&self, topic: &UUri) -> Result<(), RegistrationError> {
self.subscription_change_handlers
.write()
.map_err(|_e| {
RegistrationError::Unknown(UStatus::fail_with_code(
crate::UCode::INTERNAL,
"failed to acquire write lock for handler map",
))
})
.map(|mut handlers| {
handlers.remove(topic);
})
}
fn clear(&self) -> Result<(), RegistrationError> {
self.subscription_change_handlers
.write()
.map_err(|_e| {
RegistrationError::Unknown(UStatus::fail_with_code(
crate::UCode::INTERNAL,
"failed to acquire write lock for handler map",
))
})
.map(|mut handlers| {
handlers.clear();
})
}
#[cfg(test)]
fn has_handler(&self, topic: &UUri) -> bool {
self.subscription_change_handlers
.read()
.map_or(false, |handlers| handlers.contains_key(topic))
}
}
#[async_trait]
impl UListener for SubscriptionChangeListener {
async fn on_receive(&self, msg: UMessage) {
if !msg.is_notification() {
return;
}
let Ok(subscription_update) = msg.extract_protobuf::<Update>() else {
debug!("ignoring notification that does not contain subscription update");
return;
};
let Some(topic) = subscription_update.topic.as_ref() else {
return;
};
let Some(status) = subscription_update.status.as_ref() else {
return;
};
let Ok(handlers) = self.subscription_change_handlers.read() else {
return;
};
if let Some(handler) = handlers.get(topic) {
handler.on_subscription_change(topic.to_owned(), status.to_owned());
}
}
}
pub struct SimplePublisher {
transport: Arc<dyn UTransport>,
uri_provider: Arc<dyn LocalUriProvider>,
}
impl SimplePublisher {
pub fn new(transport: Arc<dyn UTransport>, uri_provider: Arc<dyn LocalUriProvider>) -> Self {
SimplePublisher {
transport,
uri_provider,
}
}
}
#[async_trait]
impl Publisher for SimplePublisher {
async fn publish(
&self,
resource_id: u16,
call_options: CallOptions,
payload: Option<UPayload>,
) -> Result<(), PubSubError> {
let mut builder = UMessageBuilder::publish(self.uri_provider.get_resource_uri(resource_id));
apply_common_options(call_options, &mut builder);
match build_message(&mut builder, payload) {
Ok(publish_message) => self
.transport
.send(publish_message)
.await
.map_err(PubSubError::PublishError),
Err(e) => Err(PubSubError::InvalidArgument(format!(
"failed to create Publish message from parameters: {}",
e
))),
}
}
}
pub struct InMemorySubscriber {
transport: Arc<dyn UTransport>,
uri_provider: Arc<dyn LocalUriProvider>,
usubscription: Arc<dyn USubscription>,
notifier: Arc<dyn Notifier>,
subscription_change_listener: Arc<SubscriptionChangeListener>,
}
impl InMemorySubscriber {
pub async fn new(
transport: Arc<dyn UTransport>,
uri_provider: Arc<dyn LocalUriProvider>,
) -> Result<Self, RegistrationError> {
let rpc_client = InMemoryRpcClient::new(transport.clone(), uri_provider.clone())
.await
.map(Arc::new)?;
let usubscription_client = Arc::new(RpcClientUSubscription::new(rpc_client));
let notifier = Arc::new(SimpleNotifier::new(transport.clone(), uri_provider.clone()));
Self::for_clients(transport, uri_provider, usubscription_client, notifier).await
}
pub async fn for_clients(
transport: Arc<dyn UTransport>,
uri_provider: Arc<dyn LocalUriProvider>,
usubscription: Arc<dyn USubscription>,
notifier: Arc<dyn Notifier>,
) -> Result<Self, RegistrationError> {
let subscription_change_listener = Arc::new(SubscriptionChangeListener {
subscription_change_handlers: RwLock::new(HashMap::new()),
});
notifier
.start_listening(
&usubscription::usubscription_uri(usubscription::RESOURCE_ID_SUBSCRIPTION_CHANGE),
subscription_change_listener.clone(),
)
.await?;
Ok(InMemorySubscriber {
transport,
uri_provider,
usubscription,
notifier,
subscription_change_listener,
})
}
pub async fn stop(&self) -> Result<(), RegistrationError> {
self.notifier
.stop_listening(
&usubscription::usubscription_uri(usubscription::RESOURCE_ID_SUBSCRIPTION_CHANGE),
self.subscription_change_listener.clone(),
)
.await
.and_then(|_ok| self.subscription_change_listener.clear())
}
fn subscriber_info(&self) -> SubscriberInfo {
SubscriberInfo {
uri: Some(self.uri_provider.get_source_uri()).into(),
..Default::default()
}
}
async fn invoke_subscribe(
&self,
topic: &UUri,
subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
) -> Result<State, RegistrationError> {
let subscription_request = SubscriptionRequest {
subscriber: Some(self.subscriber_info()).into(),
topic: Some(topic.to_owned()).into(),
..Default::default()
};
match self.usubscription.subscribe(subscription_request).await {
Ok(response) => match response.status.state.enum_value() {
Ok(state) if state == State::SUBSCRIBED || state == State::SUBSCRIBE_PENDING => {
if let Some(handler) = subscription_change_handler.clone() {
self.subscription_change_listener
.add_handler(topic.to_owned(), handler)?;
}
Ok(state)
}
_ => {
debug!(topic = %topic, "failed to subscribe to topic: {}", response.status.message);
Err(RegistrationError::Unknown(UStatus::fail_with_code(
crate::UCode::FAILED_PRECONDITION,
response.status.message.to_owned(),
)))
}
},
Err(e) => {
info!(topic = %topic, "error invoking USubscription service: {}", e);
Err(RegistrationError::Unknown(UStatus::fail_with_code(
crate::UCode::INTERNAL,
"failed to invoke USubscription service",
)))
}
}
}
async fn invoke_unsubscribe(&self, topic: &UUri) -> Result<(), RegistrationError> {
let request = UnsubscribeRequest {
subscriber: Some(self.subscriber_info()).into(),
topic: Some(topic.to_owned()).into(),
..Default::default()
};
self.usubscription
.unsubscribe(request)
.await
.map(|_| {
let _ = self.subscription_change_listener.remove_handler(topic);
})
.map_err(|e| {
info!(topic = %topic, "error invoking USubscription service: {}", e);
RegistrationError::Unknown(UStatus::fail_with_code(
crate::UCode::INTERNAL,
"failed to invoke USubscription service",
))
})
}
#[cfg(test)]
fn add_subscription_change_handler(
&self,
topic: &UUri,
subscription_change_handler: Arc<dyn SubscriptionChangeHandler>,
) -> Result<(), RegistrationError> {
self.subscription_change_listener
.add_handler(topic.to_owned(), subscription_change_handler)
}
#[cfg(test)]
fn has_subscription_change_handler(&self, topic: &UUri) -> bool {
self.subscription_change_listener.has_handler(topic)
}
}
#[async_trait]
impl Subscriber for InMemorySubscriber {
async fn subscribe(
&self,
topic_filter: &UUri,
handler: Arc<dyn UListener>,
subscription_change_handler: Option<Arc<dyn SubscriptionChangeHandler>>,
) -> Result<(), RegistrationError> {
self.invoke_subscribe(topic_filter, subscription_change_handler)
.await?;
self.transport
.register_listener(topic_filter, None, handler.clone())
.await
.map_err(RegistrationError::from)
}
async fn unsubscribe(
&self,
topic: &UUri,
listener: Arc<dyn UListener>,
) -> Result<(), RegistrationError> {
self.invoke_unsubscribe(topic).await?;
self.transport
.unregister_listener(topic, None, listener)
.await
.map_err(RegistrationError::from)
}
}
#[cfg(test)]
mod test {
use super::*;
use mockall::Sequence;
use protobuf::well_known_types::wrappers::StringValue;
use protobuf::Enum;
use usubscription::{MockUSubscription, SubscriptionResponse, SubscriptionStatus};
use crate::{
communication::{notification::MockNotifier, pubsub::MockSubscriptionChangeHandler},
utransport::{MockLocalUriProvider, MockTransport, MockUListener},
UAttributes, UCode, UMessageType, UPriority, UStatus, UUri, UUID,
};
fn new_uri_provider() -> Arc<dyn LocalUriProvider> {
let mut mock_uri_locator = MockLocalUriProvider::new();
mock_uri_locator
.expect_get_resource_uri()
.returning(|resource_id| UUri {
ue_id: 0x0005,
ue_version_major: 0x02,
resource_id: resource_id as u32,
..Default::default()
});
mock_uri_locator.expect_get_source_uri().returning(|| UUri {
ue_id: 0x0005,
ue_version_major: 0x02,
resource_id: 0x0000,
..Default::default()
});
Arc::new(mock_uri_locator)
}
fn succeding_notifier() -> Arc<dyn Notifier> {
let mut notifier = MockNotifier::new();
notifier
.expect_start_listening()
.once()
.return_const(Ok(()));
Arc::new(notifier)
}
#[tokio::test]
async fn test_publish_fails_for_invalid_topic() {
let uri_provider = new_uri_provider();
let mut transport = MockTransport::new();
transport.expect_do_send().never();
let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
let options = CallOptions::for_publish(None, None, None);
let publish_result = publisher
.publish(0x1000, options, None)
.await;
assert!(publish_result.is_err_and(|e| matches!(e, PubSubError::InvalidArgument(_msg))));
}
#[tokio::test]
async fn test_publish_fails_with_transport_error() {
let message_id = UUID::build();
let uri_provider = new_uri_provider();
let mut transport = MockTransport::new();
let expected_message_id = message_id.clone();
transport
.expect_do_send()
.once()
.withf(move |msg| {
msg.attributes.get_or_default().id.get_or_default() == &expected_message_id
})
.returning(|_msg| {
Err(UStatus::fail_with_code(
UCode::UNAVAILABLE,
"transport not available",
))
});
let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
let options = CallOptions::for_publish(None, Some(message_id), None);
let publish_result = publisher.publish(0x9A00, options, None).await;
assert!(publish_result.is_err_and(|e| matches!(e, PubSubError::PublishError(_status))));
}
#[tokio::test]
async fn test_publish_succeeds() {
let uri_provider = new_uri_provider();
let mut transport = MockTransport::new();
let message_id = UUID::build();
let expected_message_id = message_id.clone();
transport
.expect_do_send()
.once()
.withf(move |message| {
let Ok(payload) = message.extract_protobuf::<StringValue>() else {
return false;
};
payload.value == *"Hello"
&& message.is_publish()
&& message.attributes.as_ref().map_or(false, |attribs| {
attribs.id.as_ref() == Some(&expected_message_id)
&& attribs.priority.value() == UPriority::UPRIORITY_CS3.value()
&& attribs.ttl == Some(5_000)
})
})
.returning(|_msg| Ok(()));
let publisher = SimplePublisher::new(Arc::new(transport), uri_provider);
let call_options = CallOptions::for_publish(
Some(5_000),
Some(message_id.clone()),
Some(crate::UPriority::UPRIORITY_CS3),
);
let payload = StringValue {
value: "Hello".to_string(),
..Default::default()
};
let publish_result = publisher
.publish(
0x9A00,
call_options,
Some(
UPayload::try_from_protobuf(payload)
.expect("should have been able to create message payload"),
),
)
.await;
assert!(publish_result.is_ok());
}
#[tokio::test]
async fn test_subscriber_creation_fails_when_notifier_fails_to_register_listener() {
let mut notifier = MockNotifier::new();
notifier
.expect_start_listening()
.once()
.return_const(Err(RegistrationError::Unknown(UStatus::fail_with_code(
UCode::UNAVAILABLE,
"not available",
))));
let creation_attempt = InMemorySubscriber::for_clients(
Arc::new(MockTransport::new()),
new_uri_provider(),
Arc::new(MockUSubscription::new()),
Arc::new(notifier),
)
.await;
assert!(creation_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
}
#[tokio::test]
async fn test_subscriber_stop_succeeds() {
let mut notifier = MockNotifier::new();
notifier.expect_stop_listening().once().return_const(Ok(()));
let subscription_change_listener = Arc::new(SubscriptionChangeListener::default());
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let handler = Arc::new(MockSubscriptionChangeHandler::new());
subscription_change_listener
.add_handler(topic.clone(), handler)
.expect("adding a handler should have succeeded");
let subscriber = InMemorySubscriber {
transport: Arc::new(MockTransport::new()),
uri_provider: new_uri_provider(),
usubscription: Arc::new(MockUSubscription::new()),
notifier: Arc::new(notifier),
subscription_change_listener,
};
let stop_attempt = subscriber.stop().await;
assert!(stop_attempt.is_ok_and(|_| {
!subscriber.has_subscription_change_handler(&topic)
}));
}
#[tokio::test]
async fn test_subscribe_fails_when_usubscription_invocation_fails() {
let mut seq = Sequence::new();
let mut usubscription_client = MockUSubscription::new();
usubscription_client
.expect_subscribe()
.once()
.in_sequence(&mut seq)
.return_const(Err(UStatus::fail_with_code(
UCode::UNAVAILABLE,
"not connected",
)));
usubscription_client
.expect_subscribe()
.once()
.in_sequence(&mut seq)
.return_const({
let response = SubscriptionResponse {
status: Some(SubscriptionStatus {
state: State::UNSUBSCRIBED.into(),
message: "unsupported topic".to_string(),
..Default::default()
})
.into(),
..Default::default()
};
Ok(response)
});
usubscription_client
.expect_subscribe()
.once()
.in_sequence(&mut seq)
.return_const({
let response = SubscriptionResponse {
status: Some(SubscriptionStatus {
message: "unknown state".to_string(),
..Default::default()
})
.into(),
..Default::default()
};
Ok(response)
});
let mut transport = MockTransport::new();
transport.expect_do_register_listener().never();
let subscriber = InMemorySubscriber::for_clients(
Arc::new(transport),
new_uri_provider(),
Arc::new(usubscription_client),
succeding_notifier(),
)
.await
.unwrap();
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let mut listener = MockUListener::new();
listener.expect_on_receive().never();
let listener_ref = Arc::new(listener);
let subscribe_attempt = subscriber
.subscribe(&topic, listener_ref.clone(), None)
.await;
assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
let subscribe_attempt = subscriber
.subscribe(&topic, listener_ref.clone(), None)
.await;
assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
let subscribe_attempt = subscriber
.subscribe(&topic, listener_ref.clone(), None)
.await;
assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
}
#[tokio::test]
async fn test_repeated_subscribe_fails_for_different_subscription_change_handlers() {
let mut usubscription_client = MockUSubscription::new();
usubscription_client
.expect_subscribe()
.times(2)
.returning(|request| {
let response = SubscriptionResponse {
topic: request.topic.clone(),
status: Some(SubscriptionStatus {
state: State::SUBSCRIBED.into(),
..Default::default()
})
.into(),
..Default::default()
};
Ok(response)
});
let mut transport = MockTransport::new();
transport
.expect_do_register_listener()
.once()
.return_const(Err(UStatus::fail_with_code(
UCode::UNAVAILABLE,
"not connected",
)));
let subscriber = InMemorySubscriber::for_clients(
Arc::new(transport),
new_uri_provider(),
Arc::new(usubscription_client),
succeding_notifier(),
)
.await
.unwrap();
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let listener = Arc::new(MockUListener::new());
let subscribe_attempt = subscriber
.subscribe(
&topic,
listener.clone(),
Some(Arc::new(MockSubscriptionChangeHandler::new())),
)
.await;
assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
let subscribe_attempt = subscriber
.subscribe(
&topic,
listener.clone(),
Some(Arc::new(MockSubscriptionChangeHandler::new())),
)
.await;
assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::AlreadyExists)));
}
#[tokio::test]
async fn test_subscribe_succeeds_on_second_attempt() {
let (captured_listener_tx, captured_listener_rx) = std::sync::mpsc::channel();
let mut usubscription_client = MockUSubscription::new();
usubscription_client
.expect_subscribe()
.times(2)
.returning(|request| {
let response = SubscriptionResponse {
topic: request.topic.clone(),
status: Some(SubscriptionStatus {
state: State::SUBSCRIBED.into(),
..Default::default()
})
.into(),
..Default::default()
};
Ok(response)
});
let mut transport = MockTransport::new();
let mut seq = Sequence::new();
transport
.expect_do_register_listener()
.once()
.in_sequence(&mut seq)
.return_const(Err(UStatus::fail_with_code(
UCode::UNAVAILABLE,
"not connected",
)));
transport
.expect_do_register_listener()
.once()
.in_sequence(&mut seq)
.returning(move |_source_filter, _sink_filter, listener| {
captured_listener_tx
.send(listener)
.map_err(|_e| UStatus::fail("cannot capture listener"))
});
let subscriber = InMemorySubscriber::for_clients(
Arc::new(transport),
new_uri_provider(),
Arc::new(usubscription_client),
succeding_notifier(),
)
.await
.unwrap();
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let mut mock_listener = MockUListener::new();
mock_listener.expect_on_receive().once().return_const(());
let listener = Arc::new(mock_listener);
let handler = Arc::new(MockSubscriptionChangeHandler::new());
let subscribe_attempt = subscriber
.subscribe(&topic, listener.clone(), Some(handler.clone()))
.await;
assert!(subscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
let subscribe_attempt = subscriber
.subscribe(&topic, listener.clone(), Some(handler.clone()))
.await;
assert!(subscribe_attempt.is_ok());
let event = UMessageBuilder::publish(topic).build().unwrap();
let captured_listener = captured_listener_rx.recv().unwrap().to_owned();
captured_listener.on_receive(event).await;
}
#[tokio::test]
async fn test_unsubscribe_fails_for_unknown_listener() {
let mut usubscription_client = MockUSubscription::new();
usubscription_client
.expect_unsubscribe()
.once()
.return_const(Ok(()));
let mut transport = MockTransport::new();
transport
.expect_do_unregister_listener()
.once()
.return_const(Err(UStatus::fail_with_code(
UCode::NOT_FOUND,
"no such listener",
)));
let subscriber = InMemorySubscriber::for_clients(
Arc::new(transport),
new_uri_provider(),
Arc::new(usubscription_client),
succeding_notifier(),
)
.await
.unwrap();
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let listener = Arc::new(MockUListener::new());
let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
assert!(unsubscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::NoSuchListener)));
}
#[tokio::test]
async fn test_unsubscribe_fails_if_usubscription_invocation_fails() {
let mut usubscription_client = MockUSubscription::new();
usubscription_client
.expect_unsubscribe()
.once()
.return_const(Err(UStatus::fail_with_code(UCode::UNAVAILABLE, "unknown")));
let mut transport = MockTransport::new();
transport
.expect_do_unregister_listener()
.never()
.return_const(Ok(()));
let subscriber = InMemorySubscriber::for_clients(
Arc::new(transport),
new_uri_provider(),
Arc::new(usubscription_client),
succeding_notifier(),
)
.await
.unwrap();
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let handler = MockSubscriptionChangeHandler::new();
subscriber
.add_subscription_change_handler(&topic, Arc::new(handler))
.expect("should be able to add handler");
assert!(subscriber.has_subscription_change_handler(&topic));
let listener = Arc::new(MockUListener::new());
let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener).await;
assert!(unsubscribe_attempt.is_err_and(|e| {
matches!(e, RegistrationError::Unknown(_))
&& subscriber.has_subscription_change_handler(&topic)
}));
}
#[tokio::test]
async fn test_unsubscribe_succeeds() {
let mut usubscription_client = MockUSubscription::new();
usubscription_client
.expect_unsubscribe()
.once()
.return_const(Ok(()));
let mut transport = MockTransport::new();
transport
.expect_do_unregister_listener()
.once()
.return_const(Ok(()));
let subscriber = InMemorySubscriber::for_clients(
Arc::new(transport),
new_uri_provider(),
Arc::new(usubscription_client),
succeding_notifier(),
)
.await
.unwrap();
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let handler = MockSubscriptionChangeHandler::new();
subscriber
.add_subscription_change_handler(&topic, Arc::new(handler))
.expect("should be able to add handler");
assert!(subscriber.has_subscription_change_handler(&topic));
let listener = Arc::new(MockUListener::new());
let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
assert!(
unsubscribe_attempt.is_ok_and(|_| !subscriber.has_subscription_change_handler(&topic))
);
}
#[tokio::test]
async fn test_unsubscribe_succeeds_on_second_attempt() {
let mut usubscription_client = MockUSubscription::new();
usubscription_client
.expect_unsubscribe()
.times(2)
.return_const(Ok(()));
let mut transport = MockTransport::new();
let mut seq = Sequence::new();
transport
.expect_do_unregister_listener()
.once()
.in_sequence(&mut seq)
.return_const(Err(UStatus::fail_with_code(
UCode::UNAVAILABLE,
"not connected",
)));
transport
.expect_do_unregister_listener()
.once()
.in_sequence(&mut seq)
.return_const(Ok(()));
let subscriber = InMemorySubscriber::for_clients(
Arc::new(transport),
new_uri_provider(),
Arc::new(usubscription_client),
succeding_notifier(),
)
.await
.unwrap();
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let handler = MockSubscriptionChangeHandler::new();
subscriber
.add_subscription_change_handler(&topic, Arc::new(handler))
.expect("should be able to add handler");
assert!(subscriber.has_subscription_change_handler(&topic));
let listener = Arc::new(MockUListener::new());
let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener.clone()).await;
assert!(unsubscribe_attempt.is_err_and(|e| matches!(e, RegistrationError::Unknown(_))));
let unsubscribe_attempt = subscriber.unsubscribe(&topic, listener).await;
assert!(unsubscribe_attempt.is_ok_and(|_| {
!subscriber.has_subscription_change_handler(&topic)
}));
}
fn message_with_wrong_type(msg_type: UMessageType) -> UMessage {
let attributes = UAttributes {
type_: msg_type.into(),
..Default::default()
};
UMessage {
attributes: Some(attributes).into(),
..Default::default()
}
}
fn notification_with_wrong_payload() -> UMessage {
let payload = UPayload::try_from_protobuf(StringValue::new())
.expect("should have been able to create protobuf");
let attributes = UAttributes {
type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
payload_format: payload.payload_format().into(),
..Default::default()
};
UMessage {
attributes: Some(attributes).into(),
payload: Some(payload.payload()),
..Default::default()
}
}
fn status_update_without_topic() -> UMessage {
let status = SubscriptionStatus {
state: State::SUBSCRIBED.into(),
..Default::default()
};
let update = Update {
status: Some(status).into(),
..Default::default()
};
let payload =
UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
let attributes = UAttributes {
type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
payload_format: payload.payload_format().into(),
..Default::default()
};
UMessage {
attributes: Some(attributes).into(),
payload: Some(payload.payload()),
..Default::default()
}
}
fn status_update_without_status() -> UMessage {
let update = Update {
topic: Some(UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap()).into(),
..Default::default()
};
let payload =
UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
let attributes = UAttributes {
type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
payload_format: payload.payload_format().into(),
..Default::default()
};
UMessage {
attributes: Some(attributes).into(),
payload: Some(payload.payload()),
..Default::default()
}
}
#[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_PUBLISH); "Publish messages")]
#[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_REQUEST); "Request messages")]
#[test_case::test_case(message_with_wrong_type(UMessageType::UMESSAGE_TYPE_RESPONSE); "Response messages")]
#[test_case::test_case(notification_with_wrong_payload(); "wrong payload")]
#[test_case::test_case(status_update_without_topic(); "status without topic")]
#[test_case::test_case(status_update_without_status(); "update without status")]
#[tokio::test]
async fn test_subscription_change_listener_ignores(notification: UMessage) {
let listener = SubscriptionChangeListener::default();
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let mut handler = MockSubscriptionChangeHandler::new();
handler.expect_on_subscription_change().never();
listener
.add_handler(topic.clone(), Arc::new(handler))
.expect("should have been able to register listener");
listener.on_receive(notification).await;
}
#[tokio::test]
async fn test_subscription_change_listener_invokes_handler_for_subscribed_topic() {
let topic = UUri::try_from_parts("other", 0x1a9a, 0x01, 0x8100).unwrap();
let status = SubscriptionStatus {
state: State::SUBSCRIBED.into(),
..Default::default()
};
let update = Update {
topic: Some(topic.clone()).into(),
status: Some(status.clone()).into(),
..Default::default()
};
let payload =
UPayload::try_from_protobuf(update).expect("should have been able to create protobuf");
let attributes = UAttributes {
type_: UMessageType::UMESSAGE_TYPE_NOTIFICATION.into(),
payload_format: payload.payload_format().into(),
..Default::default()
};
let notification = UMessage {
attributes: Some(attributes).into(),
payload: Some(payload.payload()),
..Default::default()
};
let expected_topic = topic.clone();
let mut handler = MockSubscriptionChangeHandler::new();
handler
.expect_on_subscription_change()
.once()
.withf(move |topic, updated_status| {
topic == &expected_topic && updated_status == &status
})
.return_const(());
let listener = SubscriptionChangeListener::default();
listener
.add_handler(topic, Arc::new(handler))
.expect("should have been able to register listener");
listener.on_receive(notification).await;
}
}