floxide_core/distributed/
work_queue.rs1use async_trait::async_trait;
7use std::collections::{HashMap, VecDeque};
8use std::fmt::Debug;
9use std::sync::Arc;
10use thiserror::Error;
11use tokio::sync::Mutex;
12
13use crate::context::Context;
14use crate::workflow::WorkItem;
15
16#[derive(Debug, Error)]
18pub enum WorkQueueError {
19 #[error("I/O error: {0}")]
20 Io(String),
21 #[error("Queue is empty")]
22 Empty,
23 #[error("Other error: {0}")]
24 Other(String),
25}
26
27#[async_trait]
31pub trait WorkQueue<C: Context, WI: WorkItem>: Clone + Send + Sync + 'static {
32 async fn enqueue(&self, workflow_id: &str, work: WI) -> Result<(), WorkQueueError>;
35
36 async fn dequeue(&self) -> Result<Option<(String, WI)>, WorkQueueError>;
41
42 async fn purge_run(&self, run_id: &str) -> Result<(), WorkQueueError>;
45
46 async fn pending_work(&self, run_id: &str) -> Result<Vec<WI>, WorkQueueError>;
48}
49
50#[derive(Clone)]
52pub struct InMemoryWorkQueue<WI: WorkItem>(Arc<Mutex<HashMap<String, VecDeque<WI>>>>);
53
54impl<WI: WorkItem> InMemoryWorkQueue<WI> {
55 pub fn new() -> Self {
56 Self(Arc::new(Mutex::new(HashMap::new())))
57 }
58}
59
60impl<WI: WorkItem> Default for InMemoryWorkQueue<WI> {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66#[async_trait]
67impl<C: Context, WI: WorkItem + 'static> WorkQueue<C, WI> for InMemoryWorkQueue<WI> {
68 async fn enqueue(&self, workflow_id: &str, work: WI) -> Result<(), WorkQueueError> {
69 let mut map = self.0.lock().await;
70 map.entry(workflow_id.to_string())
71 .or_default()
72 .push_back(work);
73 Ok(())
74 }
75 async fn dequeue(&self) -> Result<Option<(String, WI)>, WorkQueueError> {
76 let mut map = self.0.lock().await;
77 for (run_id, q) in map.iter_mut() {
78 if let Some(item) = q.pop_front() {
79 return Ok(Some((run_id.clone(), item)));
80 }
81 }
82 Ok(None)
83 }
84 async fn purge_run(&self, run_id: &str) -> Result<(), WorkQueueError> {
85 let mut map = self.0.lock().await;
86 map.remove(run_id);
87 Ok(())
88 }
89 async fn pending_work(&self, run_id: &str) -> Result<Vec<WI>, WorkQueueError> {
90 let map = self.0.lock().await;
91 let q = map.get(run_id).ok_or(WorkQueueError::Empty)?;
92 Ok(q.iter().cloned().collect())
93 }
94}