use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, error, info, warn};
use crate::error::{ErrorContext, Result};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailedOperation {
pub id: String,
pub original_timestamp: u64,
pub failed_timestamp: u64,
pub operation: String,
pub component: String,
pub retry_count: u32,
pub error_context: ErrorContext,
pub request_data: serde_json::Value,
pub failure_reason: String,
pub can_retry: bool,
}
impl FailedOperation {
pub fn new(
operation: &str,
component: &str,
retry_count: u32,
error_context: ErrorContext,
request_data: serde_json::Value,
failure_reason: &str,
) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| {
tracing::warn!("Failed to get system time for failed operation, using epoch");
std::time::Duration::from_secs(0)
})
.as_secs();
Self {
id: uuid::Uuid::new_v4().to_string(),
original_timestamp: now,
failed_timestamp: now,
operation: operation.to_string(),
component: component.to_string(),
retry_count,
error_context,
request_data,
failure_reason: failure_reason.to_string(),
can_retry: true,
}
}
}
#[derive(Debug, Clone)]
pub struct DeadLetterConfig {
pub max_size: usize,
pub retention_period_secs: u64,
pub persist_to_disk: bool,
pub persistence_path: Option<String>,
pub cleanup_interval_secs: u64,
}
impl Default for DeadLetterConfig {
fn default() -> Self {
Self {
max_size: 1000,
retention_period_secs: 24 * 60 * 60, persist_to_disk: false,
persistence_path: None,
cleanup_interval_secs: 60 * 60, }
}
}
#[derive(Debug)]
pub struct DeadLetterQueue {
config: DeadLetterConfig,
queue: Arc<RwLock<VecDeque<FailedOperation>>>,
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
shutdown_tx: Option<mpsc::Sender<()>>,
}
impl DeadLetterQueue {
pub fn new(config: DeadLetterConfig) -> Self {
Self {
config,
queue: Arc::new(RwLock::new(VecDeque::new())),
cleanup_handle: None,
shutdown_tx: None,
}
}
pub async fn start(&mut self) -> Result<()> {
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
self.shutdown_tx = Some(shutdown_tx);
let queue = self.queue.clone();
let config = self.config.clone();
let handle = tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_secs(config.cleanup_interval_secs));
loop {
tokio::select! {
_ = interval.tick() => {
Self::cleanup_expired(&queue, &config).await;
}
_ = shutdown_rx.recv() => {
info!("Dead letter queue cleanup shutting down");
break;
}
}
}
});
self.cleanup_handle = Some(handle);
info!(
"Dead letter queue started with cleanup interval: {}s",
self.config.cleanup_interval_secs
);
Ok(())
}
pub async fn add_failed_operation(&self, failed_operation: FailedOperation) -> Result<()> {
let mut queue = self.queue.write().await;
while queue.len() >= self.config.max_size {
if let Some(oldest) = queue.pop_front() {
warn!(
"Dead letter queue full, dropping oldest operation: {}",
oldest.id
);
}
}
info!(
"Adding failed operation to dead letter queue: {} (retry count: {})",
failed_operation.operation, failed_operation.retry_count
);
queue.push_back(failed_operation);
if self.config.persist_to_disk {
if let Err(e) = self.persist_to_disk().await {
error!("Failed to persist dead letter queue to disk: {}", e);
}
}
Ok(())
}
pub async fn get_failed_operations(&self) -> Vec<FailedOperation> {
self.queue.read().await.iter().cloned().collect()
}
pub async fn get_failed_operations_by_component(
&self,
component: &str,
) -> Vec<FailedOperation> {
self.queue
.read()
.await
.iter()
.filter(|op| op.component == component)
.cloned()
.collect()
}
pub async fn get_failed_operations_by_type(&self, operation: &str) -> Vec<FailedOperation> {
self.queue
.read()
.await
.iter()
.filter(|op| op.operation == operation)
.cloned()
.collect()
}
pub async fn remove_failed_operation(&self, id: &str) -> Result<Option<FailedOperation>> {
let mut queue = self.queue.write().await;
if let Some(pos) = queue.iter().position(|op| op.id == id) {
let operation = queue.remove(pos).unwrap();
info!("Removed failed operation from dead letter queue: {}", id);
Ok(Some(operation))
} else {
Ok(None)
}
}
pub async fn get_statistics(&self) -> DeadLetterStats {
let queue = self.queue.read().await;
let mut stats = DeadLetterStats {
total_count: queue.len(),
by_component: std::collections::HashMap::new(),
by_operation: std::collections::HashMap::new(),
oldest_timestamp: None,
newest_timestamp: None,
total_retry_attempts: 0,
};
for operation in queue.iter() {
*stats
.by_component
.entry(operation.component.clone())
.or_insert(0) += 1;
*stats
.by_operation
.entry(operation.operation.clone())
.or_insert(0) += 1;
if stats.oldest_timestamp.is_none()
|| Some(operation.failed_timestamp) < stats.oldest_timestamp
{
stats.oldest_timestamp = Some(operation.failed_timestamp);
}
if stats.newest_timestamp.is_none()
|| Some(operation.failed_timestamp) > stats.newest_timestamp
{
stats.newest_timestamp = Some(operation.failed_timestamp);
}
stats.total_retry_attempts += operation.retry_count;
}
stats
}
async fn cleanup_expired(
queue: &Arc<RwLock<VecDeque<FailedOperation>>>,
config: &DeadLetterConfig,
) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| {
tracing::warn!("Failed to get system time for failed operation, using epoch");
std::time::Duration::from_secs(0)
})
.as_secs();
let cutoff = now.saturating_sub(config.retention_period_secs);
let mut queue_guard = queue.write().await;
let original_size = queue_guard.len();
queue_guard.retain(|op| op.failed_timestamp > cutoff);
let removed = original_size - queue_guard.len();
if removed > 0 {
info!(
"Cleaned up {} expired operations from dead letter queue",
removed
);
}
debug!(
"Dead letter queue cleanup complete. Current size: {}",
queue_guard.len()
);
}
async fn persist_to_disk(&self) -> Result<()> {
if let Some(ref path) = self.config.persistence_path {
let queue = self.queue.read().await;
let data = serde_json::to_string_pretty(&*queue)?;
tokio::fs::write(path, data).await?;
debug!("Persisted dead letter queue to disk: {}", path);
}
Ok(())
}
pub async fn load_from_disk(&self) -> Result<()> {
if let Some(ref path) = self.config.persistence_path {
if tokio::fs::metadata(path).await.is_ok() {
let data = tokio::fs::read_to_string(path).await?;
let operations: VecDeque<FailedOperation> = serde_json::from_str(&data)?;
let mut queue = self.queue.write().await;
*queue = operations;
info!(
"Loaded {} operations from dead letter queue disk file: {}",
queue.len(),
path
);
}
}
Ok(())
}
pub async fn shutdown(&mut self) -> Result<()> {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(()).await;
}
if let Some(handle) = self.cleanup_handle.take() {
handle.abort();
}
if self.config.persist_to_disk {
if let Err(e) = self.persist_to_disk().await {
error!("Failed to persist dead letter queue during shutdown: {}", e);
}
}
info!("Dead letter queue shutdown complete");
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeadLetterStats {
pub total_count: usize,
pub by_component: std::collections::HashMap<String, usize>,
pub by_operation: std::collections::HashMap<String, usize>,
pub oldest_timestamp: Option<u64>,
pub newest_timestamp: Option<u64>,
pub total_retry_attempts: u32,
}
impl Drop for DeadLetterQueue {
fn drop(&mut self) {
if self.cleanup_handle.is_some() {
warn!("DeadLetterQueue dropped without proper shutdown");
}
}
}