fire_http_api/stream/
streamer.rs

1use super::error::StreamError;
2use super::message::MessageData;
3
4use fire::{
5	extractor::Extractor, extractor_extract, extractor_prepare,
6	extractor_validate,
7};
8use std::{convert::Infallible, marker::PhantomData};
9use tokio::sync::mpsc;
10
11use serde::{de::DeserializeOwned, Serialize};
12
13pub struct RawStreamer {
14	inner: InnerRawStreamer,
15}
16
17enum InnerRawStreamer {
18	Sender(mpsc::Sender<MessageData>),
19	Receiver(mpsc::Receiver<MessageData>),
20}
21
22impl RawStreamer {
23	pub(crate) fn sender(tx: mpsc::Sender<MessageData>) -> Self {
24		Self {
25			inner: InnerRawStreamer::Sender(tx),
26		}
27	}
28
29	pub(crate) fn receiver(rx: mpsc::Receiver<MessageData>) -> Self {
30		Self {
31			inner: InnerRawStreamer::Receiver(rx),
32		}
33	}
34
35	pub fn assign_message<M>(self) -> Streamer<M> {
36		Streamer {
37			inner: self.inner,
38			message: PhantomData,
39		}
40	}
41}
42
43pub struct Streamer<M> {
44	inner: InnerRawStreamer,
45	message: PhantomData<M>,
46}
47
48impl<M> Streamer<M> {
49	/// ## Panics
50	/// If you call this when the Stream::KIND is not Receiver
51	pub async fn send(&mut self, data: M) -> Result<(), StreamError>
52	where
53		M: Serialize,
54	{
55		match &mut self.inner {
56			InnerRawStreamer::Sender(tx) => tx
57				.send(MessageData::serialize(data).map_err(StreamError::Json)?)
58				.await
59				.map_err(|_| StreamError::Closed),
60			_ => panic!("Streamer: cannot send, in receive mode"),
61		}
62	}
63
64	/// Completes when the receiver has dropped.
65	///
66	/// ## Panics
67	/// If you call this when the Stream::KIND is not Receiver
68	pub async fn closed(&self) {
69		match &self.inner {
70			InnerRawStreamer::Sender(tx) => {
71				tx.closed().await;
72			}
73			_ => panic!("Streamer: cannot send, in receive mode"),
74		}
75	}
76
77	/// ## Panics
78	/// If you call this when the Stream::KIND is not Sender
79	pub async fn recv(&mut self) -> Result<M, StreamError>
80	where
81		M: DeserializeOwned,
82	{
83		match &mut self.inner {
84			InnerRawStreamer::Receiver(rx) => {
85				let data = rx.recv().await.ok_or(StreamError::Closed)?;
86
87				data.deserialize().map_err(StreamError::Json)
88			}
89			_ => panic!("Streamer: cannot receive, in sender mode"),
90		}
91	}
92}
93
94impl<'a, R, M> Extractor<'a, R> for Streamer<M>
95where
96	M: Send + 'static,
97{
98	type Error = Infallible;
99	type Prepared = Self;
100
101	extractor_validate!(|validate| {
102		assert!(
103			validate.state.validate::<Self>(),
104			"Stream not found in state"
105		);
106		// remove it from the state since we will use it
107		validate.state.remove::<Self>();
108	});
109
110	extractor_prepare!(|prepare| { Ok(prepare.state.remove().unwrap()) });
111
112	extractor_extract!(|extract| { Ok(extract.prepared) });
113}