mod common;
use azure_core::{http::StatusCode, time::OffsetDateTime, Result};
use azure_core_test::{recorded, Recording, TestContext};
use azure_storage_queue::{
models::{
CorsRule, GeoReplicationStatus, ListQueuesIncludeType, Logging, Metrics,
QueueServiceClientListQueuesOptions, QueueServiceProperties, RetentionPolicy,
},
QueueServiceClient, QueueServiceClientOptions,
};
use common::{get_queue_name, recorded_test_setup};
use futures::StreamExt;
use std::collections::HashMap;
#[recorded::test]
async fn test_create_queue(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
let response = queue_service_client
.queue_client(&queue_name)?
.create(None)
.await?;
queue_service_client
.queue_client(&queue_name)?
.delete(None)
.await
.unwrap();
assert!(
response.status().is_success(),
"Expected successful status code, got {}",
response.status()
);
Ok(())
}
#[recorded::test]
async fn test_delete_queue(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
queue_service_client
.queue_client(&queue_name)?
.create(None)
.await?;
let response = queue_service_client
.queue_client(&queue_name)?
.delete(None)
.await?;
assert!(
response.status() == StatusCode::NoContent,
"Expected status code 204, got {}",
response.status(),
);
Ok(())
}
#[recorded::test]
async fn test_get_queue_properties(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let response = queue_service_client.get_properties(None).await?;
assert!(
response.status() == StatusCode::Ok,
"Expected status code 200, got {}",
response.status(),
);
Ok(())
}
#[recorded::test]
async fn test_set_queue_properties(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let properties = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let response = queue_service_client
.set_properties(properties.try_into()?, None)
.await?;
assert!(
response.status() == StatusCode::Accepted,
"Expected status code 202, got {}",
response.status(),
);
Ok(())
}
#[recorded::test]
pub async fn test_list_queues(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
let queue_name_b = format!("{queue_name}-b");
let queue_name_c = format!("{queue_name}-c");
for name in [&queue_name, &queue_name_b, &queue_name_c] {
queue_service_client
.queue_client(name)?
.create(None)
.await?;
}
let options = QueueServiceClientListQueuesOptions {
maxresults: Some(1),
..Default::default()
};
let mut page_iterator = queue_service_client
.list_queues(Some(options))?
.into_pages();
let mut all_queue_names = Vec::new();
while let Some(page) = page_iterator.next().await {
let response = page?;
let queue_list = response.into_model()?;
for queue_item in &queue_list.queue_items {
if let Some(queue_name_found) = &queue_item.name {
all_queue_names.push(queue_name_found.clone());
}
}
}
assert!(
all_queue_names.contains(&queue_name),
"Expected queue '{}' to be found in the list of queues: {:?}",
queue_name,
all_queue_names
);
let first_page_options = QueueServiceClientListQueuesOptions {
prefix: Some(queue_name.clone()),
maxresults: Some(1),
..Default::default()
};
let mut pager = queue_service_client
.list_queues(Some(first_page_options))?
.into_pages();
let first_page = pager
.next()
.await
.expect("Expected at least one page")?
.into_model()?;
assert_eq!(
first_page.queue_items.len(),
1,
"Expected first page to contain exactly 1 queue"
);
let marker = first_page
.next_marker
.clone()
.expect("Expected a next_marker to be present after first page");
let second_page_options = QueueServiceClientListQueuesOptions {
prefix: Some(queue_name.clone()),
maxresults: Some(1),
marker: Some(marker.clone()),
..Default::default()
};
let mut pager2 = queue_service_client
.list_queues(Some(second_page_options))?
.into_pages();
let second_page = pager2
.next()
.await
.expect("Expected second page")?
.into_model()?;
assert_eq!(
second_page.queue_items.len(),
1,
"Expected second page to contain exactly 1 queue"
);
let first_name = first_page.queue_items[0]
.name
.as_deref()
.expect("Expected queue name");
let second_name = second_page.queue_items[0]
.name
.as_deref()
.expect("Expected queue name");
assert_ne!(
first_name, second_name,
"Expected the two pages to return different queues"
);
for name in [&queue_name, &queue_name_b, &queue_name_c] {
queue_service_client
.queue_client(name)?
.delete(None)
.await
.unwrap();
}
Ok(())
}
#[recorded::test]
pub async fn test_list_queues_with_prefix(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
let queue_name_a = format!("{queue_name}-a");
let queue_name_b = format!("{queue_name}-b");
queue_service_client
.queue_client(&queue_name_a)?
.create(None)
.await?;
queue_service_client
.queue_client(&queue_name_b)?
.create(None)
.await?;
let test_result = async {
let options = QueueServiceClientListQueuesOptions {
prefix: Some(queue_name.clone()),
..Default::default()
};
let mut page_iterator = queue_service_client
.list_queues(Some(options))?
.into_pages();
let mut returned_names = Vec::new();
while let Some(page) = page_iterator.next().await {
let queue_list = page?.into_model()?;
for queue_item in &queue_list.queue_items {
if let Some(name) = &queue_item.name {
returned_names.push(name.clone());
}
}
}
assert!(
returned_names.contains(&queue_name_a),
"Expected '{}' in results: {:?}",
queue_name_a,
returned_names
);
assert!(
returned_names.contains(&queue_name_b),
"Expected '{}' in results: {:?}",
queue_name_b,
returned_names
);
for name in &returned_names {
assert!(
name.starts_with(&queue_name),
"Queue '{}' does not start with prefix '{}'",
name,
queue_name
);
}
Ok::<(), azure_core::Error>(())
}
.await;
queue_service_client
.queue_client(&queue_name_a)?
.delete(None)
.await
.unwrap();
queue_service_client
.queue_client(&queue_name_b)?
.delete(None)
.await
.unwrap();
test_result?;
Ok(())
}
#[recorded::test]
pub async fn test_list_queues_include_metadata(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
queue_service_client
.queue_client(&queue_name)?
.create(None)
.await?;
let test_result = async {
let metadata = HashMap::from([("env".to_string(), "test".to_string())]);
queue_service_client
.queue_client(&queue_name)?
.set_metadata(&metadata, None)
.await?;
let options = QueueServiceClientListQueuesOptions {
prefix: Some(queue_name.clone()),
include: Some(vec![ListQueuesIncludeType::Metadata]),
..Default::default()
};
let mut page_iterator = queue_service_client
.list_queues(Some(options))?
.into_pages();
let mut found_queue = None;
while let Some(page) = page_iterator.next().await {
let queue_list = page?.into_model()?;
for queue_item in queue_list.queue_items {
if queue_item.name.as_deref() == Some(&queue_name) {
found_queue = Some(queue_item);
break;
}
}
if found_queue.is_some() {
break;
}
}
let found = found_queue.expect("Expected to find test queue in list");
let returned_metadata = found
.metadata
.expect("Expected metadata to be present when include=metadata");
assert_eq!(
returned_metadata.get("env").map(String::as_str),
Some("test"),
"Expected metadata key 'env' with value 'test'"
);
Ok::<(), azure_core::Error>(())
}
.await;
queue_service_client
.queue_client(&queue_name)?
.delete(None)
.await
.unwrap();
test_result?;
Ok(())
}
#[recorded::test(playback)]
pub async fn test_get_queue_statistics(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client_secondary(recording).await?;
let response = queue_service_client.get_statistics(None).await?;
assert!(
response.status() == StatusCode::Ok,
"Expected status code 200, got {}",
response.status(),
);
let stats = response.into_model()?;
let geo_replication = stats.geo_replication.as_ref().unwrap();
assert!(
geo_replication.status.as_ref().unwrap() == &GeoReplicationStatus::Live,
"Geo-replication status should be Live"
);
assert!(
geo_replication.last_sync_time.unwrap()
> OffsetDateTime::from_unix_timestamp(1748728800).unwrap(),
"Last sync time should be after 2025-06-01T00:00:00Z"
);
Ok(())
}
#[recorded::test]
async fn test_set_service_properties(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let original = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let test_result = async {
let props = QueueServiceProperties {
logging: Some(Logging {
version: Some("1.0".to_string()),
delete: Some(true),
read: Some(true),
write: Some(true),
retention_policy: Some(RetentionPolicy {
enabled: Some(true),
days: Some(5),
}),
}),
hour_metrics: Some(Metrics {
version: Some("1.0".to_string()),
enabled: Some(true),
include_apis: Some(true),
retention_policy: Some(RetentionPolicy {
enabled: Some(true),
days: Some(5),
}),
}),
minute_metrics: Some(Metrics {
version: Some("1.0".to_string()),
enabled: Some(true),
include_apis: Some(true),
retention_policy: Some(RetentionPolicy {
enabled: Some(true),
days: Some(5),
}),
}),
cors: Some(vec![
CorsRule {
allowed_origins: Some("http://www.contoso.com".to_string()),
allowed_methods: Some("GET,PUT".to_string()),
allowed_headers: Some("x-ms-meta-*".to_string()),
exposed_headers: Some("x-ms-meta-data*".to_string()),
max_age_in_seconds: Some(3600),
},
CorsRule {
allowed_origins: Some("http://www.fabrikam.com".to_string()),
allowed_methods: Some("POST".to_string()),
allowed_headers: Some("Content-Type".to_string()),
exposed_headers: Some("Content-Length".to_string()),
max_age_in_seconds: Some(1800),
},
]),
};
queue_service_client
.set_properties(props.try_into()?, None)
.await?;
Ok::<(), azure_core::Error>(())
}
.await;
queue_service_client
.set_properties(original.try_into()?, None)
.await
.unwrap();
test_result?;
Ok(())
}
#[recorded::test]
async fn test_set_cors_too_many_rules(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let original = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let test_result = async {
let rule = CorsRule {
allowed_origins: Some("http://example.com".to_string()),
allowed_methods: Some("GET".to_string()),
allowed_headers: Some("*".to_string()),
exposed_headers: Some("*".to_string()),
max_age_in_seconds: Some(60),
};
let props = QueueServiceProperties {
cors: Some(vec![
rule.clone(),
rule.clone(),
rule.clone(),
rule.clone(),
rule.clone(),
rule.clone(),
]),
..original.clone()
};
let err = queue_service_client
.set_properties(props.try_into()?, None)
.await
.err()
.unwrap();
assert_eq!(
err.http_status(),
Some(StatusCode::BadRequest),
"Expected 400 Bad Request for too many CORS rules, got {:?}",
err.http_status()
);
Ok::<(), azure_core::Error>(())
}
.await;
test_result?;
Ok(())
}
#[recorded::test]
async fn test_set_retention_too_long(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let original = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let test_result = async {
let props = QueueServiceProperties {
logging: Some(Logging {
version: Some("1.0".to_string()),
delete: Some(false),
read: Some(false),
write: Some(false),
retention_policy: Some(RetentionPolicy {
enabled: Some(true),
days: Some(366),
}),
}),
..original.clone()
};
let err = queue_service_client
.set_properties(props.try_into()?, None)
.await
.err()
.unwrap();
assert_eq!(
err.http_status(),
Some(StatusCode::BadRequest),
"Expected 400 Bad Request for retention days > 365, got {:?}",
err.http_status()
);
Ok::<(), azure_core::Error>(())
}
.await;
test_result?;
Ok(())
}
pub async fn get_queue_service_client(recording: &Recording) -> Result<QueueServiceClient> {
let (options, endpoint, _) = recorded_test_setup(recording);
let queue_client_options = QueueServiceClientOptions {
client_options: options.clone(),
..Default::default()
};
let queue_client = QueueServiceClient::new(
&endpoint,
Some(recording.credential()),
Some(queue_client_options),
)?;
Ok(queue_client)
}
pub async fn get_queue_service_client_secondary(
recording: &Recording,
) -> Result<QueueServiceClient> {
let (options, _, endpoint) = recorded_test_setup(recording);
let queue_client_options = QueueServiceClientOptions {
client_options: options.clone(),
..Default::default()
};
let queue_client = QueueServiceClient::new(
&endpoint,
Some(recording.credential()),
Some(queue_client_options),
)?;
Ok(queue_client)
}