1#![no_std]
6
7use {
8 anachro_icd::{
9 arbitrator::{self, Arbitrator, Control as AControl, ControlError, SubMsg},
10 component::{
11 Component, ComponentInfo, Control, ControlType, PubSub, PubSubShort, PubSubType,
12 },
13 },
14 core::default::Default,
15 heapless::{consts, Vec},
16};
17
18pub use anachro_icd::{self, Name, Path, PubSubPath, Uuid, Version};
19
20type ClientStore = Vec<Client, consts::U8>;
21
22#[derive(Default)]
35pub struct Broker {
36 clients: ClientStore,
37}
38
39#[derive(Debug, PartialEq, Eq)]
40pub enum ServerError {
41 ClientAlreadyRegistered,
42 UnknownClient,
43 ClientDisconnected,
44 ConnectionError,
45 ResourcesExhausted,
46 UnknownShortcode,
47}
48
49pub const RESET_MESSAGE: Arbitrator = Arbitrator::Control(AControl {
50 response: Err(ControlError::ResetConnection),
51 seq: 0,
52});
53
54impl Broker {
56 #[inline(always)]
58 pub fn new() -> Self {
59 Broker::default()
60 }
61
62 pub fn register_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
74 if self.clients.iter().find(|c| &c.id == id).is_none() {
75 self.clients
76 .push(Client {
77 id: *id,
78 state: ClientState::SessionEstablished,
79 })
80 .map_err(|_| ServerError::ResourcesExhausted)?;
81 Ok(())
82 } else {
83 Err(ServerError::ClientAlreadyRegistered)
84 }
85 }
86
87 pub fn remove_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
92 let pos = self
93 .clients
94 .iter()
95 .position(|c| &c.id == id)
96 .ok_or(ServerError::UnknownClient)?;
97 self.clients.swap_remove(pos);
98 Ok(())
99 }
100
101 pub fn reset_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
105 let mut client = self.client_by_id_mut(id)?;
106 client.state = ClientState::SessionEstablished;
107 Ok(())
108 }
109
110 pub fn process_msg<'a, 'b: 'a>(
125 &'b mut self,
126 req: &'a Request<'a>,
127 ) -> Result<Vec<Response<'a>, consts::U8>, ServerError> {
128 let mut responses = Vec::new();
129
130 match &req.msg {
131 Component::Control(ctrl) => {
132 let client = self.client_by_id_mut(&req.source)?;
133
134 if let Some(msg) = client.process_control(&ctrl)? {
135 responses
136 .push(msg)
137 .map_err(|_| ServerError::ResourcesExhausted)?;
138 }
139 }
140 Component::PubSub(PubSub { ref path, ref ty }) => match ty {
141 PubSubType::Pub { ref payload } => {
142 responses = self.process_publish(path, payload, &req.source)?;
143 }
144 PubSubType::Sub => {
145 let client = self.client_by_id_mut(&req.source)?;
146 responses
147 .push(client.process_subscribe(&path)?)
148 .map_err(|_| ServerError::ResourcesExhausted)?;
149 }
150 PubSubType::Unsub => {
151 let client = self.client_by_id_mut(&req.source)?;
152 client.process_unsub(&path)?;
153 todo!()
154 }
155 },
156 }
157
158 Ok(responses)
159 }
160}
161
162impl Broker {
164 fn client_by_id_mut(&mut self, id: &Uuid) -> Result<&mut Client, ServerError> {
165 self.clients
166 .iter_mut()
167 .find(|c| &c.id == id)
168 .ok_or(ServerError::UnknownClient)
169 }
170
171 fn process_publish<'b: 'a, 'a>(
172 &'b mut self,
173 path: &'a PubSubPath,
174 payload: &'a [u8],
175 source: &'a Uuid,
176 ) -> Result<Vec<Response<'a>, consts::U8>, ServerError> {
177 let source_id = self
181 .clients
182 .iter()
183 .filter_map(|c| c.state.as_connected().ok().map(|x| (c, x)))
184 .find(|(c, _x)| &c.id == source)
185 .ok_or(ServerError::UnknownClient)?;
186 let path = match path {
187 PubSubPath::Long(lp) => lp.as_str(),
188 PubSubPath::Short(sid) => &source_id
189 .1
190 .shortcuts
191 .iter()
192 .find(|s| &s.short == sid)
193 .ok_or(ServerError::UnknownShortcode)?
194 .long
195 .as_str(),
196 };
197
198 let mut responses = Vec::new();
200 'client: for (client, state) in self
201 .clients
202 .iter()
203 .filter_map(|c| c.state.as_connected().ok().map(|x| (c, x)))
204 {
205 if &client.id == source {
206 continue;
208 }
209
210 for subt in state.subscriptions.iter() {
211 if anachro_icd::matches(subt.as_str(), path) {
212 for short in state.shortcuts.iter() {
214 if path == short.long.as_str() {
216 let msg = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubMsg(
217 SubMsg {
218 path: PubSubPath::Short(short.short),
219 payload,
220 },
221 )));
222 responses
223 .push(Response {
224 dest: client.id,
225 msg,
226 })
227 .map_err(|_| ServerError::ResourcesExhausted)?;
228 continue 'client;
229 }
230 }
231
232 let msg = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubMsg(SubMsg {
233 path: PubSubPath::Long(Path::borrow_from_str(path)),
234 payload,
235 })));
236 responses
237 .push(Response {
238 dest: client.id,
239 msg,
240 })
241 .map_err(|_| ServerError::ResourcesExhausted)?;
242 continue 'client;
243 }
244 }
245 }
246
247 Ok(responses)
248 }
249}
250
251struct Client {
252 id: Uuid,
253 state: ClientState,
254}
255
256impl Client {
257 fn process_control(&mut self, ctrl: &Control) -> Result<Option<Response>, ServerError> {
258 let response;
259
260 let next = match &ctrl.ty {
261 ControlType::RegisterComponent(ComponentInfo { name, version }) => match &self.state {
262 ClientState::SessionEstablished | ClientState::Connected(_) => {
263 let resp = Arbitrator::Control(arbitrator::Control {
264 seq: ctrl.seq,
265 response: Ok(arbitrator::ControlResponse::ComponentRegistration(self.id)),
266 });
267
268 response = Some(Response {
269 dest: self.id,
270 msg: resp,
271 });
272
273 Some(ClientState::Connected(ConnectedState {
274 name: name
275 .try_to_owned()
276 .map_err(|_| ServerError::ResourcesExhausted)?,
277 version: *version,
278 subscriptions: Vec::new(),
279 shortcuts: Vec::new(),
280 }))
281 }
282 },
283 ControlType::RegisterPubSubShortId(PubSubShort {
284 long_name,
285 short_id,
286 }) => {
287 let state = self.state.as_connected_mut()?;
288
289 if long_name.contains('#') || long_name.contains('+') {
290 let resp = Arbitrator::Control(arbitrator::Control {
292 seq: ctrl.seq,
293 response: Err(arbitrator::ControlError::NoWildcardsInShorts),
294 });
295
296 response = Some(Response {
297 dest: self.id,
298 msg: resp,
299 });
300 } else {
301 let shortcut_exists = state
302 .shortcuts
303 .iter()
304 .any(|sc| (sc.long.as_str() == *long_name) && (sc.short == *short_id));
305
306 if !shortcut_exists {
307 state
308 .shortcuts
309 .push(Shortcut {
310 long: Path::try_from_str(long_name).unwrap(),
311 short: *short_id,
312 })
313 .map_err(|_| ServerError::ResourcesExhausted)?;
314 }
315
316 let resp = Arbitrator::Control(arbitrator::Control {
317 seq: ctrl.seq,
318 response: Ok(arbitrator::ControlResponse::PubSubShortRegistration(
319 *short_id,
320 )),
321 });
322
323 response = Some(Response {
324 dest: self.id,
325 msg: resp,
326 });
327 }
328
329 None
332 }
333 };
334
335 if let Some(next) = next {
336 self.state = next;
337 }
338
339 Ok(response)
340 }
341
342 fn process_subscribe<'a>(&mut self, path: &'a PubSubPath) -> Result<Response<'a>, ServerError> {
343 let state = self.state.as_connected_mut()?;
344
345 let path_str = match path {
347 PubSubPath::Long(lp) => lp.as_str(),
348 PubSubPath::Short(sid) => state
349 .shortcuts
350 .iter()
351 .find(|s| &s.short == sid)
352 .ok_or(ServerError::UnknownShortcode)?
353 .long
354 .as_str(),
355 };
356
357 if state
359 .subscriptions
360 .iter()
361 .find(|s| s.as_str() == path_str)
362 .is_none()
363 {
364 state
365 .subscriptions
366 .push(Path::try_from_str(path_str).unwrap())
367 .map_err(|_| ServerError::ResourcesExhausted)?;
368 }
369
370 let resp = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubAck {
371 path: path.clone(),
372 }));
373
374 Ok(Response {
375 dest: self.id,
376 msg: resp,
377 })
378 }
379
380 fn process_unsub(&mut self, _path: &PubSubPath) -> Result<(), ServerError> {
381 let _state = self.state.as_connected_mut()?;
382
383 todo!()
384 }
385}
386
387#[allow(clippy::large_enum_variant)]
388#[derive(Debug)]
389enum ClientState {
390 SessionEstablished,
391 Connected(ConnectedState),
392}
393
394impl ClientState {
395 fn as_connected(&self) -> Result<&ConnectedState, ServerError> {
396 match self {
397 ClientState::Connected(state) => Ok(state),
398 _ => Err(ServerError::ClientDisconnected),
399 }
400 }
401
402 fn as_connected_mut(&mut self) -> Result<&mut ConnectedState, ServerError> {
403 match self {
404 ClientState::Connected(ref mut state) => Ok(state),
405 _ => Err(ServerError::ClientDisconnected),
406 }
407 }
408}
409
410#[derive(Debug)]
411struct ConnectedState {
412 name: Name<'static>,
413 version: Version,
414 subscriptions: Vec<Path<'static>, consts::U8>,
415 shortcuts: Vec<Shortcut, consts::U8>,
416}
417
418#[derive(Debug)]
419struct Shortcut {
420 long: Path<'static>,
421 short: u16,
422}
423
424pub struct Request<'a> {
428 pub source: Uuid,
429 pub msg: Component<'a>,
430}
431
432pub struct Response<'a> {
436 pub dest: Uuid,
437 pub msg: Arbitrator<'a>,
438}