use std::path::Path;
use std::sync::Arc;
use crate::engine::error::RustQueueError;
use crate::engine::models::{Job, JobId, QueueCounts};
use crate::engine::plugins::WorkerRegistry;
use crate::engine::queue::{FailResult, JobOptions, QueueInfo, QueueManager};
use crate::storage::{
BufferedRedbConfig, BufferedRedbStorage, HybridConfig, HybridStorage, MemoryStorage,
RedbStorage, StorageBackend,
};
pub struct RustQueue {
manager: QueueManager,
}
pub struct RustQueueBuilder {
storage: Arc<dyn StorageBackend>,
buffered_config: Option<BufferedRedbConfig>,
hybrid_config: Option<HybridConfig>,
worker_registry: Option<Arc<WorkerRegistry>>,
}
impl RustQueue {
pub fn memory() -> RustQueueBuilder {
RustQueueBuilder {
storage: Arc::new(MemoryStorage::new()),
buffered_config: None,
hybrid_config: None,
worker_registry: None,
}
}
pub fn redb(path: impl AsRef<Path>) -> anyhow::Result<RustQueueBuilder> {
let storage = Arc::new(RedbStorage::new(path)?);
Ok(RustQueueBuilder {
storage,
buffered_config: None,
hybrid_config: None,
worker_registry: None,
})
}
pub fn hybrid(path: impl AsRef<Path>) -> anyhow::Result<RustQueueBuilder> {
let storage = Arc::new(RedbStorage::new(path)?);
Ok(RustQueueBuilder {
storage,
buffered_config: None,
hybrid_config: Some(HybridConfig::default()),
worker_registry: None,
})
}
}
impl RustQueueBuilder {
pub fn with_write_coalescing(mut self, config: BufferedRedbConfig) -> Self {
self.buffered_config = Some(config);
self
}
pub fn with_hybrid_config(mut self, config: HybridConfig) -> Self {
self.hybrid_config = Some(config);
self
}
pub fn with_worker_registry(mut self, registry: Arc<WorkerRegistry>) -> Self {
self.worker_registry = Some(registry);
self
}
pub fn build(self) -> anyhow::Result<RustQueue> {
let storage: Arc<dyn StorageBackend> = if let Some(config) = self.hybrid_config {
Arc::new(HybridStorage::new(self.storage, config))
} else if let Some(config) = self.buffered_config {
Arc::new(BufferedRedbStorage::new(self.storage, config))
} else {
self.storage
};
let manager = if let Some(registry) = self.worker_registry {
QueueManager::new(storage).with_worker_registry(registry)
} else {
QueueManager::new(storage)
};
Ok(RustQueue { manager })
}
}
impl RustQueue {
pub async fn push(
&self,
queue: &str,
name: &str,
data: serde_json::Value,
opts: Option<JobOptions>,
) -> Result<JobId, RustQueueError> {
self.manager.push(queue, name, data, opts).await
}
pub async fn pull(&self, queue: &str, count: u32) -> Result<Vec<Job>, RustQueueError> {
self.manager.pull(queue, count).await
}
pub async fn dispatch_next_with_registered_worker(
&self,
queue: &str,
) -> Result<Option<JobId>, RustQueueError> {
self.manager
.dispatch_next_with_registered_worker(queue)
.await
}
pub async fn ack(
&self,
id: JobId,
result: Option<serde_json::Value>,
) -> Result<(), RustQueueError> {
self.manager.ack(id, result).await
}
pub async fn fail(&self, id: JobId, error: &str) -> Result<FailResult, RustQueueError> {
self.manager.fail(id, error).await
}
pub async fn cancel(&self, id: JobId) -> Result<(), RustQueueError> {
self.manager.cancel(id).await
}
pub async fn get_job(&self, id: JobId) -> Result<Option<Job>, RustQueueError> {
self.manager.get_job(id).await
}
pub async fn list_queues(&self) -> Result<Vec<QueueInfo>, RustQueueError> {
self.manager.list_queues().await
}
pub async fn get_queue_stats(&self, queue: &str) -> Result<QueueCounts, RustQueueError> {
self.manager.get_queue_stats(queue).await
}
pub async fn update_progress(
&self,
id: JobId,
progress: u8,
message: Option<String>,
) -> Result<(), RustQueueError> {
self.manager.update_progress(id, progress, message).await
}
}