use crate::consumer::Consumer;
use azservicebus::ServiceBusClient;
use azservicebus::core::BasicRetryPolicy;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct BulkOperationResult {
pub total_requested: usize,
pub successful: usize,
pub failed: usize,
pub not_found: usize,
pub error_details: Vec<String>,
pub successful_message_ids: Vec<MessageIdentifier>,
}
impl BulkOperationResult {
pub fn new(total_requested: usize) -> Self {
Self {
total_requested,
successful: 0,
failed: 0,
not_found: 0,
error_details: Vec::new(),
successful_message_ids: Vec::new(),
}
}
pub fn add_success(&mut self) {
self.successful += 1;
}
pub fn add_failure(&mut self, error: String) {
self.failed += 1;
self.error_details.push(error);
}
pub fn add_successful_message(&mut self, message_id: MessageIdentifier) {
self.successful += 1;
self.successful_message_ids.push(message_id.clone());
log::debug!(
"SUCCESS COUNT: Incremented to {} (added message: {})",
self.successful,
message_id.id
);
}
pub fn add_not_found(&mut self) {
self.not_found += 1;
}
pub fn is_complete_success(&self) -> bool {
self.successful == self.total_requested && self.failed == 0 && self.not_found == 0
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct MessageIdentifier {
pub id: String,
pub sequence: i64,
}
impl std::fmt::Display for MessageIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id)
}
}
impl MessageIdentifier {
pub fn new(id: String, sequence: i64) -> Self {
Self { id, sequence }
}
pub fn from_message(message: &crate::model::MessageModel) -> Self {
Self {
id: message.id.clone(),
sequence: message.sequence,
}
}
pub fn from_string(id: String) -> Self {
Self { id, sequence: 0 }
}
pub fn composite_key(&self) -> String {
format!("{}:{}", self.id, self.sequence)
}
}
impl From<String> for MessageIdentifier {
fn from(id: String) -> Self {
Self::from_string(id)
}
}
impl From<&str> for MessageIdentifier {
fn from(id: &str) -> Self {
Self::from_string(id.to_string())
}
}
impl From<MessageIdentifier> for String {
fn from(val: MessageIdentifier) -> Self {
val.id
}
}
impl PartialEq<String> for MessageIdentifier {
fn eq(&self, other: &String) -> bool {
&self.id == other
}
}
impl PartialEq<MessageIdentifier> for String {
fn eq(&self, other: &MessageIdentifier) -> bool {
self == &other.id
}
}
#[derive(Debug, Deserialize, Default, Clone)]
pub struct BatchConfig {
max_batch_size: Option<u32>,
operation_timeout_secs: Option<u64>,
bulk_chunk_size: Option<usize>,
bulk_processing_time_secs: Option<u64>,
lock_timeout_secs: Option<u64>,
max_messages_to_process: Option<usize>,
auto_reload_threshold: Option<usize>,
receive_timeout_secs: Option<u64>,
}
impl BatchConfig {
pub fn new(max_batch_size: u32, operation_timeout_secs: u64) -> Self {
Self {
max_batch_size: Some(max_batch_size),
operation_timeout_secs: Some(operation_timeout_secs),
bulk_chunk_size: None,
bulk_processing_time_secs: None,
lock_timeout_secs: None,
max_messages_to_process: None,
auto_reload_threshold: None,
receive_timeout_secs: None,
}
}
pub fn max_batch_size(&self) -> u32 {
self.max_batch_size.unwrap_or(500)
}
pub fn operation_timeout_secs(&self) -> u64 {
self.operation_timeout_secs.unwrap_or(600)
}
pub fn bulk_chunk_size(&self) -> usize {
self.bulk_chunk_size.unwrap_or(500)
}
pub fn bulk_processing_time_secs(&self) -> u64 {
self.bulk_processing_time_secs.unwrap_or(300)
}
pub fn lock_timeout_secs(&self) -> u64 {
self.lock_timeout_secs.unwrap_or(10)
}
pub fn max_messages_to_process(&self) -> usize {
self.max_messages_to_process.unwrap_or(10_000)
}
pub fn auto_reload_threshold(&self) -> usize {
self.auto_reload_threshold.unwrap_or(50)
}
pub fn receive_timeout_secs(&self) -> u64 {
self.receive_timeout_secs.unwrap_or(5)
}
}
#[derive(Debug, Clone)]
pub struct ServiceBusOperationContext {
pub consumer: Arc<Mutex<Consumer>>,
pub service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
pub main_queue_name: String,
pub cancel_token: CancellationToken,
}
impl ServiceBusOperationContext {
pub fn new(
consumer: Arc<Mutex<Consumer>>,
service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
main_queue_name: String,
) -> Self {
Self {
consumer,
service_bus_client,
main_queue_name,
cancel_token: CancellationToken::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct BulkSendParams {
pub target_queue: String,
pub should_delete: bool,
pub message_identifiers: Vec<MessageIdentifier>,
pub messages_data: Option<Vec<(MessageIdentifier, Vec<u8>)>>, pub max_position: usize, }
impl BulkSendParams {
pub fn with_retrieval(
target_queue: String,
should_delete: bool,
message_identifiers: Vec<MessageIdentifier>,
max_position: usize,
) -> Self {
Self {
target_queue,
should_delete,
message_identifiers,
messages_data: None,
max_position,
}
}
pub fn with_message_data(
target_queue: String,
should_delete: bool,
messages_data: Vec<(MessageIdentifier, Vec<u8>)>,
max_position: usize,
) -> Self {
let message_identifiers = messages_data.iter().map(|(id, _)| id.clone()).collect();
Self {
target_queue,
should_delete,
message_identifiers,
messages_data: Some(messages_data),
max_position,
}
}
pub fn with_max_position(
target_queue: String,
should_delete: bool,
message_identifiers: Vec<MessageIdentifier>,
max_position: usize,
) -> Self {
Self {
target_queue,
should_delete,
message_identifiers,
messages_data: None,
max_position,
}
}
}
#[derive(Debug, Clone)]
pub enum QueueOperationType {
SendToQueue,
SendToDLQ,
}
impl QueueOperationType {
pub fn from_queue_name(queue_name: &str) -> Self {
if queue_name.ends_with("/$deadletterqueue") {
Self::SendToDLQ
} else {
Self::SendToQueue
}
}
}
#[derive(Debug, Clone)]
pub struct BulkOperationContext {
pub consumer: Arc<Mutex<crate::consumer::Consumer>>,
pub cancel_token: CancellationToken,
pub queue_name: String,
}
pub struct ProcessTargetMessagesParams<'a> {
pub messages: Vec<azservicebus::ServiceBusReceivedMessage>,
pub context: &'a BulkOperationContext,
pub params: &'a BulkSendParams,
pub target_map: &'a HashMap<String, MessageIdentifier>,
pub result: &'a mut BulkOperationResult,
}
impl<'a> ProcessTargetMessagesParams<'a> {
pub fn new(
messages: Vec<azservicebus::ServiceBusReceivedMessage>,
context: &'a BulkOperationContext,
params: &'a BulkSendParams,
target_map: &'a HashMap<String, MessageIdentifier>,
result: &'a mut BulkOperationResult,
) -> Self {
Self {
messages,
context,
params,
target_map,
result,
}
}
}