anachro_server/
lib.rs

1//! # The Anachro Protocol Server/Broker Library
2//!
3//! This crate is used by devices acting as a Server/Broker of the Anachro Protocol
4
5#![no_std]
6
7use {
8    anachro_icd::{
9        arbitrator::{self, Arbitrator, Control as AControl, ControlError, SubMsg},
10        component::{
11            Component, ComponentInfo, Control, ControlType, PubSub, PubSubShort, PubSubType,
12        },
13    },
14    core::default::Default,
15    heapless::{consts, Vec},
16};
17
18pub use anachro_icd::{self, Name, Path, PubSubPath, Uuid, Version};
19
20type ClientStore = Vec<Client, consts::U8>;
21
22/// The Broker Interface
23///
24/// This is the primary interface for devices acting as a broker.
25///
26/// Currently the max capacity is fixed with a maximum of 8
27/// clients connected. Each Client may subscribe up to 8 topics.
28/// Each Client may register up to 8 shortcodes.
29///
30/// In the future, these limits may be configurable.
31///
32/// As a note, the Broker currently creates a sizable object, due
33/// to the fixed upper limits
34#[derive(Default)]
35pub struct Broker {
36    clients: ClientStore,
37}
38
39#[derive(Debug, PartialEq, Eq)]
40pub enum ServerError {
41    ClientAlreadyRegistered,
42    UnknownClient,
43    ClientDisconnected,
44    ConnectionError,
45    ResourcesExhausted,
46    UnknownShortcode,
47}
48
49pub const RESET_MESSAGE: Arbitrator = Arbitrator::Control(AControl {
50    response: Err(ControlError::ResetConnection),
51    seq: 0,
52});
53
54// Public Interfaces
55impl Broker {
56    /// Create a new broker with no clients attached
57    #[inline(always)]
58    pub fn new() -> Self {
59        Broker::default()
60    }
61
62    /// Register a client to the broker
63    ///
64    /// This can be done dynamically, e.g. when a client connects for the
65    /// first time, e.g. a TCP session is established, or the first packet
66    /// is received, or can be done ahead-of-time, e.g. when communicating
67    /// with a fixed set of wired devices.
68    ///
69    /// Clients must be registered before messages from them can be processed.
70    ///
71    /// If an already-registered client is re-registered, they will be reset to
72    /// an initial connection state, dropping all subscriptions or shortcodes.
73    pub fn register_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
74        if self.clients.iter().find(|c| &c.id == id).is_none() {
75            self.clients
76                .push(Client {
77                    id: *id,
78                    state: ClientState::SessionEstablished,
79                })
80                .map_err(|_| ServerError::ResourcesExhausted)?;
81            Ok(())
82        } else {
83            Err(ServerError::ClientAlreadyRegistered)
84        }
85    }
86
87    /// Remove a client from the broker
88    ///
89    /// This could be necessary if the connection to a client breaks or times out
90    /// Once removed, no further messages to or from this client will be processed
91    pub fn remove_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
92        let pos = self
93            .clients
94            .iter()
95            .position(|c| &c.id == id)
96            .ok_or(ServerError::UnknownClient)?;
97        self.clients.swap_remove(pos);
98        Ok(())
99    }
100
101    /// Reset a client registered with the broker, without removing it
102    ///
103    /// This could be necessary if the connection to a client breaks or times out.
104    pub fn reset_client(&mut self, id: &Uuid) -> Result<(), ServerError> {
105        let mut client = self.client_by_id_mut(id)?;
106        client.state = ClientState::SessionEstablished;
107        Ok(())
108    }
109
110    /// Process a single message from a client
111    ///
112    /// A message from a client will be processed. If processing this message
113    /// generates responses that need to be sent (e.g. a publish occurs and
114    /// subscribed clients should be notified, or if the broker is responding
115    /// to a request from the client), they will be returned, and the messages
116    /// should be sent to the appropriate clients.
117    ///
118    /// Requests and Responses are addressed by the Uuid registered for each client
119    ///
120    /// **NOTE**: If an error occurs, you probably should send a `RESET_MESSAGE` to
121    /// that client to force them to reconnect. You may also want to `remove_client`
122    /// or `reset_client`, depending on the situation. This will hopefully be handled
123    /// automatically in the future.
124    pub fn process_msg<'a, 'b: 'a>(
125        &'b mut self,
126        req: &'a Request<'a>,
127    ) -> Result<Vec<Response<'a>, consts::U8>, ServerError> {
128        let mut responses = Vec::new();
129
130        match &req.msg {
131            Component::Control(ctrl) => {
132                let client = self.client_by_id_mut(&req.source)?;
133
134                if let Some(msg) = client.process_control(&ctrl)? {
135                    responses
136                        .push(msg)
137                        .map_err(|_| ServerError::ResourcesExhausted)?;
138                }
139            }
140            Component::PubSub(PubSub { ref path, ref ty }) => match ty {
141                PubSubType::Pub { ref payload } => {
142                    responses = self.process_publish(path, payload, &req.source)?;
143                }
144                PubSubType::Sub => {
145                    let client = self.client_by_id_mut(&req.source)?;
146                    responses
147                        .push(client.process_subscribe(&path)?)
148                        .map_err(|_| ServerError::ResourcesExhausted)?;
149                }
150                PubSubType::Unsub => {
151                    let client = self.client_by_id_mut(&req.source)?;
152                    client.process_unsub(&path)?;
153                    todo!()
154                }
155            },
156        }
157
158        Ok(responses)
159    }
160}
161
162// Private interfaces
163impl Broker {
164    fn client_by_id_mut(&mut self, id: &Uuid) -> Result<&mut Client, ServerError> {
165        self.clients
166            .iter_mut()
167            .find(|c| &c.id == id)
168            .ok_or(ServerError::UnknownClient)
169    }
170
171    fn process_publish<'b: 'a, 'a>(
172        &'b mut self,
173        path: &'a PubSubPath,
174        payload: &'a [u8],
175        source: &'a Uuid,
176    ) -> Result<Vec<Response<'a>, consts::U8>, ServerError> {
177        // TODO: Make sure we're not publishing to wildcards
178
179        // First, find the sender's path
180        let source_id = self
181            .clients
182            .iter()
183            .filter_map(|c| c.state.as_connected().ok().map(|x| (c, x)))
184            .find(|(c, _x)| &c.id == source)
185            .ok_or(ServerError::UnknownClient)?;
186        let path = match path {
187            PubSubPath::Long(lp) => lp.as_str(),
188            PubSubPath::Short(sid) => &source_id
189                .1
190                .shortcuts
191                .iter()
192                .find(|s| &s.short == sid)
193                .ok_or(ServerError::UnknownShortcode)?
194                .long
195                .as_str(),
196        };
197
198        // Then, find all applicable destinations, max of 1 per destination
199        let mut responses = Vec::new();
200        'client: for (client, state) in self
201            .clients
202            .iter()
203            .filter_map(|c| c.state.as_connected().ok().map(|x| (c, x)))
204        {
205            if &client.id == source {
206                // Don't send messages back to the sender
207                continue;
208            }
209
210            for subt in state.subscriptions.iter() {
211                if anachro_icd::matches(subt.as_str(), path) {
212                    // Does the destination have a shortcut for this?
213                    for short in state.shortcuts.iter() {
214                        // NOTE: we use path, NOT subt, as it may contain wildcards
215                        if path == short.long.as_str() {
216                            let msg = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubMsg(
217                                SubMsg {
218                                    path: PubSubPath::Short(short.short),
219                                    payload,
220                                },
221                            )));
222                            responses
223                                .push(Response {
224                                    dest: client.id,
225                                    msg,
226                                })
227                                .map_err(|_| ServerError::ResourcesExhausted)?;
228                            continue 'client;
229                        }
230                    }
231
232                    let msg = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubMsg(SubMsg {
233                        path: PubSubPath::Long(Path::borrow_from_str(path)),
234                        payload,
235                    })));
236                    responses
237                        .push(Response {
238                            dest: client.id,
239                            msg,
240                        })
241                        .map_err(|_| ServerError::ResourcesExhausted)?;
242                    continue 'client;
243                }
244            }
245        }
246
247        Ok(responses)
248    }
249}
250
251struct Client {
252    id: Uuid,
253    state: ClientState,
254}
255
256impl Client {
257    fn process_control(&mut self, ctrl: &Control) -> Result<Option<Response>, ServerError> {
258        let response;
259
260        let next = match &ctrl.ty {
261            ControlType::RegisterComponent(ComponentInfo { name, version }) => match &self.state {
262                ClientState::SessionEstablished | ClientState::Connected(_) => {
263                    let resp = Arbitrator::Control(arbitrator::Control {
264                        seq: ctrl.seq,
265                        response: Ok(arbitrator::ControlResponse::ComponentRegistration(self.id)),
266                    });
267
268                    response = Some(Response {
269                        dest: self.id,
270                        msg: resp,
271                    });
272
273                    Some(ClientState::Connected(ConnectedState {
274                        name: name
275                            .try_to_owned()
276                            .map_err(|_| ServerError::ResourcesExhausted)?,
277                        version: *version,
278                        subscriptions: Vec::new(),
279                        shortcuts: Vec::new(),
280                    }))
281                }
282            },
283            ControlType::RegisterPubSubShortId(PubSubShort {
284                long_name,
285                short_id,
286            }) => {
287                let state = self.state.as_connected_mut()?;
288
289                if long_name.contains('#') || long_name.contains('+') {
290                    // TODO: How to handle wildcards + short names?
291                    let resp = Arbitrator::Control(arbitrator::Control {
292                        seq: ctrl.seq,
293                        response: Err(arbitrator::ControlError::NoWildcardsInShorts),
294                    });
295
296                    response = Some(Response {
297                        dest: self.id,
298                        msg: resp,
299                    });
300                } else {
301                    let shortcut_exists = state
302                        .shortcuts
303                        .iter()
304                        .any(|sc| (sc.long.as_str() == *long_name) && (sc.short == *short_id));
305
306                    if !shortcut_exists {
307                        state
308                            .shortcuts
309                            .push(Shortcut {
310                                long: Path::try_from_str(long_name).unwrap(),
311                                short: *short_id,
312                            })
313                            .map_err(|_| ServerError::ResourcesExhausted)?;
314                    }
315
316                    let resp = Arbitrator::Control(arbitrator::Control {
317                        seq: ctrl.seq,
318                        response: Ok(arbitrator::ControlResponse::PubSubShortRegistration(
319                            *short_id,
320                        )),
321                    });
322
323                    response = Some(Response {
324                        dest: self.id,
325                        msg: resp,
326                    });
327                }
328
329                // TODO: Dupe check?
330
331                None
332            }
333        };
334
335        if let Some(next) = next {
336            self.state = next;
337        }
338
339        Ok(response)
340    }
341
342    fn process_subscribe<'a>(&mut self, path: &'a PubSubPath) -> Result<Response<'a>, ServerError> {
343        let state = self.state.as_connected_mut()?;
344
345        // Determine canonical path
346        let path_str = match path {
347            PubSubPath::Long(lp) => lp.as_str(),
348            PubSubPath::Short(sid) => state
349                .shortcuts
350                .iter()
351                .find(|s| &s.short == sid)
352                .ok_or(ServerError::UnknownShortcode)?
353                .long
354                .as_str(),
355        };
356
357        // Only push if not a dupe
358        if state
359            .subscriptions
360            .iter()
361            .find(|s| s.as_str() == path_str)
362            .is_none()
363        {
364            state
365                .subscriptions
366                .push(Path::try_from_str(path_str).unwrap())
367                .map_err(|_| ServerError::ResourcesExhausted)?;
368        }
369
370        let resp = Arbitrator::PubSub(Ok(arbitrator::PubSubResponse::SubAck {
371            path: path.clone(),
372        }));
373
374        Ok(Response {
375            dest: self.id,
376            msg: resp,
377        })
378    }
379
380    fn process_unsub(&mut self, _path: &PubSubPath) -> Result<(), ServerError> {
381        let _state = self.state.as_connected_mut()?;
382
383        todo!()
384    }
385}
386
387#[allow(clippy::large_enum_variant)]
388#[derive(Debug)]
389enum ClientState {
390    SessionEstablished,
391    Connected(ConnectedState),
392}
393
394impl ClientState {
395    fn as_connected(&self) -> Result<&ConnectedState, ServerError> {
396        match self {
397            ClientState::Connected(state) => Ok(state),
398            _ => Err(ServerError::ClientDisconnected),
399        }
400    }
401
402    fn as_connected_mut(&mut self) -> Result<&mut ConnectedState, ServerError> {
403        match self {
404            ClientState::Connected(ref mut state) => Ok(state),
405            _ => Err(ServerError::ClientDisconnected),
406        }
407    }
408}
409
410#[derive(Debug)]
411struct ConnectedState {
412    name: Name<'static>,
413    version: Version,
414    subscriptions: Vec<Path<'static>, consts::U8>,
415    shortcuts: Vec<Shortcut, consts::U8>,
416}
417
418#[derive(Debug)]
419struct Shortcut {
420    long: Path<'static>,
421    short: u16,
422}
423
424/// A request FROM the Client, TO the Broker
425///
426/// This message is addressed by a UUID used when registering the client
427pub struct Request<'a> {
428    pub source: Uuid,
429    pub msg: Component<'a>,
430}
431
432/// A response TO the Client, FROM the Broker
433///
434/// This message is addressed by a UUID used when registering the client
435pub struct Response<'a> {
436    pub dest: Uuid,
437    pub msg: Arbitrator<'a>,
438}