#![no_std]
use {
anachro_icd::{
arbitrator::{self, Arbitrator, Control as AControl, ControlError, SubMsg},
component::{
Component, ComponentInfo, Control, ControlType, PubSub, PubSubShort, PubSubType,
},
},
core::default::Default,
heapless::{consts, Vec},
};
pub use anachro_icd::{self, Name, Path, PubSubPath, Uuid, Version};
type ClientStore = Vec<Client, consts::U8>;
#[derive(Default)]
pub struct Broker {
clients: ClientStore,
}
#[derive(Debug, PartialEq, Eq)]
pub enum ServerError {
ClientAlreadyRegistered,
UnknownClient,
ClientDisconnected,
ConnectionError,
ResourcesExhausted,
UnknownShortcode,
}
pub const RESET_MESSAGE: Arbitrator = Arbitrator::Control(AControl {
response: Err(ControlError::ResetConnection),
seq: 0,
});
impl Broker {
#[inline(always)]
pub fn new() -> Self {
Broker::default()
}
pub fn register_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
if self.clients.iter().find(|c| &c.id == id).is_none() {
self.clients
.push(Client {
id: *id,
state: ClientState::SessionEstablished,
})
.map_err(|_| ServerError::ResourcesExhausted)?;
Ok(())
} else {
Err(ServerError::ClientAlreadyRegistered)
}
}
pub fn remove_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
let pos = self
.clients
.iter()
.position(|c| &c.id == id)
.ok_or(ServerError::UnknownClient)?;
self.clients.swap_remove(pos);
Ok(())
}
pub fn reset_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
let mut client = self.client_by_id_mut(id)?;
client.state = ClientState::SessionEstablished;
Ok(())
}
pub fn process_msg<'a, 'b: 'a>(
&'b mut self,
req: &'a Request<'a>,
) -> Result<Vec<Response<'a>, consts::U8>, ServerError> {
let mut responses = Vec::new();
match &req.msg {
Component::Control(ctrl) => {
let client = self.client_by_id_mut(&req.source)?;
if let Some(msg) = client.process_control(&ctrl)? {
responses
.push(msg)
.map_err(|_| ServerError::ResourcesExhausted)?;
}
}
Component::PubSub(PubSub { ref path, ref ty }) => match ty {
PubSubType::Pub { ref payload } => {
responses = self.process_publish(path, payload, &req.source)?;
}
PubSubType::Sub => {
let client = self.client_by_id_mut(&req.source)?;
responses
.push(client.process_subscribe(&path)?)
.map_err(|_| ServerError::ResourcesExhausted)?;
}
PubSubType::Unsub => {
let client = self.client_by_id_mut(&req.source)?;
client.process_unsub(&path)?;
todo!()
}
},
}
Ok(responses)
}
}
impl Broker {
fn client_by_id_mut(&mut self, id: &Uuid) -> Result<&mut Client, ServerError> {
self.clients
.iter_mut()
.find(|c| &c.id == id)
.ok_or(ServerError::UnknownClient)
}
fn process_publish<'b: 'a, 'a>(
&'b mut self,
path: &'a PubSubPath,
payload: &'a [u8],
source: &'a Uuid,
) -> Result<Vec<Response<'a>, consts::U8>, ServerError> {
let source_id = self
.clients
.iter()
.filter_map(|c| c.state.as_connected().ok().map(|x| (c, x)))
.find(|(c, _x)| &c.id == source)
.ok_or(ServerError::UnknownClient)?;
let path = match path {
PubSubPath::Long(lp) => lp.as_str(),
PubSubPath::Short(sid) => &source_id
.1
.shortcuts
.iter()
.find(|s| &s.short == sid)
.ok_or(ServerError::UnknownShortcode)?
.long
.as_str(),
};
let mut responses = Vec::new();
'client: for (client, state) in self
.clients
.iter()
.filter_map(|c| c.state.as_connected().ok().map(|x| (c, x)))
{
if &client.id == source {
continue;
}
for subt in state.subscriptions.iter() {
if anachro_icd::matches(subt.as_str(), path) {
for short in state.shortcuts.iter() {
if path == short.long.as_str() {
let msg = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubMsg(
SubMsg {
path: PubSubPath::Short(short.short),
payload,
},
)));
responses
.push(Response {
dest: client.id,
msg,
})
.map_err(|_| ServerError::ResourcesExhausted)?;
continue 'client;
}
}
let msg = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubMsg(SubMsg {
path: PubSubPath::Long(Path::borrow_from_str(path)),
payload,
})));
responses
.push(Response {
dest: client.id,
msg,
})
.map_err(|_| ServerError::ResourcesExhausted)?;
continue 'client;
}
}
}
Ok(responses)
}
}
struct Client {
id: Uuid,
state: ClientState,
}
impl Client {
fn process_control(&mut self, ctrl: &Control) -> Result<Option<Response>, ServerError> {
let response;
let next = match &ctrl.ty {
ControlType::RegisterComponent(ComponentInfo { name, version }) => match &self.state {
ClientState::SessionEstablished | ClientState::Connected(_) => {
let resp = Arbitrator::Control(arbitrator::Control {
seq: ctrl.seq,
response: Ok(arbitrator::ControlResponse::ComponentRegistration(self.id)),
});
response = Some(Response {
dest: self.id,
msg: resp,
});
Some(ClientState::Connected(ConnectedState {
name: name
.try_to_owned()
.map_err(|_| ServerError::ResourcesExhausted)?,
version: *version,
subscriptions: Vec::new(),
shortcuts: Vec::new(),
}))
}
},
ControlType::RegisterPubSubShortId(PubSubShort {
long_name,
short_id,
}) => {
let state = self.state.as_connected_mut()?;
if long_name.contains('#') || long_name.contains('+') {
let resp = Arbitrator::Control(arbitrator::Control {
seq: ctrl.seq,
response: Err(arbitrator::ControlError::NoWildcardsInShorts),
});
response = Some(Response {
dest: self.id,
msg: resp,
});
} else {
let shortcut_exists = state
.shortcuts
.iter()
.any(|sc| (sc.long.as_str() == *long_name) && (sc.short == *short_id));
if !shortcut_exists {
state
.shortcuts
.push(Shortcut {
long: Path::try_from_str(long_name).unwrap(),
short: *short_id,
})
.map_err(|_| ServerError::ResourcesExhausted)?;
}
let resp = Arbitrator::Control(arbitrator::Control {
seq: ctrl.seq,
response: Ok(arbitrator::ControlResponse::PubSubShortRegistration(
*short_id,
)),
});
response = Some(Response {
dest: self.id,
msg: resp,
});
}
None
}
};
if let Some(next) = next {
self.state = next;
}
Ok(response)
}
fn process_subscribe<'a>(&mut self, path: &'a PubSubPath) -> Result<Response<'a>, ServerError> {
let state = self.state.as_connected_mut()?;
let path_str = match path {
PubSubPath::Long(lp) => lp.as_str(),
PubSubPath::Short(sid) => state
.shortcuts
.iter()
.find(|s| &s.short == sid)
.ok_or(ServerError::UnknownShortcode)?
.long
.as_str(),
};
if state
.subscriptions
.iter()
.find(|s| s.as_str() == path_str)
.is_none()
{
state
.subscriptions
.push(Path::try_from_str(path_str).unwrap())
.map_err(|_| ServerError::ResourcesExhausted)?;
}
let resp = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubAck {
path: path.clone(),
}));
Ok(Response {
dest: self.id,
msg: resp,
})
}
fn process_unsub(&mut self, _path: &PubSubPath) -> Result<(), ServerError> {
let _state = self.state.as_connected_mut()?;
todo!()
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum ClientState {
SessionEstablished,
Connected(ConnectedState),
}
impl ClientState {
fn as_connected(&self) -> Result<&ConnectedState, ServerError> {
match self {
ClientState::Connected(state) => Ok(state),
_ => Err(ServerError::ClientDisconnected),
}
}
fn as_connected_mut(&mut self) -> Result<&mut ConnectedState, ServerError> {
match self {
ClientState::Connected(ref mut state) => Ok(state),
_ => Err(ServerError::ClientDisconnected),
}
}
}
#[derive(Debug)]
struct ConnectedState {
name: Name<'static>,
version: Version,
subscriptions: Vec<Path<'static>, consts::U8>,
shortcuts: Vec<Shortcut, consts::U8>,
}
#[derive(Debug)]
struct Shortcut {
long: Path<'static>,
short: u16,
}
pub struct Request<'a> {
pub source: Uuid,
pub msg: Component<'a>,
}
pub struct Response<'a> {
pub dest: Uuid,
pub msg: Arbitrator<'a>,
}