apalis_core/task/
status.rs1use core::fmt;
11use std::{
12 str::FromStr,
13 sync::{
14 Arc,
15 atomic::{AtomicU8, Ordering},
16 },
17};
18
19#[repr(u8)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22#[non_exhaustive]
23#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
24pub enum Status {
25 Pending,
27 Queued,
29 Running,
31 Done,
33 Failed,
35 Killed,
37}
38
39impl Default for Status {
40 fn default() -> Self {
41 Status::Pending
42 }
43}
44
45#[derive(Debug, thiserror::Error)]
47pub enum StatusError {
48 #[error("Unknown state: {0}")]
49 UnknownState(String),
51}
52
53impl FromStr for Status {
54 type Err = StatusError;
55
56 fn from_str(s: &str) -> Result<Self, Self::Err> {
57 match s {
58 "Pending" => Ok(Status::Pending),
59 "Queued" => Ok(Status::Queued),
60 "Running" => Ok(Status::Running),
61 "Done" => Ok(Status::Done),
62 "Failed" => Ok(Status::Failed),
63 "Killed" => Ok(Status::Killed),
64 _ => Err(StatusError::UnknownState(s.to_owned())),
65 }
66 }
67}
68
69impl fmt::Display for Status {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 match &self {
72 Status::Pending => write!(f, "Pending"),
73 Status::Queued => write!(f, "Queued"),
74 Status::Running => write!(f, "Running"),
75 Status::Done => write!(f, "Done"),
76 Status::Failed => write!(f, "Failed"),
77 Status::Killed => write!(f, "Killed"),
78 }
79 }
80}
81
82impl Status {
83 fn from_u8(val: u8) -> Option<Self> {
84 match val {
85 0 => Some(Status::Pending),
86 1 => Some(Status::Queued),
87 2 => Some(Status::Running),
88 3 => Some(Status::Done),
89 4 => Some(Status::Failed),
90 5 => Some(Status::Killed),
91 _ => None,
92 }
93 }
94}
95
96#[repr(transparent)]
98#[derive(Debug, Clone, Default)]
99pub struct AtomicStatus(Arc<AtomicU8>);
100
101impl AtomicStatus {
102 pub fn new(status: Status) -> Self {
104 Self(Arc::new(AtomicU8::new(status as u8)))
105 }
106
107 pub fn load(&self) -> Status {
109 Status::from_u8(self.0.load(Ordering::Acquire)).unwrap()
110 }
111
112 pub fn store(&self, status: Status) {
114 self.0.store(status as u8, Ordering::Release);
115 }
116 pub fn swap(&self, status: Status) -> Status {
118 Status::from_u8(self.0.swap(status as u8, Ordering::AcqRel)).unwrap()
119 }
120}
121
122impl From<AtomicStatus> for Status {
123 fn from(val: AtomicStatus) -> Self {
124 val.load()
125 }
126}
127
128impl From<Status> for AtomicStatus {
129 fn from(val: Status) -> Self {
130 AtomicStatus::new(val)
131 }
132}
133
134#[cfg(feature = "serde")]
135mod serde_impl {
136 use serde::{Deserialize, Deserializer, Serialize, Serializer};
137
138 use super::*;
139
140 fn serialize<S>(status: &AtomicStatus, serializer: S) -> Result<S::Ok, S::Error>
142 where
143 S: Serializer,
144 {
145 let value = status.load();
146 serializer.serialize_str(&value.to_string())
147 }
148
149 fn deserialize<'de, D>(deserializer: D) -> Result<AtomicStatus, D::Error>
151 where
152 D: Deserializer<'de>,
153 {
154 let value = String::deserialize(deserializer)?;
155 let status = Status::from_str(&value).map_err(serde::de::Error::custom)?;
156 Ok(AtomicStatus::new(status))
157 }
158
159 impl Serialize for AtomicStatus {
160 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
161 where
162 S: Serializer,
163 {
164 serialize(self, serializer)
165 }
166 }
167
168 impl<'de> Deserialize<'de> for AtomicStatus {
169 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
170 where
171 D: Deserializer<'de>,
172 {
173 deserialize(deserializer)
174 }
175 }
176}