qml_rs/storage/
mod.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3
4use crate::core::{Job, JobState};
5
6pub mod config;
7pub mod database_init;
8pub mod error;
9pub mod memory;
10#[cfg(feature = "postgres")]
11pub mod postgres;
12#[cfg(feature = "redis")]
13pub mod redis;
14pub mod settings;
15
16#[cfg(test)]
17mod test_locking;
18
19#[cfg(feature = "postgres")]
20pub use config::PostgresConfig;
21#[cfg(feature = "redis")]
22pub use config::RedisConfig;
23pub use config::{MemoryConfig, StorageConfig};
24#[cfg(feature = "postgres")]
25pub use database_init::{DatabaseInitializer, DatabaseInitError};
26pub use error::StorageError;
27pub use memory::MemoryStorage;
28#[cfg(feature = "postgres")]
29pub use postgres::PostgresStorage;
30#[cfg(feature = "redis")]
31pub use redis::RedisStorage;
32
33/// Core storage trait that defines the interface for job persistence across all backends.
34///
35/// The [`Storage`] trait provides a unified API for job persistence operations, supporting
36/// multiple storage backends including in-memory, Redis, and PostgreSQL. All implementations
37/// provide atomic operations and race condition prevention for production use.
38///
39/// ## Storage Backends
40///
41/// - **[`MemoryStorage`]**: Fast in-memory storage for development and testing
42/// - **[`RedisStorage`]**: Distributed Redis storage with Lua script atomicity
43/// - **[`PostgresStorage`]**: ACID-compliant PostgreSQL with row-level locking
44///
45/// ## Core Operations
46///
47/// The trait provides standard CRUD operations (`enqueue`, `get`, `update`, `delete`)
48/// plus advanced operations for job processing:
49///
50/// - **Job Management**: Store, retrieve, update, and delete jobs
51/// - **Querying**: List jobs with filtering and pagination
52/// - **Processing**: Atomic job fetching with race condition prevention
53/// - **Locking**: Explicit job locking for distributed coordination
54///
55/// ## Race Condition Prevention
56///
57/// All storage backends implement atomic job fetching to prevent multiple workers
58/// from processing the same job simultaneously:
59///
60/// ```text
61/// Worker A ──┐
62///            ├── fetch_and_lock_job() ──→ Gets Job #123
63/// Worker B ──┘                         ──→ Gets Job #124 (not #123)
64/// ```
65///
66/// ## Examples
67///
68/// ### Basic Storage Operations
69/// ```rust
70/// use qml_rs::{MemoryStorage, Job, Storage};
71///
72/// # tokio_test::block_on(async {
73/// let storage = MemoryStorage::new();
74///
75/// // Create and store a job
76/// let job = Job::new("send_email", vec!["user@example.com".to_string()]);
77/// storage.enqueue(&job).await.unwrap();
78///
79/// // Retrieve the job
80/// let retrieved = storage.get(&job.id).await.unwrap().unwrap();
81/// assert_eq!(job.id, retrieved.id);
82///
83/// // Update job state
84/// let mut updated_job = retrieved;
85/// updated_job.set_state(qml_rs::JobState::processing("worker-1", "server-1")).unwrap();
86/// storage.update(&updated_job).await.unwrap();
87///
88/// // Delete the job
89/// let deleted = storage.delete(&job.id).await.unwrap();
90/// assert!(deleted);
91/// # });
92/// ```
93///
94/// ### Atomic Job Processing
95/// ```rust
96/// use qml_rs::{MemoryStorage, Job, Storage};
97///
98/// # tokio_test::block_on(async {
99/// let storage = MemoryStorage::new();
100///
101/// // Enqueue some jobs
102/// for i in 0..5 {
103///     let job = Job::new("process_item", vec![i.to_string()]);
104///     storage.enqueue(&job).await.unwrap();
105/// }
106///
107/// // Worker fetches and locks a job atomically
108/// let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
109/// match job {
110///     Some(job) => {
111///         println!("Worker-1 processing job: {}", job.id);
112///         // Job is automatically locked and marked as processing
113///     },
114///     None => println!("No jobs available"),
115/// }
116/// # });
117/// ```
118///
119/// ### Storage Backend Selection
120/// ```rust
121/// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
122///
123/// # tokio_test::block_on(async {
124/// // Memory storage for development
125/// let memory_storage = StorageInstance::memory();
126///
127/// // Redis storage for production
128/// # #[cfg(feature = "redis")]
129/// # {
130/// use qml_rs::storage::RedisConfig;
131/// let redis_config = RedisConfig::new().with_url("redis://localhost:6379");
132/// match StorageInstance::redis(redis_config).await {
133///     Ok(redis_storage) => println!("Redis storage ready"),
134///     Err(e) => println!("Redis connection failed: {}", e),
135/// }
136/// # }
137///
138/// // PostgreSQL storage for enterprise
139/// # #[cfg(feature = "postgres")]
140/// # {
141/// use qml_rs::storage::PostgresConfig;
142/// let pg_config = PostgresConfig::new()
143///     .with_database_url("postgresql://localhost:5432/qml")
144///     .with_auto_migrate(true);
145/// match StorageInstance::postgres(pg_config).await {
146///     Ok(pg_storage) => println!("PostgreSQL storage ready"),
147///     Err(e) => println!("PostgreSQL connection failed: {}", e),
148/// }
149/// # }
150/// # });
151/// ```
152///
153/// ### Job Filtering and Statistics
154/// ```rust
155/// use qml_rs::{MemoryStorage, Job, JobState, Storage};
156///
157/// # tokio_test::block_on(async {
158/// let storage = MemoryStorage::new();
159///
160/// // Create jobs in different states
161/// let mut job1 = Job::new("task1", vec![]);
162/// let mut job2 = Job::new("task2", vec![]);
163/// job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
164///
165/// storage.enqueue(&job1).await.unwrap();
166/// storage.enqueue(&job2).await.unwrap();
167///
168/// // List all jobs
169/// let all_jobs = storage.list(None, None, None).await.unwrap();
170/// println!("Total jobs: {}", all_jobs.len());
171///
172/// // Get job counts by state
173/// let counts = storage.get_job_counts().await;
174/// match counts {
175///     Ok(counts) => {
176///         for (state, count) in counts {
177///             println!("{:?}: {}", state, count);
178///         }
179///     },
180///     Err(e) => println!("Error: {}", e),
181/// }
182///
183/// // Get available jobs for processing
184/// let available = storage.get_available_jobs(Some(10)).await.unwrap();
185/// println!("Available for processing: {}", available.len());
186/// # });
187/// ```
188#[async_trait]
189pub trait Storage: Send + Sync {
190    /// Store a new job in the storage backend.
191    ///
192    /// Persists a job to the storage system, making it available for processing.
193    /// The job is typically stored in the "enqueued" state unless specified otherwise.
194    ///
195    /// ## Arguments
196    /// * `job` - The job to store with all its metadata and configuration
197    ///
198    /// ## Returns
199    /// * `Ok(())` - Job was stored successfully
200    /// * `Err(StorageError)` - Storage operation failed
201    ///
202    /// ## Examples
203    /// ```rust
204    /// use qml_rs::{MemoryStorage, Job, Storage};
205    ///
206    /// # tokio_test::block_on(async {
207    /// let storage = MemoryStorage::new();
208    ///
209    /// let job = Job::with_config(
210    ///     "send_notification",
211    ///     vec!["user123".to_string()],
212    ///     "notifications", // queue
213    ///     5,              // priority
214    ///     3               // max_retries
215    /// );
216    ///
217    /// storage.enqueue(&job).await.unwrap();
218    /// println!("Job {} enqueued successfully", job.id);
219    /// # });
220    /// ```
221    async fn enqueue(&self, job: &Job) -> Result<(), StorageError>;
222
223    /// Retrieve a job by its unique identifier.
224    ///
225    /// Fetches a complete job record including all metadata, current state,
226    /// and configuration. Returns `None` if the job doesn't exist.
227    ///
228    /// ## Arguments
229    /// * `job_id` - The unique identifier of the job to retrieve
230    ///
231    /// ## Returns
232    /// * `Ok(Some(job))` - Job exists and was retrieved successfully
233    /// * `Ok(None)` - Job doesn't exist in storage
234    /// * `Err(StorageError)` - Storage operation failed
235    ///
236    /// ## Examples
237    /// ```rust
238    /// use qml_rs::{MemoryStorage, Job, Storage};
239    ///
240    /// # tokio_test::block_on(async {
241    /// let storage = MemoryStorage::new();
242    /// let job = Job::new("process_data", vec!["file.csv".to_string()]);
243    ///
244    /// storage.enqueue(&job).await.unwrap();
245    ///
246    /// // Retrieve the job
247    /// match storage.get(&job.id).await.unwrap() {
248    ///     Some(retrieved_job) => {
249    ///         println!("Found job: {} ({})", retrieved_job.id, retrieved_job.method);
250    ///     },
251    ///     None => println!("Job not found"),
252    /// }
253    /// # });
254    /// ```
255    async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError>;
256
257    /// Update an existing job's state and metadata.
258    ///
259    /// Modifies a job record in storage, typically used for state transitions
260    /// (e.g., Enqueued → Processing → Succeeded). The entire job record is updated.
261    ///
262    /// ## Arguments
263    /// * `job` - The job with updated information to persist
264    ///
265    /// ## Returns
266    /// * `Ok(())` - Job was updated successfully
267    /// * `Err(StorageError)` - Storage operation failed (job may not exist)
268    ///
269    /// ## Examples
270    /// ```rust
271    /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
272    ///
273    /// # tokio_test::block_on(async {
274    /// let storage = MemoryStorage::new();
275    /// let mut job = Job::new("process_order", vec!["order123".to_string()]);
276    ///
277    /// storage.enqueue(&job).await.unwrap();
278    ///
279    /// // Update job state to processing
280    /// job.set_state(JobState::processing("worker-1", "server-1")).unwrap();
281    /// storage.update(&job).await.unwrap();
282    ///
283    /// // Add metadata and update again
284    /// job.add_metadata("processed_by", "worker-1");
285    /// storage.update(&job).await.unwrap();
286    /// # });
287    /// ```
288    async fn update(&self, job: &Job) -> Result<(), StorageError>;
289
290    /// Remove a job from storage (soft or hard delete).
291    ///
292    /// Deletes a job record from the storage system. Some implementations may
293    /// perform soft deletion (marking as deleted) while others perform hard deletion.
294    ///
295    /// ## Arguments
296    /// * `job_id` - The unique identifier of the job to delete
297    ///
298    /// ## Returns
299    /// * `Ok(true)` - Job existed and was deleted successfully
300    /// * `Ok(false)` - Job didn't exist (nothing to delete)
301    /// * `Err(StorageError)` - Storage operation failed
302    ///
303    /// ## Examples
304    /// ```rust
305    /// use qml_rs::{MemoryStorage, Job, Storage};
306    ///
307    /// # tokio_test::block_on(async {
308    /// let storage = MemoryStorage::new();
309    /// let job = Job::new("cleanup_task", vec![]);
310    ///
311    /// storage.enqueue(&job).await.unwrap();
312    ///
313    /// // Delete the job
314    /// let was_deleted = storage.delete(&job.id).await.unwrap();
315    /// assert!(was_deleted);
316    ///
317    /// // Verify it's gone
318    /// let retrieved = storage.get(&job.id).await.unwrap();
319    /// assert!(retrieved.is_none());
320    /// # });
321    /// ```
322    async fn delete(&self, job_id: &str) -> Result<bool, StorageError>;
323
324    /// List jobs with optional filtering and pagination.
325    ///
326    /// Retrieves multiple jobs from storage with optional filtering by state
327    /// and pagination support. Useful for building dashboards and monitoring tools.
328    ///
329    /// ## Arguments
330    /// * `state_filter` - Optional job state to filter by (e.g., only failed jobs)
331    /// * `limit` - Maximum number of jobs to return (None = no limit)
332    /// * `offset` - Number of jobs to skip for pagination (None = start from beginning)
333    ///
334    /// ## Returns
335    /// * `Ok(jobs)` - Vector of jobs matching the criteria
336    /// * `Err(StorageError)` - Storage operation failed
337    ///
338    /// ## Examples
339    /// ```rust
340    /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
341    ///
342    /// # tokio_test::block_on(async {
343    /// let storage = MemoryStorage::new();
344    ///
345    /// // Create several jobs
346    /// for i in 0..10 {
347    ///     let job = Job::new("task", vec![i.to_string()]);
348    ///     storage.enqueue(&job).await.unwrap();
349    /// }
350    ///
351    /// // List all jobs
352    /// let all_jobs = storage.list(None, None, None).await.unwrap();
353    /// println!("Total jobs: {}", all_jobs.len());
354    ///
355    /// // List first 5 jobs
356    /// let first_five = storage.list(None, Some(5), None).await.unwrap();
357    /// println!("First 5 jobs: {}", first_five.len());
358    ///
359    /// // List next 5 jobs (pagination)
360    /// let next_five = storage.list(None, Some(5), Some(5)).await.unwrap();
361    /// println!("Next 5 jobs: {}", next_five.len());
362    /// # });
363    /// ```
364    async fn list(
365        &self,
366        state_filter: Option<&JobState>,
367        limit: Option<usize>,
368        offset: Option<usize>,
369    ) -> Result<Vec<Job>, StorageError>;
370
371    /// Get the count of jobs grouped by their current state.
372    ///
373    /// Returns statistics about job distribution across different states.
374    /// Useful for monitoring and dashboard displays.
375    ///
376    /// ## Returns
377    /// * `Ok(counts)` - HashMap mapping each job state to its count
378    /// * `Err(StorageError)` - Storage operation failed
379    ///
380    /// ## Examples
381    /// ```rust,ignore
382    /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
383    ///
384    /// # tokio_test::block_on(async {
385    /// let storage = MemoryStorage::new();
386    ///
387    /// // Create jobs in different states
388    /// let mut job1 = Job::new("task1", vec![]);
389    /// storage.enqueue(&job1).await.unwrap();
390    ///
391    /// let mut job2 = Job::new("task2", vec![]);
392    /// job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
393    /// storage.update(&job2).await.unwrap();
394    ///
395    /// // Get statistics
396    /// let counts = storage.get_job_counts().await;
397    /// match counts {
398    ///     Ok(counts) => for (state, count) in counts {
399    ///         println!("State {:?}: {} jobs", state, count);
400    ///     },
401    ///     Err(e) => println!("Error: {}", e),
402    /// }
403    /// # });
404    /// ```
405    async fn get_job_counts(&self) -> Result<HashMap<JobState, usize>, StorageError>;
406
407    /// Get jobs that are ready to be processed immediately.
408    ///
409    /// Returns jobs that are available for processing: enqueued jobs, scheduled jobs
410    /// whose time has arrived, and jobs awaiting retry whose retry time has passed.
411    ///
412    /// ## Arguments
413    /// * `limit` - Maximum number of jobs to return (None = no limit)
414    ///
415    /// ## Returns
416    /// * `Ok(jobs)` - Vector of jobs ready for processing
417    /// * `Err(StorageError)` - Storage operation failed
418    ///
419    /// ## Examples
420    /// ```rust
421    /// use qml_rs::{MemoryStorage, Job, Storage};
422    ///
423    /// # tokio_test::block_on(async {
424    /// let storage = MemoryStorage::new();
425    ///
426    /// // Enqueue several jobs
427    /// for i in 0..5 {
428    ///     let job = Job::new("process_item", vec![i.to_string()]);
429    ///     storage.enqueue(&job).await.unwrap();
430    /// }
431    ///
432    /// // Get available jobs for processing
433    /// let available = storage.get_available_jobs(Some(3)).await.unwrap();
434    /// println!("Available for processing: {}", available.len());
435    ///
436    /// for job in available {
437    ///     println!("Job {} is ready: {}", job.id, job.method);
438    /// }
439    /// # });
440    /// ```
441    async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError>;
442
443    /// Atomically fetch and lock a job for processing to prevent race conditions.
444    ///
445    /// This is the **primary method for job processing** in production environments.
446    /// It atomically finds an available job, locks it, and marks it as processing
447    /// in a single operation, preventing multiple workers from processing the same job.
448    ///
449    /// ## Race Condition Prevention
450    ///
451    /// Different storage backends use different mechanisms:
452    /// - **PostgreSQL**: `SELECT FOR UPDATE SKIP LOCKED` with dedicated lock table
453    /// - **Redis**: Lua scripts for atomic operations with distributed locking
454    /// - **Memory**: Mutex-based locking with automatic cleanup
455    ///
456    /// ## Arguments
457    /// * `worker_id` - Unique identifier of the worker claiming the job
458    /// * `queues` - Optional list of specific queues to fetch from (None = all queues)
459    ///
460    /// ## Returns
461    /// * `Ok(Some(job))` - Job was successfully fetched and locked
462    /// * `Ok(None)` - No jobs are available for processing
463    /// * `Err(StorageError)` - Storage operation failed
464    ///
465    /// ## Examples
466    /// ```rust
467    /// use qml_rs::{MemoryStorage, Job, Storage};
468    ///
469    /// # tokio_test::block_on(async {
470    /// let storage = MemoryStorage::new();
471    ///
472    /// // Enqueue some jobs
473    /// for i in 0..3 {
474    ///     let job = Job::with_config(
475    ///         "process_item",
476    ///         vec![i.to_string()],
477    ///         if i == 0 { "critical" } else { "normal" }, // different queues
478    ///         i as i32,
479    ///         3
480    ///     );
481    ///     storage.enqueue(&job).await.unwrap();
482    /// }
483    ///
484    /// // Worker fetches from any queue
485    /// let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
486    /// match job {
487    ///     Some(job) => {
488    ///         println!("Worker-1 got job: {} from queue: {}", job.id, job.queue);
489    ///         // Job is now locked and marked as processing
490    ///     },
491    ///     None => println!("No jobs available"),
492    /// }
493    ///
494    /// // Worker fetches only from critical queue
495    /// let critical_job = storage.fetch_and_lock_job(
496    ///     "worker-2",
497    ///     Some(&["critical".to_string()])
498    /// ).await.unwrap();
499    /// # });
500    /// ```
501    async fn fetch_and_lock_job(
502        &self,
503        worker_id: &str,
504        queues: Option<&[String]>,
505    ) -> Result<Option<Job>, StorageError>;
506
507    /// Try to acquire an explicit lock on a specific job.
508    ///
509    /// Attempts to acquire an exclusive lock on a job for coordination between
510    /// workers. This is useful for implementing custom job processing logic
511    /// or manual job management.
512    ///
513    /// ## Arguments
514    /// * `job_id` - The unique identifier of the job to lock
515    /// * `worker_id` - Unique identifier of the worker trying to acquire the lock
516    /// * `timeout_seconds` - Lock timeout in seconds (auto-release after this time)
517    ///
518    /// ## Returns
519    /// * `Ok(true)` - Lock was successfully acquired
520    /// * `Ok(false)` - Lock could not be acquired (already locked by another worker)
521    /// * `Err(StorageError)` - Storage operation failed
522    ///
523    /// ## Examples
524    /// ```rust
525    /// use qml_rs::{MemoryStorage, Job, Storage};
526    ///
527    /// # tokio_test::block_on(async {
528    /// let storage = MemoryStorage::new();
529    /// let job = Job::new("exclusive_task", vec![]);
530    /// storage.enqueue(&job).await.unwrap();
531    ///
532    /// // Worker 1 tries to acquire lock
533    /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-1", 300).await.unwrap();
534    /// assert!(acquired);
535    ///
536    /// // Worker 2 tries to acquire the same lock (should fail)
537    /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-2", 300).await.unwrap();
538    /// assert!(!acquired);
539    ///
540    /// // Worker 1 releases the lock
541    /// storage.release_job_lock(&job.id, "worker-1").await.unwrap();
542    ///
543    /// // Now worker 2 can acquire it
544    /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-2", 300).await.unwrap();
545    /// assert!(acquired);
546    /// # });
547    /// ```
548    async fn try_acquire_job_lock(
549        &self,
550        job_id: &str,
551        worker_id: &str,
552        timeout_seconds: u64,
553    ) -> Result<bool, StorageError>;
554
555    /// Release an explicit lock on a job.
556    ///
557    /// Releases a lock that was previously acquired with `try_acquire_job_lock`.
558    /// Only the worker that acquired the lock can release it.
559    ///
560    /// ## Arguments
561    /// * `job_id` - The unique identifier of the job to unlock
562    /// * `worker_id` - Unique identifier of the worker releasing the lock
563    ///
564    /// ## Returns
565    /// * `Ok(true)` - Lock was successfully released
566    /// * `Ok(false)` - Lock was not held by this worker (or already expired)
567    /// * `Err(StorageError)` - Storage operation failed
568    ///
569    /// ## Examples
570    /// ```rust
571    /// use qml_rs::{MemoryStorage, Job, Storage};
572    ///
573    /// # tokio_test::block_on(async {
574    /// let storage = MemoryStorage::new();
575    /// let job = Job::new("task_with_lock", vec![]);
576    /// storage.enqueue(&job).await.unwrap();
577    ///
578    /// // Acquire lock
579    /// storage.try_acquire_job_lock(&job.id, "worker-1", 300).await.unwrap();
580    ///
581    /// // Do some work...
582    ///
583    /// // Release lock
584    /// let released = storage.release_job_lock(&job.id, "worker-1").await.unwrap();
585    /// assert!(released);
586    ///
587    /// // Trying to release again should return false
588    /// let released = storage.release_job_lock(&job.id, "worker-1").await.unwrap();
589    /// assert!(!released);
590    /// # });
591    /// ```
592    async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError>;
593
594    /// Atomically fetch multiple available jobs with locking.
595    ///
596    /// Similar to `fetch_and_lock_job` but fetches multiple jobs in a single
597    /// atomic operation. Useful for batch processing scenarios where a worker
598    /// can handle multiple jobs simultaneously.
599    ///
600    /// ## Arguments
601    /// * `worker_id` - Unique identifier of the worker claiming the jobs
602    /// * `limit` - Maximum number of jobs to fetch (None = implementation default)
603    /// * `queues` - Optional list of specific queues to fetch from (None = all queues)
604    ///
605    /// ## Returns
606    /// * `Ok(jobs)` - Vector of jobs that were successfully fetched and locked
607    /// * `Err(StorageError)` - Storage operation failed
608    ///
609    /// ## Examples
610    /// ```rust
611    /// use qml_rs::{MemoryStorage, Job, Storage};
612    ///
613    /// # tokio_test::block_on(async {
614    /// let storage = MemoryStorage::new();
615    ///
616    /// // Enqueue batch of jobs
617    /// for i in 0..10 {
618    ///     let job = Job::new("batch_process", vec![i.to_string()]);
619    ///     storage.enqueue(&job).await.unwrap();
620    /// }
621    ///
622    /// // Worker fetches multiple jobs at once
623    /// let jobs = storage.fetch_available_jobs_atomic("worker-1", Some(5), None).await.unwrap();
624    /// println!("Worker-1 got {} jobs for batch processing", jobs.len());
625    ///
626    /// for job in jobs {
627    ///     println!("Processing job {} with argument: {}", job.id, job.arguments[0]);
628    ///     // All jobs are now locked and marked as processing
629    /// }
630    /// # });
631    /// ```
632    async fn fetch_available_jobs_atomic(
633        &self,
634        worker_id: &str,
635        limit: Option<usize>,
636        queues: Option<&[String]>,
637    ) -> Result<Vec<Job>, StorageError>;
638}
639
640/// Storage instance that can hold any storage implementation
641pub enum StorageInstance {
642    /// Memory storage instance
643    Memory(MemoryStorage),
644    /// Redis storage instance
645    #[cfg(feature = "redis")]
646    Redis(RedisStorage),
647    /// PostgreSQL storage instance
648    #[cfg(feature = "postgres")]
649    Postgres(PostgresStorage),
650}
651
652impl StorageInstance {
653    /// Create a storage instance from configuration
654    ///
655    /// # Arguments
656    /// * `config` - The storage configuration
657    ///
658    /// # Returns
659    /// * `Ok(storage)` - The created storage instance
660    /// * `Err(StorageError)` - If there was an error creating the storage
661    ///
662    /// # Examples
663    ///
664    /// ```rust
665    /// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
666    ///
667    /// # tokio_test::block_on(async {
668    /// let config = StorageConfig::Memory(MemoryConfig::default());
669    /// let storage = StorageInstance::from_config(config).await.unwrap();
670    /// # });
671    /// ```
672    pub async fn from_config(config: StorageConfig) -> Result<Self, StorageError> {
673        match config {
674            StorageConfig::Memory(memory_config) => Ok(StorageInstance::Memory(
675                MemoryStorage::with_config(memory_config),
676            )),
677            #[cfg(feature = "redis")]
678            StorageConfig::Redis(redis_config) => {
679                let redis_storage = RedisStorage::with_config(redis_config).await?;
680                Ok(StorageInstance::Redis(redis_storage))
681            }
682            #[cfg(feature = "postgres")]
683            StorageConfig::Postgres(postgres_config) => {
684                let postgres_storage = PostgresStorage::new(postgres_config).await?;
685                Ok(StorageInstance::Postgres(postgres_storage))
686            }
687        }
688    }
689
690    /// Create a memory storage instance with default configuration
691    ///
692    /// # Examples
693    ///
694    /// ```rust
695    /// use qml_rs::storage::StorageInstance;
696    ///
697    /// let storage = StorageInstance::memory();
698    /// ```
699    pub fn memory() -> Self {
700        StorageInstance::Memory(MemoryStorage::new())
701    }
702
703    /// Create a memory storage instance with custom configuration
704    ///
705    /// # Arguments
706    /// * `config` - The memory storage configuration
707    ///
708    /// # Examples
709    ///
710    /// ```rust
711    /// use qml_rs::storage::{StorageInstance, MemoryConfig};
712    ///
713    /// let config = MemoryConfig::new().with_max_jobs(1000);
714    /// let storage = StorageInstance::memory_with_config(config);
715    /// ```
716    pub fn memory_with_config(config: MemoryConfig) -> Self {
717        StorageInstance::Memory(MemoryStorage::with_config(config))
718    }
719
720    /// Create a Redis storage instance with custom configuration
721    ///
722    /// # Arguments
723    /// * `config` - The Redis storage configuration
724    ///
725    /// # Returns
726    /// * `Ok(storage)` - The created Redis storage instance
727    /// * `Err(StorageError)` - If there was an error connecting to Redis
728    ///
729    /// # Examples
730    ///
731    /// ```rust
732    /// use qml_rs::storage::{StorageInstance, RedisConfig};
733    ///
734    /// # tokio_test::block_on(async {
735    /// let config = RedisConfig::new().with_url("redis://localhost:6379");
736    /// match StorageInstance::redis(config).await {
737    ///     Ok(storage) => println!("Redis storage created successfully"),
738    ///     Err(e) => println!("Failed to create Redis storage: {}", e),
739    /// }
740    /// # });
741    /// ```
742    #[cfg(feature = "redis")]
743    pub async fn redis(config: RedisConfig) -> Result<Self, StorageError> {
744        let redis_storage = RedisStorage::with_config(config).await?;
745        Ok(StorageInstance::Redis(redis_storage))
746    }
747
748    /// Create a PostgreSQL storage instance with custom configuration
749    ///
750    /// # Arguments
751    /// * `config` - The PostgreSQL storage configuration
752    ///
753    /// # Returns
754    /// * `Ok(storage)` - The created PostgreSQL storage instance
755    /// * `Err(StorageError)` - If there was an error connecting to PostgreSQL
756    ///
757    /// # Examples
758    ///
759    /// ```rust
760    /// use qml_rs::storage::{StorageInstance, PostgresConfig};
761    ///
762    /// # tokio_test::block_on(async {
763    /// let config = PostgresConfig::new().with_database_url("postgresql://postgres:password@localhost:5432/qml");
764    /// match StorageInstance::postgres(config).await {
765    ///     Ok(storage) => println!("PostgreSQL storage created successfully"),
766    ///     Err(e) => println!("Failed to create PostgreSQL storage: {}", e),
767    /// }
768    /// # });
769    /// ```
770    #[cfg(feature = "postgres")]
771    pub async fn postgres(config: PostgresConfig) -> Result<Self, StorageError> {
772        let postgres_storage = PostgresStorage::new(config).await?;
773        Ok(StorageInstance::Postgres(postgres_storage))
774    }
775}
776
777#[async_trait]
778impl Storage for StorageInstance {
779    async fn enqueue(&self, job: &Job) -> Result<(), StorageError> {
780        match self {
781            StorageInstance::Memory(storage) => storage.enqueue(job).await,
782            #[cfg(feature = "redis")]
783            StorageInstance::Redis(storage) => storage.enqueue(job).await,
784            #[cfg(feature = "postgres")]
785            StorageInstance::Postgres(storage) => storage.enqueue(job).await,
786        }
787    }
788
789    async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError> {
790        match self {
791            StorageInstance::Memory(storage) => storage.get(job_id).await,
792            #[cfg(feature = "redis")]
793            StorageInstance::Redis(storage) => storage.get(job_id).await,
794            #[cfg(feature = "postgres")]
795            StorageInstance::Postgres(storage) => storage.get(job_id).await,
796        }
797    }
798
799    async fn update(&self, job: &Job) -> Result<(), StorageError> {
800        match self {
801            StorageInstance::Memory(storage) => storage.update(job).await,
802            #[cfg(feature = "redis")]
803            StorageInstance::Redis(storage) => storage.update(job).await,
804            #[cfg(feature = "postgres")]
805            StorageInstance::Postgres(storage) => storage.update(job).await,
806        }
807    }
808
809    async fn delete(&self, job_id: &str) -> Result<bool, StorageError> {
810        match self {
811            StorageInstance::Memory(storage) => storage.delete(job_id).await,
812            #[cfg(feature = "redis")]
813            StorageInstance::Redis(storage) => storage.delete(job_id).await,
814            #[cfg(feature = "postgres")]
815            StorageInstance::Postgres(storage) => storage.delete(job_id).await,
816        }
817    }
818
819    async fn list(
820        &self,
821        state_filter: Option<&JobState>,
822        limit: Option<usize>,
823        offset: Option<usize>,
824    ) -> Result<Vec<Job>, StorageError> {
825        match self {
826            StorageInstance::Memory(storage) => storage.list(state_filter, limit, offset).await,
827            #[cfg(feature = "redis")]
828            StorageInstance::Redis(storage) => storage.list(state_filter, limit, offset).await,
829            #[cfg(feature = "postgres")]
830            StorageInstance::Postgres(storage) => storage.list(state_filter, limit, offset).await,
831        }
832    }
833
834    async fn get_job_counts(&self) -> Result<HashMap<JobState, usize>, StorageError> {
835        match self {
836            StorageInstance::Memory(storage) => storage.get_job_counts().await,
837            #[cfg(feature = "redis")]
838            StorageInstance::Redis(storage) => storage.get_job_counts().await,
839            #[cfg(feature = "postgres")]
840            StorageInstance::Postgres(storage) => storage.get_job_counts().await,
841        }
842    }
843
844    async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError> {
845        match self {
846            StorageInstance::Memory(storage) => storage.get_available_jobs(limit).await,
847            #[cfg(feature = "redis")]
848            StorageInstance::Redis(storage) => storage.get_available_jobs(limit).await,
849            #[cfg(feature = "postgres")]
850            StorageInstance::Postgres(storage) => storage.get_available_jobs(limit).await,
851        }
852    }
853
854    async fn fetch_and_lock_job(
855        &self,
856        worker_id: &str,
857        queues: Option<&[String]>,
858    ) -> Result<Option<Job>, StorageError> {
859        match self {
860            StorageInstance::Memory(storage) => storage.fetch_and_lock_job(worker_id, queues).await,
861            #[cfg(feature = "redis")]
862            StorageInstance::Redis(storage) => storage.fetch_and_lock_job(worker_id, queues).await,
863            #[cfg(feature = "postgres")]
864            StorageInstance::Postgres(storage) => {
865                storage.fetch_and_lock_job(worker_id, queues).await
866            }
867        }
868    }
869
870    async fn try_acquire_job_lock(
871        &self,
872        job_id: &str,
873        worker_id: &str,
874        timeout_seconds: u64,
875    ) -> Result<bool, StorageError> {
876        match self {
877            StorageInstance::Memory(storage) => {
878                storage
879                    .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
880                    .await
881            }
882            #[cfg(feature = "redis")]
883            StorageInstance::Redis(storage) => {
884                storage
885                    .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
886                    .await
887            }
888            #[cfg(feature = "postgres")]
889            StorageInstance::Postgres(storage) => {
890                storage
891                    .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
892                    .await
893            }
894        }
895    }
896
897    async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError> {
898        match self {
899            StorageInstance::Memory(storage) => storage.release_job_lock(job_id, worker_id).await,
900            #[cfg(feature = "redis")]
901            StorageInstance::Redis(storage) => storage.release_job_lock(job_id, worker_id).await,
902            #[cfg(feature = "postgres")]
903            StorageInstance::Postgres(storage) => storage.release_job_lock(job_id, worker_id).await,
904        }
905    }
906
907    async fn fetch_available_jobs_atomic(
908        &self,
909        worker_id: &str,
910        limit: Option<usize>,
911        queues: Option<&[String]>,
912    ) -> Result<Vec<Job>, StorageError> {
913        match self {
914            StorageInstance::Memory(storage) => {
915                storage
916                    .fetch_available_jobs_atomic(worker_id, limit, queues)
917                    .await
918            }
919            #[cfg(feature = "redis")]
920            StorageInstance::Redis(storage) => {
921                storage
922                    .fetch_available_jobs_atomic(worker_id, limit, queues)
923                    .await
924            }
925            #[cfg(feature = "postgres")]
926            StorageInstance::Postgres(storage) => {
927                storage
928                    .fetch_available_jobs_atomic(worker_id, limit, queues)
929                    .await
930            }
931        }
932    }
933}