use crate::app::model::Model;
use crate::error::AppError;
use quetty_server::bulk_operations::MessageIdentifier;
use quetty_server::service_bus_manager::QueueType;
use tuirealm::terminal::TerminalAdapter;
#[derive(Debug, Clone)]
pub struct BulkOperationConfig {
pub max_count: usize,
pub auto_reload_threshold: usize,
}
impl BulkOperationConfig {
pub fn from_app_config() -> Self {
let config = crate::config::get_config_or_panic();
Self {
max_count: config.batch().max_messages_to_process(),
auto_reload_threshold: config.batch().auto_reload_threshold(),
}
}
}
pub struct BulkOperationSetup<T: TerminalAdapter> {
model: *const Model<T>,
config: BulkOperationConfig,
message_ids: Vec<MessageIdentifier>,
operation_type: BulkOperationType,
}
#[derive(Debug, Clone)]
pub enum BulkOperationType {
Delete,
ResendFromDlq { delete_source: bool },
SendToDlq { delete_source: bool },
}
impl<T: TerminalAdapter> BulkOperationSetup<T> {
fn calculate_global_position(
page_size: usize,
current_page: usize,
local_index: usize,
) -> usize {
current_page * page_size + local_index + 1
}
pub fn new(model: &Model<T>, message_ids: Vec<MessageIdentifier>) -> Self {
Self {
model,
config: BulkOperationConfig::from_app_config(),
message_ids,
operation_type: BulkOperationType::Delete, }
}
pub fn operation_type(mut self, op_type: BulkOperationType) -> Self {
self.operation_type = op_type;
self
}
pub fn validate_and_build(self) -> Result<ValidatedBulkOperation<T>, AppError> {
let model = unsafe { &*self.model };
self.validate_message_count()?;
self.validate_queue_type_for_operation(model)?;
self.validate_operation_size()?;
self.validate_link_credit_limit(model)?;
log::info!(
"Validated bulk {:?} operation for {} messages",
self.operation_type,
self.message_ids.len()
);
Ok(ValidatedBulkOperation {
model: self.model,
config: self.config,
message_ids: self.message_ids,
operation_type: self.operation_type,
})
}
fn validate_message_count(&self) -> Result<(), AppError> {
let count = self.message_ids.len();
if count == 0 {
return Err(AppError::State(
"No messages selected for bulk operation".to_string(),
));
}
if count > self.config.max_count {
return Err(AppError::State(format!(
"Too many messages selected: {} (maximum: {})",
count, self.config.max_count
)));
}
Ok(())
}
fn validate_operation_size(&self) -> Result<(), AppError> {
let config = crate::config::get_config_or_panic();
let max_messages_to_process = config.batch().max_messages_to_process();
if self.message_ids.len() > max_messages_to_process {
return Err(AppError::State(format!(
"Operation size {} exceeds maximum allowed processing limit of {}",
self.message_ids.len(),
max_messages_to_process
)));
}
Ok(())
}
fn validate_queue_type_for_operation(&self, model: &Model<T>) -> Result<(), AppError> {
let current_type = model.queue_manager.queue_state.current_queue_type.clone();
let required_type = self.get_required_queue_type();
if current_type != required_type {
let current_name = queue_type_display_name(current_type);
let required_name = queue_type_display_name(required_type);
return Err(AppError::State(format!(
"Operation not allowed: currently in {current_name} but operation requires {required_name}"
)));
}
let messages = model.queue_manager.queue_state.messages.as_ref();
if messages.is_none() || messages.map(|m| m.is_empty()).unwrap_or(true) {
return Err(AppError::State(
"No messages available for operation. Please wait for messages to load after queue switch.".to_string()
));
}
Ok(())
}
fn get_required_queue_type(&self) -> QueueType {
match self.operation_type {
BulkOperationType::Delete => {
unsafe { &*self.model }
.queue_manager
.queue_state
.current_queue_type
.clone()
}
BulkOperationType::ResendFromDlq { .. } => QueueType::DeadLetter,
BulkOperationType::SendToDlq { .. } => QueueType::Main,
}
}
fn validate_link_credit_limit(&self, model: &Model<T>) -> Result<(), AppError> {
const LINK_CREDIT_LIMIT: usize = 2048;
if self.message_ids.len() == 1 {
if let Ok(tuirealm::State::One(tuirealm::StateValue::Usize(selected_index))) = model
.app
.state(&crate::components::common::ComponentId::Messages)
{
let page_size = crate::config::get_config_or_panic().max_messages() as usize;
let current_page = model.queue_state().message_pagination.current_page;
let global_position =
Self::calculate_global_position(page_size, current_page, selected_index);
if global_position > LINK_CREDIT_LIMIT {
return Err(AppError::State(format!(
"Cannot process message at position {global_position}, which exceeds the Azure Service Bus link-credit limit of {LINK_CREDIT_LIMIT}.\n\nTo fix this:\n• Process messages from earlier pages\n• Use filtering to reduce the message set"
)));
}
log::debug!(
"Link credit validation passed: single_message_position={global_position}, limit={LINK_CREDIT_LIMIT}"
);
}
} else {
let gap_sum = model.queue_state().bulk_selection.calculate_gap_sum();
if gap_sum > LINK_CREDIT_LIMIT {
return Err(AppError::State(format!(
"Operation would require processing {gap_sum} non-selected messages between your selections, which exceeds the Azure Service Bus link-credit limit of {LINK_CREDIT_LIMIT}. This would cause the bulk operation to get stuck.\n\nTo fix this:\n• Select messages that are closer together (reduce gaps between selections)\n• Split into multiple smaller operations\n• Select contiguous ranges of messages"
)));
}
log::debug!(
"Link credit validation passed: gap_sum={}, selected_count={}, limit={}",
gap_sum,
self.message_ids.len(),
LINK_CREDIT_LIMIT
);
}
Ok(())
}
}
pub struct ValidatedBulkOperation<T: TerminalAdapter> {
model: *const Model<T>,
config: BulkOperationConfig,
message_ids: Vec<MessageIdentifier>,
operation_type: BulkOperationType,
}
impl<T: TerminalAdapter> ValidatedBulkOperation<T> {
pub fn message_ids(&self) -> &[MessageIdentifier] {
&self.message_ids
}
pub fn model(&self) -> &Model<T> {
unsafe { &*self.model }
}
pub fn get_loading_message(&self) -> String {
let count = self.message_ids.len();
match &self.operation_type {
BulkOperationType::Delete => format!("Deleting {count} messages..."),
BulkOperationType::ResendFromDlq {
delete_source: true,
} => {
format!("Bulk resending {count} messages from DLQ to main queue...")
}
BulkOperationType::ResendFromDlq {
delete_source: false,
} => {
format!("Bulk copying {count} messages from DLQ to main queue (keeping in DLQ)...")
}
BulkOperationType::SendToDlq {
delete_source: true,
} => {
format!("Bulk moving {count} messages from main queue to DLQ...")
}
BulkOperationType::SendToDlq {
delete_source: false,
} => {
format!("Bulk copying {count} messages from main queue to DLQ...")
}
}
}
pub fn get_target_queue(&self) -> Result<String, AppError> {
let model = self.model();
let current_queue_name = model
.queue_state()
.current_queue_name
.as_ref()
.ok_or_else(|| AppError::State("No current queue name available".to_string()))?;
match &self.operation_type {
BulkOperationType::Delete => Err(AppError::State(
"Delete operations don't have target queues".to_string(),
)),
BulkOperationType::ResendFromDlq { .. } => {
if current_queue_name.ends_with("/$deadletterqueue") {
Ok(current_queue_name
.strip_suffix("/$deadletterqueue")
.unwrap_or(current_queue_name)
.to_string())
} else {
Ok(current_queue_name.clone())
}
}
BulkOperationType::SendToDlq { .. } => {
Ok(format!("{current_queue_name}/$deadletterqueue"))
}
}
}
pub fn get_queue_display_names(&self) -> (String, String) {
match &self.operation_type {
BulkOperationType::Delete => ("Current".to_string(), "Deleted".to_string()),
BulkOperationType::ResendFromDlq { .. } => ("DLQ".to_string(), "Main".to_string()),
BulkOperationType::SendToDlq { .. } => ("Main".to_string(), "DLQ".to_string()),
}
}
pub fn should_delete_source(&self) -> bool {
match &self.operation_type {
BulkOperationType::Delete => true, BulkOperationType::ResendFromDlq { delete_source } => *delete_source,
BulkOperationType::SendToDlq { delete_source } => *delete_source,
}
}
pub fn calculate_post_processing_context(&self) -> BulkOperationContext {
let model = self.model();
let current_page_messages = model
.queue_state()
.message_pagination
.get_current_page_messages(crate::config::get_current_page_size());
let current_message_count = current_page_messages.len();
let selected_from_current_page = self
.message_ids
.iter()
.filter(|msg_id| {
current_page_messages
.iter()
.any(|current_msg| current_msg.id == **msg_id)
})
.count();
log::debug!(
"calculate_post_processing_context: current_message_count={}, selected_from_current_page={}, total_selected={}",
current_message_count,
selected_from_current_page,
self.message_ids.len()
);
let max_position = if let Some(highest_index) = model
.queue_state()
.bulk_selection
.get_highest_selected_position()
{
highest_index
} else if self.message_ids.len() == 1 {
if let Ok(tuirealm::State::One(tuirealm::StateValue::Usize(selected_index))) = model
.app
.state(&crate::components::common::ComponentId::Messages)
{
let page_size = crate::config::get_config_or_panic().max_messages() as usize;
let current_page = model.queue_state().message_pagination.current_page;
BulkOperationSetup::<T>::calculate_global_position(
page_size,
current_page,
selected_index,
)
} else {
let page_size = crate::config::get_config_or_panic().max_messages() as usize;
let current_page = model.queue_state().message_pagination.current_page;
self.calculate_max_position(model, page_size, current_page)
}
} else {
let page_size = crate::config::get_config_or_panic().max_messages() as usize;
let current_page = model.queue_state().message_pagination.current_page;
self.calculate_max_position(model, page_size, current_page)
};
BulkOperationContext {
auto_reload_threshold: self.config.auto_reload_threshold,
current_message_count,
selected_from_current_page,
max_position,
}
}
fn calculate_max_position(
&self,
model: &Model<T>,
page_size: usize,
current_page: usize,
) -> usize {
let all_loaded_messages = &model.queue_state().message_pagination.all_loaded_messages;
let mut max_loaded_position = 0;
for (index, loaded_msg) in all_loaded_messages.iter().enumerate() {
if self
.message_ids
.iter()
.any(|msg_id| msg_id.id == loaded_msg.id)
{
max_loaded_position = std::cmp::max(max_loaded_position, index + 1);
}
}
if max_loaded_position > 0 {
log::info!(
"Found selected messages in loaded data, highest position: {max_loaded_position}"
);
return max_loaded_position;
}
let page_based_estimate = (current_page + 1) * page_size;
log::info!(
"Using page-based estimation: page {} * page_size {} = estimated position {}",
current_page + 1,
page_size,
page_based_estimate
);
page_based_estimate
}
}
#[derive(Debug, Clone)]
pub struct BulkOperationContext {
pub auto_reload_threshold: usize,
pub current_message_count: usize,
pub selected_from_current_page: usize,
pub max_position: usize,
}
pub fn queue_type_display_name(queue_type: QueueType) -> &'static str {
match queue_type {
QueueType::Main => "main queue",
QueueType::DeadLetter => "dead letter queue",
}
}
pub trait BulkOperationValidation<T: TerminalAdapter> {
fn validate_not_empty(message_ids: &[MessageIdentifier]) -> Result<(), AppError> {
if message_ids.is_empty() {
log::warn!("No messages provided for bulk operation");
return Err(AppError::State(
"No messages selected for bulk operation".to_string(),
));
}
Ok(())
}
fn log_message_order_warning(message_count: usize, operation_name: &str) {
log::warn!(
"Bulk {operation_name} for {message_count} messages may affect message order. Messages may not be processed in their original sequence."
);
}
}
impl<T: TerminalAdapter> BulkOperationValidation<T> for Model<T> {}