use azure_core::{
http::{ClientOptions, InstrumentationOptions},
tracing::TracerProvider,
};
use azure_core_opentelemetry::OpenTelemetryTracerProvider;
use azure_identity::AzureCliCredential;
use azure_storage_queue::{models::QueueMessage, QueueServiceClient, QueueServiceClientOptions};
use clap::Parser;
use opentelemetry_sdk::trace::SdkTracerProvider;
use std::{env, sync::Arc};
use tracing_subscriber::EnvFilter;
#[derive(Parser)]
struct Args {
#[arg(long)]
otel: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let otel_enabled = args.otel;
let default_level = if otel_enabled { "warn" } else { "trace" };
let env_filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));
println!("RUST_LOG filter: {}", env_filter);
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_writer(std::io::stderr) .init();
let otel_provider = if otel_enabled {
println!("OpenTelemetry tracing ENABLED (--otel flag)");
Some(Arc::new(
SdkTracerProvider::builder()
.with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
.build(),
))
} else {
println!("OpenTelemetry tracing DISABLED (pass --otel to enable)");
None
};
let account = env::var("AZURE_QUEUE_STORAGE_ACCOUNT_NAME")
.expect("Set AZURE_QUEUE_STORAGE_ACCOUNT_NAME environment variable");
let queue_name = "test-logging-queue";
let message_text = "Hello from azure_storage_queue logging example!";
println!("Authenticating with Azure CLI...");
let credential = AzureCliCredential::new(None)?;
let endpoint = format!("https://{}.queue.core.windows.net", account);
let client_options = QueueServiceClientOptions {
client_options: ClientOptions {
instrumentation: InstrumentationOptions {
tracer_provider: otel_provider.as_ref().map(|p| {
OpenTelemetryTracerProvider::new(p.clone()) as Arc<dyn TracerProvider>
}),
},
..Default::default()
},
..Default::default()
};
let service_client =
QueueServiceClient::new(&endpoint, Some(credential), Some(client_options))?;
let queue_client = service_client.queue_client(queue_name)?;
println!("\nCreating queue '{}'...", queue_name);
if queue_client.exists().await? {
println!("Queue already exists, continuing...");
} else {
queue_client.create(None).await?;
println!("Queue created successfully");
}
println!("\nSending message to queue '{}'...", queue_name);
let message = QueueMessage {
message_text: Some(message_text.to_string()),
};
let sent = queue_client.send_message(message.try_into()?, None).await?;
let sent_message = sent.into_model()?;
println!(
"Message sent. ID: {}",
sent_message.message_id.as_deref().unwrap_or("")
);
println!("\nReceiving messages from queue '{}'...", queue_name);
let received = queue_client.receive_messages(None).await?;
let message_list = received.into_model()?;
let messages = message_list.items.unwrap_or_default();
println!("Received {} message(s)", messages.len());
for msg in &messages {
println!(
" Message ID: {}, Text: {}",
msg.message_id.as_deref().unwrap_or(""),
msg.message_text.as_deref().unwrap_or("")
);
}
let first = messages.first().ok_or("no messages received")?;
let message_id = first.message_id.as_deref().unwrap_or("");
let pop_receipt = first.pop_receipt.as_deref().unwrap_or("");
println!("\nDeleting message '{}'...", message_id);
queue_client
.delete_message(message_id, pop_receipt, None)
.await?;
println!("Message deleted successfully");
println!("\nDeleting queue '{}'...", queue_name);
queue_client.delete(None).await?;
println!("Queue deleted successfully");
if let Some(provider) = otel_provider {
let _ = provider.shutdown();
println!("\nOpenTelemetry spans flushed.");
}
println!(
"\nPass --otel to see OpenTelemetry spans. Use RUST_LOG=trace for detailed HTTP logs."
);
Ok(())
}