use async_trait::async_trait;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Mutex;
use crate::context::Context;
use crate::workflow::WorkItem;
#[derive(Debug, Error)]
pub enum WorkQueueError {
#[error("I/O error: {0}")]
Io(String),
#[error("Queue is empty")]
Empty,
#[error("Other error: {0}")]
Other(String),
}
#[async_trait]
pub trait WorkQueue<C: Context, WI: WorkItem>: Clone + Send + Sync + 'static {
async fn enqueue(&self, workflow_id: &str, work: WI) -> Result<(), WorkQueueError>;
async fn dequeue(&self) -> Result<Option<(String, WI)>, WorkQueueError>;
async fn purge_run(&self, run_id: &str) -> Result<(), WorkQueueError>;
async fn pending_work(&self, run_id: &str) -> Result<Vec<WI>, WorkQueueError>;
}
#[derive(Clone)]
pub struct InMemoryWorkQueue<WI: WorkItem>(Arc<Mutex<HashMap<String, VecDeque<WI>>>>);
impl<WI: WorkItem> InMemoryWorkQueue<WI> {
pub fn new() -> Self {
Self(Arc::new(Mutex::new(HashMap::new())))
}
}
impl<WI: WorkItem> Default for InMemoryWorkQueue<WI> {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl<C: Context, WI: WorkItem + 'static> WorkQueue<C, WI> for InMemoryWorkQueue<WI> {
async fn enqueue(&self, workflow_id: &str, work: WI) -> Result<(), WorkQueueError> {
let mut map = self.0.lock().await;
map.entry(workflow_id.to_string())
.or_default()
.push_back(work);
Ok(())
}
async fn dequeue(&self) -> Result<Option<(String, WI)>, WorkQueueError> {
let mut map = self.0.lock().await;
for (run_id, q) in map.iter_mut() {
if let Some(item) = q.pop_front() {
return Ok(Some((run_id.clone(), item)));
}
}
Ok(None)
}
async fn purge_run(&self, run_id: &str) -> Result<(), WorkQueueError> {
let mut map = self.0.lock().await;
map.remove(run_id);
Ok(())
}
async fn pending_work(&self, run_id: &str) -> Result<Vec<WI>, WorkQueueError> {
let map = self.0.lock().await;
let q = map.get(run_id).ok_or(WorkQueueError::Empty)?;
Ok(q.iter().cloned().collect())
}
}