apalis_core/backend/
queue.rs

1//! Represents a queue in the backend
2//!
3//! This module provides the `Queue` struct and related functionality for managing
4//! queues in the backend. A queue is identified by its name and is used to group
5//! tasks for processing by workers.
6//!
7//! The `Queue` struct is designed to be lightweight and easily clonable, allowing
8//! it to be passed around in various contexts. It uses an `Arc<String>` internally
9//! to store the queue name, ensuring efficient memory usage and thread safety.
10//!
11//! The module also includes an implementation of the `FromRequest` trait, allowing
12//! extraction of the queue information from a task context. This is useful for
13//! workers that need to know which queue they are processing tasks from.
14use std::{str::FromStr, sync::Arc};
15
16use crate::{task::Task, task_fn::FromRequest};
17
18/// Represents a queue in the backend
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct Queue(Arc<String>);
21
22impl From<String> for Queue {
23    fn from(value: String) -> Self {
24        Self(Arc::new(value))
25    }
26}
27impl AsRef<str> for Queue {
28    fn as_ref(&self) -> &str {
29        &self.0
30    }
31}
32
33impl From<&str> for Queue {
34    fn from(value: &str) -> Self {
35        Self(Arc::new(value.to_string()))
36    }
37}
38
39impl FromStr for Queue {
40    type Err = std::convert::Infallible;
41
42    fn from_str(s: &str) -> Result<Self, Self::Err> {
43        Ok(Self(Arc::new(s.to_string())))
44    }
45}
46
47impl std::fmt::Display for Queue {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        write!(f, "{}", self.0)
50    }
51}
52
53#[cfg(feature = "serde")]
54impl serde::Serialize for Queue {
55    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56    where
57        S: serde::Serializer,
58    {
59        serializer.serialize_str(&self.0)
60    }
61}
62
63#[cfg(feature = "serde")]
64impl<'de> serde::Deserialize<'de> for Queue {
65    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
66    where
67        D: serde::Deserializer<'de>,
68    {
69        let s = String::deserialize(deserializer)?;
70        Ok(Queue(Arc::new(s)))
71    }
72}
73
74impl<Args, Ctx, IdType> FromRequest<Task<Args, Ctx, IdType>> for Queue
75where
76    Args: Sync,
77    Ctx: Sync,
78    IdType: Sync + Send,
79{
80    type Error = QueueError;
81
82    async fn from_request(req: &Task<Args, Ctx, IdType>) -> Result<Self, Self::Error> {
83        let queue = req
84            .parts
85            .data
86            .get()
87            .cloned()
88            .ok_or_else(|| QueueError::NotFound)?;
89        Ok(queue)
90    }
91}
92
93/// Errors that can occur when extracting queue information from a task context
94#[derive(Debug, thiserror::Error)]
95pub enum QueueError {
96    /// Queue data not found in task context
97    #[error("Queue data not found in task context. This is likely a bug. Please report it.")]
98    NotFound,
99}