use std::collections::HashMap;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use super::{request::PublishedMessage, Client};
use crate::error::Result;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DlqMessage {
pub message_id: String,
pub url: String,
pub topic_name: Option<String>,
pub endpoint_name: Option<String>,
pub method: Option<String>,
pub header: Option<HashMap<String, Vec<String>>>,
pub body: Option<String>,
pub body_base64: Option<String>,
pub max_retries: Option<u32>,
pub not_before: Option<u64>,
pub created_at: u64,
pub callback: Option<String>,
pub failure_callback: Option<String>,
pub queue_name: Option<String>,
pub schedule_id: Option<String>,
pub caller_ip: Option<String>,
pub label: Option<String>,
pub flow_control_key: Option<String>,
pub rate: Option<u32>,
pub period: Option<u32>,
pub parallelism: Option<u32>,
pub response_status: Option<u16>,
pub dlq_id: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DlqPage {
pub cursor: Option<String>,
pub messages: Vec<DlqMessage>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DlqDeleteResult {
pub cursor: Option<String>,
pub deleted: u64,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct DlqFilter {
pub cursor: Option<String>,
pub message_id: Option<String>,
pub url: Option<String>,
pub url_group_name: Option<String>,
pub schedule_id: Option<String>,
pub queue_name: Option<String>,
pub response_status: Option<u16>,
pub label: Option<String>,
pub flow_control_key: Option<String>,
pub from_date: Option<u64>,
pub to_date: Option<u64>,
pub count: Option<u32>,
}
pub struct DlqApi<'a> {
pub(crate) client: &'a Client,
}
impl DlqApi<'_> {
pub async fn list(&self, filter: &DlqFilter) -> Result<DlqPage> {
self.client
.http
.send_json(Method::GET, "v2/dlq", &dlq_filter_query(filter), None, None)
.await
}
pub async fn get(&self, dlq_id: &str) -> Result<DlqMessage> {
self.client
.http
.send_json(Method::GET, &format!("v2/dlq/{dlq_id}"), &[], None, None)
.await
}
pub async fn delete(&self, dlq_id: &str) -> Result<()> {
self.client
.http
.send_empty(Method::DELETE, &format!("v2/dlq/{dlq_id}"), &[], None, None)
.await
}
pub async fn delete_many<I, S>(&self, dlq_ids: I) -> Result<DlqDeleteResult>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut query = Vec::new();
for dlq_id in dlq_ids {
query.push((String::from("dlqIds"), dlq_id.as_ref().to_owned()));
}
self.client
.http
.send_json(Method::DELETE, "v2/dlq", &query, None, None)
.await
}
pub async fn delete_matching(&self, filter: &DlqFilter) -> Result<DlqDeleteResult> {
self.client
.http
.send_json(
Method::DELETE,
"v2/dlq",
&dlq_filter_query(filter),
None,
None,
)
.await
}
pub async fn retry(&self, dlq_id: &str) -> Result<PublishedMessage> {
self.client
.http
.send_json(
Method::POST,
&format!("v2/dlq/retry/{dlq_id}"),
&[],
None,
None,
)
.await
}
}
fn dlq_filter_query(filter: &DlqFilter) -> Vec<(String, String)> {
let mut query = Vec::new();
if let Some(cursor) = &filter.cursor {
query.push((String::from("cursor"), cursor.clone()));
}
if let Some(message_id) = &filter.message_id {
query.push((String::from("messageId"), message_id.clone()));
}
if let Some(url) = &filter.url {
query.push((String::from("url"), url.clone()));
}
if let Some(url_group_name) = &filter.url_group_name {
query.push((String::from("topicName"), url_group_name.clone()));
}
if let Some(schedule_id) = &filter.schedule_id {
query.push((String::from("scheduleId"), schedule_id.clone()));
}
if let Some(queue_name) = &filter.queue_name {
query.push((String::from("queueName"), queue_name.clone()));
}
if let Some(response_status) = filter.response_status {
query.push((String::from("responseStatus"), response_status.to_string()));
}
if let Some(label) = &filter.label {
query.push((String::from("label"), label.clone()));
}
if let Some(flow_control_key) = &filter.flow_control_key {
query.push((String::from("flowControlKey"), flow_control_key.clone()));
}
if let Some(from_date) = filter.from_date {
query.push((String::from("fromDate"), from_date.to_string()));
}
if let Some(to_date) = filter.to_date {
query.push((String::from("toDate"), to_date.to_string()));
}
if let Some(count) = filter.count {
query.push((String::from("count"), count.to_string()));
}
query
}