use crate::error::Result;
use crate::state::now_millis;
use crate::storage::Storage;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeadLetter {
pub task_id: String,
pub attempts: u32,
pub error: String,
pub timestamp: u64,
}
pub struct DeadLetterQueue {
storage: Arc<dyn Storage>,
}
impl DeadLetterQueue {
pub fn new(storage: Arc<dyn Storage>) -> Self {
DeadLetterQueue { storage }
}
fn key(task_id: &str) -> String {
format!("dlq:{task_id}")
}
pub async fn push(&self, task_id: &str, attempts: u32, error: impl Into<String>) -> Result<()> {
let entry = DeadLetter {
task_id: task_id.to_string(),
attempts,
error: error.into(),
timestamp: now_millis(),
};
let value = serde_json::to_value(&entry).map_err(crate::error::StorageError::from)?;
self.storage.save(&Self::key(task_id), &value).await?;
Ok(())
}
pub async fn get(&self, task_id: &str) -> Result<Option<DeadLetter>> {
match self.storage.load(&Self::key(task_id)).await? {
Some(value) => {
let entry =
serde_json::from_value(value).map_err(crate::error::StorageError::from)?;
Ok(Some(entry))
}
None => Ok(None),
}
}
pub async fn list(&self) -> Result<Vec<DeadLetter>> {
let mut out = Vec::new();
for key in self.storage.list().await? {
if let Some(task_id) = key.strip_prefix("dlq:") {
if let Some(entry) = self.get(task_id).await? {
out.push(entry);
}
}
}
Ok(out)
}
pub async fn remove(&self, task_id: &str) -> Result<()> {
self.storage.delete(&Self::key(task_id)).await?;
Ok(())
}
}