use super::{FluvioConnection, FluvioEvent};
use crate::nodes::{RunParams, produce_async};
use crate::types::*;
use fluvio::consumer::ConsumerConfigExt;
use fluvio::{Fluvio, FluvioClusterConfig, Offset};
use futures::StreamExt;
use std::rc::Rc;
#[must_use]
pub fn fluvio_sub(
connection: FluvioConnection,
topic: impl Into<String>,
partition: u32,
start_offset: Option<i64>,
) -> Rc<dyn Stream<Burst<FluvioEvent>>> {
let topic = topic.into();
let validation_error = if let Some(n) = start_offset {
if n < 0 {
Some(anyhow::anyhow!(
"start_offset must be non-negative, got {n}"
))
} else {
None
}
} else {
None
};
produce_async(move |_ctx: RunParams| async move {
if let Some(err) = validation_error {
return Err(err);
}
let cluster_config = FluvioClusterConfig::new(&connection.endpoint);
let client = Fluvio::connect_with_config(&cluster_config)
.await
.map_err(|e| anyhow::anyhow!("fluvio connect failed: {e}"))?;
let offset = match start_offset {
None => Offset::beginning(),
Some(n) => Offset::absolute(n)
.map_err(|e| anyhow::anyhow!("invalid fluvio offset {n}: {e}"))?,
};
let consumer_config = ConsumerConfigExt::builder()
.topic(topic)
.partition(partition)
.offset_start(offset)
.build()
.map_err(|e| anyhow::anyhow!("fluvio consumer config failed: {e}"))?;
let mut stream = client
.consumer_with_config(consumer_config)
.await
.map_err(|e| anyhow::anyhow!("fluvio consumer create failed: {e}"))?;
Ok(async_stream::stream! {
while let Some(result) = stream.next().await {
match result {
Ok(record) => {
let event = FluvioEvent {
key: record.key().map(|k| k.to_vec()),
value: record.value().to_vec(),
offset: record.offset,
};
yield Ok((NanoTime::now(), event));
}
Err(e) => {
yield Err(anyhow::anyhow!("fluvio record error: {e:?}"));
break;
}
}
}
})
})
}