#![cfg(feature = "sse")]
use ohkami_lib::stream::impls::{Queue, QueueStream};
use ohkami_lib::{Stream, StreamExt};
use std::{future::Future, marker::PhantomData, pin::Pin};
pub struct DataStream<T: Data = String>(
Pin<Box<dyn Stream<Item = String> + Send>>,
PhantomData<fn() -> T>,
);
#[cfg(not(feature = "openapi"))]
mod bound {
pub trait Bound {}
impl<T> Bound for T {}
}
#[cfg(feature = "openapi")]
mod bound {
pub trait Bound: crate::openapi::Schema {}
impl<T: crate::openapi::Schema> Bound for T {}
}
pub trait Data: bound::Bound + 'static {
fn encode(self) -> String;
}
const _: () = {
impl Data for String {
fn encode(self) -> String {
self
}
}
impl Data for &'static str {
fn encode(self) -> String {
self.into()
}
}
};
impl<T: Data> crate::IntoResponse for DataStream<T> {
#[inline]
fn into_response(self) -> crate::Response {
let mut res = crate::Response::OK();
res.set_stream_raw(self.0);
res
}
#[cfg(feature = "openapi")]
fn openapi_responses() -> crate::openapi::Responses {
crate::openapi::Responses::new([(
200,
crate::openapi::Response::when("Streaming")
.content("text/event-stream", <T as crate::openapi::Schema>::schema()),
)])
}
}
impl<T: Data, S> From<S> for DataStream<T>
where
S: Stream<Item = T> + Send + 'static,
{
fn from(stream: S) -> Self {
Self(Box::pin(stream.map(Data::encode)), PhantomData)
}
}
impl<T: Data + Send + 'static> DataStream<T> {
pub fn new<F, Fut>(f: F) -> Self
where
F: FnOnce(handle::Stream<T>) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
Self(
Box::pin(QueueStream::new(|q| f(handle::Stream::from(q)))),
PhantomData,
)
}
}
pub mod handle {
use super::*;
pub struct Stream<T>(pub(super) Queue<String>, pub(super) PhantomData<fn() -> T>);
impl<T> From<Queue<String>> for self::Stream<T> {
fn from(q: Queue<String>) -> Self {
Self(q, PhantomData)
}
}
impl<T: Data> self::Stream<T> {
#[inline]
pub fn send(&mut self, data: impl Into<T>) {
self.0.push(Data::encode(data.into()));
}
}
}