Skip to main content

ppoppo_infra/
types.rs

1//! Shared types used across infrastructure trait boundaries.
2
3use serde::{Deserialize, Serialize};
4use time::OffsetDateTime;
5
6/// Result of a rate limit check.
7#[derive(Debug, Clone)]
8pub struct RateLimitResult {
9    /// Whether the request is allowed.
10    pub allowed: bool,
11    /// Current weighted request count.
12    pub current_count: f64,
13    /// Maximum allowed requests in the window.
14    pub limit: i32,
15    /// Remaining requests before limit is reached.
16    pub remaining: i32,
17    /// When the current window resets.
18    pub reset_at: OffsetDateTime,
19}
20
21impl RateLimitResult {
22    /// Returns an error if rate limit exceeded, otherwise Ok(self).
23    pub fn into_result(self) -> crate::Result<Self> {
24        if self.allowed {
25            Ok(self)
26        } else {
27            Err(crate::Error::RateLimitExceeded {
28                current: self.current_count,
29                limit: self.limit,
30                reset_at: self.reset_at,
31            })
32        }
33    }
34
35    /// Returns the number of seconds until reset.
36    pub fn retry_after(&self) -> i64 {
37        let now = OffsetDateTime::now_utc();
38        (self.reset_at - now).whole_seconds().max(0)
39    }
40
41    /// Create an "allowed" result for fail-open fallback.
42    ///
43    /// Used by `ResilientRateLimit` when the backing store is unavailable.
44    /// L1 (in-memory governor) still provides per-pod protection.
45    pub fn allowed_fallback(max_requests: i32, window_seconds: i32) -> Self {
46        Self {
47            allowed: true,
48            current_count: 0.0,
49            limit: max_requests,
50            remaining: max_requests,
51            reset_at: OffsetDateTime::now_utc()
52                + time::Duration::seconds(i64::from(window_seconds)),
53        }
54    }
55}
56
57/// Job priority levels.
58#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
59pub enum Priority {
60    /// Lowest priority (-100)
61    Low = -100,
62    /// Default priority (0)
63    #[default]
64    Normal = 0,
65    /// Higher priority (100)
66    High = 100,
67    /// Highest priority (500)
68    Critical = 500,
69}
70
71impl Priority {
72    /// Convert to integer value for storage.
73    pub fn as_i32(self) -> i32 {
74        self as i32
75    }
76
77    /// Create from integer value.
78    pub fn from_i32(value: i32) -> Self {
79        match value {
80            v if v <= -100 => Priority::Low,
81            v if v <= 0 => Priority::Normal,
82            v if v <= 100 => Priority::High,
83            _ => Priority::Critical,
84        }
85    }
86}
87
88/// Job status.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90pub enum JobStatus {
91    /// Job is waiting to be processed.
92    Pending,
93    /// Job is currently being processed by a worker.
94    Processing,
95    /// Job completed successfully.
96    Completed,
97    /// Job failed after all retry attempts.
98    Failed,
99}
100
101impl JobStatus {
102    /// Parse from string representation.
103    pub fn parse(s: &str) -> Self {
104        match s {
105            "pending" => Self::Pending,
106            "processing" => Self::Processing,
107            "completed" => Self::Completed,
108            "failed" => Self::Failed,
109            _ => Self::Failed,
110        }
111    }
112
113    /// Convert to string representation.
114    pub fn as_str(&self) -> &'static str {
115        match self {
116            Self::Pending => "pending",
117            Self::Processing => "processing",
118            Self::Completed => "completed",
119            Self::Failed => "failed",
120        }
121    }
122}
123
124/// A job retrieved from the queue.
125///
126/// Uses `serde_json::Value` for the payload to maintain dyn-compatibility.
127#[derive(Debug, Clone)]
128pub struct Job {
129    /// Unique job ID (ULID).
130    pub id: String,
131    /// Queue name this job belongs to.
132    pub queue_name: String,
133    /// Job payload data as JSON.
134    pub payload: serde_json::Value,
135    /// Current attempt number (1-indexed).
136    pub attempts: i32,
137    /// Maximum allowed attempts.
138    pub max_attempts: i32,
139    /// When the job was created.
140    pub created_at: OffsetDateTime,
141}
142
143/// Queue statistics.
144#[derive(Debug, Clone)]
145pub struct QueueStats {
146    /// Queue name.
147    pub queue_name: String,
148    /// Number of pending jobs.
149    pub pending: i64,
150    /// Number of processing jobs.
151    pub processing: i64,
152    /// Number of completed jobs.
153    pub completed: i64,
154    /// Number of failed jobs.
155    pub failed: i64,
156}
157
158impl QueueStats {
159    /// Total jobs in the queue (all statuses).
160    pub fn total(&self) -> i64 {
161        self.pending + self.processing + self.completed + self.failed
162    }
163
164    /// Active jobs (pending + processing).
165    pub fn active(&self) -> i64 {
166        self.pending + self.processing
167    }
168}
169
170/// A typed job retrieved from the queue.
171///
172/// Used by [`JobQueueExt::pop_typed`](crate::JobQueueExt::pop_typed) for
173/// deserialized payloads.
174#[derive(Debug, Clone)]
175pub struct TypedJob<T> {
176    /// Unique job ID (ULID).
177    pub id: String,
178    /// Queue name this job belongs to.
179    pub queue_name: String,
180    /// Deserialized job payload.
181    pub payload: T,
182    /// Current attempt number (1-indexed).
183    pub attempts: i32,
184    /// Maximum allowed attempts.
185    pub max_attempts: i32,
186    /// When the job was created.
187    pub created_at: OffsetDateTime,
188}
189
190/// A received pub/sub notification.
191#[derive(Debug, Clone)]
192pub struct Notification {
193    /// Channel the notification was received on.
194    pub channel: String,
195    /// Raw payload string.
196    pub payload: String,
197}
198
199impl Notification {
200    /// Deserialize the payload as JSON.
201    pub fn decode<T: serde::de::DeserializeOwned>(&self) -> crate::Result<T> {
202        let value = serde_json::from_str(&self.payload)?;
203        Ok(value)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn test_priority_conversion() {
213        assert_eq!(Priority::Low.as_i32(), -100);
214        assert_eq!(Priority::Normal.as_i32(), 0);
215        assert_eq!(Priority::High.as_i32(), 100);
216        assert_eq!(Priority::Critical.as_i32(), 500);
217
218        assert_eq!(Priority::from_i32(-150), Priority::Low);
219        assert_eq!(Priority::from_i32(-50), Priority::Normal);
220        assert_eq!(Priority::from_i32(50), Priority::High);
221        assert_eq!(Priority::from_i32(1000), Priority::Critical);
222    }
223
224    #[test]
225    fn test_queue_stats() {
226        let stats = QueueStats {
227            queue_name: "test".to_string(),
228            pending: 10,
229            processing: 5,
230            completed: 100,
231            failed: 3,
232        };
233
234        assert_eq!(stats.total(), 118);
235        assert_eq!(stats.active(), 15);
236    }
237
238    #[test]
239    fn test_job_status_parse() {
240        assert_eq!(JobStatus::parse("pending"), JobStatus::Pending);
241        assert_eq!(JobStatus::parse("processing"), JobStatus::Processing);
242        assert_eq!(JobStatus::parse("completed"), JobStatus::Completed);
243        assert_eq!(JobStatus::parse("failed"), JobStatus::Failed);
244        assert_eq!(JobStatus::parse("unknown"), JobStatus::Failed);
245    }
246}