apalis_core/task/
attempt.rs

1//! A thread-safe tracker for counting the number of attempts made by a task.
2//!
3//! The `Attempt` struct wraps an atomic counter, allowing concurrent increment and retrieval of the attempt count. It is designed to be used within the Apalis job/task system, enabling tasks to keep track of how many times they have been retried or executed.
4//!
5//! Features:
6//! - Thread-safe increment and retrieval of attempt count.
7//! - Integration with apalis `FromRequest` trait for extracting attempt information from a task context.
8//! - Optional (via the `serde` feature) serialization and deserialization support for persisting or transmitting attempt state.
9use std::{
10    convert::Infallible,
11    sync::{Arc, atomic::AtomicUsize},
12};
13
14use crate::{task::Task, task_fn::FromRequest};
15
16/// A wrapper to keep count of the attempts tried by a task
17#[derive(Debug, Clone)]
18pub struct Attempt(Arc<AtomicUsize>);
19
20impl Default for Attempt {
21    fn default() -> Self {
22        Self(Arc::new(AtomicUsize::new(0)))
23    }
24}
25
26impl Attempt {
27    /// Build a new tracker
28    #[must_use]
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    /// Build a tracker from an existing value
34    #[must_use]
35    pub fn new_with_value(value: usize) -> Self {
36        Self(Arc::new(AtomicUsize::from(value)))
37    }
38
39    /// Get the current value
40    #[must_use]
41    pub fn current(&self) -> usize {
42        self.0.load(std::sync::atomic::Ordering::Relaxed)
43    }
44
45    /// Increase the current value
46    #[must_use]
47    pub fn increment(&self) -> usize {
48        self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
49    }
50}
51
52impl<Args: Sync, Ctx: Sync, IdType: Sync + Send> FromRequest<Task<Args, Ctx, IdType>> for Attempt {
53    type Error = Infallible;
54    async fn from_request(task: &Task<Args, Ctx, IdType>) -> Result<Self, Self::Error> {
55        Ok(task.parts.attempt.clone())
56    }
57}
58
59#[cfg(feature = "serde")]
60mod serde_impl {
61    use std::sync::atomic::Ordering;
62
63    use serde::{Deserialize, Deserializer, Serialize, Serializer};
64
65    use super::*;
66
67    // Custom serialization function
68    fn serialize<S>(attempt: &Attempt, serializer: S) -> Result<S::Ok, S::Error>
69    where
70        S: Serializer,
71    {
72        let value = attempt.0.load(Ordering::SeqCst);
73        serializer.serialize_u64(value as u64)
74    }
75
76    // Custom deserialization function
77    fn deserialize<'de, D>(deserializer: D) -> Result<Attempt, D::Error>
78    where
79        D: Deserializer<'de>,
80    {
81        let value = u64::deserialize(deserializer)?;
82        Ok(Attempt(Arc::new(AtomicUsize::new(value as usize))))
83    }
84
85    impl Serialize for Attempt {
86        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
87        where
88            S: Serializer,
89        {
90            serialize(self, serializer)
91        }
92    }
93
94    impl<'de> Deserialize<'de> for Attempt {
95        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
96        where
97            D: Deserializer<'de>,
98        {
99            deserialize(deserializer)
100        }
101    }
102}