Skip to main content

qml_rs/storage/
mod.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use std::collections::HashMap;
4
5use crate::core::{Job, JobStateKind, RecurringJob, ServerInfo};
6
7pub mod config;
8pub mod database_init;
9pub mod error;
10pub mod memory;
11#[cfg(feature = "postgres")]
12pub mod postgres;
13#[cfg(feature = "redis")]
14pub mod redis;
15pub mod settings;
16
17#[cfg(test)]
18mod test_locking;
19
20#[cfg(feature = "postgres")]
21pub use config::PostgresConfig;
22#[cfg(feature = "redis")]
23pub use config::RedisConfig;
24pub use config::{MemoryConfig, StorageConfig};
25#[cfg(feature = "postgres")]
26pub use database_init::{DatabaseInitError, DatabaseInitializer};
27pub use error::StorageError;
28pub use memory::MemoryStorage;
29#[cfg(feature = "postgres")]
30pub use postgres::PostgresStorage;
31#[cfg(feature = "redis")]
32pub use redis::RedisStorage;
33
34/// Core storage trait that defines the interface for job persistence across all backends.
35///
36/// The [`Storage`] trait provides a unified API for job persistence operations, supporting
37/// multiple storage backends including in-memory, Redis, and PostgreSQL. All implementations
38/// provide atomic operations and race condition prevention for production use.
39///
40/// ## Storage Backends
41///
42/// - **[`MemoryStorage`]**: Fast in-memory storage for development and testing
43/// - **[`RedisStorage`]**: Distributed Redis storage with Lua script atomicity
44/// - **[`PostgresStorage`]**: ACID-compliant PostgreSQL with row-level locking
45///
46/// ## Core Operations
47///
48/// The trait provides standard CRUD operations (`enqueue`, `get`, `update`, `delete`)
49/// plus advanced operations for job processing:
50///
51/// - **Job Management**: Store, retrieve, update, and delete jobs
52/// - **Querying**: List jobs with filtering and pagination
53/// - **Processing**: Atomic job fetching with race condition prevention
54/// - **Locking**: Explicit job locking for distributed coordination
55///
56/// ## Race Condition Prevention
57///
58/// All storage backends implement atomic job fetching to prevent multiple workers
59/// from processing the same job simultaneously:
60///
61/// ```text
62/// Worker A ──┐
63///            ├── fetch_and_lock_job() ──→ Gets Job #123
64/// Worker B ──┘                         ──→ Gets Job #124 (not #123)
65/// ```
66///
67/// ## Examples
68///
69/// ### Basic Storage Operations
70/// ```rust
71/// use qml_rs::{Job, MemoryStorage};
72/// use qml_rs::storage::prelude::*;
73///
74/// # tokio_test::block_on(async {
75/// let storage = MemoryStorage::new();
76///
77/// // Create and store a job
78/// let job = Job::new("send_email", serde_json::json!(["user@example.com".to_string()]));
79/// storage.enqueue(&job).await.unwrap();
80///
81/// // Retrieve the job
82/// let retrieved = storage.get(&job.id).await.unwrap().unwrap();
83/// assert_eq!(job.id, retrieved.id);
84///
85/// // Update job state
86/// let mut updated_job = retrieved;
87/// updated_job.set_state(qml_rs::JobState::processing("worker-1", "server-1")).unwrap();
88/// storage.update(&updated_job).await.unwrap();
89///
90/// // Delete the job
91/// let deleted = storage.delete(&job.id).await.unwrap();
92/// assert!(deleted);
93/// # });
94/// ```
95///
96/// ### Atomic Job Processing
97/// ```rust
98/// use qml_rs::{Job, MemoryStorage};
99/// use qml_rs::storage::prelude::*;
100///
101/// # tokio_test::block_on(async {
102/// let storage = MemoryStorage::new();
103///
104/// // Enqueue some jobs
105/// for i in 0..5 {
106///     let job = Job::new("process_item", serde_json::json!([i.to_string()]));
107///     storage.enqueue(&job).await.unwrap();
108/// }
109///
110/// // Worker fetches and locks a job atomically
111/// let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
112/// match job {
113///     Some(job) => {
114///         println!("Worker-1 processing job: {}", job.id);
115///         // Job is automatically locked and marked as processing
116///     },
117///     None => println!("No jobs available"),
118/// }
119/// # });
120/// ```
121///
122/// ### Storage Backend Selection
123/// ```rust
124/// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
125///
126/// # tokio_test::block_on(async {
127/// // Memory storage for development
128/// let memory_storage = StorageInstance::memory();
129///
130/// // Redis storage for production
131/// # #[cfg(feature = "redis")]
132/// # {
133/// use qml_rs::storage::RedisConfig;
134/// let redis_config = RedisConfig::new().with_url("redis://localhost:6379");
135/// match StorageInstance::redis(redis_config).await {
136///     Ok(redis_storage) => println!("Redis storage ready"),
137///     Err(e) => println!("Redis connection failed: {}", e),
138/// }
139/// # }
140///
141/// // PostgreSQL storage for enterprise
142/// # #[cfg(feature = "postgres")]
143/// # {
144/// use qml_rs::storage::PostgresConfig;
145/// let pg_config = PostgresConfig::new()
146///     .with_database_url("postgresql://localhost:5432/qml")
147///     .with_auto_migrate(true);
148/// match StorageInstance::postgres(pg_config).await {
149///     Ok(pg_storage) => println!("PostgreSQL storage ready"),
150///     Err(e) => println!("PostgreSQL connection failed: {}", e),
151/// }
152/// # }
153/// # });
154/// ```
155///
156/// ### Job Filtering and Statistics
157/// ```rust
158/// use qml_rs::{Job, JobState, MemoryStorage};
159/// use qml_rs::storage::prelude::*;
160///
161/// # tokio_test::block_on(async {
162/// let storage = MemoryStorage::new();
163///
164/// // Create jobs in different states
165/// let mut job1 = Job::new("task1", serde_json::Value::Null);
166/// let mut job2 = Job::new("task2", serde_json::Value::Null);
167/// job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
168///
169/// storage.enqueue(&job1).await.unwrap();
170/// storage.enqueue(&job2).await.unwrap();
171///
172/// // List all jobs
173/// let all_jobs = storage.list(None, None, None).await.unwrap();
174/// println!("Total jobs: {}", all_jobs.len());
175///
176/// // Get job counts by state
177/// let counts = storage.get_job_counts().await;
178/// match counts {
179///     Ok(counts) => {
180///         for (state, count) in counts {
181///             println!("{:?}: {}", state, count);
182///         }
183///     },
184///     Err(e) => println!("Error: {}", e),
185/// }
186///
187/// // Get available jobs for processing
188/// let available = storage.get_available_jobs(Some(10)).await.unwrap();
189/// println!("Available for processing: {}", available.len());
190/// # });
191/// ```
192/// Dashboard-facing subset of storage operations.
193///
194/// [`MonitoringApi`] carves out the methods the Axum dashboard and its
195/// [`DashboardService`](crate::dashboard::DashboardService) actually touch
196/// (`get`, `update`, `update_if_state`, `delete`, `list`, `get_job_counts`)
197/// so that dashboard tests can be written against a small fake instead of
198/// a full [`Storage`] backend. Every real [`Storage`] implementation is
199/// also a [`MonitoringApi`], so callers holding an `Arc<dyn Storage>` can
200/// pass it anywhere an `Arc<dyn MonitoringApi>` is expected via trait
201/// upcasting.
202///
203/// The trait deliberately includes mutating methods even though it's
204/// scoped at observation/operations — the dashboard needs them for its
205/// retry-job and delete-job actions, and pretending they're read-only
206/// would force callers back onto the full [`Storage`] trait and defeat
207/// the testing payoff.
208#[async_trait]
209pub trait MonitoringApi: Send + Sync {
210    /// Retrieve a job by its unique identifier.
211    async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError>;
212
213    /// Update an existing job's state and metadata.
214    async fn update(&self, job: &Job) -> Result<(), StorageError>;
215
216    /// Compare-and-swap variant of [`update`].
217    ///
218    /// Writes `job` only if the persisted row's state currently matches
219    /// `expected`. Returns `Ok(true)` when the update was applied,
220    /// `Ok(false)` when the state had moved on (a stomp was avoided),
221    /// and `Err(JobNotFound)` when no row exists for the id.
222    ///
223    /// Use this when a caller has read the job, decided to transition it
224    /// based on what it observed, and might race with a worker or a peer
225    /// dashboard. The dashboard "retry" button is the canonical example —
226    /// without CAS, a slow second retry could overwrite a `Processing`
227    /// state that a worker had already taken on after the first retry.
228    async fn update_if_state(
229        &self,
230        job: &Job,
231        expected: JobStateKind,
232    ) -> Result<bool, StorageError>;
233
234    /// Remove a job from storage (soft or hard delete).
235    async fn delete(&self, job_id: &str) -> Result<bool, StorageError>;
236
237    /// List jobs with optional filtering and pagination.
238    ///
239    /// `state_filter` is a [`JobStateKind`] discriminant — every backend
240    /// already filters by discriminant only. The earlier signature took
241    /// `Option<&JobState>` and required callers (notably the dashboard
242    /// router) to construct throwaway `JobState` values with bogus inner
243    /// fields just to pick a variant. The fields were silently ignored
244    /// but the type system couldn't say so.
245    async fn list(
246        &self,
247        state_filter: Option<JobStateKind>,
248        limit: Option<usize>,
249        offset: Option<usize>,
250    ) -> Result<Vec<Job>, StorageError>;
251
252    /// Get the count of jobs grouped by their current state.
253    async fn get_job_counts(&self) -> Result<HashMap<JobStateKind, usize>, StorageError>;
254}
255// =========================================================================
256// Sub-traits — operational surfaces of a storage backend
257// =========================================================================
258//
259// Originally one giant `Storage` trait carried 24 methods spanning job
260// CRUD + atomic claim + recurring-job templates + server registry +
261// generic named locks. The mass made it (a) hard for a partial backend
262// (e.g. an in-process mirror) to opt out of methods it doesn't support
263// and (b) easy for callers to demand the full surface where a narrow
264// one would do.
265//
266// The split below carves the surface into five cohesive sub-traits.
267// `Storage` is now a marker umbrella with a blanket `impl<T> Storage
268// for T where T: ...`, so every existing `Arc<dyn Storage>` callsite
269// continues to work and every backend that implements the five sub-
270// traits automatically implements `Storage`.
271//
272//   * `JobStore`       — enqueue, list/query, time-based fetches,
273//                        atomic claim-and-transition, expiration.
274//   * `JobLocker`      — race-condition primitives: fetch-and-lock,
275//                        per-job named locks, stranded recovery.
276//   * `RecurringStore` — cron-scheduled job templates.
277//   * `ServerRegistry` — heartbeat / dead-server detection / reclaim.
278//   * `NamedLocks`     — generic distributed locks for user-facing
279//                        "at most one instance of X" semantics.
280//
281// `JobStore` extends `MonitoringApi`, so backends only have to write
282// the dashboard read-side once.
283
284/// Persistence-side of a storage backend: enqueue, list/query, atomic
285/// claim-and-transition, expiration sweep.
286#[async_trait]
287pub trait JobStore: MonitoringApi {
288    /// Persist a new job. Typically lands in the `Enqueued` state
289    /// unless the caller assigned a different state on `job.state`.
290    async fn enqueue(&self, job: &Job) -> Result<(), StorageError>;
291
292    /// Get jobs that are ready to be processed immediately.
293    ///
294    /// Returns enqueued jobs, scheduled jobs whose time has arrived,
295    /// and jobs awaiting retry whose retry time has passed. Used by
296    /// the dashboard's queue-statistics view; workers go through the
297    /// atomic [`JobLocker::fetch_and_lock_job`] path instead.
298    async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError>;
299
300    /// Fetch scheduled jobs whose `enqueue_at` has already passed.
301    /// Read-only — does not transition state. Backends push the time
302    /// predicate down to the engine; results are ordered by priority
303    /// (desc) then `created_at` (asc). Use [`claim_due_scheduled_jobs`](Self::claim_due_scheduled_jobs)
304    /// for the atomic claim-and-promote primitive used by the scheduler.
305    async fn fetch_due_scheduled_jobs(
306        &self,
307        now: DateTime<Utc>,
308        limit: usize,
309    ) -> Result<Vec<Job>, StorageError>;
310
311    /// Read-only counterpart of [`fetch_due_scheduled_jobs`](Self::fetch_due_scheduled_jobs)
312    /// for jobs in `AwaitingRetry`.
313    async fn fetch_due_retry_jobs(
314        &self,
315        now: DateTime<Utc>,
316        limit: usize,
317    ) -> Result<Vec<Job>, StorageError>;
318
319    /// Atomically claim due scheduled jobs and transition them to
320    /// `Enqueued`.
321    ///
322    /// The transition is persisted by the storage engine before this
323    /// call returns. Two schedulers running against the same backend
324    /// cannot both promote the same job. Returned jobs are already in
325    /// `Enqueued` state.
326    ///
327    /// **Caller contract:** do NOT call [`MonitoringApi::update`] on
328    /// the returned jobs to "save" the transition — the persisted row
329    /// already reflects it. Re-writing is harmless on Postgres / Memory
330    /// but causes redundant index churn on Redis.
331    async fn claim_due_scheduled_jobs(
332        &self,
333        now: DateTime<Utc>,
334        limit: usize,
335    ) -> Result<Vec<Job>, StorageError>;
336
337    /// Atomic counterpart of [`claim_due_scheduled_jobs`](Self::claim_due_scheduled_jobs)
338    /// for jobs in `AwaitingRetry`. Same caller contract.
339    async fn claim_due_retry_jobs(
340        &self,
341        now: DateTime<Utc>,
342        limit: usize,
343    ) -> Result<Vec<Job>, StorageError>;
344
345    /// Delete jobs whose `expires_at` is in the past.
346    ///
347    /// Called periodically by [`crate::processing::CleanupWorker`].
348    /// Backends should only touch rows in a final state
349    /// (`Succeeded` / `Failed` / `Deleted`) — in-flight jobs should
350    /// never carry an `expires_at`. Returns the number of rows removed.
351    async fn delete_expired_jobs(&self, now: DateTime<Utc>) -> Result<usize, StorageError>;
352}
353
354/// Race-condition primitives: atomic fetch-and-lock for workers, per-job
355/// named locks, and stranded-job recovery.
356#[async_trait]
357pub trait JobLocker: Send + Sync {
358    /// Atomically fetch and lock a job for processing.
359    ///
360    /// The primary worker entry point. Atomically finds an available
361    /// job, locks it, and marks it `Processing` in a single operation —
362    /// preventing multiple workers from claiming the same job.
363    ///
364    /// Backends use different mechanisms to enforce atomicity:
365    /// - **PostgreSQL**: `SELECT ... FOR UPDATE SKIP LOCKED`.
366    /// - **Redis**: a Lua script that picks the highest-score entry
367    ///   from one or more candidate ZSETs.
368    /// - **Memory**: mutex-based.
369    ///
370    /// `queues = None` matches any queue. `queues = Some(&[...])`
371    /// restricts to the listed queues. With per-queue indexing on
372    /// Redis (added alongside the original 1024-cap fix), the queue
373    /// filter is exact on every backend.
374    async fn fetch_and_lock_job(
375        &self,
376        worker_id: &str,
377        queues: Option<&[String]>,
378    ) -> Result<Option<Job>, StorageError>;
379
380    /// Atomic batch variant of [`fetch_and_lock_job`](Self::fetch_and_lock_job).
381    /// Each backend calls fetch-and-lock per slot; the contract is N
382    /// claims rather than one giant atomic over N rows.
383    async fn fetch_available_jobs_atomic(
384        &self,
385        worker_id: &str,
386        limit: Option<usize>,
387        queues: Option<&[String]>,
388    ) -> Result<Vec<Job>, StorageError>;
389
390    /// Acquire an exclusive per-job lock for `timeout_seconds`.
391    /// Returns `Ok(true)` if the lock was acquired, `Ok(false)` if
392    /// another worker holds it.
393    ///
394    /// Distinct from [`NamedLocks::try_acquire_lock`]: this lock lives
395    /// on the job row itself (or a per-job entry on Redis/Memory) so
396    /// fetch-and-lock can remain a single atomic operation.
397    async fn try_acquire_job_lock(
398        &self,
399        job_id: &str,
400        worker_id: &str,
401        timeout_seconds: u64,
402    ) -> Result<bool, StorageError>;
403
404    /// Release a per-job lock held by `worker_id`. No-op if the lock
405    /// has been taken over by someone else.
406    async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError>;
407
408    /// Recover jobs stranded in the `Processing` state by a previous
409    /// server instance.
410    ///
411    /// A job is stranded if its `Processing::started_at` predates
412    /// `stale_before`. Matching jobs are transitioned back to
413    /// `Enqueued` (preserving their original `queue`) and any explicit
414    /// per-job locks are cleared. Returns the number of jobs recovered.
415    ///
416    /// Called by `BackgroundJobServer::start` on startup.
417    /// `stale_before` should comfortably exceed the typical job
418    /// runtime so a still-alive worker on another server isn't
419    /// fighting the sweep.
420    async fn requeue_stranded_jobs(
421        &self,
422        stale_before: DateTime<Utc>,
423    ) -> Result<usize, StorageError>;
424}
425
426/// Recurring-job templates — the storage side of cron-scheduled jobs.
427#[async_trait]
428pub trait RecurringStore: Send + Sync {
429    /// Insert or update a [`RecurringJob`] template, keyed by
430    /// [`RecurringJob::id`].
431    async fn upsert_recurring_job(&self, job: &RecurringJob) -> Result<(), StorageError>;
432
433    /// Remove a recurring-job template by id. Returns `Ok(true)` if
434    /// the row existed and was removed, `Ok(false)` if the id was
435    /// unknown.
436    async fn remove_recurring_job(&self, id: &str) -> Result<bool, StorageError>;
437
438    /// List recurring-job templates (for dashboards / operator tooling).
439    async fn list_recurring_jobs(&self) -> Result<Vec<RecurringJob>, StorageError>;
440
441    /// Atomically claim recurring-job templates whose `next_run_at <=
442    /// now` and are `enabled`. Implementations use locking (Postgres:
443    /// `FOR UPDATE SKIP LOCKED`; Redis: per-row `SET NX`) so two
444    /// servers running the poller cannot double-fire the same tick.
445    ///
446    /// Claimed rows are returned to the caller *before* `next_run_at`
447    /// is advanced — the caller calls [`RecurringJob::advance`] and
448    /// [`upsert_recurring_job`](Self::upsert_recurring_job) to write
449    /// the new `next_run_at` back. Cron expressions can't be
450    /// computed in the database.
451    async fn fetch_due_recurring_jobs(
452        &self,
453        now: DateTime<Utc>,
454        limit: usize,
455    ) -> Result<Vec<RecurringJob>, StorageError>;
456}
457
458/// Live-server registry. Used by the heartbeat worker to detect dead
459/// peers and reclaim their in-flight jobs.
460#[async_trait]
461pub trait ServerRegistry: Send + Sync {
462    /// Insert or update a live [`ServerInfo`] registration. Called
463    /// once on `BackgroundJobServer::start` when heartbeats are
464    /// enabled.
465    async fn register_server(&self, info: &ServerInfo) -> Result<(), StorageError>;
466
467    /// Bump `last_heartbeat` for a previously-registered `server_id`.
468    /// Returns `Ok(false)` if the server was not registered (or had
469    /// already been reclaimed).
470    async fn heartbeat_server(
471        &self,
472        server_id: &str,
473        now: DateTime<Utc>,
474    ) -> Result<bool, StorageError>;
475
476    /// Remove a server registration. Called from `stop()` on graceful
477    /// shutdown, and by peers after reclaiming a dead server's jobs.
478    async fn deregister_server(&self, server_id: &str) -> Result<bool, StorageError>;
479
480    /// Return every server whose `last_heartbeat < stale_before`. Peers
481    /// call this to find servers that have likely crashed.
482    async fn list_dead_servers(
483        &self,
484        stale_before: DateTime<Utc>,
485    ) -> Result<Vec<ServerInfo>, StorageError>;
486
487    /// Re-queue every `Processing` job whose
488    /// [`crate::core::JobState::Processing::server_name`] matches
489    /// `server_id`, returning the number of jobs moved back to
490    /// `Enqueued`. Idempotent — a second call after the first reclaim
491    /// returns 0.
492    async fn reclaim_jobs_from_server(&self, server_id: &str) -> Result<usize, StorageError>;
493}
494
495/// Generic distributed named locks — for "at most one instance of X"
496/// semantics (e.g. a recurring report that must not overlap with
497/// itself).
498#[async_trait]
499pub trait NamedLocks: Send + Sync {
500    /// Try to acquire a named lock.
501    ///
502    /// `resource` is the lock key, `owner` identifies the holder, `ttl`
503    /// is how long the lock lives before another owner can take over.
504    ///
505    /// Semantics:
506    /// - Free resource → created, `Ok(true)`.
507    /// - Expired → taken over (overwriting `owner` and `expires_at`), `Ok(true)`.
508    /// - Held by same `owner` → refresh (extend), `Ok(true)`.
509    /// - Held live by a different owner → `Ok(false)`.
510    ///
511    /// Distinct from [`JobLocker::try_acquire_job_lock`]: per-job
512    /// locks live on the job row so fetch-and-lock remains a single
513    /// atomic operation.
514    async fn try_acquire_lock(
515        &self,
516        resource: &str,
517        owner: &str,
518        ttl: std::time::Duration,
519    ) -> Result<bool, StorageError>;
520
521    /// Release a named lock. Only the current `owner` can release.
522    async fn release_lock(&self, resource: &str, owner: &str) -> Result<bool, StorageError>;
523
524    /// Background sweep of expired generic named locks. Returns the
525    /// number of expired entries removed.
526    ///
527    /// - **Postgres**: `DELETE FROM qml_locks WHERE expires_at < $1`.
528    ///   The `try_acquire_lock` path replaces expired rows
529    ///   opportunistically on contention, but a workload that takes a
530    ///   lock once and never re-acquires would otherwise leak rows.
531    /// - **Redis**: no-op — Redis-native PX TTL handles expiration
532    ///   server-side. Returns `Ok(0)`.
533    /// - **Memory**: drops entries from the in-process map.
534    ///
535    /// Called by [`crate::processing::CleanupWorker`] on each tick.
536    async fn cleanup_expired_named_locks(&self, now: DateTime<Utc>) -> Result<usize, StorageError>;
537}
538
539/// Composite trait combining every storage operation: job CRUD +
540/// queries ([`JobStore`]), atomic claim/lock primitives ([`JobLocker`]),
541/// recurring-job templates ([`RecurringStore`]), server registry
542/// ([`ServerRegistry`]), and generic named locks ([`NamedLocks`]).
543///
544/// `Arc<dyn Storage>` is the canonical handle the runtime holds. Every
545/// method on `Storage` comes from one of the five sub-traits via
546/// supertrait inheritance; calling them on a `dyn Storage` value
547/// requires the relevant sub-trait to be in scope.
548///
549/// A [`prelude`] module re-exports all five sub-traits in one shot —
550/// `use qml_rs::storage::prelude::*` is the easiest way to bring them
551/// all into scope when you'd otherwise need `use qml_rs::Storage` to
552/// reach the full surface.
553///
554/// Each backend writes a one-line `impl Storage for Backend {}` —
555/// zero-cost, because every method comes from the five sub-traits.
556pub trait Storage:
557    JobStore + JobLocker + RecurringStore + ServerRegistry + NamedLocks + Send + Sync
558{
559}
560
561impl Storage for MemoryStorage {}
562#[cfg(feature = "redis")]
563impl Storage for RedisStorage {}
564#[cfg(feature = "postgres")]
565impl Storage for PostgresStorage {}
566
567/// One-stop import for every storage trait in this module.
568///
569/// `use qml_rs::storage::prelude::*` brings [`Storage`] *and* the five
570/// sub-traits ([`JobStore`], [`JobLocker`], [`RecurringStore`],
571/// [`ServerRegistry`], [`NamedLocks`]) plus [`MonitoringApi`] into
572/// scope. Because Rust resolves trait methods by which trait is in
573/// scope, calling `storage.enqueue(...)` on an `&dyn Storage` requires
574/// `JobStore` to be reachable — the prelude saves callers from
575/// remembering which method lives where.
576pub mod prelude {
577    pub use super::{
578        JobLocker, JobStore, MonitoringApi, NamedLocks, RecurringStore, ServerRegistry, Storage,
579    };
580}
581
582// =========================================================================
583// StorageInstance — module-level constructors returning Arc<dyn Storage>
584// =========================================================================
585
586/// Module-level constructor surface for the supported backends.
587///
588/// Originally a 3-variant enum (`Memory | Redis | Postgres`) with a
589/// 350-line hand-written `match`-based dispatch implementing every
590/// `Storage` trait method. With the trait split into sub-traits and a
591/// blanket `impl<T> Storage for T where T: ...`, the enum + dispatch
592/// became pure boilerplate. Replaced with a unit struct whose
593/// associated functions return `Arc<dyn Storage>` directly — every
594/// backend type already satisfies `Storage` via the blanket impl, so
595/// no per-backend dispatch is needed.
596///
597/// Existing call sites (`StorageInstance::memory()`,
598/// `StorageInstance::redis(cfg).await`, etc.) keep their syntax; what
599/// changes is that those constructors now return `Arc<dyn Storage>`
600/// rather than an enum value the caller has to wrap in `Arc::new`.
601pub struct StorageInstance;
602
603impl StorageInstance {
604    /// Create a storage instance from configuration.
605    ///
606    /// # Examples
607    ///
608    /// ```rust
609    /// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
610    ///
611    /// # tokio_test::block_on(async {
612    /// let config = StorageConfig::Memory(MemoryConfig::default());
613    /// let storage = StorageInstance::from_config(config).await.unwrap();
614    /// # });
615    /// ```
616    pub async fn from_config(
617        config: StorageConfig,
618    ) -> Result<std::sync::Arc<dyn Storage>, StorageError> {
619        match config {
620            StorageConfig::Memory(memory_config) => Ok(std::sync::Arc::new(
621                MemoryStorage::with_config(memory_config),
622            )),
623            #[cfg(feature = "redis")]
624            StorageConfig::Redis(redis_config) => Ok(std::sync::Arc::new(
625                RedisStorage::with_config(redis_config).await?,
626            )),
627            #[cfg(feature = "postgres")]
628            StorageConfig::Postgres(postgres_config) => Ok(std::sync::Arc::new(
629                PostgresStorage::new(postgres_config).await?,
630            )),
631        }
632    }
633
634    /// Create a memory storage instance with default configuration.
635    ///
636    /// # Examples
637    ///
638    /// ```rust
639    /// use qml_rs::storage::StorageInstance;
640    ///
641    /// let storage = StorageInstance::memory();
642    /// ```
643    pub fn memory() -> std::sync::Arc<dyn Storage> {
644        std::sync::Arc::new(MemoryStorage::new())
645    }
646
647    /// Create a memory storage instance with custom configuration.
648    pub fn memory_with_config(config: MemoryConfig) -> std::sync::Arc<dyn Storage> {
649        std::sync::Arc::new(MemoryStorage::with_config(config))
650    }
651
652    /// Create a Redis storage instance with custom configuration.
653    #[cfg(feature = "redis")]
654    pub async fn redis(config: RedisConfig) -> Result<std::sync::Arc<dyn Storage>, StorageError> {
655        Ok(std::sync::Arc::new(
656            RedisStorage::with_config(config).await?,
657        ))
658    }
659
660    /// Create a PostgreSQL storage instance with custom configuration.
661    #[cfg(feature = "postgres")]
662    pub async fn postgres(
663        config: PostgresConfig,
664    ) -> Result<std::sync::Arc<dyn Storage>, StorageError> {
665        Ok(std::sync::Arc::new(PostgresStorage::new(config).await?))
666    }
667}