oxanus/
storage.rs

1use chrono::{DateTime, Utc};
2
3use crate::{
4    error::OxanusError,
5    job_envelope::{JobEnvelope, JobId},
6    queue::Queue,
7    storage_builder::StorageBuilder,
8    storage_internal::{Process, Stats, StorageInternal},
9    worker::Worker,
10};
11
12/// Storage provides the main interface for job management in Oxanus.
13///
14/// It handles all job operations including enqueueing, scheduling, and monitoring.
15/// Storage instances are created using the [`Storage::builder()`] method.
16///
17/// # Examples
18///
19/// ```rust
20/// use oxanus::{Storage, Queue, Worker};
21///
22/// async fn example() -> Result<(), oxanus::OxanusError> {
23///     let storage = Storage::builder().from_env()?.build()?;
24///
25///     // Enqueue a job
26///     storage.enqueue(MyQueue, MyWorker { data: "hello" }).await?;
27///
28///     // Schedule a job for later
29///     storage.enqueue_in(MyQueue, MyWorker { data: "delayed" }, 300).await?;
30///
31///     Ok(())
32/// }
33/// ```
34#[derive(Clone)]
35pub struct Storage {
36    pub(crate) internal: StorageInternal,
37}
38
39impl Storage {
40    /// Creates a new [`StorageBuilder`] for configuring and building a Storage instance.
41    ///
42    /// # Examples
43    ///
44    /// ```rust
45    /// use oxanus::Storage;
46    ///
47    /// let builder = Storage::builder();
48    /// let storage = builder.from_env()?.build()?;
49    /// ```
50    pub fn builder() -> StorageBuilder {
51        StorageBuilder::new()
52    }
53
54    /// Enqueues a job to be processed immediately.
55    ///
56    /// # Arguments
57    ///
58    /// * `queue` - The queue to enqueue the job to
59    /// * `job` - The job to enqueue
60    ///
61    /// # Returns
62    ///
63    /// A [`JobId`] that can be used to track the job, or an [`OxanusError`] if the operation fails.
64    ///
65    /// # Examples
66    ///
67    /// ```rust
68    /// use oxanus::{Storage, Queue, Worker};
69    ///
70    /// async fn example(storage: &Storage) -> Result<(), oxanus::OxanusError> {
71    ///     let job_id = storage.enqueue(MyQueue, MyWorker { data: "hello" }).await?;
72    ///     Ok(())
73    /// }
74    /// ```
75    pub async fn enqueue<T, DT, ET>(&self, queue: impl Queue, job: T) -> Result<JobId, OxanusError>
76    where
77        T: Worker<Context = DT, Error = ET> + serde::Serialize,
78        DT: Send + Sync + Clone + 'static,
79        ET: std::error::Error + Send + Sync + 'static,
80    {
81        self.enqueue_in(queue, job, 0).await
82    }
83
84    /// Enqueues a job to be processed after a specified delay.
85    ///
86    /// # Arguments
87    ///
88    /// * `queue` - The queue to enqueue the job to
89    /// * `job` - The job to enqueue
90    /// * `delay` - The delay in seconds before the job should be processed
91    ///
92    /// # Returns
93    ///
94    /// A [`JobId`] that can be used to track the job, or an [`OxanusError`] if the operation fails.
95    ///
96    /// # Examples
97    ///
98    /// ```rust
99    /// use oxanus::{Storage, Queue, Worker};
100    ///
101    /// async fn example(storage: &Storage) -> Result<(), oxanus::OxanusError> {
102    ///     // Schedule a job to run in 5 minutes
103    ///     let job_id = storage.enqueue_in(MyQueue, MyWorker { data: "delayed" }, 300).await?;
104    ///     Ok(())
105    /// }
106    /// ```
107    pub async fn enqueue_in<T, DT, ET>(
108        &self,
109        queue: impl Queue,
110        job: T,
111        delay: u64,
112    ) -> Result<JobId, OxanusError>
113    where
114        T: Worker<Context = DT, Error = ET> + serde::Serialize,
115        DT: Send + Sync + Clone + 'static,
116        ET: std::error::Error + Send + Sync + 'static,
117    {
118        let envelope = JobEnvelope::new(queue.key().clone(), job)?;
119
120        tracing::trace!("Enqueuing job: {:?}", envelope);
121
122        if delay > 0 {
123            self.internal.enqueue_in(envelope, delay).await
124        } else {
125            self.internal.enqueue(envelope).await
126        }
127    }
128
129    /// Schedules a job to run at a specific time.
130    ///
131    /// # Arguments
132    ///
133    /// * `queue` - The queue to enqueue the job to
134    /// * `job` - The job to enqueue
135    /// * `time` - The UTC timestamp when the job should become available
136    ///
137    /// # Returns
138    ///
139    /// A [`JobId`] that can be used to track the job, or an [`OxanusError`] if the operation fails.
140    ///
141    /// # Examples
142    ///
143    /// ```rust
144    /// use chrono::{Duration, Utc};
145    /// use oxanus::{Storage, Queue, Worker};
146    ///
147    /// async fn example(storage: &Storage) -> Result<(), oxanus::OxanusError> {
148    ///     let time = Utc::now() + Duration::minutes(5);
149    ///     let job_id = storage.enqueue_at(MyQueue, MyWorker { data: "scheduled" }, time).await?;
150    ///     Ok(())
151    /// }
152    /// ```
153    pub async fn enqueue_at<T, DT, ET>(
154        &self,
155        queue: impl Queue,
156        job: T,
157        time: DateTime<Utc>,
158    ) -> Result<JobId, OxanusError>
159    where
160        T: Worker<Context = DT, Error = ET> + serde::Serialize,
161        DT: Send + Sync + Clone + 'static,
162        ET: std::error::Error + Send + Sync + 'static,
163    {
164        let envelope = JobEnvelope::new(queue.key().clone(), job)?;
165
166        tracing::trace!("Scheduling job {:?} at {}", envelope, time);
167
168        self.internal.enqueue_at(envelope, time).await
169    }
170
171    /// Returns the number of jobs currently enqueued in the specified queue.
172    ///
173    /// # Arguments
174    ///
175    /// * `queue` - The queue to count jobs for
176    ///
177    /// # Returns
178    ///
179    /// The number of enqueued jobs, or an [`OxanusError`] if the operation fails.
180    pub async fn enqueued_count(&self, queue: impl Queue) -> Result<usize, OxanusError> {
181        self.internal.enqueued_count(&queue.key()).await
182    }
183
184    /// Returns the latency of the queue (The age of the oldest job in the queue).
185    ///
186    /// # Arguments
187    ///
188    /// * `queue` - The queue to get the latency for
189    ///
190    /// # Returns
191    ///
192    /// The latency of the queue in milliseconds, or an [`OxanusError`] if the operation fails.
193    pub async fn latency_ms(&self, queue: impl Queue) -> Result<f64, OxanusError> {
194        self.internal.latency_ms(&queue.key()).await
195    }
196
197    /// Returns the number of jobs that have failed and moved to the dead queue.
198    ///
199    /// # Returns
200    ///
201    /// The number of dead jobs, or an [`OxanusError`] if the operation fails.
202    pub async fn dead_count(&self) -> Result<usize, OxanusError> {
203        self.internal.dead_count().await
204    }
205
206    /// Returns the number of jobs that are currently being retried.
207    ///
208    /// # Returns
209    ///
210    /// The number of retrying jobs, or an [`OxanusError`] if the operation fails.
211    pub async fn retries_count(&self) -> Result<usize, OxanusError> {
212        self.internal.retries_count().await
213    }
214
215    /// Returns the number of jobs that are scheduled for future execution.
216    ///
217    /// # Returns
218    ///
219    /// The number of scheduled jobs, or an [`OxanusError`] if the operation fails.
220    pub async fn scheduled_count(&self) -> Result<usize, OxanusError> {
221        self.internal.scheduled_count().await
222    }
223
224    /// Returns the number of jobs that are currently enqueued or scheduled for future execution.
225    ///
226    /// # Returns
227    ///
228    /// The number of jobs, or an [`OxanusError`] if the operation fails.
229    pub async fn jobs_count(&self) -> Result<usize, OxanusError> {
230        self.internal.jobs_count().await
231    }
232
233    /// Returns the stats for all queues.
234    ///
235    /// # Returns
236    ///
237    /// The stats for all queues, or an [`OxanusError`] if the operation fails.
238    pub async fn stats(&self) -> Result<Stats, OxanusError> {
239        self.internal.stats().await
240    }
241
242    /// Returns the list of processes that are currently running.
243    ///
244    /// # Returns
245    ///
246    /// The list of processes, or an [`OxanusError`] if the operation fails.
247    pub async fn processes(&self) -> Result<Vec<Process>, OxanusError> {
248        self.internal.processes().await
249    }
250
251    /// Returns the namespace this storage instance is using.
252    ///
253    /// # Returns
254    ///
255    /// The namespace string.
256    pub fn namespace(&self) -> &str {
257        self.internal.namespace()
258    }
259}