qml_rs/storage/mod.rs
1use async_trait::async_trait;
2use std::collections::HashMap;
3
4use crate::core::{Job, JobState};
5
6pub mod config;
7pub mod error;
8pub mod memory;
9#[cfg(feature = "postgres")]
10pub mod postgres;
11#[cfg(feature = "redis")]
12pub mod redis;
13pub mod settings;
14
15#[cfg(test)]
16mod test_locking;
17
18#[cfg(feature = "postgres")]
19pub use config::PostgresConfig;
20#[cfg(feature = "redis")]
21pub use config::RedisConfig;
22pub use config::{MemoryConfig, StorageConfig};
23pub use error::StorageError;
24pub use memory::MemoryStorage;
25#[cfg(feature = "postgres")]
26pub use postgres::PostgresStorage;
27#[cfg(feature = "redis")]
28pub use redis::RedisStorage;
29
30/// Core storage trait that defines the interface for job persistence across all backends.
31///
32/// The [`Storage`] trait provides a unified API for job persistence operations, supporting
33/// multiple storage backends including in-memory, Redis, and PostgreSQL. All implementations
34/// provide atomic operations and race condition prevention for production use.
35///
36/// ## Storage Backends
37///
38/// - **[`MemoryStorage`]**: Fast in-memory storage for development and testing
39/// - **[`RedisStorage`]**: Distributed Redis storage with Lua script atomicity
40/// - **[`PostgresStorage`]**: ACID-compliant PostgreSQL with row-level locking
41///
42/// ## Core Operations
43///
44/// The trait provides standard CRUD operations (`enqueue`, `get`, `update`, `delete`)
45/// plus advanced operations for job processing:
46///
47/// - **Job Management**: Store, retrieve, update, and delete jobs
48/// - **Querying**: List jobs with filtering and pagination
49/// - **Processing**: Atomic job fetching with race condition prevention
50/// - **Locking**: Explicit job locking for distributed coordination
51///
52/// ## Race Condition Prevention
53///
54/// All storage backends implement atomic job fetching to prevent multiple workers
55/// from processing the same job simultaneously:
56///
57/// ```text
58/// Worker A ──┐
59/// ├── fetch_and_lock_job() ──→ Gets Job #123
60/// Worker B ──┘ ──→ Gets Job #124 (not #123)
61/// ```
62///
63/// ## Examples
64///
65/// ### Basic Storage Operations
66/// ```rust
67/// use qml_rs::{MemoryStorage, Job, Storage};
68///
69/// # tokio_test::block_on(async {
70/// let storage = MemoryStorage::new();
71///
72/// // Create and store a job
73/// let job = Job::new("send_email", vec!["user@example.com".to_string()]);
74/// storage.enqueue(&job).await.unwrap();
75///
76/// // Retrieve the job
77/// let retrieved = storage.get(&job.id).await.unwrap().unwrap();
78/// assert_eq!(job.id, retrieved.id);
79///
80/// // Update job state
81/// let mut updated_job = retrieved;
82/// updated_job.set_state(qml_rs::JobState::processing("worker-1", "server-1")).unwrap();
83/// storage.update(&updated_job).await.unwrap();
84///
85/// // Delete the job
86/// let deleted = storage.delete(&job.id).await.unwrap();
87/// assert!(deleted);
88/// # });
89/// ```
90///
91/// ### Atomic Job Processing
92/// ```rust
93/// use qml_rs::{MemoryStorage, Job, Storage};
94///
95/// # tokio_test::block_on(async {
96/// let storage = MemoryStorage::new();
97///
98/// // Enqueue some jobs
99/// for i in 0..5 {
100/// let job = Job::new("process_item", vec![i.to_string()]);
101/// storage.enqueue(&job).await.unwrap();
102/// }
103///
104/// // Worker fetches and locks a job atomically
105/// let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
106/// match job {
107/// Some(job) => {
108/// println!("Worker-1 processing job: {}", job.id);
109/// // Job is automatically locked and marked as processing
110/// },
111/// None => println!("No jobs available"),
112/// }
113/// # });
114/// ```
115///
116/// ### Storage Backend Selection
117/// ```rust
118/// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
119///
120/// # tokio_test::block_on(async {
121/// // Memory storage for development
122/// let memory_storage = StorageInstance::memory();
123///
124/// // Redis storage for production
125/// # #[cfg(feature = "redis")]
126/// # {
127/// use qml_rs::storage::RedisConfig;
128/// let redis_config = RedisConfig::new().with_url("redis://localhost:6379");
129/// match StorageInstance::redis(redis_config).await {
130/// Ok(redis_storage) => println!("Redis storage ready"),
131/// Err(e) => println!("Redis connection failed: {}", e),
132/// }
133/// # }
134///
135/// // PostgreSQL storage for enterprise
136/// # #[cfg(feature = "postgres")]
137/// # {
138/// use qml_rs::storage::PostgresConfig;
139/// let pg_config = PostgresConfig::new()
140/// .with_database_url("postgresql://localhost:5432/qml")
141/// .with_auto_migrate(true);
142/// match StorageInstance::postgres(pg_config).await {
143/// Ok(pg_storage) => println!("PostgreSQL storage ready"),
144/// Err(e) => println!("PostgreSQL connection failed: {}", e),
145/// }
146/// # }
147/// # });
148/// ```
149///
150/// ### Job Filtering and Statistics
151/// ```rust
152/// use qml_rs::{MemoryStorage, Job, JobState, Storage};
153///
154/// # tokio_test::block_on(async {
155/// let storage = MemoryStorage::new();
156///
157/// // Create jobs in different states
158/// let mut job1 = Job::new("task1", vec![]);
159/// let mut job2 = Job::new("task2", vec![]);
160/// job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
161///
162/// storage.enqueue(&job1).await.unwrap();
163/// storage.enqueue(&job2).await.unwrap();
164///
165/// // List all jobs
166/// let all_jobs = storage.list(None, None, None).await.unwrap();
167/// println!("Total jobs: {}", all_jobs.len());
168///
169/// // Get job counts by state
170/// let counts = storage.get_job_counts().await;
171/// match counts {
172/// Ok(counts) => {
173/// for (state, count) in counts {
174/// println!("{:?}: {}", state, count);
175/// }
176/// },
177/// Err(e) => println!("Error: {}", e),
178/// }
179///
180/// // Get available jobs for processing
181/// let available = storage.get_available_jobs(Some(10)).await.unwrap();
182/// println!("Available for processing: {}", available.len());
183/// # });
184/// ```
185#[async_trait]
186pub trait Storage: Send + Sync {
187 /// Store a new job in the storage backend.
188 ///
189 /// Persists a job to the storage system, making it available for processing.
190 /// The job is typically stored in the "enqueued" state unless specified otherwise.
191 ///
192 /// ## Arguments
193 /// * `job` - The job to store with all its metadata and configuration
194 ///
195 /// ## Returns
196 /// * `Ok(())` - Job was stored successfully
197 /// * `Err(StorageError)` - Storage operation failed
198 ///
199 /// ## Examples
200 /// ```rust
201 /// use qml_rs::{MemoryStorage, Job, Storage};
202 ///
203 /// # tokio_test::block_on(async {
204 /// let storage = MemoryStorage::new();
205 ///
206 /// let job = Job::with_config(
207 /// "send_notification",
208 /// vec!["user123".to_string()],
209 /// "notifications", // queue
210 /// 5, // priority
211 /// 3 // max_retries
212 /// );
213 ///
214 /// storage.enqueue(&job).await.unwrap();
215 /// println!("Job {} enqueued successfully", job.id);
216 /// # });
217 /// ```
218 async fn enqueue(&self, job: &Job) -> Result<(), StorageError>;
219
220 /// Retrieve a job by its unique identifier.
221 ///
222 /// Fetches a complete job record including all metadata, current state,
223 /// and configuration. Returns `None` if the job doesn't exist.
224 ///
225 /// ## Arguments
226 /// * `job_id` - The unique identifier of the job to retrieve
227 ///
228 /// ## Returns
229 /// * `Ok(Some(job))` - Job exists and was retrieved successfully
230 /// * `Ok(None)` - Job doesn't exist in storage
231 /// * `Err(StorageError)` - Storage operation failed
232 ///
233 /// ## Examples
234 /// ```rust
235 /// use qml_rs::{MemoryStorage, Job, Storage};
236 ///
237 /// # tokio_test::block_on(async {
238 /// let storage = MemoryStorage::new();
239 /// let job = Job::new("process_data", vec!["file.csv".to_string()]);
240 ///
241 /// storage.enqueue(&job).await.unwrap();
242 ///
243 /// // Retrieve the job
244 /// match storage.get(&job.id).await.unwrap() {
245 /// Some(retrieved_job) => {
246 /// println!("Found job: {} ({})", retrieved_job.id, retrieved_job.method);
247 /// },
248 /// None => println!("Job not found"),
249 /// }
250 /// # });
251 /// ```
252 async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError>;
253
254 /// Update an existing job's state and metadata.
255 ///
256 /// Modifies a job record in storage, typically used for state transitions
257 /// (e.g., Enqueued → Processing → Succeeded). The entire job record is updated.
258 ///
259 /// ## Arguments
260 /// * `job` - The job with updated information to persist
261 ///
262 /// ## Returns
263 /// * `Ok(())` - Job was updated successfully
264 /// * `Err(StorageError)` - Storage operation failed (job may not exist)
265 ///
266 /// ## Examples
267 /// ```rust
268 /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
269 ///
270 /// # tokio_test::block_on(async {
271 /// let storage = MemoryStorage::new();
272 /// let mut job = Job::new("process_order", vec!["order123".to_string()]);
273 ///
274 /// storage.enqueue(&job).await.unwrap();
275 ///
276 /// // Update job state to processing
277 /// job.set_state(JobState::processing("worker-1", "server-1")).unwrap();
278 /// storage.update(&job).await.unwrap();
279 ///
280 /// // Add metadata and update again
281 /// job.add_metadata("processed_by", "worker-1");
282 /// storage.update(&job).await.unwrap();
283 /// # });
284 /// ```
285 async fn update(&self, job: &Job) -> Result<(), StorageError>;
286
287 /// Remove a job from storage (soft or hard delete).
288 ///
289 /// Deletes a job record from the storage system. Some implementations may
290 /// perform soft deletion (marking as deleted) while others perform hard deletion.
291 ///
292 /// ## Arguments
293 /// * `job_id` - The unique identifier of the job to delete
294 ///
295 /// ## Returns
296 /// * `Ok(true)` - Job existed and was deleted successfully
297 /// * `Ok(false)` - Job didn't exist (nothing to delete)
298 /// * `Err(StorageError)` - Storage operation failed
299 ///
300 /// ## Examples
301 /// ```rust
302 /// use qml_rs::{MemoryStorage, Job, Storage};
303 ///
304 /// # tokio_test::block_on(async {
305 /// let storage = MemoryStorage::new();
306 /// let job = Job::new("cleanup_task", vec![]);
307 ///
308 /// storage.enqueue(&job).await.unwrap();
309 ///
310 /// // Delete the job
311 /// let was_deleted = storage.delete(&job.id).await.unwrap();
312 /// assert!(was_deleted);
313 ///
314 /// // Verify it's gone
315 /// let retrieved = storage.get(&job.id).await.unwrap();
316 /// assert!(retrieved.is_none());
317 /// # });
318 /// ```
319 async fn delete(&self, job_id: &str) -> Result<bool, StorageError>;
320
321 /// List jobs with optional filtering and pagination.
322 ///
323 /// Retrieves multiple jobs from storage with optional filtering by state
324 /// and pagination support. Useful for building dashboards and monitoring tools.
325 ///
326 /// ## Arguments
327 /// * `state_filter` - Optional job state to filter by (e.g., only failed jobs)
328 /// * `limit` - Maximum number of jobs to return (None = no limit)
329 /// * `offset` - Number of jobs to skip for pagination (None = start from beginning)
330 ///
331 /// ## Returns
332 /// * `Ok(jobs)` - Vector of jobs matching the criteria
333 /// * `Err(StorageError)` - Storage operation failed
334 ///
335 /// ## Examples
336 /// ```rust
337 /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
338 ///
339 /// # tokio_test::block_on(async {
340 /// let storage = MemoryStorage::new();
341 ///
342 /// // Create several jobs
343 /// for i in 0..10 {
344 /// let job = Job::new("task", vec![i.to_string()]);
345 /// storage.enqueue(&job).await.unwrap();
346 /// }
347 ///
348 /// // List all jobs
349 /// let all_jobs = storage.list(None, None, None).await.unwrap();
350 /// println!("Total jobs: {}", all_jobs.len());
351 ///
352 /// // List first 5 jobs
353 /// let first_five = storage.list(None, Some(5), None).await.unwrap();
354 /// println!("First 5 jobs: {}", first_five.len());
355 ///
356 /// // List next 5 jobs (pagination)
357 /// let next_five = storage.list(None, Some(5), Some(5)).await.unwrap();
358 /// println!("Next 5 jobs: {}", next_five.len());
359 /// # });
360 /// ```
361 async fn list(
362 &self,
363 state_filter: Option<&JobState>,
364 limit: Option<usize>,
365 offset: Option<usize>,
366 ) -> Result<Vec<Job>, StorageError>;
367
368 /// Get the count of jobs grouped by their current state.
369 ///
370 /// Returns statistics about job distribution across different states.
371 /// Useful for monitoring and dashboard displays.
372 ///
373 /// ## Returns
374 /// * `Ok(counts)` - HashMap mapping each job state to its count
375 /// * `Err(StorageError)` - Storage operation failed
376 ///
377 /// ## Examples
378 /// ```rust,ignore
379 /// use qml_rs::{MemoryStorage, Job, JobState, Storage};
380 ///
381 /// # tokio_test::block_on(async {
382 /// let storage = MemoryStorage::new();
383 ///
384 /// // Create jobs in different states
385 /// let mut job1 = Job::new("task1", vec![]);
386 /// storage.enqueue(&job1).await.unwrap();
387 ///
388 /// let mut job2 = Job::new("task2", vec![]);
389 /// job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
390 /// storage.update(&job2).await.unwrap();
391 ///
392 /// // Get statistics
393 /// let counts = storage.get_job_counts().await;
394 /// match counts {
395 /// Ok(counts) => for (state, count) in counts {
396 /// println!("State {:?}: {} jobs", state, count);
397 /// },
398 /// Err(e) => println!("Error: {}", e),
399 /// }
400 /// # });
401 /// ```
402 async fn get_job_counts(&self) -> Result<HashMap<JobState, usize>, StorageError>;
403
404 /// Get jobs that are ready to be processed immediately.
405 ///
406 /// Returns jobs that are available for processing: enqueued jobs, scheduled jobs
407 /// whose time has arrived, and jobs awaiting retry whose retry time has passed.
408 ///
409 /// ## Arguments
410 /// * `limit` - Maximum number of jobs to return (None = no limit)
411 ///
412 /// ## Returns
413 /// * `Ok(jobs)` - Vector of jobs ready for processing
414 /// * `Err(StorageError)` - Storage operation failed
415 ///
416 /// ## Examples
417 /// ```rust
418 /// use qml_rs::{MemoryStorage, Job, Storage};
419 ///
420 /// # tokio_test::block_on(async {
421 /// let storage = MemoryStorage::new();
422 ///
423 /// // Enqueue several jobs
424 /// for i in 0..5 {
425 /// let job = Job::new("process_item", vec![i.to_string()]);
426 /// storage.enqueue(&job).await.unwrap();
427 /// }
428 ///
429 /// // Get available jobs for processing
430 /// let available = storage.get_available_jobs(Some(3)).await.unwrap();
431 /// println!("Available for processing: {}", available.len());
432 ///
433 /// for job in available {
434 /// println!("Job {} is ready: {}", job.id, job.method);
435 /// }
436 /// # });
437 /// ```
438 async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError>;
439
440 /// Atomically fetch and lock a job for processing to prevent race conditions.
441 ///
442 /// This is the **primary method for job processing** in production environments.
443 /// It atomically finds an available job, locks it, and marks it as processing
444 /// in a single operation, preventing multiple workers from processing the same job.
445 ///
446 /// ## Race Condition Prevention
447 ///
448 /// Different storage backends use different mechanisms:
449 /// - **PostgreSQL**: `SELECT FOR UPDATE SKIP LOCKED` with dedicated lock table
450 /// - **Redis**: Lua scripts for atomic operations with distributed locking
451 /// - **Memory**: Mutex-based locking with automatic cleanup
452 ///
453 /// ## Arguments
454 /// * `worker_id` - Unique identifier of the worker claiming the job
455 /// * `queues` - Optional list of specific queues to fetch from (None = all queues)
456 ///
457 /// ## Returns
458 /// * `Ok(Some(job))` - Job was successfully fetched and locked
459 /// * `Ok(None)` - No jobs are available for processing
460 /// * `Err(StorageError)` - Storage operation failed
461 ///
462 /// ## Examples
463 /// ```rust
464 /// use qml_rs::{MemoryStorage, Job, Storage};
465 ///
466 /// # tokio_test::block_on(async {
467 /// let storage = MemoryStorage::new();
468 ///
469 /// // Enqueue some jobs
470 /// for i in 0..3 {
471 /// let job = Job::with_config(
472 /// "process_item",
473 /// vec![i.to_string()],
474 /// if i == 0 { "critical" } else { "normal" }, // different queues
475 /// i as i32,
476 /// 3
477 /// );
478 /// storage.enqueue(&job).await.unwrap();
479 /// }
480 ///
481 /// // Worker fetches from any queue
482 /// let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
483 /// match job {
484 /// Some(job) => {
485 /// println!("Worker-1 got job: {} from queue: {}", job.id, job.queue);
486 /// // Job is now locked and marked as processing
487 /// },
488 /// None => println!("No jobs available"),
489 /// }
490 ///
491 /// // Worker fetches only from critical queue
492 /// let critical_job = storage.fetch_and_lock_job(
493 /// "worker-2",
494 /// Some(&["critical".to_string()])
495 /// ).await.unwrap();
496 /// # });
497 /// ```
498 async fn fetch_and_lock_job(
499 &self,
500 worker_id: &str,
501 queues: Option<&[String]>,
502 ) -> Result<Option<Job>, StorageError>;
503
504 /// Try to acquire an explicit lock on a specific job.
505 ///
506 /// Attempts to acquire an exclusive lock on a job for coordination between
507 /// workers. This is useful for implementing custom job processing logic
508 /// or manual job management.
509 ///
510 /// ## Arguments
511 /// * `job_id` - The unique identifier of the job to lock
512 /// * `worker_id` - Unique identifier of the worker trying to acquire the lock
513 /// * `timeout_seconds` - Lock timeout in seconds (auto-release after this time)
514 ///
515 /// ## Returns
516 /// * `Ok(true)` - Lock was successfully acquired
517 /// * `Ok(false)` - Lock could not be acquired (already locked by another worker)
518 /// * `Err(StorageError)` - Storage operation failed
519 ///
520 /// ## Examples
521 /// ```rust
522 /// use qml_rs::{MemoryStorage, Job, Storage};
523 ///
524 /// # tokio_test::block_on(async {
525 /// let storage = MemoryStorage::new();
526 /// let job = Job::new("exclusive_task", vec![]);
527 /// storage.enqueue(&job).await.unwrap();
528 ///
529 /// // Worker 1 tries to acquire lock
530 /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-1", 300).await.unwrap();
531 /// assert!(acquired);
532 ///
533 /// // Worker 2 tries to acquire the same lock (should fail)
534 /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-2", 300).await.unwrap();
535 /// assert!(!acquired);
536 ///
537 /// // Worker 1 releases the lock
538 /// storage.release_job_lock(&job.id, "worker-1").await.unwrap();
539 ///
540 /// // Now worker 2 can acquire it
541 /// let acquired = storage.try_acquire_job_lock(&job.id, "worker-2", 300).await.unwrap();
542 /// assert!(acquired);
543 /// # });
544 /// ```
545 async fn try_acquire_job_lock(
546 &self,
547 job_id: &str,
548 worker_id: &str,
549 timeout_seconds: u64,
550 ) -> Result<bool, StorageError>;
551
552 /// Release an explicit lock on a job.
553 ///
554 /// Releases a lock that was previously acquired with `try_acquire_job_lock`.
555 /// Only the worker that acquired the lock can release it.
556 ///
557 /// ## Arguments
558 /// * `job_id` - The unique identifier of the job to unlock
559 /// * `worker_id` - Unique identifier of the worker releasing the lock
560 ///
561 /// ## Returns
562 /// * `Ok(true)` - Lock was successfully released
563 /// * `Ok(false)` - Lock was not held by this worker (or already expired)
564 /// * `Err(StorageError)` - Storage operation failed
565 ///
566 /// ## Examples
567 /// ```rust
568 /// use qml_rs::{MemoryStorage, Job, Storage};
569 ///
570 /// # tokio_test::block_on(async {
571 /// let storage = MemoryStorage::new();
572 /// let job = Job::new("task_with_lock", vec![]);
573 /// storage.enqueue(&job).await.unwrap();
574 ///
575 /// // Acquire lock
576 /// storage.try_acquire_job_lock(&job.id, "worker-1", 300).await.unwrap();
577 ///
578 /// // Do some work...
579 ///
580 /// // Release lock
581 /// let released = storage.release_job_lock(&job.id, "worker-1").await.unwrap();
582 /// assert!(released);
583 ///
584 /// // Trying to release again should return false
585 /// let released = storage.release_job_lock(&job.id, "worker-1").await.unwrap();
586 /// assert!(!released);
587 /// # });
588 /// ```
589 async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError>;
590
591 /// Atomically fetch multiple available jobs with locking.
592 ///
593 /// Similar to `fetch_and_lock_job` but fetches multiple jobs in a single
594 /// atomic operation. Useful for batch processing scenarios where a worker
595 /// can handle multiple jobs simultaneously.
596 ///
597 /// ## Arguments
598 /// * `worker_id` - Unique identifier of the worker claiming the jobs
599 /// * `limit` - Maximum number of jobs to fetch (None = implementation default)
600 /// * `queues` - Optional list of specific queues to fetch from (None = all queues)
601 ///
602 /// ## Returns
603 /// * `Ok(jobs)` - Vector of jobs that were successfully fetched and locked
604 /// * `Err(StorageError)` - Storage operation failed
605 ///
606 /// ## Examples
607 /// ```rust
608 /// use qml_rs::{MemoryStorage, Job, Storage};
609 ///
610 /// # tokio_test::block_on(async {
611 /// let storage = MemoryStorage::new();
612 ///
613 /// // Enqueue batch of jobs
614 /// for i in 0..10 {
615 /// let job = Job::new("batch_process", vec![i.to_string()]);
616 /// storage.enqueue(&job).await.unwrap();
617 /// }
618 ///
619 /// // Worker fetches multiple jobs at once
620 /// let jobs = storage.fetch_available_jobs_atomic("worker-1", Some(5), None).await.unwrap();
621 /// println!("Worker-1 got {} jobs for batch processing", jobs.len());
622 ///
623 /// for job in jobs {
624 /// println!("Processing job {} with argument: {}", job.id, job.arguments[0]);
625 /// // All jobs are now locked and marked as processing
626 /// }
627 /// # });
628 /// ```
629 async fn fetch_available_jobs_atomic(
630 &self,
631 worker_id: &str,
632 limit: Option<usize>,
633 queues: Option<&[String]>,
634 ) -> Result<Vec<Job>, StorageError>;
635}
636
637/// Storage instance that can hold any storage implementation
638pub enum StorageInstance {
639 /// Memory storage instance
640 Memory(MemoryStorage),
641 /// Redis storage instance
642 #[cfg(feature = "redis")]
643 Redis(RedisStorage),
644 /// PostgreSQL storage instance
645 #[cfg(feature = "postgres")]
646 Postgres(PostgresStorage),
647}
648
649impl StorageInstance {
650 /// Create a storage instance from configuration
651 ///
652 /// # Arguments
653 /// * `config` - The storage configuration
654 ///
655 /// # Returns
656 /// * `Ok(storage)` - The created storage instance
657 /// * `Err(StorageError)` - If there was an error creating the storage
658 ///
659 /// # Examples
660 ///
661 /// ```rust
662 /// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
663 ///
664 /// # tokio_test::block_on(async {
665 /// let config = StorageConfig::Memory(MemoryConfig::default());
666 /// let storage = StorageInstance::from_config(config).await.unwrap();
667 /// # });
668 /// ```
669 pub async fn from_config(config: StorageConfig) -> Result<Self, StorageError> {
670 match config {
671 StorageConfig::Memory(memory_config) => Ok(StorageInstance::Memory(
672 MemoryStorage::with_config(memory_config),
673 )),
674 #[cfg(feature = "redis")]
675 StorageConfig::Redis(redis_config) => {
676 let redis_storage = RedisStorage::with_config(redis_config).await?;
677 Ok(StorageInstance::Redis(redis_storage))
678 }
679 #[cfg(feature = "postgres")]
680 StorageConfig::Postgres(postgres_config) => {
681 let postgres_storage = PostgresStorage::new(postgres_config).await?;
682 Ok(StorageInstance::Postgres(postgres_storage))
683 }
684 }
685 }
686
687 /// Create a memory storage instance with default configuration
688 ///
689 /// # Examples
690 ///
691 /// ```rust
692 /// use qml_rs::storage::StorageInstance;
693 ///
694 /// let storage = StorageInstance::memory();
695 /// ```
696 pub fn memory() -> Self {
697 StorageInstance::Memory(MemoryStorage::new())
698 }
699
700 /// Create a memory storage instance with custom configuration
701 ///
702 /// # Arguments
703 /// * `config` - The memory storage configuration
704 ///
705 /// # Examples
706 ///
707 /// ```rust
708 /// use qml_rs::storage::{StorageInstance, MemoryConfig};
709 ///
710 /// let config = MemoryConfig::new().with_max_jobs(1000);
711 /// let storage = StorageInstance::memory_with_config(config);
712 /// ```
713 pub fn memory_with_config(config: MemoryConfig) -> Self {
714 StorageInstance::Memory(MemoryStorage::with_config(config))
715 }
716
717 /// Create a Redis storage instance with custom configuration
718 ///
719 /// # Arguments
720 /// * `config` - The Redis storage configuration
721 ///
722 /// # Returns
723 /// * `Ok(storage)` - The created Redis storage instance
724 /// * `Err(StorageError)` - If there was an error connecting to Redis
725 ///
726 /// # Examples
727 ///
728 /// ```rust
729 /// use qml_rs::storage::{StorageInstance, RedisConfig};
730 ///
731 /// # tokio_test::block_on(async {
732 /// let config = RedisConfig::new().with_url("redis://localhost:6379");
733 /// match StorageInstance::redis(config).await {
734 /// Ok(storage) => println!("Redis storage created successfully"),
735 /// Err(e) => println!("Failed to create Redis storage: {}", e),
736 /// }
737 /// # });
738 /// ```
739 #[cfg(feature = "redis")]
740 pub async fn redis(config: RedisConfig) -> Result<Self, StorageError> {
741 let redis_storage = RedisStorage::with_config(config).await?;
742 Ok(StorageInstance::Redis(redis_storage))
743 }
744
745 /// Create a PostgreSQL storage instance with custom configuration
746 ///
747 /// # Arguments
748 /// * `config` - The PostgreSQL storage configuration
749 ///
750 /// # Returns
751 /// * `Ok(storage)` - The created PostgreSQL storage instance
752 /// * `Err(StorageError)` - If there was an error connecting to PostgreSQL
753 ///
754 /// # Examples
755 ///
756 /// ```rust
757 /// use qml_rs::storage::{StorageInstance, PostgresConfig};
758 ///
759 /// # tokio_test::block_on(async {
760 /// let config = PostgresConfig::new().with_database_url("postgresql://postgres:password@localhost:5432/qml");
761 /// match StorageInstance::postgres(config).await {
762 /// Ok(storage) => println!("PostgreSQL storage created successfully"),
763 /// Err(e) => println!("Failed to create PostgreSQL storage: {}", e),
764 /// }
765 /// # });
766 /// ```
767 #[cfg(feature = "postgres")]
768 pub async fn postgres(config: PostgresConfig) -> Result<Self, StorageError> {
769 let postgres_storage = PostgresStorage::new(config).await?;
770 Ok(StorageInstance::Postgres(postgres_storage))
771 }
772}
773
774#[async_trait]
775impl Storage for StorageInstance {
776 async fn enqueue(&self, job: &Job) -> Result<(), StorageError> {
777 match self {
778 StorageInstance::Memory(storage) => storage.enqueue(job).await,
779 #[cfg(feature = "redis")]
780 StorageInstance::Redis(storage) => storage.enqueue(job).await,
781 #[cfg(feature = "postgres")]
782 StorageInstance::Postgres(storage) => storage.enqueue(job).await,
783 }
784 }
785
786 async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError> {
787 match self {
788 StorageInstance::Memory(storage) => storage.get(job_id).await,
789 #[cfg(feature = "redis")]
790 StorageInstance::Redis(storage) => storage.get(job_id).await,
791 #[cfg(feature = "postgres")]
792 StorageInstance::Postgres(storage) => storage.get(job_id).await,
793 }
794 }
795
796 async fn update(&self, job: &Job) -> Result<(), StorageError> {
797 match self {
798 StorageInstance::Memory(storage) => storage.update(job).await,
799 #[cfg(feature = "redis")]
800 StorageInstance::Redis(storage) => storage.update(job).await,
801 #[cfg(feature = "postgres")]
802 StorageInstance::Postgres(storage) => storage.update(job).await,
803 }
804 }
805
806 async fn delete(&self, job_id: &str) -> Result<bool, StorageError> {
807 match self {
808 StorageInstance::Memory(storage) => storage.delete(job_id).await,
809 #[cfg(feature = "redis")]
810 StorageInstance::Redis(storage) => storage.delete(job_id).await,
811 #[cfg(feature = "postgres")]
812 StorageInstance::Postgres(storage) => storage.delete(job_id).await,
813 }
814 }
815
816 async fn list(
817 &self,
818 state_filter: Option<&JobState>,
819 limit: Option<usize>,
820 offset: Option<usize>,
821 ) -> Result<Vec<Job>, StorageError> {
822 match self {
823 StorageInstance::Memory(storage) => storage.list(state_filter, limit, offset).await,
824 #[cfg(feature = "redis")]
825 StorageInstance::Redis(storage) => storage.list(state_filter, limit, offset).await,
826 #[cfg(feature = "postgres")]
827 StorageInstance::Postgres(storage) => storage.list(state_filter, limit, offset).await,
828 }
829 }
830
831 async fn get_job_counts(&self) -> Result<HashMap<JobState, usize>, StorageError> {
832 match self {
833 StorageInstance::Memory(storage) => storage.get_job_counts().await,
834 #[cfg(feature = "redis")]
835 StorageInstance::Redis(storage) => storage.get_job_counts().await,
836 #[cfg(feature = "postgres")]
837 StorageInstance::Postgres(storage) => storage.get_job_counts().await,
838 }
839 }
840
841 async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError> {
842 match self {
843 StorageInstance::Memory(storage) => storage.get_available_jobs(limit).await,
844 #[cfg(feature = "redis")]
845 StorageInstance::Redis(storage) => storage.get_available_jobs(limit).await,
846 #[cfg(feature = "postgres")]
847 StorageInstance::Postgres(storage) => storage.get_available_jobs(limit).await,
848 }
849 }
850
851 async fn fetch_and_lock_job(
852 &self,
853 worker_id: &str,
854 queues: Option<&[String]>,
855 ) -> Result<Option<Job>, StorageError> {
856 match self {
857 StorageInstance::Memory(storage) => storage.fetch_and_lock_job(worker_id, queues).await,
858 #[cfg(feature = "redis")]
859 StorageInstance::Redis(storage) => storage.fetch_and_lock_job(worker_id, queues).await,
860 #[cfg(feature = "postgres")]
861 StorageInstance::Postgres(storage) => {
862 storage.fetch_and_lock_job(worker_id, queues).await
863 }
864 }
865 }
866
867 async fn try_acquire_job_lock(
868 &self,
869 job_id: &str,
870 worker_id: &str,
871 timeout_seconds: u64,
872 ) -> Result<bool, StorageError> {
873 match self {
874 StorageInstance::Memory(storage) => {
875 storage
876 .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
877 .await
878 }
879 #[cfg(feature = "redis")]
880 StorageInstance::Redis(storage) => {
881 storage
882 .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
883 .await
884 }
885 #[cfg(feature = "postgres")]
886 StorageInstance::Postgres(storage) => {
887 storage
888 .try_acquire_job_lock(job_id, worker_id, timeout_seconds)
889 .await
890 }
891 }
892 }
893
894 async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError> {
895 match self {
896 StorageInstance::Memory(storage) => storage.release_job_lock(job_id, worker_id).await,
897 #[cfg(feature = "redis")]
898 StorageInstance::Redis(storage) => storage.release_job_lock(job_id, worker_id).await,
899 #[cfg(feature = "postgres")]
900 StorageInstance::Postgres(storage) => storage.release_job_lock(job_id, worker_id).await,
901 }
902 }
903
904 async fn fetch_available_jobs_atomic(
905 &self,
906 worker_id: &str,
907 limit: Option<usize>,
908 queues: Option<&[String]>,
909 ) -> Result<Vec<Job>, StorageError> {
910 match self {
911 StorageInstance::Memory(storage) => {
912 storage
913 .fetch_available_jobs_atomic(worker_id, limit, queues)
914 .await
915 }
916 #[cfg(feature = "redis")]
917 StorageInstance::Redis(storage) => {
918 storage
919 .fetch_available_jobs_atomic(worker_id, limit, queues)
920 .await
921 }
922 #[cfg(feature = "postgres")]
923 StorageInstance::Postgres(storage) => {
924 storage
925 .fetch_available_jobs_atomic(worker_id, limit, queues)
926 .await
927 }
928 }
929 }
930}