oxanus/storage.rs
1use chrono::{DateTime, Utc};
2
3use crate::{
4 error::OxanusError,
5 job_envelope::{JobEnvelope, JobId},
6 queue::Queue,
7 storage_builder::StorageBuilder,
8 storage_internal::{Process, Stats, StorageInternal},
9 worker::Worker,
10};
11
12/// Storage provides the main interface for job management in Oxanus.
13///
14/// It handles all job operations including enqueueing, scheduling, and monitoring.
15/// Storage instances are created using the [`Storage::builder()`] method.
16///
17/// # Examples
18///
19/// ```rust
20/// use oxanus::{Storage, Queue, Worker};
21///
22/// async fn example() -> Result<(), oxanus::OxanusError> {
23/// let storage = Storage::builder().from_env()?.build()?;
24///
25/// // Enqueue a job
26/// storage.enqueue(MyQueue, MyWorker { data: "hello" }).await?;
27///
28/// // Schedule a job for later
29/// storage.enqueue_in(MyQueue, MyWorker { data: "delayed" }, 300).await?;
30///
31/// Ok(())
32/// }
33/// ```
34#[derive(Clone)]
35pub struct Storage {
36 pub(crate) internal: StorageInternal,
37}
38
39impl Storage {
40 /// Creates a new [`StorageBuilder`] for configuring and building a Storage instance.
41 ///
42 /// # Examples
43 ///
44 /// ```rust
45 /// use oxanus::Storage;
46 ///
47 /// let builder = Storage::builder();
48 /// let storage = builder.from_env()?.build()?;
49 /// ```
50 pub fn builder() -> StorageBuilder {
51 StorageBuilder::new()
52 }
53
54 /// Enqueues a job to be processed immediately.
55 ///
56 /// # Arguments
57 ///
58 /// * `queue` - The queue to enqueue the job to
59 /// * `job` - The job to enqueue
60 ///
61 /// # Returns
62 ///
63 /// A [`JobId`] that can be used to track the job, or an [`OxanusError`] if the operation fails.
64 ///
65 /// # Examples
66 ///
67 /// ```rust
68 /// use oxanus::{Storage, Queue, Worker};
69 ///
70 /// async fn example(storage: &Storage) -> Result<(), oxanus::OxanusError> {
71 /// let job_id = storage.enqueue(MyQueue, MyWorker { data: "hello" }).await?;
72 /// Ok(())
73 /// }
74 /// ```
75 pub async fn enqueue<T, DT, ET>(&self, queue: impl Queue, job: T) -> Result<JobId, OxanusError>
76 where
77 T: Worker<Context = DT, Error = ET> + serde::Serialize,
78 DT: Send + Sync + Clone + 'static,
79 ET: std::error::Error + Send + Sync + 'static,
80 {
81 self.enqueue_in(queue, job, 0).await
82 }
83
84 /// Enqueues a job to be processed after a specified delay.
85 ///
86 /// # Arguments
87 ///
88 /// * `queue` - The queue to enqueue the job to
89 /// * `job` - The job to enqueue
90 /// * `delay` - The delay in seconds before the job should be processed
91 ///
92 /// # Returns
93 ///
94 /// A [`JobId`] that can be used to track the job, or an [`OxanusError`] if the operation fails.
95 ///
96 /// # Examples
97 ///
98 /// ```rust
99 /// use oxanus::{Storage, Queue, Worker};
100 ///
101 /// async fn example(storage: &Storage) -> Result<(), oxanus::OxanusError> {
102 /// // Schedule a job to run in 5 minutes
103 /// let job_id = storage.enqueue_in(MyQueue, MyWorker { data: "delayed" }, 300).await?;
104 /// Ok(())
105 /// }
106 /// ```
107 pub async fn enqueue_in<T, DT, ET>(
108 &self,
109 queue: impl Queue,
110 job: T,
111 delay: u64,
112 ) -> Result<JobId, OxanusError>
113 where
114 T: Worker<Context = DT, Error = ET> + serde::Serialize,
115 DT: Send + Sync + Clone + 'static,
116 ET: std::error::Error + Send + Sync + 'static,
117 {
118 let envelope = JobEnvelope::new(queue.key().clone(), job)?;
119
120 tracing::trace!("Enqueuing job: {:?}", envelope);
121
122 if delay > 0 {
123 self.internal.enqueue_in(envelope, delay).await
124 } else {
125 self.internal.enqueue(envelope).await
126 }
127 }
128
129 /// Schedules a job to run at a specific time.
130 ///
131 /// # Arguments
132 ///
133 /// * `queue` - The queue to enqueue the job to
134 /// * `job` - The job to enqueue
135 /// * `time` - The UTC timestamp when the job should become available
136 ///
137 /// # Returns
138 ///
139 /// A [`JobId`] that can be used to track the job, or an [`OxanusError`] if the operation fails.
140 ///
141 /// # Examples
142 ///
143 /// ```rust
144 /// use chrono::{Duration, Utc};
145 /// use oxanus::{Storage, Queue, Worker};
146 ///
147 /// async fn example(storage: &Storage) -> Result<(), oxanus::OxanusError> {
148 /// let time = Utc::now() + Duration::minutes(5);
149 /// let job_id = storage.enqueue_at(MyQueue, MyWorker { data: "scheduled" }, time).await?;
150 /// Ok(())
151 /// }
152 /// ```
153 pub async fn enqueue_at<T, DT, ET>(
154 &self,
155 queue: impl Queue,
156 job: T,
157 time: DateTime<Utc>,
158 ) -> Result<JobId, OxanusError>
159 where
160 T: Worker<Context = DT, Error = ET> + serde::Serialize,
161 DT: Send + Sync + Clone + 'static,
162 ET: std::error::Error + Send + Sync + 'static,
163 {
164 let envelope = JobEnvelope::new(queue.key().clone(), job)?;
165
166 tracing::trace!("Scheduling job {:?} at {}", envelope, time);
167
168 self.internal.enqueue_at(envelope, time).await
169 }
170
171 /// Returns the number of jobs currently enqueued in the specified queue.
172 ///
173 /// # Arguments
174 ///
175 /// * `queue` - The queue to count jobs for
176 ///
177 /// # Returns
178 ///
179 /// The number of enqueued jobs, or an [`OxanusError`] if the operation fails.
180 pub async fn enqueued_count(&self, queue: impl Queue) -> Result<usize, OxanusError> {
181 self.internal.enqueued_count(&queue.key()).await
182 }
183
184 /// Returns the latency of the queue (The age of the oldest job in the queue).
185 ///
186 /// # Arguments
187 ///
188 /// * `queue` - The queue to get the latency for
189 ///
190 /// # Returns
191 ///
192 /// The latency of the queue in milliseconds, or an [`OxanusError`] if the operation fails.
193 pub async fn latency_ms(&self, queue: impl Queue) -> Result<f64, OxanusError> {
194 self.internal.latency_ms(&queue.key()).await
195 }
196
197 /// Returns the number of jobs that have failed and moved to the dead queue.
198 ///
199 /// # Returns
200 ///
201 /// The number of dead jobs, or an [`OxanusError`] if the operation fails.
202 pub async fn dead_count(&self) -> Result<usize, OxanusError> {
203 self.internal.dead_count().await
204 }
205
206 /// Returns the number of jobs that are currently being retried.
207 ///
208 /// # Returns
209 ///
210 /// The number of retrying jobs, or an [`OxanusError`] if the operation fails.
211 pub async fn retries_count(&self) -> Result<usize, OxanusError> {
212 self.internal.retries_count().await
213 }
214
215 /// Returns the number of jobs that are scheduled for future execution.
216 ///
217 /// # Returns
218 ///
219 /// The number of scheduled jobs, or an [`OxanusError`] if the operation fails.
220 pub async fn scheduled_count(&self) -> Result<usize, OxanusError> {
221 self.internal.scheduled_count().await
222 }
223
224 /// Returns the number of jobs that are currently enqueued or scheduled for future execution.
225 ///
226 /// # Returns
227 ///
228 /// The number of jobs, or an [`OxanusError`] if the operation fails.
229 pub async fn jobs_count(&self) -> Result<usize, OxanusError> {
230 self.internal.jobs_count().await
231 }
232
233 /// Returns the stats for all queues.
234 ///
235 /// # Returns
236 ///
237 /// The stats for all queues, or an [`OxanusError`] if the operation fails.
238 pub async fn stats(&self) -> Result<Stats, OxanusError> {
239 self.internal.stats().await
240 }
241
242 /// Returns the list of processes that are currently running.
243 ///
244 /// # Returns
245 ///
246 /// The list of processes, or an [`OxanusError`] if the operation fails.
247 pub async fn processes(&self) -> Result<Vec<Process>, OxanusError> {
248 self.internal.processes().await
249 }
250
251 /// Returns the namespace this storage instance is using.
252 ///
253 /// # Returns
254 ///
255 /// The namespace string.
256 pub fn namespace(&self) -> &str {
257 self.internal.namespace()
258 }
259}