use azeventhubs::consumer::{
EventHubConsumerClient, EventHubConsumerClientOptions, EventPosition, ReadEventOptions,
};
use futures_util::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let _ = dotenv::from_filename(".env");
let connection_string = std::env::var("EVENT_HUBS_CONNECTION_STRING")?;
let event_hub_name = std::env::var("EVENT_HUB_NAME")?;
let options = EventHubConsumerClientOptions::default();
let mut consumer_client = EventHubConsumerClient::new_from_connection_string(
EventHubConsumerClient::DEFAULT_CONSUMER_GROUP_NAME,
connection_string,
event_hub_name,
options,
)
.await?;
let partition_ids = consumer_client.get_partition_ids().await?;
let starting_position = EventPosition::earliest();
let options = ReadEventOptions::default();
let mut stream = consumer_client
.read_events_from_partition(&partition_ids[0], starting_position, options)
.await?;
let mut counter = 0;
while let Some(event) = stream.next().await {
let event = event?;
let body = event.body()?;
let value = std::str::from_utf8(body)?;
log::info!("{:?}", value);
log::info!("counter: {}", counter);
counter += 1;
if counter > 3000 {
break;
}
}
stream.close().await?;
consumer_client.close().await?;
Ok(())
}