use crate::clients::client::IggyClient;
use crate::clients::consumer::IggyConsumer;
use crate::prelude::{IggyError, SystemClient};
use crate::stream_builder::{IggyConsumerConfig, build};
use tracing::trace;
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct IggyStreamConsumer;
impl IggyStreamConsumer {
pub async fn build(
client: &IggyClient,
config: &IggyConsumerConfig,
) -> Result<IggyConsumer, IggyError> {
trace!("Check if client is connected");
if client.ping().await.is_err() {
return Err(IggyError::NotConnected);
}
trace!("Check if stream and topic exist");
build::build_iggy_stream_topic_if_not_exists(client, config).await?;
trace!("Build iggy consumer");
let iggy_consumer = build::build_iggy_consumer(client, config).await?;
Ok(iggy_consumer)
}
pub async fn with_client_from_url(
connection_string: &str,
config: &IggyConsumerConfig,
) -> Result<(IggyClient, IggyConsumer), IggyError> {
trace!("Build and connect iggy client");
let client = build::build_iggy_client(connection_string).await?;
trace!("Check if stream and topic exist");
build::build_iggy_stream_topic_if_not_exists(&client, config).await?;
trace!("Build iggy consumer");
let iggy_consumer = build::build_iggy_consumer(&client, config).await?;
Ok((client, iggy_consumer))
}
pub async fn build_iggy_client(connection_string: &str) -> Result<IggyClient, IggyError> {
trace!("Build and connect iggy client");
let client = build::build_iggy_client(connection_string).await?;
Ok(client)
}
}