qml_rs/storage/
mod.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3
4use crate::core::{Job, JobState};
5
6pub mod config;
7pub mod error;
8pub mod memory;
9#[cfg(feature = "postgres")]
10pub mod postgres;
11#[cfg(feature = "redis")]
12pub mod redis;
13pub mod settings;
14
15#[cfg(test)]
16mod test_locking;
17
18#[cfg(feature = "postgres")]
19pub use config::PostgresConfig;
20#[cfg(feature = "redis")]
21pub use config::RedisConfig;
22pub use config::{MemoryConfig, StorageConfig};
23pub use error::StorageError;
24pub use memory::MemoryStorage;
25#[cfg(feature = "postgres")]
26pub use postgres::PostgresStorage;
27#[cfg(feature = "redis")]
28pub use redis::RedisStorage;
29
30/// Core storage trait that defines the interface for job persistence across all backends.
31///
32/// The [`Storage`] trait provides a unified API for job persistence operations, supporting
33/// multiple storage backends including in-memory, Redis, and PostgreSQL. All implementations
34/// provide atomic operations and race condition prevention for production use.
35///
36/// ## Storage Backends
37///
38/// - **[`MemoryStorage`]**: Fast in-memory storage for development and testing
39/// - **[`RedisStorage`]**: Distributed Redis storage with Lua script atomicity
40/// - **[`PostgresStorage`]**: ACID-compliant PostgreSQL with row-level locking
41///
42/// ## Core Operations
43///
44/// The trait provides standard CRUD operations (`enqueue`, `get`, `update`, `delete`)
45/// plus advanced operations for job processing:
46///
47/// - **Job Management**: Store, retrieve, update, and delete jobs
48/// - **Querying**: List jobs with filtering and pagination
49/// - **Processing**: Atomic job fetching with race condition prevention
50/// - **Locking**: Explicit job locking for distributed coordination
51///
52/// ## Race Condition Prevention
53///
54/// All storage backends implement atomic job fetching to prevent multiple workers
55/// from processing the same job simultaneously:
56///
57/// ```text
58/// Worker A ──┐
59///            ├── fetch_and_lock_job() ──→ Gets Job #123
60/// Worker B ──┘                         ──→ Gets Job #124 (not #123)
61/// ```
62///
63/// ## Examples
64///
65/// ### Basic Storage Operations
66/// ```rust
67/// use qml_rs::{MemoryStorage, Job, Storage};
68///
69/// # tokio_test::block_on(async {
70/// let storage = MemoryStorage::new();
71///
72/// // Create and store a job
73/// let job = Job::new("send_email", vec!["user@example.com".to_string()]);
74/// storage.enqueue(&job).await.unwrap();
75///
76/// // Retrieve the job
77/// let retrieved = storage.get(&job.id).await.unwrap().unwrap();
78/// assert_eq!(job.id, retrieved.id);
79///
80/// // Update job state
81/// let mut updated_job = retrieved;
82/// updated_job.set_state(qml_rs::JobState::processing("worker-1", "server-1")).unwrap();
83/// storage.update(&updated_job).await.unwrap();
84///
85/// // Delete the job
86/// let deleted = storage.delete(&job.id).await.unwrap();
87/// assert!(deleted);
88/// # });
89/// ```
90///
91/// ### Atomic Job Processing
92/// ```rust
93/// use qml_rs::{MemoryStorage, Job, Storage};
94///
95/// # tokio_test::block_on(async {
96/// let storage = MemoryStorage::new();
97///
98/// // Enqueue some jobs
99/// for i in 0..5 {
100///     let job = Job::new("process_item", vec![i.to_string()]);
101///     storage.enqueue(&job).await.unwrap();
102/// }
103///
104/// // Worker fetches and locks a job atomically
105/// let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
106/// match job {
107///     Some(job) => {
108///         println!("Worker-1 processing job: {}", job.id);
109///         // Job is automatically locked and marked as processing
110///     },
111///     None => println!("No jobs available"),
112/// }
113/// # });
114/// ```
115///
116/// ### Storage Backend Selection
117/// ```rust
118/// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
119///
120/// # tokio_test::block_on(async {
121/// // Memory storage for development
122/// let memory_storage = StorageInstance::memory();
123///
124/// // Redis storage for production
125/// # #[cfg(feature = "redis")]
126/// # {
127/// use qml_rs::storage::RedisConfig;
128/// let redis_config = RedisConfig::new().with_url("redis://localhost:6379");
129/// match StorageInstance::redis(redis_config).await {
130///     Ok(redis_storage) => println!("Redis storage ready"),
131///     Err(e) => println!("Redis connection failed: {}", e),
132/// }
133/// # }
134///
135/// // PostgreSQL storage for enterprise
136/// # #[cfg(feature = "postgres")]
137/// # {
138/// use qml_rs::storage::PostgresConfig;
139/// let pg_config = PostgresConfig::new()
140///     .with_database_url("postgresql://localhost:5432/qml")
141///     .with_auto_migrate(true);
142/// match StorageInstance::postgres(pg_config).await {
143///     Ok(pg_storage) => println!("PostgreSQL storage ready"),
144///     Err(e) => println!("PostgreSQL connection failed: {}", e),
145/// }
146/// # }
147/// # });
148/// ```
149///
150/// ### Job Filtering and Statistics
151/// ```rust
152/// use qml_rs::{MemoryStorage, Job, JobState, Storage};
153///
154/// # tokio_test::block_on(async {
155/// let storage = MemoryStorage::new();
156///
157/// // Create jobs in different states
158/// let mut job1 = Job::new("task1", vec![]);
159/// let mut job2 = Job::new("task2", vec![]);
160/// job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
161///
162/// storage.enqueue(&job1).await.unwrap();
163/// storage.enqueue(&job2).await.unwrap();
164///
165/// // List all jobs
166/// let all_jobs = storage.list(None, None, None).await.unwrap();
167/// println!("Total jobs: {}", all_jobs.len());
168///
169/// // Get job counts by state
170/// let counts = storage.get_job_counts().await;
171/// match counts {
172///     Ok(counts) => {
173///         for (state, count) in counts {
174///             println!("{:?}: {}", state, count);
175///         }
176///     },
177///     Err(e) => println!("Error: {}", e),
178/// }
179///
180/// // Get available jobs for processing
181/// let available = storage.get_available_jobs(Some(10)).await.unwrap();
182/// println!("Available for processing: {}", available.len());
183/// # });
184/// ```
185#[async_trait]
186pub trait Storage: Send + Sync {
187    /// Store a new job in the storage backend.
188    ///
189    /// Persists a job to the storage system, making it available for processing.
190    /// The job is typically stored in the "enqueued" state unless specified otherwise.
191    ///
192    /// ## Arguments
193    /// * `job` - The job to store with all its metadata and configuration
194    ///
195    /// ## Returns
196    /// * `Ok(())` - Job was stored successfully
197    /// * `Err(StorageError)` - Storage operation failed
198    ///
199    /// ## Examples
200    /// ```rust
201    /// use qml_rs::{MemoryStorage, Job, Storage};
202    ///
203    /// # tokio_test::block_on(async {
204    /// let storage = MemoryStorage::new();
205    ///
206    /// let job = Job::with_config(
207    ///     "send_notification",
208    ///     vec!["user123".to_string()],
209    ///     "notifications", // queue
210    ///     5,              // priority
211    ///     3               // max_retries
212    /// );
213    ///
214    /// storage.enqueue(&job).await.unwrap();
215    /// println!("Job {} enqueued successfully", job.id);
216    /// # });
217    /// ```
218    async fn enqueue(&self, job: &Job) -> Result<(), StorageError>;
219
220    /// Retrieve a job by its unique identifier.
221    ///
222    /// Fetches a complete job record including all metadata, current state,
223    /// and configuration. Returns `None` if the job doesn't exist.
224    ///
225    /// ## Arguments
226    /// * `job_id` - The unique identifier of the job to retrieve
227    ///
228    /// ## Returns
229    /// * `Ok(Some(job))` - Job exists and was retrieved successfully
230    /// * `Ok(None)` - Job doesn't exist in storage
231    /// * `Err(StorageError)` - Storage operation failed
232    ///
233    /// ## Examples
234    /// ```rust
235    /// use qml_rs::{MemoryStorage, Job, Storage};
236    ///
237    /// # tokio_test::block_on(async {
238    /// let storage = MemoryStorage::new();
239    /// let job = Job::new("process_data", vec!["file.csv".to_string()]);
240    ///
241    /// storage.enqueue(&job).await.unwrap();
242    ///
243    /// // Retrieve the job
244    /// match storage.get(&job.id).await.unwrap() {
245    ///     Some(retrieved_job) => {
246    ///         println!("Found job: {} ({})", retrieved_job.id, retrieved_job.method);
247    ///     },
248    ///     None => println!("Job not found"),
249    /// }
250    /// # });
251    /// ```
252    async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError>;
253
254    /// Update an existing job's state and metadata.
255    ///
256    /// Modifies a job record in storage, typically used for state transitions
257    /// (e.g., Enqueued → Processing → Succeeded). The entire job record is updated.
258    ///
259    /// ## Arguments
260    /// * `job` - The job with updated information to persist
261    ///
262    /// ## Returns
263    /// * `Ok(())` - Job was updated successfully
264    /// * `Err(StorageError)` - Storage operation failed (job may not exist)
265    ///
266    /// ## Examples
267    /// ```rust
268    /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
269    ///
270    /// # tokio_test::block_on(async {
271    /// let storage = MemoryStorage::new();
272    /// let mut job = Job::new("process_order", vec!["order123".to_string()]);
273    ///
274    /// storage.enqueue(&job).await.unwrap();
275    ///
276    /// // Update job state to processing
277    /// job.set_state(JobState::processing("worker-1", "server-1")).unwrap();
278    /// storage.update(&job).await.unwrap();
279    ///
280    /// // Add metadata and update again
281    /// job.add_metadata("processed_by", "worker-1");
282    /// storage.update(&job).await.unwrap();
283    /// # });
284    /// ```
285    async fn update(&self, job: &Job) -> Result<(), StorageError>;
286
287    /// Remove a job from storage (soft or hard delete).
288    ///
289    /// Deletes a job record from the storage system. Some implementations may
290    /// perform soft deletion (marking as deleted) while others perform hard deletion.
291    ///
292    /// ## Arguments
293    /// * `job_id` - The unique identifier of the job to delete
294    ///
295    /// ## Returns
296    /// * `Ok(true)` - Job existed and was deleted successfully
297    /// * `Ok(false)` - Job didn't exist (nothing to delete)
298    /// * `Err(StorageError)` - Storage operation failed
299    ///
300    /// ## Examples
301    /// ```rust
302    /// use qml_rs::{MemoryStorage, Job, Storage};
303    ///
304    /// # tokio_test::block_on(async {
305    /// let storage = MemoryStorage::new();
306    /// let job = Job::new("cleanup_task", vec![]);
307    ///
308    /// storage.enqueue(&job).await.unwrap();
309    ///
310    /// // Delete the job
311    /// let was_deleted = storage.delete(&job.id).await.unwrap();
312    /// assert!(was_deleted);
313    ///
314    /// // Verify it's gone
315    /// let retrieved = storage.get(&job.id).await.unwrap();
316    /// assert!(retrieved.is_none());
317    /// # });
318    /// ```
319    async fn delete(&self, job_id: &str) -> Result<bool, StorageError>;
320
321    /// List jobs with optional filtering and pagination.
322    ///
323    /// Retrieves multiple jobs from storage with optional filtering by state
324    /// and pagination support. Useful for building dashboards and monitoring tools.
325    ///
326    /// ## Arguments
327    /// * `state_filter` - Optional job state to filter by (e.g., only failed jobs)
328    /// * `limit` - Maximum number of jobs to return (None = no limit)
329    /// * `offset` - Number of jobs to skip for pagination (None = start from beginning)
330    ///
331    /// ## Returns
332    /// * `Ok(jobs)` - Vector of jobs matching the criteria
333    /// * `Err(StorageError)` - Storage operation failed
334    ///
335    /// ## Examples
336    /// ```rust
337    /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
338    ///
339    /// # tokio_test::block_on(async {
340    /// let storage = MemoryStorage::new();
341    ///
342    /// // Create several jobs
343    /// for i in 0..10 {
344    ///     let job = Job::new("task", vec![i.to_string()]);
345    ///     storage.enqueue(&job).await.unwrap();
346    /// }
347    ///
348    /// // List all jobs
349    /// let all_jobs = storage.list(None, None, None).await.unwrap();
350    /// println!("Total jobs: {}", all_jobs.len());
351    ///
352    /// // List first 5 jobs
353    /// let first_five = storage.list(None, Some(5), None).await.unwrap();
354    /// println!("First 5 jobs: {}", first_five.len());
355    ///
356    /// // List next 5 jobs (pagination)
357    /// let next_five = storage.list(None, Some(5), Some(5)).await.unwrap();
358    /// println!("Next 5 jobs: {}", next_five.len());
359    /// # });
360    /// ```
361    async fn list(
362        &self,
363        state_filter: Option<&JobState>,
364        limit: Option<usize>,
365        offset: Option<usize>,
366    ) -> Result<Vec<Job>, StorageError>;
367
368    /// Get the count of jobs grouped by their current state.
369    ///
370    /// Returns statistics about job distribution across different states.
371    /// Useful for monitoring and dashboard displays.
372    ///
373    /// ## Returns
374    /// * `Ok(counts)` - HashMap mapping each job state to its count
375    /// * `Err(StorageError)` - Storage operation failed
376    ///
377    /// ## Examples
378    /// ```rust,ignore
379    /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
380    ///
381    /// # tokio_test::block_on(async {
382    /// let storage = MemoryStorage::new();
383    ///
384    /// // Create jobs in different states
385    /// let mut job1 = Job::new("task1", vec![]);
386    /// storage.enqueue(&job1).await.unwrap();
387    ///
388    /// let mut job2 = Job::new("task2", vec![]);
389    /// job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
390    /// storage.update(&job2).await.unwrap();
391    ///
392    /// // Get statistics
393    /// let counts = storage.get_job_counts().await;
394    /// match counts {
395    ///     Ok(counts) => for (state, count) in counts {
396    ///         println!("State {:?}: {} jobs", state, count);
397    ///     },
398    ///     Err(e) => println!("Error: {}", e),
399    /// }
400    /// # });
401    /// ```
402    async fn get_job_counts(&self) -> Result<HashMap<JobState, usize>, StorageError>;
403
404    /// Get jobs that are ready to be processed immediately.
405    ///
406    /// Returns jobs that are available for processing: enqueued jobs, scheduled jobs
407    /// whose time has arrived, and jobs awaiting retry whose retry time has passed.
408    ///
409    /// ## Arguments
410    /// * `limit` - Maximum number of jobs to return (None = no limit)
411    ///
412    /// ## Returns
413    /// * `Ok(jobs)` - Vector of jobs ready for processing
414    /// * `Err(StorageError)` - Storage operation failed
415    ///
416    /// ## Examples
417    /// ```rust
418    /// use qml_rs::{MemoryStorage, Job, Storage};
419    ///
420    /// # tokio_test::block_on(async {
421    /// let storage = MemoryStorage::new();
422    ///
423    /// // Enqueue several jobs
424    /// for i in 0..5 {
425    ///     let job = Job::new("process_item", vec![i.to_string()]);
426    ///     storage.enqueue(&job).await.unwrap();
427    /// }
428    ///
429    /// // Get available jobs for processing
430    /// let available = storage.get_available_jobs(Some(3)).await.unwrap();
431    /// println!("Available for processing: {}", available.len());
432    ///
433    /// for job in available {
434    ///     println!("Job {} is ready: {}", job.id, job.method);
435    /// }
436    /// # });
437    /// ```
438    async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError>;
439
440    /// Atomically fetch and lock a job for processing to prevent race conditions.
441    ///
442    /// This is the **primary method for job processing** in production environments.
443    /// It atomically finds an available job, locks it, and marks it as processing
444    /// in a single operation, preventing multiple workers from processing the same job.
445    ///
446    /// ## Race Condition Prevention
447    ///
448    /// Different storage backends use different mechanisms:
449    /// - **PostgreSQL**: `SELECT FOR UPDATE SKIP LOCKED` with dedicated lock table
450    /// - **Redis**: Lua scripts for atomic operations with distributed locking
451    /// - **Memory**: Mutex-based locking with automatic cleanup
452    ///
453    /// ## Arguments
454    /// * `worker_id` - Unique identifier of the worker claiming the job
455    /// * `queues` - Optional list of specific queues to fetch from (None = all queues)
456    ///
457    /// ## Returns
458    /// * `Ok(Some(job))` - Job was successfully fetched and locked
459    /// * `Ok(None)` - No jobs are available for processing
460    /// * `Err(StorageError)` - Storage operation failed
461    ///
462    /// ## Examples
463    /// ```rust
464    /// use qml_rs::{MemoryStorage, Job, Storage};
465    ///
466    /// # tokio_test::block_on(async {
467    /// let storage = MemoryStorage::new();
468    ///
469    /// // Enqueue some jobs
470    /// for i in 0..3 {
471    ///     let job = Job::with_config(
472    ///         "process_item",
473    ///         vec![i.to_string()],
474    ///         if i == 0 { "critical" } else { "normal" }, // different queues
475    ///         i as i32,
476    ///         3
477    ///     );
478    ///     storage.enqueue(&job).await.unwrap();
479    /// }
480    ///
481    /// // Worker fetches from any queue
482    /// let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
483    /// match job {
484    ///     Some(job) => {
485    ///         println!("Worker-1 got job: {} from queue: {}", job.id, job.queue);
486    ///         // Job is now locked and marked as processing
487    ///     },
488    ///     None => println!("No jobs available"),
489    /// }
490    ///
491    /// // Worker fetches only from critical queue
492    /// let critical_job = storage.fetch_and_lock_job(
493    ///     "worker-2",
494    ///     Some(&["critical".to_string()])
495    /// ).await.unwrap();
496    /// # });
497    /// ```
498    async fn fetch_and_lock_job(
499        &self,
500        worker_id: &str,
501        queues: Option<&[String]>,
502    ) -> Result<Option<Job>, StorageError>;
503
504    /// Try to acquire an explicit lock on a specific job.
505    ///
506    /// Attempts to acquire an exclusive lock on a job for coordination between
507    /// workers. This is useful for implementing custom job processing logic
508    /// or manual job management.
509    ///
510    /// ## Arguments
511    /// * `job_id` - The unique identifier of the job to lock
512    /// * `worker_id` - Unique identifier of the worker trying to acquire the lock
513    /// * `timeout_seconds` - Lock timeout in seconds (auto-release after this time)
514    ///
515    /// ## Returns
516    /// * `Ok(true)` - Lock was successfully acquired
517    /// * `Ok(false)` - Lock could not be acquired (already locked by another worker)
518    /// * `Err(StorageError)` - Storage operation failed
519    ///
520    /// ## Examples
521    /// ```rust
522    /// use qml_rs::{MemoryStorage, Job, Storage};
523    ///
524    /// # tokio_test::block_on(async {
525    /// let storage = MemoryStorage::new();
526    /// let job = Job::new("exclusive_task", vec![]);
527    /// storage.enqueue(&job).await.unwrap();
528    ///
529    /// // Worker 1 tries to acquire lock
530    /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-1", 300).await.unwrap();
531    /// assert!(acquired);
532    ///
533    /// // Worker 2 tries to acquire the same lock (should fail)
534    /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-2", 300).await.unwrap();
535    /// assert!(!acquired);
536    ///
537    /// // Worker 1 releases the lock
538    /// storage.release_job_lock(&job.id, "worker-1").await.unwrap();
539    ///
540    /// // Now worker 2 can acquire it
541    /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-2", 300).await.unwrap();
542    /// assert!(acquired);
543    /// # });
544    /// ```
545    async fn try_acquire_job_lock(
546        &self,
547        job_id: &str,
548        worker_id: &str,
549        timeout_seconds: u64,
550    ) -> Result<bool, StorageError>;
551
552    /// Release an explicit lock on a job.
553    ///
554    /// Releases a lock that was previously acquired with `try_acquire_job_lock`.
555    /// Only the worker that acquired the lock can release it.
556    ///
557    /// ## Arguments
558    /// * `job_id` - The unique identifier of the job to unlock
559    /// * `worker_id` - Unique identifier of the worker releasing the lock
560    ///
561    /// ## Returns
562    /// * `Ok(true)` - Lock was successfully released
563    /// * `Ok(false)` - Lock was not held by this worker (or already expired)
564    /// * `Err(StorageError)` - Storage operation failed
565    ///
566    /// ## Examples
567    /// ```rust
568    /// use qml_rs::{MemoryStorage, Job, Storage};
569    ///
570    /// # tokio_test::block_on(async {
571    /// let storage = MemoryStorage::new();
572    /// let job = Job::new("task_with_lock", vec![]);
573    /// storage.enqueue(&job).await.unwrap();
574    ///
575    /// // Acquire lock
576    /// storage.try_acquire_job_lock(&job.id, "worker-1", 300).await.unwrap();
577    ///
578    /// // Do some work...
579    ///
580    /// // Release lock
581    /// let released = storage.release_job_lock(&job.id, "worker-1").await.unwrap();
582    /// assert!(released);
583    ///
584    /// // Trying to release again should return false
585    /// let released = storage.release_job_lock(&job.id, "worker-1").await.unwrap();
586    /// assert!(!released);
587    /// # });
588    /// ```
589    async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError>;
590
591    /// Atomically fetch multiple available jobs with locking.
592    ///
593    /// Similar to `fetch_and_lock_job` but fetches multiple jobs in a single
594    /// atomic operation. Useful for batch processing scenarios where a worker
595    /// can handle multiple jobs simultaneously.
596    ///
597    /// ## Arguments
598    /// * `worker_id` - Unique identifier of the worker claiming the jobs
599    /// * `limit` - Maximum number of jobs to fetch (None = implementation default)
600    /// * `queues` - Optional list of specific queues to fetch from (None = all queues)
601    ///
602    /// ## Returns
603    /// * `Ok(jobs)` - Vector of jobs that were successfully fetched and locked
604    /// * `Err(StorageError)` - Storage operation failed
605    ///
606    /// ## Examples
607    /// ```rust
608    /// use qml_rs::{MemoryStorage, Job, Storage};
609    ///
610    /// # tokio_test::block_on(async {
611    /// let storage = MemoryStorage::new();
612    ///
613    /// // Enqueue batch of jobs
614    /// for i in 0..10 {
615    ///     let job = Job::new("batch_process", vec![i.to_string()]);
616    ///     storage.enqueue(&job).await.unwrap();
617    /// }
618    ///
619    /// // Worker fetches multiple jobs at once
620    /// let jobs = storage.fetch_available_jobs_atomic("worker-1", Some(5), None).await.unwrap();
621    /// println!("Worker-1 got {} jobs for batch processing", jobs.len());
622    ///
623    /// for job in jobs {
624    ///     println!("Processing job {} with argument: {}", job.id, job.arguments[0]);
625    ///     // All jobs are now locked and marked as processing
626    /// }
627    /// # });
628    /// ```
629    async fn fetch_available_jobs_atomic(
630        &self,
631        worker_id: &str,
632        limit: Option<usize>,
633        queues: Option<&[String]>,
634    ) -> Result<Vec<Job>, StorageError>;
635}
636
637/// Storage instance that can hold any storage implementation
638pub enum StorageInstance {
639    /// Memory storage instance
640    Memory(MemoryStorage),
641    /// Redis storage instance
642    #[cfg(feature = "redis")]
643    Redis(RedisStorage),
644    /// PostgreSQL storage instance
645    #[cfg(feature = "postgres")]
646    Postgres(PostgresStorage),
647}
648
649impl StorageInstance {
650    /// Create a storage instance from configuration
651    ///
652    /// # Arguments
653    /// * `config` - The storage configuration
654    ///
655    /// # Returns
656    /// * `Ok(storage)` - The created storage instance
657    /// * `Err(StorageError)` - If there was an error creating the storage
658    ///
659    /// # Examples
660    ///
661    /// ```rust
662    /// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
663    ///
664    /// # tokio_test::block_on(async {
665    /// let config = StorageConfig::Memory(MemoryConfig::default());
666    /// let storage = StorageInstance::from_config(config).await.unwrap();
667    /// # });
668    /// ```
669    pub async fn from_config(config: StorageConfig) -> Result<Self, StorageError> {
670        match config {
671            StorageConfig::Memory(memory_config) => Ok(StorageInstance::Memory(
672                MemoryStorage::with_config(memory_config),
673            )),
674            #[cfg(feature = "redis")]
675            StorageConfig::Redis(redis_config) => {
676                let redis_storage = RedisStorage::with_config(redis_config).await?;
677                Ok(StorageInstance::Redis(redis_storage))
678            }
679            #[cfg(feature = "postgres")]
680            StorageConfig::Postgres(postgres_config) => {
681                let postgres_storage = PostgresStorage::new(postgres_config).await?;
682                Ok(StorageInstance::Postgres(postgres_storage))
683            }
684        }
685    }
686
687    /// Create a memory storage instance with default configuration
688    ///
689    /// # Examples
690    ///
691    /// ```rust
692    /// use qml_rs::storage::StorageInstance;
693    ///
694    /// let storage = StorageInstance::memory();
695    /// ```
696    pub fn memory() -> Self {
697        StorageInstance::Memory(MemoryStorage::new())
698    }
699
700    /// Create a memory storage instance with custom configuration
701    ///
702    /// # Arguments
703    /// * `config` - The memory storage configuration
704    ///
705    /// # Examples
706    ///
707    /// ```rust
708    /// use qml_rs::storage::{StorageInstance, MemoryConfig};
709    ///
710    /// let config = MemoryConfig::new().with_max_jobs(1000);
711    /// let storage = StorageInstance::memory_with_config(config);
712    /// ```
713    pub fn memory_with_config(config: MemoryConfig) -> Self {
714        StorageInstance::Memory(MemoryStorage::with_config(config))
715    }
716
717    /// Create a Redis storage instance with custom configuration
718    ///
719    /// # Arguments
720    /// * `config` - The Redis storage configuration
721    ///
722    /// # Returns
723    /// * `Ok(storage)` - The created Redis storage instance
724    /// * `Err(StorageError)` - If there was an error connecting to Redis
725    ///
726    /// # Examples
727    ///
728    /// ```rust
729    /// use qml_rs::storage::{StorageInstance, RedisConfig};
730    ///
731    /// # tokio_test::block_on(async {
732    /// let config = RedisConfig::new().with_url("redis://localhost:6379");
733    /// match StorageInstance::redis(config).await {
734    ///     Ok(storage) => println!("Redis storage created successfully"),
735    ///     Err(e) => println!("Failed to create Redis storage: {}", e),
736    /// }
737    /// # });
738    /// ```
739    #[cfg(feature = "redis")]
740    pub async fn redis(config: RedisConfig) -> Result<Self, StorageError> {
741        let redis_storage = RedisStorage::with_config(config).await?;
742        Ok(StorageInstance::Redis(redis_storage))
743    }
744
745    /// Create a PostgreSQL storage instance with custom configuration
746    ///
747    /// # Arguments
748    /// * `config` - The PostgreSQL storage configuration
749    ///
750    /// # Returns
751    /// * `Ok(storage)` - The created PostgreSQL storage instance
752    /// * `Err(StorageError)` - If there was an error connecting to PostgreSQL
753    ///
754    /// # Examples
755    ///
756    /// ```rust
757    /// use qml_rs::storage::{StorageInstance, PostgresConfig};
758    ///
759    /// # tokio_test::block_on(async {
760    /// let config = PostgresConfig::new().with_database_url("postgresql://postgres:password@localhost:5432/qml");
761    /// match StorageInstance::postgres(config).await {
762    ///     Ok(storage) => println!("PostgreSQL storage created successfully"),
763    ///     Err(e) => println!("Failed to create PostgreSQL storage: {}", e),
764    /// }
765    /// # });
766    /// ```
767    #[cfg(feature = "postgres")]
768    pub async fn postgres(config: PostgresConfig) -> Result<Self, StorageError> {
769        let postgres_storage = PostgresStorage::new(config).await?;
770        Ok(StorageInstance::Postgres(postgres_storage))
771    }
772}
773
774#[async_trait]
775impl Storage for StorageInstance {
776    async fn enqueue(&self, job: &Job) -> Result<(), StorageError> {
777        match self {
778            StorageInstance::Memory(storage) => storage.enqueue(job).await,
779            #[cfg(feature = "redis")]
780            StorageInstance::Redis(storage) => storage.enqueue(job).await,
781            #[cfg(feature = "postgres")]
782            StorageInstance::Postgres(storage) => storage.enqueue(job).await,
783        }
784    }
785
786    async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError> {
787        match self {
788            StorageInstance::Memory(storage) => storage.get(job_id).await,
789            #[cfg(feature = "redis")]
790            StorageInstance::Redis(storage) => storage.get(job_id).await,
791            #[cfg(feature = "postgres")]
792            StorageInstance::Postgres(storage) => storage.get(job_id).await,
793        }
794    }
795
796    async fn update(&self, job: &Job) -> Result<(), StorageError> {
797        match self {
798            StorageInstance::Memory(storage) => storage.update(job).await,
799            #[cfg(feature = "redis")]
800            StorageInstance::Redis(storage) => storage.update(job).await,
801            #[cfg(feature = "postgres")]
802            StorageInstance::Postgres(storage) => storage.update(job).await,
803        }
804    }
805
806    async fn delete(&self, job_id: &str) -> Result<bool, StorageError> {
807        match self {
808            StorageInstance::Memory(storage) => storage.delete(job_id).await,
809            #[cfg(feature = "redis")]
810            StorageInstance::Redis(storage) => storage.delete(job_id).await,
811            #[cfg(feature = "postgres")]
812            StorageInstance::Postgres(storage) => storage.delete(job_id).await,
813        }
814    }
815
816    async fn list(
817        &self,
818        state_filter: Option<&JobState>,
819        limit: Option<usize>,
820        offset: Option<usize>,
821    ) -> Result<Vec<Job>, StorageError> {
822        match self {
823            StorageInstance::Memory(storage) => storage.list(state_filter, limit, offset).await,
824            #[cfg(feature = "redis")]
825            StorageInstance::Redis(storage) => storage.list(state_filter, limit, offset).await,
826            #[cfg(feature = "postgres")]
827            StorageInstance::Postgres(storage) => storage.list(state_filter, limit, offset).await,
828        }
829    }
830
831    async fn get_job_counts(&self) -> Result<HashMap<JobState, usize>, StorageError> {
832        match self {
833            StorageInstance::Memory(storage) => storage.get_job_counts().await,
834            #[cfg(feature = "redis")]
835            StorageInstance::Redis(storage) => storage.get_job_counts().await,
836            #[cfg(feature = "postgres")]
837            StorageInstance::Postgres(storage) => storage.get_job_counts().await,
838        }
839    }
840
841    async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError> {
842        match self {
843            StorageInstance::Memory(storage) => storage.get_available_jobs(limit).await,
844            #[cfg(feature = "redis")]
845            StorageInstance::Redis(storage) => storage.get_available_jobs(limit).await,
846            #[cfg(feature = "postgres")]
847            StorageInstance::Postgres(storage) => storage.get_available_jobs(limit).await,
848        }
849    }
850
851    async fn fetch_and_lock_job(
852        &self,
853        worker_id: &str,
854        queues: Option<&[String]>,
855    ) -> Result<Option<Job>, StorageError> {
856        match self {
857            StorageInstance::Memory(storage) => storage.fetch_and_lock_job(worker_id, queues).await,
858            #[cfg(feature = "redis")]
859            StorageInstance::Redis(storage) => storage.fetch_and_lock_job(worker_id, queues).await,
860            #[cfg(feature = "postgres")]
861            StorageInstance::Postgres(storage) => {
862                storage.fetch_and_lock_job(worker_id, queues).await
863            }
864        }
865    }
866
867    async fn try_acquire_job_lock(
868        &self,
869        job_id: &str,
870        worker_id: &str,
871        timeout_seconds: u64,
872    ) -> Result<bool, StorageError> {
873        match self {
874            StorageInstance::Memory(storage) => {
875                storage
876                    .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
877                    .await
878            }
879            #[cfg(feature = "redis")]
880            StorageInstance::Redis(storage) => {
881                storage
882                    .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
883                    .await
884            }
885            #[cfg(feature = "postgres")]
886            StorageInstance::Postgres(storage) => {
887                storage
888                    .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
889                    .await
890            }
891        }
892    }
893
894    async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError> {
895        match self {
896            StorageInstance::Memory(storage) => storage.release_job_lock(job_id, worker_id).await,
897            #[cfg(feature = "redis")]
898            StorageInstance::Redis(storage) => storage.release_job_lock(job_id, worker_id).await,
899            #[cfg(feature = "postgres")]
900            StorageInstance::Postgres(storage) => storage.release_job_lock(job_id, worker_id).await,
901        }
902    }
903
904    async fn fetch_available_jobs_atomic(
905        &self,
906        worker_id: &str,
907        limit: Option<usize>,
908        queues: Option<&[String]>,
909    ) -> Result<Vec<Job>, StorageError> {
910        match self {
911            StorageInstance::Memory(storage) => {
912                storage
913                    .fetch_available_jobs_atomic(worker_id, limit, queues)
914                    .await
915            }
916            #[cfg(feature = "redis")]
917            StorageInstance::Redis(storage) => {
918                storage
919                    .fetch_available_jobs_atomic(worker_id, limit, queues)
920                    .await
921            }
922            #[cfg(feature = "postgres")]
923            StorageInstance::Postgres(storage) => {
924                storage
925                    .fetch_available_jobs_atomic(worker_id, limit, queues)
926                    .await
927            }
928        }
929    }
930}