Skip to main content

selium_switchboard/
messaging.rs

1//! Messaging helpers built on the switchboard.
2
3use std::{
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll, ready},
7};
8
9use futures::future::poll_fn;
10use futures::{Sink, SinkExt, Stream, StreamExt};
11use pin_project::pin_project;
12use selium_userland::{
13    encoding::{FlatMsg, HasSchema},
14    io::ChannelHandle,
15};
16
17use crate::switchboard::{
18    Backpressure, Cardinality, EndpointHandle, EndpointId, RawPublisher, Switchboard,
19    SwitchboardError,
20};
21
22/// Target that a [`Client`] can connect to for a given request/response pair.
23pub trait ClientTarget<Req, Rep> {
24    /// Return the target endpoint identifier.
25    fn endpoint_id(&self) -> EndpointId;
26}
27
28/// Targets that a [`Client`] can connect to for a given request/response pair.
29pub trait ClientTargets<Req, Rep> {
30    /// Return the target endpoint identifiers.
31    fn endpoint_ids(&self) -> Vec<EndpointId>;
32}
33
34/// Target that a [`Subscriber`] can connect to for a given payload type.
35pub trait SubscriberTarget<Req> {
36    /// Return the target endpoint identifier.
37    fn endpoint_id(&self) -> EndpointId;
38}
39
40/// Targets that a [`Subscriber`] can connect to for a given payload type.
41pub trait SubscriberTargets<Req> {
42    /// Return the target endpoint identifiers.
43    fn endpoint_ids(&self) -> Vec<EndpointId>;
44}
45
46/// Target that a [`Publisher`] or [`Fanout`] can connect to for a given payload type.
47pub trait PublisherTarget<Rep> {
48    /// Return the target endpoint identifier.
49    fn endpoint_id(&self) -> EndpointId;
50}
51
52/// Targets that a [`Publisher`] or [`Fanout`] can connect to for a given payload type.
53pub trait PublisherTargets<Rep> {
54    /// Return the target endpoint identifiers.
55    fn endpoint_ids(&self) -> Vec<EndpointId>;
56}
57
58/// Target that a [`Server`] can connect to for a given request/response pair.
59pub trait ServerTarget<Req, Rep> {
60    /// Return the target endpoint identifier.
61    fn endpoint_id(&self) -> EndpointId;
62}
63
64/// Targets that a [`Server`] can connect to for a given request/response pair.
65pub trait ServerTargets<Req, Rep> {
66    /// Return the target endpoint identifiers.
67    fn endpoint_ids(&self) -> Vec<EndpointId>;
68}
69
70/// Disseminates data to zero or more [`Subscriber`]s.
71#[pin_project(project = PublisherProject)]
72pub struct Publisher<Rep> {
73    endpoint: EndpointHandle<(), Rep>,
74}
75
76/// Receives data from a single [`Publisher`].
77#[pin_project(project = SubscriberProject)]
78pub struct Subscriber<Req> {
79    endpoint: EndpointHandle<Req, ()>,
80}
81
82/// Balances messages between one or more [`Subscriber`]s.
83#[pin_project(project = FanoutProject)]
84pub struct Fanout<Rep> {
85    endpoint: EndpointHandle<(), Rep>,
86    next: usize,
87}
88
89/// Replies to queries sent by [`Client`]s.
90#[pin_project(project = ServerProject)]
91pub struct Server<Req, Rep> {
92    endpoint: EndpointHandle<Req, Rep>,
93}
94
95/// Sends queries to [`Server`]s.
96#[pin_project(project = ClientProject)]
97pub struct Client<Req, Rep> {
98    endpoint: EndpointHandle<Rep, Req>,
99}
100
101/// Request wrapper that includes an opaque responder bound to the origin endpoint.
102pub struct RequestCtx<Req, Rep> {
103    request: Req,
104    responder: Responder<Rep>,
105}
106
107/// Opaque reply handle that targets the originating requester.
108pub struct Responder<Rep> {
109    handle: ChannelHandle,
110    _marker: PhantomData<Rep>,
111}
112
113impl<Req, Rep> ClientTarget<Req, Rep> for EndpointId {
114    fn endpoint_id(&self) -> EndpointId {
115        *self
116    }
117}
118
119impl<Req, Rep> ClientTargets<Req, Rep> for EndpointId {
120    fn endpoint_ids(&self) -> Vec<EndpointId> {
121        vec![*self]
122    }
123}
124
125impl<Req, Rep, T> ClientTargets<Req, Rep> for Vec<T>
126where
127    T: ClientTarget<Req, Rep>,
128{
129    fn endpoint_ids(&self) -> Vec<EndpointId> {
130        self.iter().map(|target| target.endpoint_id()).collect()
131    }
132}
133
134impl<Req, Rep, T> ClientTargets<Req, Rep> for &[T]
135where
136    T: ClientTarget<Req, Rep>,
137{
138    fn endpoint_ids(&self) -> Vec<EndpointId> {
139        self.iter().map(|target| target.endpoint_id()).collect()
140    }
141}
142
143impl<Req, Rep> ClientTarget<Req, Rep> for &Server<Req, Rep> {
144    fn endpoint_id(&self) -> EndpointId {
145        Server::endpoint_id(*self)
146    }
147}
148
149impl<Req, Rep> ClientTargets<Req, Rep> for &Server<Req, Rep> {
150    fn endpoint_ids(&self) -> Vec<EndpointId> {
151        vec![self.endpoint_id()]
152    }
153}
154
155impl<Req> SubscriberTarget<Req> for EndpointId {
156    fn endpoint_id(&self) -> EndpointId {
157        *self
158    }
159}
160
161impl<Req> SubscriberTargets<Req> for EndpointId {
162    fn endpoint_ids(&self) -> Vec<EndpointId> {
163        vec![*self]
164    }
165}
166
167impl<Req, T> SubscriberTargets<Req> for Vec<T>
168where
169    T: SubscriberTarget<Req>,
170{
171    fn endpoint_ids(&self) -> Vec<EndpointId> {
172        self.iter().map(|target| target.endpoint_id()).collect()
173    }
174}
175
176impl<Req, T> SubscriberTargets<Req> for &[T]
177where
178    T: SubscriberTarget<Req>,
179{
180    fn endpoint_ids(&self) -> Vec<EndpointId> {
181        self.iter().map(|target| target.endpoint_id()).collect()
182    }
183}
184
185impl<Req> SubscriberTarget<Req> for &Publisher<Req> {
186    fn endpoint_id(&self) -> EndpointId {
187        Publisher::endpoint_id(*self)
188    }
189}
190
191impl<Req> SubscriberTargets<Req> for &Publisher<Req> {
192    fn endpoint_ids(&self) -> Vec<EndpointId> {
193        vec![self.endpoint_id()]
194    }
195}
196
197impl<Req> SubscriberTarget<Req> for &Fanout<Req> {
198    fn endpoint_id(&self) -> EndpointId {
199        Fanout::endpoint_id(*self)
200    }
201}
202
203impl<Req> SubscriberTargets<Req> for &Fanout<Req> {
204    fn endpoint_ids(&self) -> Vec<EndpointId> {
205        vec![self.endpoint_id()]
206    }
207}
208
209impl<Rep> PublisherTarget<Rep> for EndpointId {
210    fn endpoint_id(&self) -> EndpointId {
211        *self
212    }
213}
214
215impl<Rep> PublisherTargets<Rep> for EndpointId {
216    fn endpoint_ids(&self) -> Vec<EndpointId> {
217        vec![*self]
218    }
219}
220
221impl<Rep, T> PublisherTargets<Rep> for Vec<T>
222where
223    T: PublisherTarget<Rep>,
224{
225    fn endpoint_ids(&self) -> Vec<EndpointId> {
226        self.iter().map(|target| target.endpoint_id()).collect()
227    }
228}
229
230impl<Rep, T> PublisherTargets<Rep> for &[T]
231where
232    T: PublisherTarget<Rep>,
233{
234    fn endpoint_ids(&self) -> Vec<EndpointId> {
235        self.iter().map(|target| target.endpoint_id()).collect()
236    }
237}
238
239impl<Rep> PublisherTarget<Rep> for &Subscriber<Rep> {
240    fn endpoint_id(&self) -> EndpointId {
241        Subscriber::endpoint_id(*self)
242    }
243}
244
245impl<Rep> PublisherTargets<Rep> for &Subscriber<Rep> {
246    fn endpoint_ids(&self) -> Vec<EndpointId> {
247        vec![self.endpoint_id()]
248    }
249}
250
251impl<Req, Rep> ServerTarget<Req, Rep> for EndpointId {
252    fn endpoint_id(&self) -> EndpointId {
253        *self
254    }
255}
256
257impl<Req, Rep> ServerTargets<Req, Rep> for EndpointId {
258    fn endpoint_ids(&self) -> Vec<EndpointId> {
259        vec![*self]
260    }
261}
262
263impl<Req, Rep, T> ServerTargets<Req, Rep> for Vec<T>
264where
265    T: ServerTarget<Req, Rep>,
266{
267    fn endpoint_ids(&self) -> Vec<EndpointId> {
268        self.iter().map(|target| target.endpoint_id()).collect()
269    }
270}
271
272impl<Req, Rep, T> ServerTargets<Req, Rep> for &[T]
273where
274    T: ServerTarget<Req, Rep>,
275{
276    fn endpoint_ids(&self) -> Vec<EndpointId> {
277        self.iter().map(|target| target.endpoint_id()).collect()
278    }
279}
280
281impl<Req, Rep> ServerTarget<Req, Rep> for &Client<Req, Rep> {
282    fn endpoint_id(&self) -> EndpointId {
283        Client::endpoint_id(*self)
284    }
285}
286
287impl<Req, Rep> ServerTargets<Req, Rep> for &Client<Req, Rep> {
288    fn endpoint_ids(&self) -> Vec<EndpointId> {
289        vec![self.endpoint_id()]
290    }
291}
292
293impl<Rep> Clone for Responder<Rep> {
294    fn clone(&self) -> Self {
295        Self {
296            handle: self.handle,
297            _marker: PhantomData,
298        }
299    }
300}
301
302impl<Rep> Publisher<Rep> {
303    /// Return the switchboard endpoint identifier.
304    pub fn endpoint_id(&self) -> EndpointId {
305        self.endpoint.get_id()
306    }
307}
308
309impl<Rep> Publisher<Rep>
310where
311    Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
312{
313    /// Create a new publisher endpoint.
314    pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
315        let endpoint = switchboard
316            .endpoint()
317            .inputs(Cardinality::Zero)
318            .outputs(Cardinality::One)
319            .register()
320            .await?;
321        Ok(Self { endpoint })
322    }
323
324    /// Create a new publisher endpoint with the supplied backpressure behaviour.
325    pub async fn create_with_backpressure(
326        switchboard: &Switchboard,
327        backpressure: Backpressure,
328    ) -> Result<Self, SwitchboardError> {
329        let endpoint = switchboard
330            .endpoint()
331            .inputs(Cardinality::Zero)
332            .outputs(Cardinality::One)
333            .output_backpressure(backpressure)
334            .register()
335            .await?;
336        Ok(Self { endpoint })
337    }
338
339    /// Connect this publisher to the supplied target(s), wiring outbound flows.
340    pub async fn connect<T>(
341        &self,
342        switchboard: &Switchboard,
343        target: T,
344    ) -> Result<(), SwitchboardError>
345    where
346        T: PublisherTargets<Rep>,
347    {
348        for endpoint_id in target.endpoint_ids() {
349            switchboard
350                .connect_ids(self.endpoint_id(), endpoint_id)
351                .await?;
352        }
353
354        Ok(())
355    }
356
357    /// Wait for outbound wiring to be established.
358    pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
359        poll_fn(|cx| {
360            self.endpoint.poll_updates(cx)?;
361            if self.endpoint.io.outbound.is_empty() {
362                match self.endpoint.backpressure() {
363                    Backpressure::Park => Poll::Pending,
364                    Backpressure::Drop => Poll::Ready(Ok(())),
365                }
366            } else {
367                Poll::Ready(Ok(()))
368            }
369        })
370        .await
371    }
372}
373
374impl<Rep> Sink<Rep> for Publisher<Rep>
375where
376    Rep: FlatMsg + Send + Unpin + 'static,
377{
378    type Error = SwitchboardError;
379
380    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
381        let PublisherProject { endpoint } = self.project();
382
383        endpoint.poll_updates(cx)?;
384
385        if endpoint.io.outbound.is_empty() {
386            return match endpoint.backpressure() {
387                Backpressure::Park => Poll::Pending,
388                Backpressure::Drop => Poll::Ready(Ok(())),
389            };
390        }
391
392        debug_assert_eq!(endpoint.io.outbound.len(), 1);
393
394        let mut publ = Pin::new(&mut endpoint.io.outbound[0]);
395        match publ.as_mut().poll_ready(cx) {
396            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
397            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
398            Poll::Pending => Poll::Pending,
399        }
400    }
401
402    fn start_send(self: Pin<&mut Self>, item: Rep) -> Result<(), Self::Error> {
403        let PublisherProject { endpoint } = self.project();
404
405        if endpoint.io.outbound.is_empty() {
406            return match endpoint.backpressure() {
407                Backpressure::Park => Err(SwitchboardError::NoRoute),
408                Backpressure::Drop => Ok(()),
409            };
410        }
411
412        debug_assert_eq!(endpoint.io.outbound.len(), 1);
413
414        let mut publ = Pin::new(&mut endpoint.io.outbound[0]);
415        publ.as_mut().start_send(item)?;
416
417        Ok(())
418    }
419
420    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
421        let PublisherProject { endpoint } = self.project();
422
423        endpoint.poll_updates(cx)?;
424
425        if endpoint.io.outbound.is_empty() {
426            return match endpoint.backpressure() {
427                Backpressure::Park => Poll::Pending,
428                Backpressure::Drop => Poll::Ready(Ok(())),
429            };
430        }
431
432        debug_assert_eq!(endpoint.io.outbound.len(), 1);
433
434        ready!(
435            Pin::new(&mut endpoint.io.outbound[0])
436                .as_mut()
437                .poll_flush(cx)?
438        );
439
440        Poll::Ready(Ok(()))
441    }
442
443    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
444        self.poll_flush(cx)
445    }
446}
447
448impl<Req> Subscriber<Req> {
449    /// Return the switchboard endpoint identifier.
450    pub fn endpoint_id(&self) -> EndpointId {
451        self.endpoint.get_id()
452    }
453}
454
455impl<Req> Subscriber<Req>
456where
457    Req: FlatMsg + HasSchema + Send + Unpin + 'static,
458{
459    /// Create a new subscriber endpoint.
460    pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
461        let endpoint = switchboard
462            .endpoint()
463            .inputs(Cardinality::One)
464            .outputs(Cardinality::Zero)
465            .register()
466            .await?;
467        Ok(Self { endpoint })
468    }
469
470    /// Connect this subscriber to the supplied target(s), wiring inbound flows.
471    pub async fn connect<T>(
472        &self,
473        switchboard: &Switchboard,
474        source: T,
475    ) -> Result<(), SwitchboardError>
476    where
477        T: SubscriberTargets<Req>,
478    {
479        for endpoint_id in source.endpoint_ids() {
480            switchboard
481                .connect_ids(endpoint_id, self.endpoint_id())
482                .await?;
483        }
484
485        Ok(())
486    }
487
488    /// Wait for inbound wiring to be established.
489    pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
490        poll_fn(|cx| {
491            self.endpoint.poll_updates(cx)?;
492            if self.endpoint.io.inbound.is_empty() {
493                Poll::Pending
494            } else {
495                Poll::Ready(Ok(()))
496            }
497        })
498        .await
499    }
500}
501
502impl<Req> Stream for Subscriber<Req>
503where
504    Req: FlatMsg + Send + Unpin + HasSchema + 'static,
505{
506    type Item = Result<Req, SwitchboardError>;
507
508    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
509        let SubscriberProject { endpoint } = self.project();
510
511        endpoint.poll_updates(cx)?;
512
513        if endpoint.io.inbound.is_empty() {
514            return Poll::Pending;
515        }
516
517        debug_assert_eq!(endpoint.io.inbound.len(), 1);
518
519        let mut sub = Pin::new(&mut endpoint.io.inbound[0].subscriber);
520        match sub.as_mut().poll_next(cx) {
521            Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(item))),
522            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
523            Poll::Ready(None) => Poll::Ready(None),
524            Poll::Pending => Poll::Pending,
525        }
526    }
527}
528
529impl<Rep> Fanout<Rep> {
530    /// Return the switchboard endpoint identifier.
531    pub fn endpoint_id(&self) -> EndpointId {
532        self.endpoint.get_id()
533    }
534}
535
536impl<Rep> Fanout<Rep>
537where
538    Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
539{
540    /// Create a new fanout endpoint.
541    pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
542        let endpoint = switchboard
543            .endpoint()
544            .inputs(Cardinality::Zero)
545            .outputs(Cardinality::Many)
546            .register()
547            .await?;
548        Ok(Self { endpoint, next: 0 })
549    }
550
551    /// Create a new fanout endpoint with the supplied backpressure behaviour.
552    pub async fn create_with_backpressure(
553        switchboard: &Switchboard,
554        backpressure: Backpressure,
555    ) -> Result<Self, SwitchboardError> {
556        let endpoint = switchboard
557            .endpoint()
558            .inputs(Cardinality::Zero)
559            .outputs(Cardinality::Many)
560            .output_backpressure(backpressure)
561            .register()
562            .await?;
563        Ok(Self { endpoint, next: 0 })
564    }
565
566    /// Connect this fanout to the supplied target(s), wiring outbound flows.
567    pub async fn connect<T>(
568        &self,
569        switchboard: &Switchboard,
570        target: T,
571    ) -> Result<(), SwitchboardError>
572    where
573        T: PublisherTargets<Rep>,
574    {
575        for endpoint_id in target.endpoint_ids() {
576            switchboard
577                .connect_ids(self.endpoint_id(), endpoint_id)
578                .await?;
579        }
580
581        Ok(())
582    }
583
584    /// Wait for outbound wiring to be established.
585    pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
586        poll_fn(|cx| {
587            self.endpoint.poll_updates(cx)?;
588            if self.endpoint.io.outbound.is_empty() {
589                match self.endpoint.backpressure() {
590                    Backpressure::Park => Poll::Pending,
591                    Backpressure::Drop => Poll::Ready(Ok(())),
592                }
593            } else {
594                Poll::Ready(Ok(()))
595            }
596        })
597        .await
598    }
599}
600
601impl<Rep> Sink<Rep> for Fanout<Rep>
602where
603    Rep: FlatMsg + Send + Unpin + 'static,
604{
605    type Error = SwitchboardError;
606
607    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
608        let FanoutProject { endpoint, next } = self.project();
609
610        endpoint.poll_updates(cx)?;
611
612        if endpoint.io.outbound.is_empty() {
613            return match endpoint.backpressure() {
614                Backpressure::Park => Poll::Pending,
615                Backpressure::Drop => Poll::Ready(Ok(())),
616            };
617        }
618
619        let idx = *next % endpoint.io.outbound.len();
620        let mut publ = Pin::new(&mut endpoint.io.outbound[idx]);
621        match publ.as_mut().poll_ready(cx) {
622            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
623            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
624            Poll::Pending => Poll::Pending,
625        }
626    }
627
628    fn start_send(self: Pin<&mut Self>, item: Rep) -> Result<(), Self::Error> {
629        let FanoutProject { endpoint, next } = self.project();
630
631        if endpoint.io.outbound.is_empty() {
632            return match endpoint.backpressure() {
633                Backpressure::Park => Err(SwitchboardError::NoRoute),
634                Backpressure::Drop => Ok(()),
635            };
636        }
637
638        let idx = *next % endpoint.io.outbound.len();
639        let mut publ = Pin::new(&mut endpoint.io.outbound[idx]);
640        publ.as_mut().start_send(item)?;
641        *next = (idx + 1) % endpoint.io.outbound.len();
642
643        Ok(())
644    }
645
646    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
647        #[allow(unused_variables)]
648        let FanoutProject { endpoint, next } = self.project();
649
650        endpoint.poll_updates(cx)?;
651        for publ in &mut endpoint.io.outbound {
652            let mut pinned = Pin::new(publ);
653            ready!(pinned.as_mut().poll_flush(cx)?);
654        }
655
656        Poll::Ready(Ok(()))
657    }
658
659    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
660        self.poll_flush(cx)
661    }
662}
663
664impl<Req, Rep> Server<Req, Rep> {
665    /// Return the switchboard endpoint identifier.
666    pub fn endpoint_id(&self) -> EndpointId {
667        self.endpoint.get_id()
668    }
669}
670
671impl<Req, Rep> Server<Req, Rep>
672where
673    Req: FlatMsg + HasSchema + Send + Unpin + 'static,
674    Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
675{
676    /// Create a new server endpoint.
677    pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
678        let endpoint = switchboard
679            .endpoint()
680            .inputs(Cardinality::One)
681            .outputs(Cardinality::Many)
682            .register()
683            .await?;
684        Ok(Self { endpoint })
685    }
686
687    /// Connect this server to the supplied target(s), wiring request and reply flows.
688    pub async fn connect<T>(
689        &self,
690        switchboard: &Switchboard,
691        client: T,
692    ) -> Result<(), SwitchboardError>
693    where
694        T: ServerTargets<Req, Rep>,
695    {
696        let server_id = self.endpoint_id();
697        for client_id in client.endpoint_ids() {
698            switchboard.connect_ids(client_id, server_id).await?;
699            switchboard.connect_ids(server_id, client_id).await?;
700        }
701
702        Ok(())
703    }
704
705    /// Wait for inbound and outbound wiring to be established.
706    pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
707        poll_fn(|cx| {
708            self.endpoint.poll_updates(cx)?;
709            if self.endpoint.io.inbound.is_empty() || self.endpoint.io.outbound.is_empty() {
710                Poll::Pending
711            } else {
712                Poll::Ready(Ok(()))
713            }
714        })
715        .await
716    }
717}
718
719impl<Req, Rep> Stream for Server<Req, Rep>
720where
721    Req: FlatMsg + Send + Unpin + HasSchema + 'static,
722    Rep: FlatMsg + Send + Unpin + HasSchema + 'static,
723{
724    type Item = Result<RequestCtx<Req, Rep>, SwitchboardError>;
725
726    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
727        let ServerProject { endpoint } = self.project();
728
729        endpoint.poll_updates(cx)?;
730
731        if endpoint.io.inbound.is_empty() {
732            return Poll::Pending;
733        }
734
735        debug_assert_eq!(endpoint.io.inbound.len(), 1);
736
737        let mut sub = Pin::new(&mut endpoint.io.inbound[0].subscriber);
738        match sub.as_mut().poll_next(cx) {
739            Poll::Ready(Some(Ok(item))) => {
740                let Some(handle) = endpoint.outbound_handle(endpoint.io.inbound[0].from) else {
741                    return Poll::Ready(Some(Err(SwitchboardError::NoRoute)));
742                };
743                let responder = Responder::new(handle);
744                Poll::Ready(Some(Ok(RequestCtx {
745                    request: item,
746                    responder,
747                })))
748            }
749            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
750            Poll::Ready(None) => Poll::Ready(None),
751            Poll::Pending => Poll::Pending,
752        }
753    }
754}
755
756impl<Req, Rep> Client<Req, Rep> {
757    /// Return the switchboard endpoint identifier.
758    pub fn endpoint_id(&self) -> EndpointId {
759        self.endpoint.get_id()
760    }
761}
762
763impl<Req, Rep> Client<Req, Rep>
764where
765    Req: FlatMsg + HasSchema + Send + Unpin + 'static,
766    Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
767{
768    /// Create a new client endpoint.
769    pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
770        let endpoint = switchboard
771            .endpoint()
772            .inputs(Cardinality::One)
773            .outputs(Cardinality::One)
774            .register()
775            .await?;
776        Ok(Self { endpoint })
777    }
778
779    /// Send a request and await the next reply.
780    ///
781    /// This helper assumes at most one in-flight request at a time.
782    pub async fn request(&mut self, request: Req) -> Result<Rep, SwitchboardError> {
783        self.send(request).await?;
784        match self.next().await {
785            Some(Ok(reply)) => Ok(reply),
786            Some(Err(err)) => Err(err),
787            None => Err(SwitchboardError::EndpointClosed),
788        }
789    }
790
791    /// Connect this client to the supplied target(s), wiring request and reply flows.
792    pub async fn connect<T>(
793        &self,
794        switchboard: &Switchboard,
795        server: T,
796    ) -> Result<(), SwitchboardError>
797    where
798        T: ClientTargets<Req, Rep>,
799    {
800        let client_id = self.endpoint_id();
801        for server_id in server.endpoint_ids() {
802            switchboard.connect_ids(client_id, server_id).await?;
803            switchboard.connect_ids(server_id, client_id).await?;
804        }
805
806        Ok(())
807    }
808
809    /// Wait for inbound and outbound wiring to be established.
810    pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
811        poll_fn(|cx| {
812            self.endpoint.poll_updates(cx)?;
813            if self.endpoint.io.inbound.is_empty() || self.endpoint.io.outbound.is_empty() {
814                Poll::Pending
815            } else {
816                Poll::Ready(Ok(()))
817            }
818        })
819        .await
820    }
821}
822
823impl<Req, Rep> Stream for Client<Req, Rep>
824where
825    Req: FlatMsg + Send + Unpin + HasSchema + 'static,
826    Rep: FlatMsg + Send + Unpin + HasSchema + 'static,
827{
828    type Item = Result<Rep, SwitchboardError>;
829
830    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
831        let ClientProject { endpoint } = self.project();
832
833        endpoint.poll_updates(cx)?;
834
835        if endpoint.io.inbound.is_empty() {
836            return Poll::Pending;
837        }
838
839        debug_assert_eq!(endpoint.io.inbound.len(), 1);
840
841        let mut sub = Pin::new(&mut endpoint.io.inbound[0].subscriber);
842        match sub.as_mut().poll_next(cx) {
843            Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(item))),
844            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
845            Poll::Ready(None) => {
846                endpoint.io.inbound.remove(0);
847                Poll::Pending
848            }
849            Poll::Pending => Poll::Pending,
850        }
851    }
852}
853
854impl<Req, Rep> Sink<Req> for Client<Req, Rep>
855where
856    Req: FlatMsg + Send + Unpin + 'static,
857    Rep: FlatMsg + Send + Unpin + 'static,
858{
859    type Error = SwitchboardError;
860
861    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
862        let ClientProject { endpoint } = self.project();
863
864        endpoint.poll_updates(cx)?;
865
866        if endpoint.io.outbound.is_empty() {
867            return Poll::Pending;
868        }
869
870        debug_assert_eq!(endpoint.io.outbound.len(), 1);
871
872        let mut publ = Pin::new(&mut endpoint.io.outbound[0]);
873        match publ.as_mut().poll_ready(cx) {
874            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
875            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
876            Poll::Pending => Poll::Pending,
877        }
878    }
879
880    fn start_send(self: Pin<&mut Self>, item: Req) -> Result<(), Self::Error> {
881        let ClientProject { endpoint } = self.project();
882
883        if endpoint.io.outbound.is_empty() {
884            return Err(SwitchboardError::NoRoute);
885        }
886
887        debug_assert_eq!(endpoint.io.outbound.len(), 1);
888
889        let mut publ = Pin::new(&mut endpoint.io.outbound[0]);
890        publ.as_mut().start_send(item)?;
891
892        Ok(())
893    }
894
895    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
896        let ClientProject { endpoint } = self.project();
897
898        endpoint.poll_updates(cx)?;
899
900        debug_assert_eq!(endpoint.io.outbound.len(), 1);
901
902        ready!(
903            Pin::new(&mut endpoint.io.outbound[0])
904                .as_mut()
905                .poll_flush(cx)?
906        );
907
908        Poll::Ready(Ok(()))
909    }
910
911    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
912        self.poll_flush(cx)
913    }
914}
915
916impl<Req, Rep> RequestCtx<Req, Rep>
917where
918    Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
919{
920    /// Decompose the request context into the request payload and responder.
921    pub fn into_parts(self) -> (Req, Responder<Rep>) {
922        (self.request, self.responder)
923    }
924
925    /// Return a clone of the responder handle.
926    pub fn responder(&self) -> Responder<Rep> {
927        self.responder.clone()
928    }
929
930    /// Borrow the request payload.
931    pub fn request(&self) -> &Req {
932        &self.request
933    }
934
935    /// Send a reply back to the requester that issued the corresponding `RequestCtx`.
936    pub async fn reply(&self, reply: Rep) -> Result<(), SwitchboardError> {
937        self.responder.send(reply).await
938    }
939}
940
941impl<Rep> Responder<Rep>
942where
943    Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
944{
945    fn new(handle: ChannelHandle) -> Self {
946        Self {
947            handle,
948            _marker: PhantomData,
949        }
950    }
951
952    /// Send a reply back to the requester that issued the corresponding `RequestCtx`.
953    pub async fn send(&self, reply: Rep) -> Result<(), SwitchboardError> {
954        let mut publisher = RawPublisher::<Rep>::from_channel_handle(self.handle).await?;
955        publisher.send(reply).await
956    }
957}