1use crate::csp::{
2 comm::{Responder, RxChan, VoidTx},
3 message::Message,
4 shutdown::NoShutdown,
5};
6
7use super::handler::Handler;
8
9with_rt! {
10 use either::Either;
11 use crate::{
12 csp::{
13 comm::OutputTx,
14 shutdown::ShutdownRx,
15 },
16 utils::poll_biased::PollBiased,
17 };
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct Server<H, X> {
22 pub handler: H,
23 pub rx: X,
24}
25
26pub struct ServeOptions<OTx, SRx> {
27 pub shutdown_rx: SRx,
29
30 pub output_tx: OTx,
33}
34
35impl<SRx> ServeOptions<VoidTx, SRx> {
36 pub const fn with_shutdown_token(shutdown_rx: SRx) -> Self {
37 Self {
38 shutdown_rx,
39 output_tx: VoidTx,
40 }
41 }
42}
43
44impl Default for ServeOptions<VoidTx, NoShutdown> {
45 fn default() -> Self {
46 Self {
47 shutdown_rx: NoShutdown,
48 output_tx: VoidTx,
49 }
50 }
51}
52
53impl<H, X> Server<H, X> {
54 #[cfg(feature = "tokio")]
55 pub async fn serve<T, E, OTx, SRx>(
56 &mut self,
57 mut options: ServeOptions<OTx, SRx>,
58 ) -> Result<(), E>
59 where
60 T: Send + 'static,
61 E: Send + 'static,
62 OTx: OutputTx<H::Output, E> + 'static,
63 H: Handler<T, E> + Clone + 'static,
64 X: RxChan<T, E, H::Output>,
65 X::Responder: Send + 'static,
66 SRx: ShutdownRx<E>,
67 {
68 loop {
69 let result = PollBiased::new(options.shutdown_rx.wait_shutdown(), self.rx.recv()).await;
70 match result {
71 Either::Left(_) => return Ok(()),
72 Either::Right(message) => {
73 let message = message?;
74
75 let out_tx = options.output_tx.clone();
76 let handler = self.handler.clone();
77
78 tokio::spawn(async move {
79 let output = handler.handle(message.data).await;
80 if let Some(responder) = message.responder {
81 responder.respond_with(output).await?;
82 } else {
83 out_tx.send(output).await?;
84 }
85
86 Ok::<_, E>(())
87 });
88 }
89 }
90 }
91 }
92
93 pub async fn handle_message<T, E>(
94 &mut self,
95 message: Message<T, X::Responder>,
96 ) -> Result<Option<Result<H::Output, E>>, E>
97 where
98 H: Handler<T, E>,
99 X: RxChan<T, E, H::Output>,
100 {
101 let output = self.handler.handle(message.data).await;
102
103 if let Some(responder) = message.responder {
104 responder.respond_with(output).await?;
105 Ok(None)
106 } else {
107 Ok(Some(output))
108 }
109 }
110}