fire_http_api/stream/
streamer.rs1use super::error::StreamError;
2use super::message::MessageData;
3
4use fire::{
5 extractor::Extractor, extractor_extract, extractor_prepare,
6 extractor_validate,
7};
8use std::{convert::Infallible, marker::PhantomData};
9use tokio::sync::mpsc;
10
11use serde::{de::DeserializeOwned, Serialize};
12
13pub struct RawStreamer {
14 inner: InnerRawStreamer,
15}
16
17enum InnerRawStreamer {
18 Sender(mpsc::Sender<MessageData>),
19 Receiver(mpsc::Receiver<MessageData>),
20}
21
22impl RawStreamer {
23 pub(crate) fn sender(tx: mpsc::Sender<MessageData>) -> Self {
24 Self {
25 inner: InnerRawStreamer::Sender(tx),
26 }
27 }
28
29 pub(crate) fn receiver(rx: mpsc::Receiver<MessageData>) -> Self {
30 Self {
31 inner: InnerRawStreamer::Receiver(rx),
32 }
33 }
34
35 pub fn assign_message<M>(self) -> Streamer<M> {
36 Streamer {
37 inner: self.inner,
38 message: PhantomData,
39 }
40 }
41}
42
43pub struct Streamer<M> {
44 inner: InnerRawStreamer,
45 message: PhantomData<M>,
46}
47
48impl<M> Streamer<M> {
49 pub async fn send(&mut self, data: M) -> Result<(), StreamError>
52 where
53 M: Serialize,
54 {
55 match &mut self.inner {
56 InnerRawStreamer::Sender(tx) => tx
57 .send(MessageData::serialize(data).map_err(StreamError::Json)?)
58 .await
59 .map_err(|_| StreamError::Closed),
60 _ => panic!("Streamer: cannot send, in receive mode"),
61 }
62 }
63
64 pub async fn closed(&self) {
69 match &self.inner {
70 InnerRawStreamer::Sender(tx) => {
71 tx.closed().await;
72 }
73 _ => panic!("Streamer: cannot send, in receive mode"),
74 }
75 }
76
77 pub async fn recv(&mut self) -> Result<M, StreamError>
80 where
81 M: DeserializeOwned,
82 {
83 match &mut self.inner {
84 InnerRawStreamer::Receiver(rx) => {
85 let data = rx.recv().await.ok_or(StreamError::Closed)?;
86
87 data.deserialize().map_err(StreamError::Json)
88 }
89 _ => panic!("Streamer: cannot receive, in sender mode"),
90 }
91 }
92}
93
94impl<'a, R, M> Extractor<'a, R> for Streamer<M>
95where
96 M: Send + 'static,
97{
98 type Error = Infallible;
99 type Prepared = Self;
100
101 extractor_validate!(|validate| {
102 assert!(
103 validate.state.validate::<Self>(),
104 "Stream not found in state"
105 );
106 validate.state.remove::<Self>();
108 });
109
110 extractor_prepare!(|prepare| { Ok(prepare.state.remove().unwrap()) });
111
112 extractor_extract!(|extract| { Ok(extract.prepared) });
113}