Skip to main content

anachro_client/
client.rs

1//! The Client interface
2//!
3//! This is the primary interface used by clients. It is used to track
4//! the state of a connection, and process any incoming or outgoing messages
5
6use {
7    crate::{client_io::ClientIo, table::Table, Error, RecvMsg},
8    anachro_icd::{
9        self,
10        arbitrator::{Arbitrator, Control as AControl, ControlResponse, PubSubResponse},
11        component::{
12            Component, ComponentInfo, Control as CControl, ControlType, PubSub, PubSubShort,
13            PubSubType,
14        },
15        ManagedString, Name, Path, PubSubPath, Uuid, Version,
16    },
17};
18
19/// The shortcode offset used for Publish topics
20///
21/// For now, I have defined `0x0000..=0x7FFF` as the range used
22/// for subscription topic shortcodes, and `0x8000..=0xFFFF` as
23/// the range used for publish topic shortcodes. This is an
24/// implementation detail, and should not be relied upon.
25pub const PUBLISH_SHORTCODE_OFFSET: u16 = 0x8000;
26
27#[derive(Debug)]
28enum ClientState {
29    Disconnected,
30    PendingRegistration,
31    Registered,
32    Subscribing,
33    Subscribed,
34    ShortCodingSub,
35    ShortCodingPub,
36    Active,
37}
38
39impl ClientState {
40    pub(crate) fn as_active(&self) -> Result<(), Error> {
41        match self {
42            ClientState::Active => Ok(()),
43            _ => Err(Error::NotActive),
44        }
45    }
46}
47
48/// The Client interface
49///
50/// This is the primary interface used by clients. It is used to track
51/// the state of a connection, and process any incoming or outgoing messages
52pub struct Client {
53    state: ClientState,
54    // TODO: This should probably just be a &'static str
55    name: Name<'static>,
56    version: Version,
57    ctr: u16,
58    sub_paths: &'static [&'static str],
59    pub_short_paths: &'static [&'static str],
60    timeout_ticks: Option<u8>,
61    uuid: Uuid,
62    current_tick: u8,
63    current_idx: usize,
64}
65
66impl Client {
67    /// Create a new client instance
68    ///
69    /// ## Parameters
70    ///
71    /// ### `name`
72    ///
73    /// The name of this device or client
74    ///
75    /// ### `version`
76    ///
77    /// The semantic version number of this client
78    ///
79    /// ### `ctr_init`
80    ///
81    /// A value to initialize the control counter.
82    ///
83    /// You may choose to initialize this with a fixed or random value
84    ///
85    /// ### `sub_paths`
86    ///
87    /// The subscription paths that the device is interested in
88    ///
89    /// This is typically provided by the `Table::sub_paths()` method on
90    /// a table type generated using the `pubsub_table!()` macro.
91    ///
92    /// ### `pub_paths`
93    ///
94    /// The publishing paths that the device is interested in
95    ///
96    /// This is typically provided by the `Table::pub_paths()` method on
97    /// a table type generated using the `pubsub_table!()` macro.
98    ///
99    /// ### `timeout_ticks`
100    ///
101    /// The number of ticks used to time-out waiting for certain responses
102    /// before retrying automatically.
103    ///
104    /// Set to `None` to disable automatic retries. This will require the
105    /// user to manually call `Client::reset_connection()` if a message is
106    /// lost.
107    ///
108    /// Ticks are counted by calls to `Client::process_one()`, which should be
109    /// called at a semi-regular rate. e.g. if you call `process_one()` every
110    /// 10ms, a `timeout_ticks: Some(100)` would automatically timeout after
111    /// 1s of waiting for a response.
112    pub fn new(
113        name: &str,
114        version: Version,
115        ctr_init: u16,
116        sub_paths: &'static [&'static str],
117        pub_short_paths: &'static [&'static str],
118        timeout_ticks: Option<u8>,
119    ) -> Self {
120        Self {
121            name: Name::try_from_str(name).unwrap(),
122            version,
123            ctr: ctr_init,
124            state: ClientState::Disconnected,
125            sub_paths,
126            pub_short_paths,
127            timeout_ticks,
128            uuid: Uuid::from_bytes([0u8; 16]),
129            current_tick: 0,
130            current_idx: 0,
131        }
132    }
133
134    /// Reset the client connection
135    ///
136    /// This immediately disconnects the client, at which point
137    /// it will begin attemption to re-establish a connection to
138    /// the broker.
139    pub fn reset_connection(&mut self) {
140        self.state = ClientState::Disconnected;
141        self.current_tick = 0;
142        self.current_idx = 0;
143    }
144
145    /// Obtain the `Uuid` assigned by the broker to this client
146    ///
147    /// If the client is not connected, `None` will be returned.
148    pub fn get_id(&self) -> Option<&Uuid> {
149        if self.is_connected() {
150            Some(&self.uuid)
151        } else {
152            None
153        }
154    }
155
156    /// Is the client connected?
157    pub fn is_connected(&self) -> bool {
158        self.state.as_active().is_ok()
159    }
160
161    /// Publish a message
162    ///
163    /// This interface publishes a message from the client to the broker.
164    ///
165    /// ## Parameters
166    ///
167    /// ### `cio`
168    ///
169    /// This is the `ClientIo` instance used by the client
170    ///
171    /// ### `path`
172    ///
173    /// This is the path to publish to. This is typically created by using
174    /// the `Table::serialize()` method, which returns a path and the serialized
175    /// payload
176    ///
177    /// ### `payload`
178    ///
179    /// The serialized payload to publish. This is typically created by using
180    /// the `Table::serialize()` method, which returns a path and the serialized
181    /// payload
182    pub fn publish<'a, 'b: 'a, C: ClientIo>(
183        &'b self,
184        cio: &mut C,
185        path: &'a str,
186        payload: &'a [u8],
187    ) -> Result<(), Error> {
188        self.state.as_active()?;
189
190        let path = match self.pub_short_paths.iter().position(|pth| &path == pth) {
191            Some(short) => PubSubPath::Short((short as u16) | PUBLISH_SHORTCODE_OFFSET),
192            None => PubSubPath::Long(ManagedString::Borrow(path)),
193        };
194
195        let msg = Component::PubSub(PubSub {
196            path,
197            ty: PubSubType::Pub { payload },
198        });
199
200        cio.send(&msg)?;
201
202        Ok(())
203    }
204
205    /// Process a single incoming message
206    ///
207    /// This function *must* be called regularly to process messages
208    /// that have been received by the broker. It is suggested to call
209    /// it at regular intervals if you are using the `timeout_ticks` option
210    /// when creating the Client.
211    ///
212    /// If a subscription message has been received, it will be returned
213    /// with the path that was published to. If the Client has subscribed
214    /// using a wildcard, it may not exactly match the subscription topic
215    ///
216    /// The `anachro-icd::matches` function can be used to compare if a topic
217    /// matches a given fixed or wildcard path, if necessary.
218    pub fn process_one<C: ClientIo, T: Table>(
219        &mut self,
220        cio: &mut C,
221    ) -> Result<Option<RecvMsg<T>>, Error> {
222        let mut response: Option<RecvMsg<T>> = None;
223
224        match &mut self.state {
225            // =====================================
226            // Disconnected
227            // =====================================
228            ClientState::Disconnected => {
229                self.disconnected(cio)?;
230            }
231
232            // =====================================
233            // Pending Registration
234            // =====================================
235            ClientState::PendingRegistration => {
236                self.pending_registration(cio)?;
237
238                if self.timeout_violated() {
239                    self.state = ClientState::Disconnected;
240                    self.current_tick = 0;
241                }
242            }
243
244            // =====================================
245            // Registered
246            // =====================================
247            ClientState::Registered => {
248                self.registered(cio)?;
249            }
250
251            // =====================================
252            // Subscribing
253            // =====================================
254            ClientState::Subscribing => {
255                self.subscribing(cio)?;
256
257                if self.timeout_violated() {
258                    let msg = Component::PubSub(PubSub {
259                        path: PubSubPath::Long(Path::borrow_from_str(
260                            self.sub_paths[self.current_idx],
261                        )),
262                        ty: PubSubType::Sub,
263                    });
264
265                    cio.send(&msg)?;
266
267                    self.current_tick = 0;
268                }
269            }
270
271            // =====================================
272            // Subscribed
273            // =====================================
274            ClientState::Subscribed => {
275                self.subscribed(cio)?;
276            }
277
278            // =====================================
279            // ShortCoding
280            // =====================================
281            ClientState::ShortCodingSub => {
282                self.shortcoding_sub(cio)?;
283
284                if self.timeout_violated() {
285                    self.ctr = self.ctr.wrapping_add(1);
286
287                    let msg = Component::Control(CControl {
288                        seq: self.ctr,
289                        ty: ControlType::RegisterPubSubShortId(PubSubShort {
290                            long_name: self.sub_paths[self.current_idx],
291                            short_id: self.current_idx as u16,
292                        }),
293                    });
294
295                    cio.send(&msg)?;
296
297                    self.current_tick = 0;
298                }
299            }
300
301            ClientState::ShortCodingPub => {
302                self.shortcoding_pub(cio)?;
303
304                if self.timeout_violated() {
305                    self.ctr = self.ctr.wrapping_add(1);
306
307                    let msg = Component::Control(CControl {
308                        seq: self.ctr,
309                        ty: ControlType::RegisterPubSubShortId(PubSubShort {
310                            long_name: self.pub_short_paths[self.current_idx],
311                            short_id: (self.current_idx as u16) | PUBLISH_SHORTCODE_OFFSET,
312                        }),
313                    });
314
315                    cio.send(&msg)?;
316
317                    self.current_tick = 0;
318                }
319            }
320
321            // =====================================
322            // Active
323            // =====================================
324            ClientState::Active => {
325                response = self.active(cio)?;
326            }
327        };
328
329        Ok(response)
330    }
331}
332
333// Private interfaces for the client. These are largely used to
334// process incoming messages and handle state
335impl Client {
336    /// Have we reached the timeout limit provided by the user?
337    fn timeout_violated(&self) -> bool {
338        match self.timeout_ticks {
339            Some(ticks) if ticks <= self.current_tick => true,
340            Some(_) => false,
341            None => false,
342        }
343    }
344
345    /// Process messages while in a `ClientState::Disconnected` state
346    fn disconnected<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
347        self.ctr += 1;
348
349        let resp = Component::Control(CControl {
350            seq: self.ctr,
351            ty: ControlType::RegisterComponent(ComponentInfo {
352                name: self.name.as_borrowed(),
353                version: self.version,
354            }),
355        });
356
357        cio.send(&resp)?;
358
359        self.state = ClientState::PendingRegistration;
360        self.current_tick = 0;
361
362        Ok(())
363    }
364
365    /// Process messages while in a `ClientState::PendingRegistration state`
366    fn pending_registration<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
367        let msg = cio.recv()?;
368        let msg = match msg {
369            Some(msg) => msg,
370            None => {
371                self.current_tick = self.current_tick.saturating_add(1);
372                return Ok(());
373            }
374        };
375
376        if let Arbitrator::Control(AControl { seq, response }) = msg {
377            if seq != self.ctr {
378                self.current_tick = self.current_tick.saturating_add(1);
379                // TODO, restart connection process? Just disregard?
380                Err(Error::UnexpectedMessage)
381            } else if let Ok(ControlResponse::ComponentRegistration(uuid)) = response {
382                self.uuid = uuid;
383                self.state = ClientState::Registered;
384                self.current_tick = 0;
385                Ok(())
386            } else {
387                self.current_tick = self.current_tick.saturating_add(1);
388                // TODO, restart connection process? Just disregard?
389                Err(Error::UnexpectedMessage)
390            }
391        } else {
392            self.current_tick = self.current_tick.saturating_add(1);
393            Ok(())
394        }
395    }
396
397    /// Process messages while in a `ClientState::Registered` state
398    fn registered<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
399        if self.sub_paths.is_empty() {
400            self.state = ClientState::Subscribed;
401            self.current_tick = 0;
402        } else {
403            let msg = Component::PubSub(PubSub {
404                path: PubSubPath::Long(Path::borrow_from_str(self.sub_paths[0])),
405                ty: PubSubType::Sub,
406            });
407
408            cio.send(&msg)?;
409
410            self.state = ClientState::Subscribing;
411            self.current_idx = 0;
412            self.current_tick = 0;
413        }
414
415        Ok(())
416    }
417
418    /// Process messages while in a `ClientState::Subscribing` state
419    fn subscribing<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
420        let msg = cio.recv()?;
421        let msg = match msg {
422            Some(msg) => msg,
423            None => {
424                self.current_tick = self.current_tick.saturating_add(1);
425                return Ok(());
426            }
427        };
428
429        if let Arbitrator::PubSub(Ok(PubSubResponse::SubAck {
430            path: PubSubPath::Long(pth),
431        })) = msg
432        {
433            if pth.as_str() == self.sub_paths[self.current_idx] {
434                self.current_idx += 1;
435                if self.current_idx >= self.sub_paths.len() {
436                    self.state = ClientState::Subscribed;
437                    self.current_tick = 0;
438                } else {
439                    let msg = Component::PubSub(PubSub {
440                        path: PubSubPath::Long(Path::borrow_from_str(
441                            self.sub_paths[self.current_idx],
442                        )),
443                        ty: PubSubType::Sub,
444                    });
445
446                    cio.send(&msg)?;
447
448                    self.state = ClientState::Subscribing;
449                    self.current_tick = 0;
450                }
451            } else {
452                self.current_tick = self.current_tick.saturating_add(1);
453            }
454        } else {
455            self.current_tick = self.current_tick.saturating_add(1);
456        }
457
458        Ok(())
459    }
460
461    /// Process messages while in a `ClientState::Subscribed` state
462    fn subscribed<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
463        match (self.sub_paths.len(), self.pub_short_paths.len()) {
464            (0, 0) => {
465                self.state = ClientState::Active;
466                self.current_tick = 0;
467            }
468            (0, _n) => {
469                self.ctr = self.ctr.wrapping_add(1);
470                let msg = Component::Control(CControl {
471                    seq: self.ctr,
472                    ty: ControlType::RegisterPubSubShortId(PubSubShort {
473                        long_name: self.pub_short_paths[0],
474                        short_id: PUBLISH_SHORTCODE_OFFSET,
475                    }),
476                });
477
478                cio.send(&msg)?;
479
480                self.state = ClientState::ShortCodingPub;
481                self.current_tick = 0;
482                self.current_idx = 0;
483            }
484            (_n, _) => {
485                // TODO: This doesn't handle the case when the subscribe shortcode is
486                // a wildcard, which the broker will reject
487                self.ctr = self.ctr.wrapping_add(1);
488                let msg = Component::Control(CControl {
489                    seq: self.ctr,
490                    ty: ControlType::RegisterPubSubShortId(PubSubShort {
491                        long_name: self.sub_paths[0],
492                        short_id: 0x0000,
493                    }),
494                });
495
496                cio.send(&msg)?;
497
498                self.state = ClientState::ShortCodingSub;
499                self.current_tick = 0;
500                self.current_idx = 0;
501            }
502        }
503        Ok(())
504    }
505
506    /// Process messages while in a `ClientState::ShortcodingSub` state
507    fn shortcoding_sub<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
508        let msg = cio.recv()?;
509        let msg = match msg {
510            Some(msg) => msg,
511            None => {
512                self.current_tick = self.current_tick.saturating_add(1);
513                return Ok(());
514            }
515        };
516
517        if let Arbitrator::Control(AControl {
518            seq,
519            response: Ok(ControlResponse::PubSubShortRegistration(sid)),
520        }) = msg
521        {
522            if seq == self.ctr && sid == (self.current_idx as u16) {
523                self.current_idx += 1;
524
525                if self.current_idx >= self.sub_paths.len() {
526                    if self.pub_short_paths.is_empty() {
527                        self.state = ClientState::Active;
528                        self.current_tick = 0;
529                    } else {
530                        self.ctr = self.ctr.wrapping_add(1);
531
532                        let msg = Component::Control(CControl {
533                            seq: self.ctr,
534                            ty: ControlType::RegisterPubSubShortId(PubSubShort {
535                                long_name: self.pub_short_paths[0],
536                                short_id: PUBLISH_SHORTCODE_OFFSET,
537                            }),
538                        });
539
540                        cio.send(&msg)?;
541
542                        self.current_tick = 0;
543                        self.current_idx = 0;
544                        self.state = ClientState::ShortCodingPub;
545                    }
546                } else {
547                    self.ctr = self.ctr.wrapping_add(1);
548
549                    // TODO: This doesn't handle subscriptions with wildcards
550                    let msg = Component::Control(CControl {
551                        seq: self.ctr,
552                        ty: ControlType::RegisterPubSubShortId(PubSubShort {
553                            long_name: self.sub_paths[self.current_idx],
554                            short_id: self.current_idx as u16,
555                        }),
556                    });
557
558                    cio.send(&msg)?;
559
560                    self.current_tick = 0;
561                }
562            } else {
563                self.current_tick = self.current_tick.saturating_add(1);
564            }
565        } else {
566            self.current_tick = self.current_tick.saturating_add(1);
567        }
568
569        Ok(())
570    }
571
572    /// Process messages while in a `ClientState::ShortcodingPub` state
573    fn shortcoding_pub<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
574        let msg = cio.recv()?;
575        let msg = match msg {
576            Some(msg) => msg,
577            None => {
578                self.current_tick = self.current_tick.saturating_add(1);
579                return Ok(());
580            }
581        };
582
583        if let Arbitrator::Control(AControl {
584            seq,
585            response: Ok(ControlResponse::PubSubShortRegistration(sid)),
586        }) = msg
587        {
588            if seq == self.ctr && sid == ((self.current_idx as u16) | PUBLISH_SHORTCODE_OFFSET) {
589                self.current_idx += 1;
590
591                if self.current_idx >= self.pub_short_paths.len() {
592                    self.state = ClientState::Active;
593                    self.current_tick = 0;
594                } else {
595                    self.ctr = self.ctr.wrapping_add(1);
596
597                    let msg = Component::Control(CControl {
598                        seq: self.ctr,
599                        ty: ControlType::RegisterPubSubShortId(PubSubShort {
600                            long_name: self.pub_short_paths[self.current_idx],
601                            short_id: ((self.current_idx as u16) | PUBLISH_SHORTCODE_OFFSET),
602                        }),
603                    });
604
605                    cio.send(&msg)?;
606
607                    self.current_tick = 0;
608                }
609            } else {
610                self.current_tick = self.current_tick.saturating_add(1);
611            }
612        } else {
613            self.current_tick = self.current_tick.saturating_add(1);
614        }
615
616        Ok(())
617    }
618
619    /// Process messages while in a Connected state
620    fn active<C: ClientIo, T: Table>(&mut self, cio: &mut C) -> Result<Option<RecvMsg<T>>, Error> {
621        let msg = cio.recv()?;
622        let pubsub = match msg {
623            Some(Arbitrator::PubSub(Ok(PubSubResponse::SubMsg(ref ps)))) => ps,
624            Some(_) => {
625                // TODO: Maybe something else? return err?
626                return Ok(None);
627            }
628            None => {
629                return Ok(None);
630            }
631        };
632
633        // Determine the path
634        let path = match &pubsub.path {
635            PubSubPath::Short(sid) => Path::Borrow(
636                *self
637                    .sub_paths
638                    .get(*sid as usize)
639                    .ok_or(Error::UnexpectedMessage)?,
640            ),
641            PubSubPath::Long(ms) => ms.try_to_owned().map_err(|_| Error::UnexpectedMessage)?,
642        };
643
644        Ok(Some(RecvMsg {
645            path,
646            payload: T::from_pub_sub(pubsub).map_err(|_| Error::UnexpectedMessage)?,
647        }))
648    }
649}