mod common;
use azure_core::{http::StatusCode, time::OffsetDateTime, Result};
use azure_core_test::{recorded, Recording, TestContext, TestMode};
use azure_storage_queue::{
models::{
CorsRule, GeoReplicationStatus, ListQueuesIncludeType, Logging, Metrics,
QueueServiceClientListQueuesOptions, QueueServiceProperties, RetentionPolicy,
},
QueueServiceClient, QueueServiceClientOptions,
};
use common::{get_queue_name, recorded_test_setup};
use futures::StreamExt;
use std::{collections::HashMap, time::Duration};
use tokio::time;
#[recorded::test]
async fn test_create_queue(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
let response = queue_service_client
.queue_client(&queue_name)?
.create(None)
.await?;
queue_service_client
.queue_client(&queue_name)?
.delete(None)
.await
.unwrap();
assert!(
response.status().is_success(),
"Expected successful status code, got {}",
response.status()
);
Ok(())
}
#[recorded::test]
async fn test_delete_queue(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
queue_service_client
.queue_client(&queue_name)?
.create(None)
.await?;
let response = queue_service_client
.queue_client(&queue_name)?
.delete(None)
.await?;
assert!(
response.status() == StatusCode::NoContent,
"Expected status code 204, got {}",
response.status(),
);
Ok(())
}
#[recorded::test]
async fn test_get_queue_properties(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let response = queue_service_client.get_properties(None).await?;
assert!(
response.status() == StatusCode::Ok,
"Expected status code 200, got {}",
response.status(),
);
Ok(())
}
#[recorded::test]
async fn test_set_queue_properties(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let properties = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let response = queue_service_client
.set_properties(properties.try_into()?, None)
.await?;
assert!(
response.status() == StatusCode::Accepted,
"Expected status code 202, got {}",
response.status(),
);
Ok(())
}
#[recorded::test]
pub async fn test_list_queues(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
let queue_name_b = format!("{queue_name}-b");
let queue_name_c = format!("{queue_name}-c");
for name in [&queue_name, &queue_name_b, &queue_name_c] {
queue_service_client
.queue_client(name)?
.create(None)
.await?;
}
let options = QueueServiceClientListQueuesOptions {
maxresults: Some(1),
..Default::default()
};
let mut page_iterator = queue_service_client
.list_queues(Some(options))?
.into_pages();
let mut all_queue_names = Vec::new();
while let Some(page) = page_iterator.next().await {
let response = page?;
let queue_list = response.into_model()?;
for queue_item in &queue_list.queue_items {
if let Some(queue_name_found) = &queue_item.name {
all_queue_names.push(queue_name_found.clone());
}
}
}
assert!(
all_queue_names.contains(&queue_name),
"Expected queue '{}' to be found in the list of queues: {:?}",
queue_name,
all_queue_names
);
let first_page_options = QueueServiceClientListQueuesOptions {
prefix: Some(queue_name.clone()),
maxresults: Some(1),
..Default::default()
};
let mut pager = queue_service_client
.list_queues(Some(first_page_options))?
.into_pages();
let first_page = pager
.next()
.await
.expect("Expected at least one page")?
.into_model()?;
assert_eq!(
first_page.queue_items.len(),
1,
"Expected first page to contain exactly 1 queue"
);
let marker = first_page
.next_marker
.clone()
.expect("Expected a next_marker to be present after first page");
let second_page_options = QueueServiceClientListQueuesOptions {
prefix: Some(queue_name.clone()),
maxresults: Some(1),
marker: Some(marker.clone()),
..Default::default()
};
let mut pager2 = queue_service_client
.list_queues(Some(second_page_options))?
.into_pages();
let second_page = pager2
.next()
.await
.expect("Expected second page")?
.into_model()?;
assert_eq!(
second_page.queue_items.len(),
1,
"Expected second page to contain exactly 1 queue"
);
let first_name = first_page.queue_items[0]
.name
.as_deref()
.expect("Expected queue name");
let second_name = second_page.queue_items[0]
.name
.as_deref()
.expect("Expected queue name");
assert_ne!(
first_name, second_name,
"Expected the two pages to return different queues"
);
for name in [&queue_name, &queue_name_b, &queue_name_c] {
queue_service_client
.queue_client(name)?
.delete(None)
.await
.unwrap();
}
Ok(())
}
#[recorded::test]
pub async fn test_list_queues_with_prefix(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
let queue_name_a = format!("{queue_name}-a");
let queue_name_b = format!("{queue_name}-b");
queue_service_client
.queue_client(&queue_name_a)?
.create(None)
.await?;
queue_service_client
.queue_client(&queue_name_b)?
.create(None)
.await?;
let test_result = async {
let options = QueueServiceClientListQueuesOptions {
prefix: Some(queue_name.clone()),
..Default::default()
};
let mut page_iterator = queue_service_client
.list_queues(Some(options))?
.into_pages();
let mut returned_names = Vec::new();
while let Some(page) = page_iterator.next().await {
let queue_list = page?.into_model()?;
for queue_item in &queue_list.queue_items {
if let Some(name) = &queue_item.name {
returned_names.push(name.clone());
}
}
}
assert!(
returned_names.contains(&queue_name_a),
"Expected '{}' in results: {:?}",
queue_name_a,
returned_names
);
assert!(
returned_names.contains(&queue_name_b),
"Expected '{}' in results: {:?}",
queue_name_b,
returned_names
);
for name in &returned_names {
assert!(
name.starts_with(&queue_name),
"Queue '{}' does not start with prefix '{}'",
name,
queue_name
);
}
Ok::<(), azure_core::Error>(())
}
.await;
queue_service_client
.queue_client(&queue_name_a)?
.delete(None)
.await
.unwrap();
queue_service_client
.queue_client(&queue_name_b)?
.delete(None)
.await
.unwrap();
test_result?;
Ok(())
}
#[recorded::test]
pub async fn test_list_queues_include_metadata(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let queue_name = get_queue_name(recording);
queue_service_client
.queue_client(&queue_name)?
.create(None)
.await?;
let test_result = async {
let metadata = HashMap::from([("env".to_string(), "test".to_string())]);
queue_service_client
.queue_client(&queue_name)?
.set_metadata(&metadata, None)
.await?;
let options = QueueServiceClientListQueuesOptions {
prefix: Some(queue_name.clone()),
include: Some(vec![ListQueuesIncludeType::Metadata]),
..Default::default()
};
let mut page_iterator = queue_service_client
.list_queues(Some(options))?
.into_pages();
let mut found_queue = None;
while let Some(page) = page_iterator.next().await {
let queue_list = page?.into_model()?;
for queue_item in queue_list.queue_items {
if queue_item.name.as_deref() == Some(&queue_name) {
found_queue = Some(queue_item);
break;
}
}
if found_queue.is_some() {
break;
}
}
let found = found_queue.expect("Expected to find test queue in list");
let returned_metadata = found
.metadata
.expect("Expected metadata to be present when include=metadata");
assert_eq!(
returned_metadata.get("env").map(String::as_str),
Some("test"),
"Expected metadata key 'env' with value 'test'"
);
Ok::<(), azure_core::Error>(())
}
.await;
queue_service_client
.queue_client(&queue_name)?
.delete(None)
.await
.unwrap();
test_result?;
Ok(())
}
#[recorded::test]
pub async fn test_get_queue_statistics(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client_secondary(recording).await?;
let response = queue_service_client.get_statistics(None).await?;
assert!(
response.status() == StatusCode::Ok,
"Expected status code 200, got {}",
response.status(),
);
let stats = response.into_model()?;
let geo_replication = stats.geo_replication.as_ref().unwrap();
assert!(
geo_replication.status.as_ref().unwrap() == &GeoReplicationStatus::Live,
"Geo-replication status should be Live"
);
assert!(
geo_replication.last_sync_time.unwrap()
> OffsetDateTime::from_unix_timestamp(1748728800).unwrap(),
"Last sync time should be after 2025-06-01T00:00:00Z"
);
Ok(())
}
#[recorded::test(playback)]
async fn test_set_service_properties(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let original = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let test_result = async {
let props = QueueServiceProperties {
logging: Some(Logging {
version: Some("1.0".to_string()),
delete: Some(true),
read: Some(true),
write: Some(true),
retention_policy: Some(RetentionPolicy {
enabled: Some(true),
days: Some(5),
}),
}),
hour_metrics: Some(Metrics {
version: Some("1.0".to_string()),
enabled: Some(true),
include_apis: Some(true),
retention_policy: Some(RetentionPolicy {
enabled: Some(true),
days: Some(5),
}),
}),
minute_metrics: Some(Metrics {
version: Some("1.0".to_string()),
enabled: Some(true),
include_apis: Some(true),
retention_policy: Some(RetentionPolicy {
enabled: Some(true),
days: Some(5),
}),
}),
cors: Some(vec![
CorsRule {
allowed_origins: Some("http://www.contoso.com".to_string()),
allowed_methods: Some("GET,PUT".to_string()),
allowed_headers: Some("x-ms-meta-*".to_string()),
exposed_headers: Some("x-ms-meta-data*".to_string()),
max_age_in_seconds: Some(3600),
},
CorsRule {
allowed_origins: Some("http://www.fabrikam.com".to_string()),
allowed_methods: Some("POST".to_string()),
allowed_headers: Some("Content-Type".to_string()),
exposed_headers: Some("Content-Length".to_string()),
max_age_in_seconds: Some(1800),
},
]),
};
queue_service_client
.set_properties(props.try_into()?, None)
.await?;
if recording.test_mode() == TestMode::Live || recording.test_mode() == TestMode::Record {
time::sleep(Duration::from_secs(15)).await;
}
let updated = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let logging = updated.logging.as_ref().expect("Expected logging settings");
assert_eq!(logging.delete, Some(true), "Expected delete logging = true");
assert_eq!(logging.read, Some(true), "Expected read logging = true");
assert_eq!(logging.write, Some(true), "Expected write logging = true");
let rp = logging
.retention_policy
.as_ref()
.expect("Expected retention policy on logging");
assert_eq!(rp.enabled, Some(true), "Expected logging retention enabled");
assert_eq!(rp.days, Some(5), "Expected logging retention days = 5");
let hm = updated
.hour_metrics
.as_ref()
.expect("Expected hour_metrics settings");
assert_eq!(
hm.enabled,
Some(true),
"Expected hour_metrics enabled = true"
);
assert_eq!(
hm.include_apis,
Some(true),
"Expected hour_metrics include_apis = true"
);
let rp = hm
.retention_policy
.as_ref()
.expect("Expected retention policy on hour_metrics");
assert_eq!(
rp.enabled,
Some(true),
"Expected hour_metrics retention enabled"
);
assert_eq!(rp.days, Some(5), "Expected hour_metrics retention days = 5");
let mm = updated
.minute_metrics
.as_ref()
.expect("Expected minute_metrics settings");
assert_eq!(
mm.enabled,
Some(true),
"Expected minute_metrics enabled = true"
);
assert_eq!(
mm.include_apis,
Some(true),
"Expected minute_metrics include_apis = true"
);
let rp = mm
.retention_policy
.as_ref()
.expect("Expected retention policy on minute_metrics");
assert_eq!(
rp.enabled,
Some(true),
"Expected minute_metrics retention enabled"
);
assert_eq!(
rp.days,
Some(5),
"Expected minute_metrics retention days = 5"
);
let returned = updated.cors.as_ref().expect("Expected CORS rules");
assert_eq!(returned.len(), 2, "Expected exactly 2 CORS rules");
assert_eq!(
returned[0].allowed_origins.as_deref(),
Some("http://www.contoso.com")
);
assert_eq!(returned[0].allowed_methods.as_deref(), Some("GET,PUT"));
assert_eq!(returned[0].max_age_in_seconds, Some(3600));
assert_eq!(
returned[1].allowed_origins.as_deref(),
Some("http://www.fabrikam.com")
);
assert_eq!(returned[1].allowed_methods.as_deref(), Some("POST"));
assert_eq!(returned[1].max_age_in_seconds, Some(1800));
Ok::<(), azure_core::Error>(())
}
.await;
queue_service_client
.set_properties(original.try_into()?, None)
.await
.unwrap();
test_result?;
Ok(())
}
#[recorded::test]
async fn test_set_cors_too_many_rules(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let original = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let test_result = async {
let rule = CorsRule {
allowed_origins: Some("http://example.com".to_string()),
allowed_methods: Some("GET".to_string()),
allowed_headers: Some("*".to_string()),
exposed_headers: Some("*".to_string()),
max_age_in_seconds: Some(60),
};
let props = QueueServiceProperties {
cors: Some(vec![
rule.clone(),
rule.clone(),
rule.clone(),
rule.clone(),
rule.clone(),
rule.clone(),
]),
..original.clone()
};
let err = queue_service_client
.set_properties(props.try_into()?, None)
.await
.err()
.unwrap();
assert_eq!(
err.http_status(),
Some(StatusCode::BadRequest),
"Expected 400 Bad Request for too many CORS rules, got {:?}",
err.http_status()
);
Ok::<(), azure_core::Error>(())
}
.await;
test_result?;
Ok(())
}
#[recorded::test]
async fn test_set_retention_too_long(ctx: TestContext) -> Result<()> {
let recording = ctx.recording();
let queue_service_client = get_queue_service_client(recording).await?;
let original = queue_service_client
.get_properties(None)
.await?
.into_model()?;
let test_result = async {
let props = QueueServiceProperties {
logging: Some(Logging {
version: Some("1.0".to_string()),
delete: Some(false),
read: Some(false),
write: Some(false),
retention_policy: Some(RetentionPolicy {
enabled: Some(true),
days: Some(366),
}),
}),
..original.clone()
};
let err = queue_service_client
.set_properties(props.try_into()?, None)
.await
.err()
.unwrap();
assert_eq!(
err.http_status(),
Some(StatusCode::BadRequest),
"Expected 400 Bad Request for retention days > 365, got {:?}",
err.http_status()
);
Ok::<(), azure_core::Error>(())
}
.await;
test_result?;
Ok(())
}
pub async fn get_queue_service_client(recording: &Recording) -> Result<QueueServiceClient> {
let (options, endpoint, _) = recorded_test_setup(recording);
let queue_client_options = QueueServiceClientOptions {
client_options: options.clone(),
..Default::default()
};
let queue_client = QueueServiceClient::new(
&endpoint,
Some(recording.credential()),
Some(queue_client_options),
)?;
Ok(queue_client)
}
pub async fn get_queue_service_client_secondary(
recording: &Recording,
) -> Result<QueueServiceClient> {
let (options, _, endpoint) = recorded_test_setup(recording);
let queue_client_options = QueueServiceClientOptions {
client_options: options.clone(),
..Default::default()
};
let queue_client = QueueServiceClient::new(
&endpoint,
Some(recording.credential()),
Some(queue_client_options),
)?;
Ok(queue_client)
}