use crate::{
generated::models::{
PeekedMessages, QueueClientClearOptions, QueueClientCreateOptions,
QueueClientDeleteMessageOptions, QueueClientDeleteOptions,
QueueClientGetAccessPolicyOptions, QueueClientGetPropertiesOptions,
QueueClientGetPropertiesResult, QueueClientPeekMessagesOptions,
QueueClientReceiveMessagesOptions, QueueClientSendMessageOptions,
QueueClientSetAccessPolicyOptions, QueueClientSetMetadataOptions,
QueueClientUpdateMessageOptions, QueueMessage, ReceivedMessages, SignedIdentifiers,
},
models::SentMessage,
};
use azure_core::{
error::CheckSuccessOptions,
fmt::SafeDebug,
http::{
ClientOptions, Method, NoFormat, Pipeline, PipelineSendOptions, Request, RequestContent,
Response, Url, UrlExt, XmlFormat,
},
tracing, Result,
};
use std::collections::HashMap;
#[tracing::client]
pub struct QueueClient {
pub(crate) endpoint: Url,
pub(crate) pipeline: Pipeline,
pub(crate) version: String,
}
#[derive(Clone, SafeDebug)]
pub struct QueueClientOptions {
pub client_options: ClientOptions,
pub version: String,
}
impl QueueClient {
pub fn endpoint(&self) -> &Url {
&self.endpoint
}
#[tracing::function("Storage.Queues.QueueClient.clear")]
pub async fn clear(
&self,
options: Option<QueueClientClearOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
url.append_path("/messages");
let mut query_builder = url.query_builder();
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Delete);
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[204],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.create")]
pub async fn create(
&self,
options: Option<QueueClientCreateOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
let mut query_builder = url.query_builder();
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Put);
if let Some(metadata) = options.metadata.as_ref() {
for (k, v) in metadata {
request.insert_header(format!("x-ms-meta-{k}"), v);
}
}
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[201, 204],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.delete")]
pub async fn delete(
&self,
options: Option<QueueClientDeleteOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
let mut query_builder = url.query_builder();
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Delete);
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[204],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.deleteMessage")]
pub async fn delete_message(
&self,
message_id: &str,
pop_receipt: &str,
options: Option<QueueClientDeleteMessageOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
if message_id.is_empty() {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
"parameter message_id cannot be empty",
));
}
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
let mut path = String::from("/messages/{messageId}");
path = path.replace("{messageId}", message_id);
url.append_path(&path);
let mut query_builder = url.query_builder();
query_builder.set_pair("popreceipt", pop_receipt);
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Delete);
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[204],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.getAccessPolicy")]
pub async fn get_access_policy(
&self,
options: Option<QueueClientGetAccessPolicyOptions<'_>>,
) -> Result<Response<SignedIdentifiers, XmlFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
let mut query_builder = url.query_builder();
query_builder.append_pair("comp", "acl");
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Get);
request.insert_header("accept", "application/xml");
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[200],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.getProperties")]
pub async fn get_properties(
&self,
options: Option<QueueClientGetPropertiesOptions<'_>>,
) -> Result<Response<QueueClientGetPropertiesResult, NoFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
let mut query_builder = url.query_builder();
query_builder.append_pair("comp", "metadata");
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Get);
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[200],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.peekMessages")]
pub async fn peek_messages(
&self,
options: Option<QueueClientPeekMessagesOptions<'_>>,
) -> Result<Response<PeekedMessages, XmlFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
url.append_path("/messages");
let mut query_builder = url.query_builder();
query_builder.append_pair("peekonly", "true");
if let Some(number_of_messages) = options.number_of_messages {
query_builder.set_pair("numofmessages", number_of_messages.to_string());
}
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Get);
request.insert_header("accept", "application/xml");
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[200],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.receiveMessages")]
pub async fn receive_messages(
&self,
options: Option<QueueClientReceiveMessagesOptions<'_>>,
) -> Result<Response<ReceivedMessages, XmlFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
url.append_path("/messages");
let mut query_builder = url.query_builder();
if let Some(number_of_messages) = options.number_of_messages {
query_builder.set_pair("numofmessages", number_of_messages.to_string());
}
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
if let Some(visibility_timeout) = options.visibility_timeout {
query_builder.set_pair("visibilitytimeout", visibility_timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Get);
request.insert_header("accept", "application/xml");
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[200],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.sendMessage")]
pub async fn send_message(
&self,
queue_message: RequestContent<QueueMessage, XmlFormat>,
options: Option<QueueClientSendMessageOptions<'_>>,
) -> Result<Response<SentMessage, XmlFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
url.append_path("/messages");
let mut query_builder = url.query_builder();
if let Some(message_time_to_live) = options.message_time_to_live {
query_builder.set_pair("messagettl", message_time_to_live.to_string());
}
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
if let Some(visibility_timeout) = options.visibility_timeout {
query_builder.set_pair("visibilitytimeout", visibility_timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Post);
request.insert_header("accept", "application/xml");
request.insert_header("content-type", "application/xml");
request.insert_header("x-ms-version", &self.version);
request.set_body(queue_message);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[201],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.setAccessPolicy")]
pub async fn set_access_policy(
&self,
queue_acl: RequestContent<SignedIdentifiers, XmlFormat>,
options: Option<QueueClientSetAccessPolicyOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
let mut query_builder = url.query_builder();
query_builder.append_pair("comp", "acl");
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Put);
request.insert_header("x-ms-version", &self.version);
request.insert_header("content-type", "application/xml");
request.set_body(queue_acl);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[204],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.setMetadata")]
pub async fn set_metadata(
&self,
metadata: &HashMap<String, String>,
options: Option<QueueClientSetMetadataOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
let mut query_builder = url.query_builder();
query_builder.append_pair("comp", "metadata");
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.build();
let mut request = Request::new(url, Method::Put);
for (k, v) in metadata {
request.insert_header(format!("x-ms-meta-{k}"), v);
}
request.insert_header("x-ms-version", &self.version);
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[204],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
#[tracing::function("Storage.Queues.QueueClient.updateMessage")]
pub async fn update_message(
&self,
message_id: &str,
pop_receipt: &str,
visibility_timeout: i32,
options: Option<QueueClientUpdateMessageOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
if message_id.is_empty() {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
"parameter message_id cannot be empty",
));
}
let options = options.unwrap_or_default();
let ctx = options.method_options.context.to_borrowed();
let mut url = self.endpoint.clone();
let mut path = String::from("/messages/{messageId}");
path = path.replace("{messageId}", message_id);
url.append_path(&path);
let mut query_builder = url.query_builder();
query_builder.set_pair("popreceipt", pop_receipt);
if let Some(timeout) = options.timeout {
query_builder.set_pair("timeout", timeout.to_string());
}
query_builder.set_pair("visibilitytimeout", visibility_timeout.to_string());
query_builder.build();
let mut request = Request::new(url, Method::Put);
request.insert_header("x-ms-version", &self.version);
if let Some(queue_message) = options.queue_message.clone() {
request.insert_header("content-type", "application/xml");
request.set_body(queue_message);
}
let rsp = self
.pipeline
.send(
&ctx,
&mut request,
Some(PipelineSendOptions {
check_success: CheckSuccessOptions {
success_codes: &[204],
},
..Default::default()
}),
)
.await?;
Ok(rsp.into())
}
}
pub(crate) const DEFAULT_VERSION: &str = "2026-04-06";
impl Default for QueueClientOptions {
fn default() -> Self {
Self {
client_options: ClientOptions::default(),
version: String::from(DEFAULT_VERSION),
}
}
}