apalis_core/task/
attempt.rs

1//! A thread-safe tracker for counting the number of attempts made by a task.
2//!
3//! The `Attempt` struct wraps an atomic counter, allowing concurrent increment and retrieval of the attempt count. It is designed to be used within the Apalis job/task system, enabling tasks to keep track of how many times they have been retried or executed.
4//!
5//! Features:
6//! - Thread-safe increment and retrieval of attempt count.
7//! - Integration with apalis `FromRequest` trait for extracting attempt information from a task context.
8//! - Optional (via the `serde` feature) serialization and deserialization support for persisting or transmitting attempt state.
9use std::{
10    convert::Infallible,
11    sync::{Arc, atomic::AtomicUsize},
12};
13
14use crate::{task::Task, task_fn::FromRequest};
15
16/// A wrapper to keep count of the attempts tried by a task
17#[derive(Debug, Clone)]
18pub struct Attempt(Arc<AtomicUsize>);
19
20impl Default for Attempt {
21    fn default() -> Self {
22        Self(Arc::new(AtomicUsize::new(0)))
23    }
24}
25
26impl Attempt {
27    /// Build a new tracker
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    /// Build a tracker from an existing value
33    pub fn new_with_value(value: usize) -> Self {
34        Self(Arc::new(AtomicUsize::from(value)))
35    }
36
37    /// Get the current value
38    pub fn current(&self) -> usize {
39        self.0.load(std::sync::atomic::Ordering::Relaxed)
40    }
41
42    /// Increase the current value
43    pub fn increment(&self) -> usize {
44        self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
45    }
46}
47
48impl<Args: Sync, Ctx: Sync, IdType: Sync + Send> FromRequest<Task<Args, Ctx, IdType>> for Attempt {
49    type Error = Infallible;
50    async fn from_request(task: &Task<Args, Ctx, IdType>) -> Result<Self, Self::Error> {
51        Ok(task.parts.attempt.clone())
52    }
53}
54
55#[cfg(feature = "serde")]
56mod serde_impl {
57    use std::sync::atomic::Ordering;
58
59    use serde::{Deserialize, Deserializer, Serialize, Serializer};
60
61    use super::*;
62
63    // Custom serialization function
64    fn serialize<S>(attempt: &Attempt, serializer: S) -> Result<S::Ok, S::Error>
65    where
66        S: Serializer,
67    {
68        let value = attempt.0.load(Ordering::SeqCst);
69        serializer.serialize_u64(value as u64)
70    }
71
72    // Custom deserialization function
73    fn deserialize<'de, D>(deserializer: D) -> Result<Attempt, D::Error>
74    where
75        D: Deserializer<'de>,
76    {
77        let value = u64::deserialize(deserializer)?;
78        Ok(Attempt(Arc::new(AtomicUsize::new(value as usize))))
79    }
80
81    impl Serialize for Attempt {
82        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
83        where
84            S: Serializer,
85        {
86            serialize(self, serializer)
87        }
88    }
89
90    impl<'de> Deserialize<'de> for Attempt {
91        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
92        where
93            D: Deserializer<'de>,
94        {
95            deserialize(deserializer)
96        }
97    }
98}