use std::pin::Pin;
use std::rc::Rc;
use axum::body::Bytes;
use futures::StreamExt;
use serde::Serialize;
use super::codec::Envelope;
use super::server::WebServer;
use crate::nodes::{FutStream, RunParams, StreamOperators};
use crate::types::*;
#[must_use]
pub fn web_pub<T: Element + Send + Serialize>(
server: &WebServer,
topic: impl Into<String>,
upstream: &Rc<dyn Stream<T>>,
) -> Rc<dyn Node> {
let topic = topic.into();
let codec = server.codec();
let historical = server.is_historical_noop();
let sender = server.inner.get_or_create_pub_topic(&topic);
upstream.consume_async(Box::new(
move |_ctx: RunParams, source: Pin<Box<dyn FutStream<T>>>| async move {
let mut source = source;
if historical {
while source.next().await.is_some() {}
return Ok(());
}
while let Some((time, value)) = source.next().await {
let payload = codec.encode(&value)?;
let env = Envelope {
topic: topic.clone(),
time_ns: u64::from(time),
payload,
};
let bytes = Bytes::from(codec.encode(&env)?);
let _ = sender.send(bytes);
}
Ok(())
},
))
}
pub trait WebPubOperators<T: Element + Send + Serialize> {
#[must_use]
fn web_pub(self: &Rc<Self>, server: &WebServer, topic: impl Into<String>) -> Rc<dyn Node>;
}
impl<T: Element + Send + Serialize> WebPubOperators<T> for dyn Stream<T> {
fn web_pub(self: &Rc<Self>, server: &WebServer, topic: impl Into<String>) -> Rc<dyn Node> {
web_pub(server, topic, self)
}
}