1use std::{
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll, ready},
7};
8
9use futures::future::poll_fn;
10use futures::{Sink, SinkExt, Stream, StreamExt};
11use pin_project::pin_project;
12use selium_userland::{
13 encoding::{FlatMsg, HasSchema},
14 io::ChannelHandle,
15};
16
17use crate::switchboard::{
18 Backpressure, Cardinality, EndpointHandle, EndpointId, RawPublisher, Switchboard,
19 SwitchboardError,
20};
21
22pub trait ClientTarget<Req, Rep> {
24 fn endpoint_id(&self) -> EndpointId;
26}
27
28pub trait ClientTargets<Req, Rep> {
30 fn endpoint_ids(&self) -> Vec<EndpointId>;
32}
33
34pub trait SubscriberTarget<Req> {
36 fn endpoint_id(&self) -> EndpointId;
38}
39
40pub trait SubscriberTargets<Req> {
42 fn endpoint_ids(&self) -> Vec<EndpointId>;
44}
45
46pub trait PublisherTarget<Rep> {
48 fn endpoint_id(&self) -> EndpointId;
50}
51
52pub trait PublisherTargets<Rep> {
54 fn endpoint_ids(&self) -> Vec<EndpointId>;
56}
57
58pub trait ServerTarget<Req, Rep> {
60 fn endpoint_id(&self) -> EndpointId;
62}
63
64pub trait ServerTargets<Req, Rep> {
66 fn endpoint_ids(&self) -> Vec<EndpointId>;
68}
69
70#[pin_project(project = PublisherProject)]
72pub struct Publisher<Rep> {
73 endpoint: EndpointHandle<(), Rep>,
74}
75
76#[pin_project(project = SubscriberProject)]
78pub struct Subscriber<Req> {
79 endpoint: EndpointHandle<Req, ()>,
80}
81
82#[pin_project(project = FanoutProject)]
84pub struct Fanout<Rep> {
85 endpoint: EndpointHandle<(), Rep>,
86 next: usize,
87}
88
89#[pin_project(project = ServerProject)]
91pub struct Server<Req, Rep> {
92 endpoint: EndpointHandle<Req, Rep>,
93}
94
95#[pin_project(project = ClientProject)]
97pub struct Client<Req, Rep> {
98 endpoint: EndpointHandle<Rep, Req>,
99}
100
101pub struct RequestCtx<Req, Rep> {
103 request: Req,
104 responder: Responder<Rep>,
105}
106
107pub struct Responder<Rep> {
109 handle: ChannelHandle,
110 _marker: PhantomData<Rep>,
111}
112
113impl<Req, Rep> ClientTarget<Req, Rep> for EndpointId {
114 fn endpoint_id(&self) -> EndpointId {
115 *self
116 }
117}
118
119impl<Req, Rep> ClientTargets<Req, Rep> for EndpointId {
120 fn endpoint_ids(&self) -> Vec<EndpointId> {
121 vec![*self]
122 }
123}
124
125impl<Req, Rep, T> ClientTargets<Req, Rep> for Vec<T>
126where
127 T: ClientTarget<Req, Rep>,
128{
129 fn endpoint_ids(&self) -> Vec<EndpointId> {
130 self.iter().map(|target| target.endpoint_id()).collect()
131 }
132}
133
134impl<Req, Rep, T> ClientTargets<Req, Rep> for &[T]
135where
136 T: ClientTarget<Req, Rep>,
137{
138 fn endpoint_ids(&self) -> Vec<EndpointId> {
139 self.iter().map(|target| target.endpoint_id()).collect()
140 }
141}
142
143impl<Req, Rep> ClientTarget<Req, Rep> for &Server<Req, Rep> {
144 fn endpoint_id(&self) -> EndpointId {
145 Server::endpoint_id(*self)
146 }
147}
148
149impl<Req, Rep> ClientTargets<Req, Rep> for &Server<Req, Rep> {
150 fn endpoint_ids(&self) -> Vec<EndpointId> {
151 vec![self.endpoint_id()]
152 }
153}
154
155impl<Req> SubscriberTarget<Req> for EndpointId {
156 fn endpoint_id(&self) -> EndpointId {
157 *self
158 }
159}
160
161impl<Req> SubscriberTargets<Req> for EndpointId {
162 fn endpoint_ids(&self) -> Vec<EndpointId> {
163 vec![*self]
164 }
165}
166
167impl<Req, T> SubscriberTargets<Req> for Vec<T>
168where
169 T: SubscriberTarget<Req>,
170{
171 fn endpoint_ids(&self) -> Vec<EndpointId> {
172 self.iter().map(|target| target.endpoint_id()).collect()
173 }
174}
175
176impl<Req, T> SubscriberTargets<Req> for &[T]
177where
178 T: SubscriberTarget<Req>,
179{
180 fn endpoint_ids(&self) -> Vec<EndpointId> {
181 self.iter().map(|target| target.endpoint_id()).collect()
182 }
183}
184
185impl<Req> SubscriberTarget<Req> for &Publisher<Req> {
186 fn endpoint_id(&self) -> EndpointId {
187 Publisher::endpoint_id(*self)
188 }
189}
190
191impl<Req> SubscriberTargets<Req> for &Publisher<Req> {
192 fn endpoint_ids(&self) -> Vec<EndpointId> {
193 vec![self.endpoint_id()]
194 }
195}
196
197impl<Req> SubscriberTarget<Req> for &Fanout<Req> {
198 fn endpoint_id(&self) -> EndpointId {
199 Fanout::endpoint_id(*self)
200 }
201}
202
203impl<Req> SubscriberTargets<Req> for &Fanout<Req> {
204 fn endpoint_ids(&self) -> Vec<EndpointId> {
205 vec![self.endpoint_id()]
206 }
207}
208
209impl<Rep> PublisherTarget<Rep> for EndpointId {
210 fn endpoint_id(&self) -> EndpointId {
211 *self
212 }
213}
214
215impl<Rep> PublisherTargets<Rep> for EndpointId {
216 fn endpoint_ids(&self) -> Vec<EndpointId> {
217 vec![*self]
218 }
219}
220
221impl<Rep, T> PublisherTargets<Rep> for Vec<T>
222where
223 T: PublisherTarget<Rep>,
224{
225 fn endpoint_ids(&self) -> Vec<EndpointId> {
226 self.iter().map(|target| target.endpoint_id()).collect()
227 }
228}
229
230impl<Rep, T> PublisherTargets<Rep> for &[T]
231where
232 T: PublisherTarget<Rep>,
233{
234 fn endpoint_ids(&self) -> Vec<EndpointId> {
235 self.iter().map(|target| target.endpoint_id()).collect()
236 }
237}
238
239impl<Rep> PublisherTarget<Rep> for &Subscriber<Rep> {
240 fn endpoint_id(&self) -> EndpointId {
241 Subscriber::endpoint_id(*self)
242 }
243}
244
245impl<Rep> PublisherTargets<Rep> for &Subscriber<Rep> {
246 fn endpoint_ids(&self) -> Vec<EndpointId> {
247 vec![self.endpoint_id()]
248 }
249}
250
251impl<Req, Rep> ServerTarget<Req, Rep> for EndpointId {
252 fn endpoint_id(&self) -> EndpointId {
253 *self
254 }
255}
256
257impl<Req, Rep> ServerTargets<Req, Rep> for EndpointId {
258 fn endpoint_ids(&self) -> Vec<EndpointId> {
259 vec![*self]
260 }
261}
262
263impl<Req, Rep, T> ServerTargets<Req, Rep> for Vec<T>
264where
265 T: ServerTarget<Req, Rep>,
266{
267 fn endpoint_ids(&self) -> Vec<EndpointId> {
268 self.iter().map(|target| target.endpoint_id()).collect()
269 }
270}
271
272impl<Req, Rep, T> ServerTargets<Req, Rep> for &[T]
273where
274 T: ServerTarget<Req, Rep>,
275{
276 fn endpoint_ids(&self) -> Vec<EndpointId> {
277 self.iter().map(|target| target.endpoint_id()).collect()
278 }
279}
280
281impl<Req, Rep> ServerTarget<Req, Rep> for &Client<Req, Rep> {
282 fn endpoint_id(&self) -> EndpointId {
283 Client::endpoint_id(*self)
284 }
285}
286
287impl<Req, Rep> ServerTargets<Req, Rep> for &Client<Req, Rep> {
288 fn endpoint_ids(&self) -> Vec<EndpointId> {
289 vec![self.endpoint_id()]
290 }
291}
292
293impl<Rep> Clone for Responder<Rep> {
294 fn clone(&self) -> Self {
295 Self {
296 handle: self.handle,
297 _marker: PhantomData,
298 }
299 }
300}
301
302impl<Rep> Publisher<Rep> {
303 pub fn endpoint_id(&self) -> EndpointId {
305 self.endpoint.get_id()
306 }
307}
308
309impl<Rep> Publisher<Rep>
310where
311 Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
312{
313 pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
315 let endpoint = switchboard
316 .endpoint()
317 .inputs(Cardinality::Zero)
318 .outputs(Cardinality::One)
319 .register()
320 .await?;
321 Ok(Self { endpoint })
322 }
323
324 pub async fn create_with_backpressure(
326 switchboard: &Switchboard,
327 backpressure: Backpressure,
328 ) -> Result<Self, SwitchboardError> {
329 let endpoint = switchboard
330 .endpoint()
331 .inputs(Cardinality::Zero)
332 .outputs(Cardinality::One)
333 .output_backpressure(backpressure)
334 .register()
335 .await?;
336 Ok(Self { endpoint })
337 }
338
339 pub async fn connect<T>(
341 &self,
342 switchboard: &Switchboard,
343 target: T,
344 ) -> Result<(), SwitchboardError>
345 where
346 T: PublisherTargets<Rep>,
347 {
348 for endpoint_id in target.endpoint_ids() {
349 switchboard
350 .connect_ids(self.endpoint_id(), endpoint_id)
351 .await?;
352 }
353
354 Ok(())
355 }
356
357 pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
359 poll_fn(|cx| {
360 self.endpoint.poll_updates(cx)?;
361 if self.endpoint.io.outbound.is_empty() {
362 match self.endpoint.backpressure() {
363 Backpressure::Park => Poll::Pending,
364 Backpressure::Drop => Poll::Ready(Ok(())),
365 }
366 } else {
367 Poll::Ready(Ok(()))
368 }
369 })
370 .await
371 }
372}
373
374impl<Rep> Sink<Rep> for Publisher<Rep>
375where
376 Rep: FlatMsg + Send + Unpin + 'static,
377{
378 type Error = SwitchboardError;
379
380 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
381 let PublisherProject { endpoint } = self.project();
382
383 endpoint.poll_updates(cx)?;
384
385 if endpoint.io.outbound.is_empty() {
386 return match endpoint.backpressure() {
387 Backpressure::Park => Poll::Pending,
388 Backpressure::Drop => Poll::Ready(Ok(())),
389 };
390 }
391
392 debug_assert_eq!(endpoint.io.outbound.len(), 1);
393
394 let mut publ = Pin::new(&mut endpoint.io.outbound[0]);
395 match publ.as_mut().poll_ready(cx) {
396 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
397 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
398 Poll::Pending => Poll::Pending,
399 }
400 }
401
402 fn start_send(self: Pin<&mut Self>, item: Rep) -> Result<(), Self::Error> {
403 let PublisherProject { endpoint } = self.project();
404
405 if endpoint.io.outbound.is_empty() {
406 return match endpoint.backpressure() {
407 Backpressure::Park => Err(SwitchboardError::NoRoute),
408 Backpressure::Drop => Ok(()),
409 };
410 }
411
412 debug_assert_eq!(endpoint.io.outbound.len(), 1);
413
414 let mut publ = Pin::new(&mut endpoint.io.outbound[0]);
415 publ.as_mut().start_send(item)?;
416
417 Ok(())
418 }
419
420 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
421 let PublisherProject { endpoint } = self.project();
422
423 endpoint.poll_updates(cx)?;
424
425 if endpoint.io.outbound.is_empty() {
426 return match endpoint.backpressure() {
427 Backpressure::Park => Poll::Pending,
428 Backpressure::Drop => Poll::Ready(Ok(())),
429 };
430 }
431
432 debug_assert_eq!(endpoint.io.outbound.len(), 1);
433
434 ready!(
435 Pin::new(&mut endpoint.io.outbound[0])
436 .as_mut()
437 .poll_flush(cx)?
438 );
439
440 Poll::Ready(Ok(()))
441 }
442
443 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
444 self.poll_flush(cx)
445 }
446}
447
448impl<Req> Subscriber<Req> {
449 pub fn endpoint_id(&self) -> EndpointId {
451 self.endpoint.get_id()
452 }
453}
454
455impl<Req> Subscriber<Req>
456where
457 Req: FlatMsg + HasSchema + Send + Unpin + 'static,
458{
459 pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
461 let endpoint = switchboard
462 .endpoint()
463 .inputs(Cardinality::One)
464 .outputs(Cardinality::Zero)
465 .register()
466 .await?;
467 Ok(Self { endpoint })
468 }
469
470 pub async fn connect<T>(
472 &self,
473 switchboard: &Switchboard,
474 source: T,
475 ) -> Result<(), SwitchboardError>
476 where
477 T: SubscriberTargets<Req>,
478 {
479 for endpoint_id in source.endpoint_ids() {
480 switchboard
481 .connect_ids(endpoint_id, self.endpoint_id())
482 .await?;
483 }
484
485 Ok(())
486 }
487
488 pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
490 poll_fn(|cx| {
491 self.endpoint.poll_updates(cx)?;
492 if self.endpoint.io.inbound.is_empty() {
493 Poll::Pending
494 } else {
495 Poll::Ready(Ok(()))
496 }
497 })
498 .await
499 }
500}
501
502impl<Req> Stream for Subscriber<Req>
503where
504 Req: FlatMsg + Send + Unpin + HasSchema + 'static,
505{
506 type Item = Result<Req, SwitchboardError>;
507
508 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
509 let SubscriberProject { endpoint } = self.project();
510
511 endpoint.poll_updates(cx)?;
512
513 if endpoint.io.inbound.is_empty() {
514 return Poll::Pending;
515 }
516
517 debug_assert_eq!(endpoint.io.inbound.len(), 1);
518
519 let mut sub = Pin::new(&mut endpoint.io.inbound[0].subscriber);
520 match sub.as_mut().poll_next(cx) {
521 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(item))),
522 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
523 Poll::Ready(None) => Poll::Ready(None),
524 Poll::Pending => Poll::Pending,
525 }
526 }
527}
528
529impl<Rep> Fanout<Rep> {
530 pub fn endpoint_id(&self) -> EndpointId {
532 self.endpoint.get_id()
533 }
534}
535
536impl<Rep> Fanout<Rep>
537where
538 Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
539{
540 pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
542 let endpoint = switchboard
543 .endpoint()
544 .inputs(Cardinality::Zero)
545 .outputs(Cardinality::Many)
546 .register()
547 .await?;
548 Ok(Self { endpoint, next: 0 })
549 }
550
551 pub async fn create_with_backpressure(
553 switchboard: &Switchboard,
554 backpressure: Backpressure,
555 ) -> Result<Self, SwitchboardError> {
556 let endpoint = switchboard
557 .endpoint()
558 .inputs(Cardinality::Zero)
559 .outputs(Cardinality::Many)
560 .output_backpressure(backpressure)
561 .register()
562 .await?;
563 Ok(Self { endpoint, next: 0 })
564 }
565
566 pub async fn connect<T>(
568 &self,
569 switchboard: &Switchboard,
570 target: T,
571 ) -> Result<(), SwitchboardError>
572 where
573 T: PublisherTargets<Rep>,
574 {
575 for endpoint_id in target.endpoint_ids() {
576 switchboard
577 .connect_ids(self.endpoint_id(), endpoint_id)
578 .await?;
579 }
580
581 Ok(())
582 }
583
584 pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
586 poll_fn(|cx| {
587 self.endpoint.poll_updates(cx)?;
588 if self.endpoint.io.outbound.is_empty() {
589 match self.endpoint.backpressure() {
590 Backpressure::Park => Poll::Pending,
591 Backpressure::Drop => Poll::Ready(Ok(())),
592 }
593 } else {
594 Poll::Ready(Ok(()))
595 }
596 })
597 .await
598 }
599}
600
601impl<Rep> Sink<Rep> for Fanout<Rep>
602where
603 Rep: FlatMsg + Send + Unpin + 'static,
604{
605 type Error = SwitchboardError;
606
607 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
608 let FanoutProject { endpoint, next } = self.project();
609
610 endpoint.poll_updates(cx)?;
611
612 if endpoint.io.outbound.is_empty() {
613 return match endpoint.backpressure() {
614 Backpressure::Park => Poll::Pending,
615 Backpressure::Drop => Poll::Ready(Ok(())),
616 };
617 }
618
619 let idx = *next % endpoint.io.outbound.len();
620 let mut publ = Pin::new(&mut endpoint.io.outbound[idx]);
621 match publ.as_mut().poll_ready(cx) {
622 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
623 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
624 Poll::Pending => Poll::Pending,
625 }
626 }
627
628 fn start_send(self: Pin<&mut Self>, item: Rep) -> Result<(), Self::Error> {
629 let FanoutProject { endpoint, next } = self.project();
630
631 if endpoint.io.outbound.is_empty() {
632 return match endpoint.backpressure() {
633 Backpressure::Park => Err(SwitchboardError::NoRoute),
634 Backpressure::Drop => Ok(()),
635 };
636 }
637
638 let idx = *next % endpoint.io.outbound.len();
639 let mut publ = Pin::new(&mut endpoint.io.outbound[idx]);
640 publ.as_mut().start_send(item)?;
641 *next = (idx + 1) % endpoint.io.outbound.len();
642
643 Ok(())
644 }
645
646 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
647 #[allow(unused_variables)]
648 let FanoutProject { endpoint, next } = self.project();
649
650 endpoint.poll_updates(cx)?;
651 for publ in &mut endpoint.io.outbound {
652 let mut pinned = Pin::new(publ);
653 ready!(pinned.as_mut().poll_flush(cx)?);
654 }
655
656 Poll::Ready(Ok(()))
657 }
658
659 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
660 self.poll_flush(cx)
661 }
662}
663
664impl<Req, Rep> Server<Req, Rep> {
665 pub fn endpoint_id(&self) -> EndpointId {
667 self.endpoint.get_id()
668 }
669}
670
671impl<Req, Rep> Server<Req, Rep>
672where
673 Req: FlatMsg + HasSchema + Send + Unpin + 'static,
674 Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
675{
676 pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
678 let endpoint = switchboard
679 .endpoint()
680 .inputs(Cardinality::One)
681 .outputs(Cardinality::Many)
682 .register()
683 .await?;
684 Ok(Self { endpoint })
685 }
686
687 pub async fn connect<T>(
689 &self,
690 switchboard: &Switchboard,
691 client: T,
692 ) -> Result<(), SwitchboardError>
693 where
694 T: ServerTargets<Req, Rep>,
695 {
696 let server_id = self.endpoint_id();
697 for client_id in client.endpoint_ids() {
698 switchboard.connect_ids(client_id, server_id).await?;
699 switchboard.connect_ids(server_id, client_id).await?;
700 }
701
702 Ok(())
703 }
704
705 pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
707 poll_fn(|cx| {
708 self.endpoint.poll_updates(cx)?;
709 if self.endpoint.io.inbound.is_empty() || self.endpoint.io.outbound.is_empty() {
710 Poll::Pending
711 } else {
712 Poll::Ready(Ok(()))
713 }
714 })
715 .await
716 }
717}
718
719impl<Req, Rep> Stream for Server<Req, Rep>
720where
721 Req: FlatMsg + Send + Unpin + HasSchema + 'static,
722 Rep: FlatMsg + Send + Unpin + HasSchema + 'static,
723{
724 type Item = Result<RequestCtx<Req, Rep>, SwitchboardError>;
725
726 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
727 let ServerProject { endpoint } = self.project();
728
729 endpoint.poll_updates(cx)?;
730
731 if endpoint.io.inbound.is_empty() {
732 return Poll::Pending;
733 }
734
735 debug_assert_eq!(endpoint.io.inbound.len(), 1);
736
737 let mut sub = Pin::new(&mut endpoint.io.inbound[0].subscriber);
738 match sub.as_mut().poll_next(cx) {
739 Poll::Ready(Some(Ok(item))) => {
740 let Some(handle) = endpoint.outbound_handle(endpoint.io.inbound[0].from) else {
741 return Poll::Ready(Some(Err(SwitchboardError::NoRoute)));
742 };
743 let responder = Responder::new(handle);
744 Poll::Ready(Some(Ok(RequestCtx {
745 request: item,
746 responder,
747 })))
748 }
749 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
750 Poll::Ready(None) => Poll::Ready(None),
751 Poll::Pending => Poll::Pending,
752 }
753 }
754}
755
756impl<Req, Rep> Client<Req, Rep> {
757 pub fn endpoint_id(&self) -> EndpointId {
759 self.endpoint.get_id()
760 }
761}
762
763impl<Req, Rep> Client<Req, Rep>
764where
765 Req: FlatMsg + HasSchema + Send + Unpin + 'static,
766 Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
767{
768 pub async fn create(switchboard: &Switchboard) -> Result<Self, SwitchboardError> {
770 let endpoint = switchboard
771 .endpoint()
772 .inputs(Cardinality::One)
773 .outputs(Cardinality::One)
774 .register()
775 .await?;
776 Ok(Self { endpoint })
777 }
778
779 pub async fn request(&mut self, request: Req) -> Result<Rep, SwitchboardError> {
783 self.send(request).await?;
784 match self.next().await {
785 Some(Ok(reply)) => Ok(reply),
786 Some(Err(err)) => Err(err),
787 None => Err(SwitchboardError::EndpointClosed),
788 }
789 }
790
791 pub async fn connect<T>(
793 &self,
794 switchboard: &Switchboard,
795 server: T,
796 ) -> Result<(), SwitchboardError>
797 where
798 T: ClientTargets<Req, Rep>,
799 {
800 let client_id = self.endpoint_id();
801 for server_id in server.endpoint_ids() {
802 switchboard.connect_ids(client_id, server_id).await?;
803 switchboard.connect_ids(server_id, client_id).await?;
804 }
805
806 Ok(())
807 }
808
809 pub async fn ready(&mut self) -> Result<(), SwitchboardError> {
811 poll_fn(|cx| {
812 self.endpoint.poll_updates(cx)?;
813 if self.endpoint.io.inbound.is_empty() || self.endpoint.io.outbound.is_empty() {
814 Poll::Pending
815 } else {
816 Poll::Ready(Ok(()))
817 }
818 })
819 .await
820 }
821}
822
823impl<Req, Rep> Stream for Client<Req, Rep>
824where
825 Req: FlatMsg + Send + Unpin + HasSchema + 'static,
826 Rep: FlatMsg + Send + Unpin + HasSchema + 'static,
827{
828 type Item = Result<Rep, SwitchboardError>;
829
830 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
831 let ClientProject { endpoint } = self.project();
832
833 endpoint.poll_updates(cx)?;
834
835 if endpoint.io.inbound.is_empty() {
836 return Poll::Pending;
837 }
838
839 debug_assert_eq!(endpoint.io.inbound.len(), 1);
840
841 let mut sub = Pin::new(&mut endpoint.io.inbound[0].subscriber);
842 match sub.as_mut().poll_next(cx) {
843 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(item))),
844 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
845 Poll::Ready(None) => {
846 endpoint.io.inbound.remove(0);
847 Poll::Pending
848 }
849 Poll::Pending => Poll::Pending,
850 }
851 }
852}
853
854impl<Req, Rep> Sink<Req> for Client<Req, Rep>
855where
856 Req: FlatMsg + Send + Unpin + 'static,
857 Rep: FlatMsg + Send + Unpin + 'static,
858{
859 type Error = SwitchboardError;
860
861 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
862 let ClientProject { endpoint } = self.project();
863
864 endpoint.poll_updates(cx)?;
865
866 if endpoint.io.outbound.is_empty() {
867 return Poll::Pending;
868 }
869
870 debug_assert_eq!(endpoint.io.outbound.len(), 1);
871
872 let mut publ = Pin::new(&mut endpoint.io.outbound[0]);
873 match publ.as_mut().poll_ready(cx) {
874 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
875 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
876 Poll::Pending => Poll::Pending,
877 }
878 }
879
880 fn start_send(self: Pin<&mut Self>, item: Req) -> Result<(), Self::Error> {
881 let ClientProject { endpoint } = self.project();
882
883 if endpoint.io.outbound.is_empty() {
884 return Err(SwitchboardError::NoRoute);
885 }
886
887 debug_assert_eq!(endpoint.io.outbound.len(), 1);
888
889 let mut publ = Pin::new(&mut endpoint.io.outbound[0]);
890 publ.as_mut().start_send(item)?;
891
892 Ok(())
893 }
894
895 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
896 let ClientProject { endpoint } = self.project();
897
898 endpoint.poll_updates(cx)?;
899
900 debug_assert_eq!(endpoint.io.outbound.len(), 1);
901
902 ready!(
903 Pin::new(&mut endpoint.io.outbound[0])
904 .as_mut()
905 .poll_flush(cx)?
906 );
907
908 Poll::Ready(Ok(()))
909 }
910
911 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
912 self.poll_flush(cx)
913 }
914}
915
916impl<Req, Rep> RequestCtx<Req, Rep>
917where
918 Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
919{
920 pub fn into_parts(self) -> (Req, Responder<Rep>) {
922 (self.request, self.responder)
923 }
924
925 pub fn responder(&self) -> Responder<Rep> {
927 self.responder.clone()
928 }
929
930 pub fn request(&self) -> &Req {
932 &self.request
933 }
934
935 pub async fn reply(&self, reply: Rep) -> Result<(), SwitchboardError> {
937 self.responder.send(reply).await
938 }
939}
940
941impl<Rep> Responder<Rep>
942where
943 Rep: FlatMsg + HasSchema + Send + Unpin + 'static,
944{
945 fn new(handle: ChannelHandle) -> Self {
946 Self {
947 handle,
948 _marker: PhantomData,
949 }
950 }
951
952 pub async fn send(&self, reply: Rep) -> Result<(), SwitchboardError> {
954 let mut publisher = RawPublisher::<Rep>::from_channel_handle(self.handle).await?;
955 publisher.send(reply).await
956 }
957}