use std::rc::Rc;
use serde::de::DeserializeOwned;
use tokio::sync::mpsc;
use super::server::{SUBSCRIBE_MPSC_CAPACITY, WebServer};
use crate::nodes::{RunParams, produce_async};
use crate::types::*;
#[must_use]
pub fn web_sub<T: Element + Send + DeserializeOwned>(
server: &WebServer,
topic: impl Into<String>,
) -> Rc<dyn Stream<Burst<T>>> {
let topic = topic.into();
let codec = server.codec();
let historical = server.is_historical_noop();
let rx_opt = if historical {
None
} else {
let (tx, rx) = mpsc::channel(SUBSCRIBE_MPSC_CAPACITY);
server.inner.register_sub_sender(&topic, tx);
Some(rx)
};
produce_async(move |_ctx: RunParams| async move {
Ok(async_stream::stream! {
let Some(mut rx) = rx_opt else { return; };
while let Some(payload) = rx.recv().await {
match codec.decode::<T>(&payload) {
Ok(v) => yield Ok((NanoTime::now(), v)),
Err(e) => yield Err(anyhow::anyhow!("web_sub '{topic}': {e}")),
}
}
})
})
}