sena/handling/
server.rs

1use crate::csp::{
2    comm::{Responder, RxChan, VoidTx},
3    message::Message,
4    shutdown::NoShutdown,
5};
6
7use super::handler::Handler;
8
9with_rt! {
10    use either::Either;
11    use crate::{
12        csp::{
13            comm::OutputTx,
14            shutdown::ShutdownRx,
15        },
16        utils::poll_biased::PollBiased,
17    };
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct Server<H, X> {
22    pub handler: H,
23    pub rx: X,
24}
25
26pub struct ServeOptions<OTx, SRx> {
27    /// Token that can be used to gracefully shutdown server
28    pub shutdown_rx: SRx,
29
30    /// Channel where all output will be thrown, outputs will be sent here
31    /// only if output is not forwarded as a response (client doesn't wants response)
32    pub output_tx: OTx,
33}
34
35impl<SRx> ServeOptions<VoidTx, SRx> {
36    pub const fn with_shutdown_token(shutdown_rx: SRx) -> Self {
37        Self {
38            shutdown_rx,
39            output_tx: VoidTx,
40        }
41    }
42}
43
44impl Default for ServeOptions<VoidTx, NoShutdown> {
45    fn default() -> Self {
46        Self {
47            shutdown_rx: NoShutdown,
48            output_tx: VoidTx,
49        }
50    }
51}
52
53impl<H, X> Server<H, X> {
54    #[cfg(feature = "tokio")]
55    pub async fn serve<T, E, OTx, SRx>(
56        &mut self,
57        mut options: ServeOptions<OTx, SRx>,
58    ) -> Result<(), E>
59    where
60        T: Send + 'static,
61        E: Send + 'static,
62        OTx: OutputTx<H::Output, E> + 'static,
63        H: Handler<T, E> + Clone + 'static,
64        X: RxChan<T, E, H::Output>,
65        X::Responder: Send + 'static,
66        SRx: ShutdownRx<E>,
67    {
68        loop {
69            let result = PollBiased::new(options.shutdown_rx.wait_shutdown(), self.rx.recv()).await;
70            match result {
71                Either::Left(_) => return Ok(()),
72                Either::Right(message) => {
73                    let message = message?;
74
75                    let out_tx = options.output_tx.clone();
76                    let handler = self.handler.clone();
77
78                    tokio::spawn(async move {
79                        let output = handler.handle(message.data).await;
80                        if let Some(responder) = message.responder {
81                            responder.respond_with(output).await?;
82                        } else {
83                            out_tx.send(output).await?;
84                        }
85
86                        Ok::<_, E>(())
87                    });
88                }
89            }
90        }
91    }
92
93    pub async fn handle_message<T, E>(
94        &mut self,
95        message: Message<T, X::Responder>,
96    ) -> Result<Option<Result<H::Output, E>>, E>
97    where
98        H: Handler<T, E>,
99        X: RxChan<T, E, H::Output>,
100    {
101        let output = self.handler.handle(message.data).await;
102
103        if let Some(responder) = message.responder {
104            responder.respond_with(output).await?;
105            Ok(None)
106        } else {
107            Ok(Some(output))
108        }
109    }
110}