floxide_core/distributed/
work_queue.rs

1//! Work queue for distributed workflow execution.
2//!
3//! This module defines the WorkQueue trait for enqueuing and dequeuing workflow work items,
4//! and provides an in-memory implementation for testing and local development.
5
6use async_trait::async_trait;
7use std::collections::{HashMap, VecDeque};
8use std::fmt::Debug;
9use std::sync::Arc;
10use thiserror::Error;
11use tokio::sync::Mutex;
12
13use crate::context::Context;
14use crate::workflow::WorkItem;
15
16/// Errors that can occur in a WorkQueue implementation.
17#[derive(Debug, Error)]
18pub enum WorkQueueError {
19    #[error("I/O error: {0}")]
20    Io(String),
21    #[error("Queue is empty")]
22    Empty,
23    #[error("Other error: {0}")]
24    Other(String),
25}
26
27/// Trait for a distributed workflow work queue.
28///
29/// Implementations provide FIFO queueing of work items for distributed workers.
30#[async_trait]
31pub trait WorkQueue<C: Context, WI: WorkItem>: Clone + Send + Sync + 'static {
32    /// Enqueue one work-item under this `workflow_id`.
33    /// Returns Err(WorkQueueError) on failure.
34    async fn enqueue(&self, workflow_id: &str, work: WI) -> Result<(), WorkQueueError>;
35
36    /// Dequeue the next available work-item from any workflow.
37    /// Returns Ok(Some((workflow_id, item))) if an item was dequeued,
38    /// Ok(None) if the queue is empty,
39    /// or Err(WorkQueueError) on failure.
40    async fn dequeue(&self) -> Result<Option<(String, WI)>, WorkQueueError>;
41
42    /// Purge all work items for a given workflow run.
43    /// Removes all queued work for the specified run_id.
44    async fn purge_run(&self, run_id: &str) -> Result<(), WorkQueueError>;
45
46    /// Get pending work for a run.
47    async fn pending_work(&self, run_id: &str) -> Result<Vec<WI>, WorkQueueError>;
48}
49
50/// In-memory implementation of WorkQueue for testing and local development.
51#[derive(Clone)]
52pub struct InMemoryWorkQueue<WI: WorkItem>(Arc<Mutex<HashMap<String, VecDeque<WI>>>>);
53
54impl<WI: WorkItem> InMemoryWorkQueue<WI> {
55    pub fn new() -> Self {
56        Self(Arc::new(Mutex::new(HashMap::new())))
57    }
58}
59
60impl<WI: WorkItem> Default for InMemoryWorkQueue<WI> {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66#[async_trait]
67impl<C: Context, WI: WorkItem + 'static> WorkQueue<C, WI> for InMemoryWorkQueue<WI> {
68    async fn enqueue(&self, workflow_id: &str, work: WI) -> Result<(), WorkQueueError> {
69        let mut map = self.0.lock().await;
70        map.entry(workflow_id.to_string())
71            .or_default()
72            .push_back(work);
73        Ok(())
74    }
75    async fn dequeue(&self) -> Result<Option<(String, WI)>, WorkQueueError> {
76        let mut map = self.0.lock().await;
77        for (run_id, q) in map.iter_mut() {
78            if let Some(item) = q.pop_front() {
79                return Ok(Some((run_id.clone(), item)));
80            }
81        }
82        Ok(None)
83    }
84    async fn purge_run(&self, run_id: &str) -> Result<(), WorkQueueError> {
85        let mut map = self.0.lock().await;
86        map.remove(run_id);
87        Ok(())
88    }
89    async fn pending_work(&self, run_id: &str) -> Result<Vec<WI>, WorkQueueError> {
90        let map = self.0.lock().await;
91        let q = map.get(run_id).ok_or(WorkQueueError::Empty)?;
92        Ok(q.iter().cloned().collect())
93    }
94}