avantis-utils 0.7.2

Swiss army knife utilities for all kind of Avantis rust projects
Documentation
use anyhow::anyhow;
use anyhow::Result;
use tokio;

#[tokio::main]
async fn main() -> Result<()> {
    inner::main().await
}

#[cfg(all(feature = "config", feature = "kafka"))]
mod inner {

    use super::*;

    use avantis_utils::config::load_config;
    use avantis_utils::config::Environment;
    use avantis_utils::kafka::consumer;
    use avantis_utils::kafka::consumer::ConsumerExt;
    use avantis_utils::kafka::producer::with_trace_header;
    use avantis_utils::kafka::KafkaConfig;
    use avantis_utils::kafka::ProtobufKafkaMessage;
    use avantis_utils::kafka::ProtobufKafkaRecord;
    use avantis_utils::telemetry::TelemetrySetting;
    use futures_lite::{pin, StreamExt};
    use once_cell::sync::Lazy;
    use opentelemetry::trace::TraceContextExt;
    use prost;
    use prost::Message;
    use rdkafka::consumer::CommitMode;
    use rdkafka::consumer::Consumer;
    use rdkafka::consumer::StreamConsumer;
    use rdkafka::producer::FutureProducer;
    use rdkafka::producer::FutureRecord;
    use rdkafka::util::Timeout;
    use tracing;
    use tracing_opentelemetry::OpenTelemetrySpanExt;

    #[derive(Clone, PartialEq, Message)]
    pub struct ProtobufMessage {
        #[prost(string, tag = "1")]
        pub message: prost::alloc::string::String,
    }

    impl From<ProtobufMessage> for ProtobufKafkaMessage {
        fn from(protobuf_message: ProtobufMessage) -> Self {
            Self {
                key: protobuf_message.message.clone(),
                value: protobuf_message.encode_to_vec().into(),
            }
        }
    }

    static SETTINGS: Lazy<ExampleSettings> =
        Lazy::new(|| ExampleSettings::load(Environment::Develop).unwrap());

    #[tracing::instrument(name = "kafk_simple::main")]
    pub async fn main() -> Result<()> {
        SETTINGS.telemetry.init_telemetry(env!("CARGO_PKG_NAME"))?;
        let kafka_agent: FutureProducer = SETTINGS.kafka.producer_config()?;
        producer(&kafka_agent).await?;
        producer(&kafka_agent).await?;
        producer(&kafka_agent).await?;
        consumer().await?;

        Ok(())
    }

    #[tracing::instrument(name = "kafk_simple::check_msg", fields(span.kind = "consumer"))]
    async fn check_msg(msg: ProtobufMessage) -> Result<()> {
        println!("Checking messages {}", msg.message);
        check_msg_inner();
        Ok(())
    }

    #[tracing::instrument(name = "kafk_simple::check_msg2")]
    fn check_msg_inner() {
        println!("Checking messages 2");
    }

    #[tracing::instrument(skip_all, name = "kafk_simple::consumer")]
    async fn consumer() -> Result<(), anyhow::Error> {
        let kafka_consumer: StreamConsumer = SETTINGS
            .kafka
            .consumer_config(&SETTINGS.kafka_consumer_group)?;

        kafka_consumer.subscribe(&[&SETTINGS.kafka_topic])?;
        let stream = kafka_consumer
            .stream()
            .map(|message| {
                let result = kafka_consumer.process_protobuf_and_commit(
                    message,
                    check_msg,
                    CommitMode::Sync,
                );
                result
            })
            .map(|future_result| async move {
                future_result.await.unwrap_or_else(consumer::process_error)
            });
        pin!(stream);
        Ok(while let Some(future) = stream.next().await {
            future.await;
        })
    }

    #[tracing::instrument(skip_all, name = "kafk_simple::producer")]
    async fn producer(future_producer: &FutureProducer) -> Result<(), anyhow::Error> {
        let trace_id = tracing::Span::current()
            .context()
            .span()
            .span_context()
            .trace_id();
        println!("producer trace id : {}", trace_id);
        let record = ProtobufKafkaRecord {
            topic: &SETTINGS.kafka_topic,
            message: ProtobufMessage {
                message: "test message".to_string(),
            }
            .into(),
        };
        let record: FutureRecord<String, [u8]> = FutureRecord::from(&record);
        let result = future_producer
            .send(with_trace_header(record)?, Timeout::Never)
            .await
            .map_err(|e| anyhow!("Error occur while produce kafka message cause: {:?}", e));
        println!("result {:?}", result);
        Ok(())
    }

    #[derive(Clone, Debug, PartialEq, serde::Deserialize)]
    struct ExampleSettings {
        pub telemetry: TelemetrySetting,
        kafka: KafkaConfig,
        kafka_topic: String,
        kafka_consumer_group: String,
    }

    impl ExampleSettings {
        fn load(environment: Environment) -> anyhow::Result<Self> {
            load_config(environment)
        }
    }
}

#[cfg(not(all(feature = "config", feature = "kafka")))]
mod inner {
    use super::*;

    pub fn main() {
        println!("Please pass --features config,kafka to cargo when trying this example.");
    }
}