use super::{FluvioConnection, FluvioRecord};
use crate::RunParams;
use crate::burst;
use crate::nodes::{FutStream, StreamOperators};
use crate::types::*;
use fluvio::{Fluvio, FluvioClusterConfig, RecordKey};
use futures::StreamExt;
use std::pin::Pin;
use std::rc::Rc;
#[must_use]
pub fn fluvio_pub(
connection: FluvioConnection,
topic: impl Into<String>,
upstream: &Rc<dyn Stream<Burst<FluvioRecord>>>,
) -> Rc<dyn Node> {
let topic = topic.into();
upstream.consume_async(Box::new(
move |_ctx: RunParams, source: Pin<Box<dyn FutStream<Burst<FluvioRecord>>>>| async move {
let cluster_config = FluvioClusterConfig::new(&connection.endpoint);
let client = Fluvio::connect_with_config(&cluster_config)
.await
.map_err(|e| anyhow::anyhow!("fluvio connect failed: {e}"))?;
let producer = client
.topic_producer(&topic)
.await
.map_err(|e| anyhow::anyhow!("fluvio producer create failed: {e}"))?;
let mut source = source;
while let Some((_time, burst)) = source.next().await {
for record in burst {
let key = record.key.map(RecordKey::from).unwrap_or(RecordKey::NULL);
producer
.send(key, record.value)
.await
.map_err(|e| anyhow::anyhow!("fluvio send failed: {e}"))?;
}
producer
.flush()
.await
.map_err(|e| anyhow::anyhow!("fluvio flush failed: {e}"))?;
}
Ok(())
},
))
}
pub trait FluvioPubOperators {
#[must_use]
fn fluvio_pub(
self: &Rc<Self>,
conn: FluvioConnection,
topic: impl Into<String>,
) -> Rc<dyn Node>;
}
impl FluvioPubOperators for dyn Stream<Burst<FluvioRecord>> {
fn fluvio_pub(
self: &Rc<Self>,
conn: FluvioConnection,
topic: impl Into<String>,
) -> Rc<dyn Node> {
fluvio_pub(conn, topic, self)
}
}
impl FluvioPubOperators for dyn Stream<FluvioRecord> {
fn fluvio_pub(
self: &Rc<Self>,
conn: FluvioConnection,
topic: impl Into<String>,
) -> Rc<dyn Node> {
let burst_stream = self.map(|record| burst![record]);
fluvio_pub(conn, topic, &burst_stream)
}
}