use super::Client;
use crate::error::Result;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Message {
pub message_id: String,
pub url: String,
pub topic_name: Option<String>,
pub endpoint_name: Option<String>,
pub api: Option<String>,
pub key: Option<String>,
pub method: Option<String>,
pub header: Option<HashMap<String, Vec<String>>>,
pub body: Option<String>,
pub body_base64: Option<String>,
pub max_retries: Option<u32>,
pub retry_delay_expression: Option<String>,
pub not_before: Option<u64>,
pub created_at: u64,
pub callback: Option<String>,
pub failure_callback: Option<String>,
pub queue_name: Option<String>,
pub schedule_id: Option<String>,
pub caller_ip: Option<String>,
pub label: Option<String>,
pub flow_control_key: Option<String>,
pub rate: Option<u32>,
pub period: Option<u32>,
pub parallelism: Option<u32>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CancelResult {
pub cancelled: u64,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MessageFilter {
pub url_group_name: Option<String>,
pub queue_name: Option<String>,
pub url: Option<String>,
pub schedule_id: Option<String>,
pub caller_ip: Option<String>,
pub label: Option<String>,
pub flow_control_key: Option<String>,
pub from_date: Option<u64>,
pub to_date: Option<u64>,
pub count: Option<u32>,
}
pub struct MessagesApi<'a> {
pub(crate) client: &'a Client,
}
impl MessagesApi<'_> {
pub async fn get(&self, message_id: &str) -> Result<Message> {
self.client
.http
.send_json(
Method::GET,
&format!("v2/messages/{message_id}"),
&[],
None,
None,
)
.await
}
pub async fn cancel(&self, message_id: &str) -> Result<()> {
self.client
.http
.send_empty(
Method::DELETE,
&format!("v2/messages/{message_id}"),
&[],
None,
None,
)
.await
}
pub async fn cancel_many<I, S>(&self, message_ids: I) -> Result<CancelResult>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut query = Vec::new();
for message_id in message_ids {
query.push((String::from("messageIds"), message_id.as_ref().to_owned()));
}
self.client
.http
.send_json(Method::DELETE, "v2/messages", &query, None, None)
.await
}
pub async fn cancel_matching(&self, filter: &MessageFilter) -> Result<CancelResult> {
self.client
.http
.send_json(
Method::DELETE,
"v2/messages",
&message_filter_query(filter),
None,
None,
)
.await
}
pub async fn cancel_all(&self, count: Option<u32>) -> Result<CancelResult> {
let query = count
.map(|count| vec![(String::from("count"), count.to_string())])
.unwrap_or_default();
self.client
.http
.send_json(Method::DELETE, "v2/messages", &query, None, None)
.await
}
}
fn message_filter_query(filter: &MessageFilter) -> Vec<(String, String)> {
let mut query = Vec::new();
if let Some(url_group_name) = &filter.url_group_name {
query.push((String::from("topicName"), url_group_name.clone()));
}
if let Some(queue_name) = &filter.queue_name {
query.push((String::from("queueName"), queue_name.clone()));
}
if let Some(url) = &filter.url {
query.push((String::from("url"), url.clone()));
}
if let Some(schedule_id) = &filter.schedule_id {
query.push((String::from("scheduleId"), schedule_id.clone()));
}
if let Some(caller_ip) = &filter.caller_ip {
query.push((String::from("callerIp"), caller_ip.clone()));
}
if let Some(label) = &filter.label {
query.push((String::from("label"), label.clone()));
}
if let Some(flow_control_key) = &filter.flow_control_key {
query.push((String::from("flowControlKey"), flow_control_key.clone()));
}
if let Some(from_date) = filter.from_date {
query.push((String::from("fromDate"), from_date.to_string()));
}
if let Some(to_date) = filter.to_date {
query.push((String::from("toDate"), to_date.to_string()));
}
if let Some(count) = filter.count {
query.push((String::from("count"), count.to_string()));
}
query
}