1use {
7 crate::{client_io::ClientIo, table::Table, Error, RecvMsg},
8 anachro_icd::{
9 self,
10 arbitrator::{Arbitrator, Control as AControl, ControlResponse, PubSubResponse},
11 component::{
12 Component, ComponentInfo, Control as CControl, ControlType, PubSub, PubSubShort,
13 PubSubType,
14 },
15 ManagedString, Name, Path, PubSubPath, Uuid, Version,
16 },
17};
18
19pub const PUBLISH_SHORTCODE_OFFSET: u16 = 0x8000;
26
27#[derive(Debug)]
28enum ClientState {
29 Disconnected,
30 PendingRegistration,
31 Registered,
32 Subscribing,
33 Subscribed,
34 ShortCodingSub,
35 ShortCodingPub,
36 Active,
37}
38
39impl ClientState {
40 pub(crate) fn as_active(&self) -> Result<(), Error> {
41 match self {
42 ClientState::Active => Ok(()),
43 _ => Err(Error::NotActive),
44 }
45 }
46}
47
48pub struct Client {
53 state: ClientState,
54 name: Name<'static>,
56 version: Version,
57 ctr: u16,
58 sub_paths: &'static [&'static str],
59 pub_short_paths: &'static [&'static str],
60 timeout_ticks: Option<u8>,
61 uuid: Uuid,
62 current_tick: u8,
63 current_idx: usize,
64}
65
66impl Client {
67 pub fn new(
113 name: &str,
114 version: Version,
115 ctr_init: u16,
116 sub_paths: &'static [&'static str],
117 pub_short_paths: &'static [&'static str],
118 timeout_ticks: Option<u8>,
119 ) -> Self {
120 Self {
121 name: Name::try_from_str(name).unwrap(),
122 version,
123 ctr: ctr_init,
124 state: ClientState::Disconnected,
125 sub_paths,
126 pub_short_paths,
127 timeout_ticks,
128 uuid: Uuid::from_bytes([0u8; 16]),
129 current_tick: 0,
130 current_idx: 0,
131 }
132 }
133
134 pub fn reset_connection(&mut self) {
140 self.state = ClientState::Disconnected;
141 self.current_tick = 0;
142 self.current_idx = 0;
143 }
144
145 pub fn get_id(&self) -> Option<&Uuid> {
149 if self.is_connected() {
150 Some(&self.uuid)
151 } else {
152 None
153 }
154 }
155
156 pub fn is_connected(&self) -> bool {
158 self.state.as_active().is_ok()
159 }
160
161 pub fn publish<'a, 'b: 'a, C: ClientIo>(
183 &'b self,
184 cio: &mut C,
185 path: &'a str,
186 payload: &'a [u8],
187 ) -> Result<(), Error> {
188 self.state.as_active()?;
189
190 let path = match self.pub_short_paths.iter().position(|pth| &path == pth) {
191 Some(short) => PubSubPath::Short((short as u16) | PUBLISH_SHORTCODE_OFFSET),
192 None => PubSubPath::Long(ManagedString::Borrow(path)),
193 };
194
195 let msg = Component::PubSub(PubSub {
196 path,
197 ty: PubSubType::Pub { payload },
198 });
199
200 cio.send(&msg)?;
201
202 Ok(())
203 }
204
205 pub fn process_one<C: ClientIo, T: Table>(
219 &mut self,
220 cio: &mut C,
221 ) -> Result<Option<RecvMsg<T>>, Error> {
222 let mut response: Option<RecvMsg<T>> = None;
223
224 match &mut self.state {
225 ClientState::Disconnected => {
229 self.disconnected(cio)?;
230 }
231
232 ClientState::PendingRegistration => {
236 self.pending_registration(cio)?;
237
238 if self.timeout_violated() {
239 self.state = ClientState::Disconnected;
240 self.current_tick = 0;
241 }
242 }
243
244 ClientState::Registered => {
248 self.registered(cio)?;
249 }
250
251 ClientState::Subscribing => {
255 self.subscribing(cio)?;
256
257 if self.timeout_violated() {
258 let msg = Component::PubSub(PubSub {
259 path: PubSubPath::Long(Path::borrow_from_str(
260 self.sub_paths[self.current_idx],
261 )),
262 ty: PubSubType::Sub,
263 });
264
265 cio.send(&msg)?;
266
267 self.current_tick = 0;
268 }
269 }
270
271 ClientState::Subscribed => {
275 self.subscribed(cio)?;
276 }
277
278 ClientState::ShortCodingSub => {
282 self.shortcoding_sub(cio)?;
283
284 if self.timeout_violated() {
285 self.ctr = self.ctr.wrapping_add(1);
286
287 let msg = Component::Control(CControl {
288 seq: self.ctr,
289 ty: ControlType::RegisterPubSubShortId(PubSubShort {
290 long_name: self.sub_paths[self.current_idx],
291 short_id: self.current_idx as u16,
292 }),
293 });
294
295 cio.send(&msg)?;
296
297 self.current_tick = 0;
298 }
299 }
300
301 ClientState::ShortCodingPub => {
302 self.shortcoding_pub(cio)?;
303
304 if self.timeout_violated() {
305 self.ctr = self.ctr.wrapping_add(1);
306
307 let msg = Component::Control(CControl {
308 seq: self.ctr,
309 ty: ControlType::RegisterPubSubShortId(PubSubShort {
310 long_name: self.pub_short_paths[self.current_idx],
311 short_id: (self.current_idx as u16) | PUBLISH_SHORTCODE_OFFSET,
312 }),
313 });
314
315 cio.send(&msg)?;
316
317 self.current_tick = 0;
318 }
319 }
320
321 ClientState::Active => {
325 response = self.active(cio)?;
326 }
327 };
328
329 Ok(response)
330 }
331}
332
333impl Client {
336 fn timeout_violated(&self) -> bool {
338 match self.timeout_ticks {
339 Some(ticks) if ticks <= self.current_tick => true,
340 Some(_) => false,
341 None => false,
342 }
343 }
344
345 fn disconnected<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
347 self.ctr += 1;
348
349 let resp = Component::Control(CControl {
350 seq: self.ctr,
351 ty: ControlType::RegisterComponent(ComponentInfo {
352 name: self.name.as_borrowed(),
353 version: self.version,
354 }),
355 });
356
357 cio.send(&resp)?;
358
359 self.state = ClientState::PendingRegistration;
360 self.current_tick = 0;
361
362 Ok(())
363 }
364
365 fn pending_registration<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
367 let msg = cio.recv()?;
368 let msg = match msg {
369 Some(msg) => msg,
370 None => {
371 self.current_tick = self.current_tick.saturating_add(1);
372 return Ok(());
373 }
374 };
375
376 if let Arbitrator::Control(AControl { seq, response }) = msg {
377 if seq != self.ctr {
378 self.current_tick = self.current_tick.saturating_add(1);
379 Err(Error::UnexpectedMessage)
381 } else if let Ok(ControlResponse::ComponentRegistration(uuid)) = response {
382 self.uuid = uuid;
383 self.state = ClientState::Registered;
384 self.current_tick = 0;
385 Ok(())
386 } else {
387 self.current_tick = self.current_tick.saturating_add(1);
388 Err(Error::UnexpectedMessage)
390 }
391 } else {
392 self.current_tick = self.current_tick.saturating_add(1);
393 Ok(())
394 }
395 }
396
397 fn registered<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
399 if self.sub_paths.is_empty() {
400 self.state = ClientState::Subscribed;
401 self.current_tick = 0;
402 } else {
403 let msg = Component::PubSub(PubSub {
404 path: PubSubPath::Long(Path::borrow_from_str(self.sub_paths[0])),
405 ty: PubSubType::Sub,
406 });
407
408 cio.send(&msg)?;
409
410 self.state = ClientState::Subscribing;
411 self.current_idx = 0;
412 self.current_tick = 0;
413 }
414
415 Ok(())
416 }
417
418 fn subscribing<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
420 let msg = cio.recv()?;
421 let msg = match msg {
422 Some(msg) => msg,
423 None => {
424 self.current_tick = self.current_tick.saturating_add(1);
425 return Ok(());
426 }
427 };
428
429 if let Arbitrator::PubSub(Ok(PubSubResponse::SubAck {
430 path: PubSubPath::Long(pth),
431 })) = msg
432 {
433 if pth.as_str() == self.sub_paths[self.current_idx] {
434 self.current_idx += 1;
435 if self.current_idx >= self.sub_paths.len() {
436 self.state = ClientState::Subscribed;
437 self.current_tick = 0;
438 } else {
439 let msg = Component::PubSub(PubSub {
440 path: PubSubPath::Long(Path::borrow_from_str(
441 self.sub_paths[self.current_idx],
442 )),
443 ty: PubSubType::Sub,
444 });
445
446 cio.send(&msg)?;
447
448 self.state = ClientState::Subscribing;
449 self.current_tick = 0;
450 }
451 } else {
452 self.current_tick = self.current_tick.saturating_add(1);
453 }
454 } else {
455 self.current_tick = self.current_tick.saturating_add(1);
456 }
457
458 Ok(())
459 }
460
461 fn subscribed<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
463 match (self.sub_paths.len(), self.pub_short_paths.len()) {
464 (0, 0) => {
465 self.state = ClientState::Active;
466 self.current_tick = 0;
467 }
468 (0, _n) => {
469 self.ctr = self.ctr.wrapping_add(1);
470 let msg = Component::Control(CControl {
471 seq: self.ctr,
472 ty: ControlType::RegisterPubSubShortId(PubSubShort {
473 long_name: self.pub_short_paths[0],
474 short_id: PUBLISH_SHORTCODE_OFFSET,
475 }),
476 });
477
478 cio.send(&msg)?;
479
480 self.state = ClientState::ShortCodingPub;
481 self.current_tick = 0;
482 self.current_idx = 0;
483 }
484 (_n, _) => {
485 self.ctr = self.ctr.wrapping_add(1);
488 let msg = Component::Control(CControl {
489 seq: self.ctr,
490 ty: ControlType::RegisterPubSubShortId(PubSubShort {
491 long_name: self.sub_paths[0],
492 short_id: 0x0000,
493 }),
494 });
495
496 cio.send(&msg)?;
497
498 self.state = ClientState::ShortCodingSub;
499 self.current_tick = 0;
500 self.current_idx = 0;
501 }
502 }
503 Ok(())
504 }
505
506 fn shortcoding_sub<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
508 let msg = cio.recv()?;
509 let msg = match msg {
510 Some(msg) => msg,
511 None => {
512 self.current_tick = self.current_tick.saturating_add(1);
513 return Ok(());
514 }
515 };
516
517 if let Arbitrator::Control(AControl {
518 seq,
519 response: Ok(ControlResponse::PubSubShortRegistration(sid)),
520 }) = msg
521 {
522 if seq == self.ctr && sid == (self.current_idx as u16) {
523 self.current_idx += 1;
524
525 if self.current_idx >= self.sub_paths.len() {
526 if self.pub_short_paths.is_empty() {
527 self.state = ClientState::Active;
528 self.current_tick = 0;
529 } else {
530 self.ctr = self.ctr.wrapping_add(1);
531
532 let msg = Component::Control(CControl {
533 seq: self.ctr,
534 ty: ControlType::RegisterPubSubShortId(PubSubShort {
535 long_name: self.pub_short_paths[0],
536 short_id: PUBLISH_SHORTCODE_OFFSET,
537 }),
538 });
539
540 cio.send(&msg)?;
541
542 self.current_tick = 0;
543 self.current_idx = 0;
544 self.state = ClientState::ShortCodingPub;
545 }
546 } else {
547 self.ctr = self.ctr.wrapping_add(1);
548
549 let msg = Component::Control(CControl {
551 seq: self.ctr,
552 ty: ControlType::RegisterPubSubShortId(PubSubShort {
553 long_name: self.sub_paths[self.current_idx],
554 short_id: self.current_idx as u16,
555 }),
556 });
557
558 cio.send(&msg)?;
559
560 self.current_tick = 0;
561 }
562 } else {
563 self.current_tick = self.current_tick.saturating_add(1);
564 }
565 } else {
566 self.current_tick = self.current_tick.saturating_add(1);
567 }
568
569 Ok(())
570 }
571
572 fn shortcoding_pub<C: ClientIo>(&mut self, cio: &mut C) -> Result<(), Error> {
574 let msg = cio.recv()?;
575 let msg = match msg {
576 Some(msg) => msg,
577 None => {
578 self.current_tick = self.current_tick.saturating_add(1);
579 return Ok(());
580 }
581 };
582
583 if let Arbitrator::Control(AControl {
584 seq,
585 response: Ok(ControlResponse::PubSubShortRegistration(sid)),
586 }) = msg
587 {
588 if seq == self.ctr && sid == ((self.current_idx as u16) | PUBLISH_SHORTCODE_OFFSET) {
589 self.current_idx += 1;
590
591 if self.current_idx >= self.pub_short_paths.len() {
592 self.state = ClientState::Active;
593 self.current_tick = 0;
594 } else {
595 self.ctr = self.ctr.wrapping_add(1);
596
597 let msg = Component::Control(CControl {
598 seq: self.ctr,
599 ty: ControlType::RegisterPubSubShortId(PubSubShort {
600 long_name: self.pub_short_paths[self.current_idx],
601 short_id: ((self.current_idx as u16) | PUBLISH_SHORTCODE_OFFSET),
602 }),
603 });
604
605 cio.send(&msg)?;
606
607 self.current_tick = 0;
608 }
609 } else {
610 self.current_tick = self.current_tick.saturating_add(1);
611 }
612 } else {
613 self.current_tick = self.current_tick.saturating_add(1);
614 }
615
616 Ok(())
617 }
618
619 fn active<C: ClientIo, T: Table>(&mut self, cio: &mut C) -> Result<Option<RecvMsg<T>>, Error> {
621 let msg = cio.recv()?;
622 let pubsub = match msg {
623 Some(Arbitrator::PubSub(Ok(PubSubResponse::SubMsg(ref ps)))) => ps,
624 Some(_) => {
625 return Ok(None);
627 }
628 None => {
629 return Ok(None);
630 }
631 };
632
633 let path = match &pubsub.path {
635 PubSubPath::Short(sid) => Path::Borrow(
636 *self
637 .sub_paths
638 .get(*sid as usize)
639 .ok_or(Error::UnexpectedMessage)?,
640 ),
641 PubSubPath::Long(ms) => ms.try_to_owned().map_err(|_| Error::UnexpectedMessage)?,
642 };
643
644 Ok(Some(RecvMsg {
645 path,
646 payload: T::from_pub_sub(pubsub).map_err(|_| Error::UnexpectedMessage)?,
647 }))
648 }
649}