use alloc::vec::Vec;
use fallible_collections::FallibleVec;
use crate::transfer::{Header, MessageHeader, ServiceHeader};
use crate::transport::Transport;
use crate::{OutOfMemoryError, ServiceId, SubjectId};
pub trait SubscriptionManager<S> {
fn subscribe_message(
&mut self,
subject: SubjectId,
subscription: S,
) -> Result<(), OutOfMemoryError>;
fn unsubscribe_message(&mut self, subject: SubjectId) -> Option<S>;
fn subscribe_request(
&mut self,
service: ServiceId,
subscription: S,
) -> Result<(), OutOfMemoryError>;
fn unsubscribe_request(&mut self, service: ServiceId) -> Option<S>;
fn subscribe_response(
&mut self,
service: ServiceId,
subscription: S,
) -> Result<(), OutOfMemoryError>;
fn unsubscribe_response(&mut self, service: ServiceId) -> Option<S>;
fn find_subscription<T: Transport>(&self, header: &Header<T>) -> Option<&S> {
match header {
Header::Message(MessageHeader { subject, .. }) => {
self.find_message_subscription(*subject)
}
Header::Request(ServiceHeader { service, .. }) => {
self.find_request_subscription(*service)
}
Header::Response(ServiceHeader { service, .. }) => {
self.find_response_subscription(*service)
}
}
}
fn find_subscription_mut<T: Transport>(&mut self, header: &Header<T>) -> Option<&mut S> {
match header {
Header::Message(MessageHeader { subject, .. }) => {
self.find_message_subscription_mut(*subject)
}
Header::Request(ServiceHeader { service, .. }) => {
self.find_request_subscription_mut(*service)
}
Header::Response(ServiceHeader { service, .. }) => {
self.find_response_subscription_mut(*service)
}
}
}
fn find_message_subscription(&self, subject: SubjectId) -> Option<&S>;
fn find_message_subscription_mut(&mut self, subject: SubjectId) -> Option<&mut S>;
fn find_request_subscription(&self, service: ServiceId) -> Option<&S>;
fn find_request_subscription_mut(&mut self, service: ServiceId) -> Option<&mut S>;
fn find_response_subscription(&self, service: ServiceId) -> Option<&S>;
fn find_response_subscription_mut(&mut self, service: ServiceId) -> Option<&mut S>;
fn for_each_message_subscription_mut<F>(&mut self, operation: F)
where
F: FnMut(&mut S);
fn for_each_request_subscription_mut<F>(&mut self, operation: F)
where
F: FnMut(&mut S);
fn for_each_response_subscription_mut<F>(&mut self, operation: F)
where
F: FnMut(&mut S);
fn subscribers(&self) -> impl Iterator<Item = SubjectId>;
fn servers(&self) -> impl Iterator<Item = ServiceId>;
}
pub struct DynamicSubscriptionManager<S> {
message_subscriptions: Vec<(SubjectId, S)>,
request_subscriptions: Vec<(ServiceId, S)>,
response_subscriptions: Vec<(ServiceId, S)>,
}
impl<S> SubscriptionManager<S> for DynamicSubscriptionManager<S> {
fn subscribe_message(
&mut self,
subject: SubjectId,
subscription: S,
) -> Result<(), OutOfMemoryError> {
self.unsubscribe_message(subject);
FallibleVec::try_push(&mut self.message_subscriptions, (subject, subscription))?;
Ok(())
}
fn unsubscribe_message(&mut self, subject: SubjectId) -> Option<S> {
if let Some(index) = self
.message_subscriptions
.iter()
.position(|(stored_subject, _)| *stored_subject == subject)
{
let (_, subscription) = self.message_subscriptions.swap_remove(index);
Some(subscription)
} else {
None
}
}
fn subscribe_request(
&mut self,
service: ServiceId,
subscription: S,
) -> Result<(), OutOfMemoryError> {
self.unsubscribe_request(service);
FallibleVec::try_push(&mut self.request_subscriptions, (service, subscription))?;
Ok(())
}
fn unsubscribe_request(&mut self, service: ServiceId) -> Option<S> {
if let Some(index) = self
.request_subscriptions
.iter()
.position(|(stored_service, _)| *stored_service == service)
{
let (_, subscription) = self.request_subscriptions.swap_remove(index);
Some(subscription)
} else {
None
}
}
fn subscribe_response(
&mut self,
service: ServiceId,
subscription: S,
) -> Result<(), OutOfMemoryError> {
self.unsubscribe_response(service);
FallibleVec::try_push(&mut self.response_subscriptions, (service, subscription))?;
Ok(())
}
fn unsubscribe_response(&mut self, service: ServiceId) -> Option<S> {
if let Some(index) = self
.response_subscriptions
.iter()
.position(|(stored_service, _)| *stored_service == service)
{
let (_, subscription) = self.response_subscriptions.swap_remove(index);
Some(subscription)
} else {
None
}
}
fn find_message_subscription(&self, subject: SubjectId) -> Option<&S> {
self.message_subscriptions
.iter()
.find(|(sub_subject, _)| *sub_subject == subject)
.map(|(_, sub)| sub)
}
fn find_message_subscription_mut(&mut self, subject: SubjectId) -> Option<&mut S> {
self.message_subscriptions
.iter_mut()
.find(|(sub_subject, _)| *sub_subject == subject)
.map(|(_, sub)| sub)
}
fn find_request_subscription(&self, service: ServiceId) -> Option<&S> {
self.request_subscriptions
.iter()
.find(|(sub_service, _)| *sub_service == service)
.map(|(_, sub)| sub)
}
fn find_request_subscription_mut(&mut self, service: ServiceId) -> Option<&mut S> {
self.request_subscriptions
.iter_mut()
.find(|(sub_service, _)| *sub_service == service)
.map(|(_, sub)| sub)
}
fn find_response_subscription(&self, service: ServiceId) -> Option<&S> {
self.response_subscriptions
.iter()
.find(|(sub_service, _)| *sub_service == service)
.map(|(_, sub)| sub)
}
fn find_response_subscription_mut(&mut self, service: ServiceId) -> Option<&mut S> {
self.response_subscriptions
.iter_mut()
.find(|(sub_service, _)| *sub_service == service)
.map(|(_, sub)| sub)
}
fn for_each_message_subscription_mut<F>(&mut self, mut operation: F)
where
F: FnMut(&mut S),
{
for (_, subscription) in &mut self.message_subscriptions {
operation(subscription);
}
}
fn for_each_request_subscription_mut<F>(&mut self, mut operation: F)
where
F: FnMut(&mut S),
{
for (_, subscription) in &mut self.request_subscriptions {
operation(subscription);
}
}
fn for_each_response_subscription_mut<F>(&mut self, mut operation: F)
where
F: FnMut(&mut S),
{
for (_, subscription) in &mut self.response_subscriptions {
operation(subscription);
}
}
fn subscribers(&self) -> impl Iterator<Item = SubjectId> {
self.message_subscriptions.iter().map(|x| x.0)
}
fn servers(&self) -> impl Iterator<Item = ServiceId> {
self.request_subscriptions.iter().map(|x| x.0)
}
}
impl<S> Default for DynamicSubscriptionManager<S> {
fn default() -> Self {
DynamicSubscriptionManager {
message_subscriptions: Default::default(),
request_subscriptions: Default::default(),
response_subscriptions: Default::default(),
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Subscription {
Message(SubjectId),
Request(ServiceId),
Response(ServiceId),
}