use crate::queue::{ArcJobProcessorFn, QueueInterface};
use crate::webhook::sender::JobProcessorFnAsync;
use crate::webhook::types::JobData;
use async_trait::async_trait;
use dashmap::DashMap;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info};
pub struct MemoryQueueManager {
queues: Arc<DashMap<String, Vec<JobData>, ahash::RandomState>>,
processors: Arc<DashMap<String, ArcJobProcessorFn, ahash::RandomState>>,
}
impl Default for MemoryQueueManager {
fn default() -> Self {
Self::new()
}
}
impl MemoryQueueManager {
pub fn new() -> Self {
let queues = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
let processors = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
Self { queues, processors }
}
pub fn start_processing(&self) {
let queues = Arc::clone(&self.queues);
let processors = Arc::clone(&self.processors);
info!("{}", "Starting memory queue processing loop...".to_string());
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
interval.tick().await;
for queue_entry in queues.iter() {
let queue_name = queue_entry.key();
if let Some(processor) = processors.get(queue_name) {
if let Some(mut jobs_vec) = queues.get_mut(queue_name) {
if !jobs_vec.is_empty() {
debug!(
"{}",
format!(
"Processing {} jobs from memory queue {}",
jobs_vec.len(),
queue_name
)
);
for job in jobs_vec.drain(..) {
let processor_clone = Arc::clone(&processor);
processor_clone(job).await.unwrap();
}
}
}
}
}
}
});
}
}
#[async_trait]
impl QueueInterface for MemoryQueueManager {
async fn add_to_queue(&self, queue_name: &str, data: JobData) -> crate::error::Result<()> {
self.queues
.entry(queue_name.to_string())
.or_default()
.push(data);
Ok(())
}
async fn process_queue(
&self,
queue_name: &str,
callback: JobProcessorFnAsync,
) -> crate::error::Result<()> {
self.queues.entry(queue_name.to_string()).or_default();
self.processors
.insert(queue_name.to_string(), Arc::from(callback)); debug!("Registered processor for memory queue: {}", queue_name);
Ok(())
}
async fn disconnect(&self) -> crate::error::Result<()> {
self.queues.clear();
Ok(())
}
async fn check_health(&self) -> crate::error::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::queue::JobData;
use crate::webhook::types::JobPayload;
#[tokio::test]
async fn test_add_to_queue() {
let manager = MemoryQueueManager::new();
let data = JobData {
app_key: "test_key".to_string(),
app_id: "test_id".to_string(),
app_secret: "test_secret".to_string(),
payload: JobPayload {
time_ms: chrono::Utc::now().timestamp_millis(),
events: vec![],
},
original_signature: "test_signature".to_string(),
};
manager
.add_to_queue("test_queue", data.clone())
.await
.unwrap();
assert_eq!(manager.queues.get("test_queue").unwrap().len(), 1);
}
#[tokio::test]
async fn test_disconnect() {
let manager = MemoryQueueManager::new();
let data = JobData {
app_key: "test_key".to_string(),
app_id: "test_id".to_string(),
app_secret: "test_secret".to_string(),
payload: JobPayload {
time_ms: chrono::Utc::now().timestamp_millis(),
events: vec![],
},
original_signature: "test_signature".to_string(),
};
manager.add_to_queue("test_queue", data).await.unwrap();
assert!(!manager.queues.is_empty());
manager.disconnect().await.unwrap();
assert!(manager.queues.is_empty());
}
}