qml_rs/storage/mod.rs
1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use std::collections::HashMap;
4
5use crate::core::{Job, JobStateKind, RecurringJob, ServerInfo};
6
7pub mod config;
8pub mod database_init;
9pub mod error;
10pub mod memory;
11#[cfg(feature = "postgres")]
12pub mod postgres;
13#[cfg(feature = "redis")]
14pub mod redis;
15pub mod settings;
16
17#[cfg(test)]
18mod test_locking;
19
20#[cfg(feature = "postgres")]
21pub use config::PostgresConfig;
22#[cfg(feature = "redis")]
23pub use config::RedisConfig;
24pub use config::{MemoryConfig, StorageConfig};
25#[cfg(feature = "postgres")]
26pub use database_init::{DatabaseInitError, DatabaseInitializer};
27pub use error::StorageError;
28pub use memory::MemoryStorage;
29#[cfg(feature = "postgres")]
30pub use postgres::PostgresStorage;
31#[cfg(feature = "redis")]
32pub use redis::RedisStorage;
33
34/// Core storage trait that defines the interface for job persistence across all backends.
35///
36/// The [`Storage`] trait provides a unified API for job persistence operations, supporting
37/// multiple storage backends including in-memory, Redis, and PostgreSQL. All implementations
38/// provide atomic operations and race condition prevention for production use.
39///
40/// ## Storage Backends
41///
42/// - **[`MemoryStorage`]**: Fast in-memory storage for development and testing
43/// - **[`RedisStorage`]**: Distributed Redis storage with Lua script atomicity
44/// - **[`PostgresStorage`]**: ACID-compliant PostgreSQL with row-level locking
45///
46/// ## Core Operations
47///
48/// The trait provides standard CRUD operations (`enqueue`, `get`, `update`, `delete`)
49/// plus advanced operations for job processing:
50///
51/// - **Job Management**: Store, retrieve, update, and delete jobs
52/// - **Querying**: List jobs with filtering and pagination
53/// - **Processing**: Atomic job fetching with race condition prevention
54/// - **Locking**: Explicit job locking for distributed coordination
55///
56/// ## Race Condition Prevention
57///
58/// All storage backends implement atomic job fetching to prevent multiple workers
59/// from processing the same job simultaneously:
60///
61/// ```text
62/// Worker A ──┐
63/// ├── fetch_and_lock_job() ──→ Gets Job #123
64/// Worker B ──┘ ──→ Gets Job #124 (not #123)
65/// ```
66///
67/// ## Examples
68///
69/// ### Basic Storage Operations
70/// ```rust
71/// use qml_rs::{Job, MemoryStorage};
72/// use qml_rs::storage::prelude::*;
73///
74/// # tokio_test::block_on(async {
75/// let storage = MemoryStorage::new();
76///
77/// // Create and store a job
78/// let job = Job::new("send_email", serde_json::json!(["user@example.com".to_string()]));
79/// storage.enqueue(&job).await.unwrap();
80///
81/// // Retrieve the job
82/// let retrieved = storage.get(&job.id).await.unwrap().unwrap();
83/// assert_eq!(job.id, retrieved.id);
84///
85/// // Update job state
86/// let mut updated_job = retrieved;
87/// updated_job.set_state(qml_rs::JobState::processing("worker-1", "server-1")).unwrap();
88/// storage.update(&updated_job).await.unwrap();
89///
90/// // Delete the job
91/// let deleted = storage.delete(&job.id).await.unwrap();
92/// assert!(deleted);
93/// # });
94/// ```
95///
96/// ### Atomic Job Processing
97/// ```rust
98/// use qml_rs::{Job, MemoryStorage};
99/// use qml_rs::storage::prelude::*;
100///
101/// # tokio_test::block_on(async {
102/// let storage = MemoryStorage::new();
103///
104/// // Enqueue some jobs
105/// for i in 0..5 {
106/// let job = Job::new("process_item", serde_json::json!([i.to_string()]));
107/// storage.enqueue(&job).await.unwrap();
108/// }
109///
110/// // Worker fetches and locks a job atomically
111/// let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
112/// match job {
113/// Some(job) => {
114/// println!("Worker-1 processing job: {}", job.id);
115/// // Job is automatically locked and marked as processing
116/// },
117/// None => println!("No jobs available"),
118/// }
119/// # });
120/// ```
121///
122/// ### Storage Backend Selection
123/// ```rust
124/// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
125///
126/// # tokio_test::block_on(async {
127/// // Memory storage for development
128/// let memory_storage = StorageInstance::memory();
129///
130/// // Redis storage for production
131/// # #[cfg(feature = "redis")]
132/// # {
133/// use qml_rs::storage::RedisConfig;
134/// let redis_config = RedisConfig::new().with_url("redis://localhost:6379");
135/// match StorageInstance::redis(redis_config).await {
136/// Ok(redis_storage) => println!("Redis storage ready"),
137/// Err(e) => println!("Redis connection failed: {}", e),
138/// }
139/// # }
140///
141/// // PostgreSQL storage for enterprise
142/// # #[cfg(feature = "postgres")]
143/// # {
144/// use qml_rs::storage::PostgresConfig;
145/// let pg_config = PostgresConfig::new()
146/// .with_database_url("postgresql://localhost:5432/qml")
147/// .with_auto_migrate(true);
148/// match StorageInstance::postgres(pg_config).await {
149/// Ok(pg_storage) => println!("PostgreSQL storage ready"),
150/// Err(e) => println!("PostgreSQL connection failed: {}", e),
151/// }
152/// # }
153/// # });
154/// ```
155///
156/// ### Job Filtering and Statistics
157/// ```rust
158/// use qml_rs::{Job, JobState, MemoryStorage};
159/// use qml_rs::storage::prelude::*;
160///
161/// # tokio_test::block_on(async {
162/// let storage = MemoryStorage::new();
163///
164/// // Create jobs in different states
165/// let mut job1 = Job::new("task1", serde_json::Value::Null);
166/// let mut job2 = Job::new("task2", serde_json::Value::Null);
167/// job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
168///
169/// storage.enqueue(&job1).await.unwrap();
170/// storage.enqueue(&job2).await.unwrap();
171///
172/// // List all jobs
173/// let all_jobs = storage.list(None, None, None).await.unwrap();
174/// println!("Total jobs: {}", all_jobs.len());
175///
176/// // Get job counts by state
177/// let counts = storage.get_job_counts().await;
178/// match counts {
179/// Ok(counts) => {
180/// for (state, count) in counts {
181/// println!("{:?}: {}", state, count);
182/// }
183/// },
184/// Err(e) => println!("Error: {}", e),
185/// }
186///
187/// // Get available jobs for processing
188/// let available = storage.get_available_jobs(Some(10)).await.unwrap();
189/// println!("Available for processing: {}", available.len());
190/// # });
191/// ```
192/// Dashboard-facing subset of storage operations.
193///
194/// [`MonitoringApi`] carves out the methods the Axum dashboard and its
195/// [`DashboardService`](crate::dashboard::DashboardService) actually touch
196/// (`get`, `update`, `update_if_state`, `delete`, `list`, `get_job_counts`)
197/// so that dashboard tests can be written against a small fake instead of
198/// a full [`Storage`] backend. Every real [`Storage`] implementation is
199/// also a [`MonitoringApi`], so callers holding an `Arc<dyn Storage>` can
200/// pass it anywhere an `Arc<dyn MonitoringApi>` is expected via trait
201/// upcasting.
202///
203/// The trait deliberately includes mutating methods even though it's
204/// scoped at observation/operations — the dashboard needs them for its
205/// retry-job and delete-job actions, and pretending they're read-only
206/// would force callers back onto the full [`Storage`] trait and defeat
207/// the testing payoff.
208#[async_trait]
209pub trait MonitoringApi: Send + Sync {
210 /// Retrieve a job by its unique identifier.
211 async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError>;
212
213 /// Update an existing job's state and metadata.
214 async fn update(&self, job: &Job) -> Result<(), StorageError>;
215
216 /// Compare-and-swap variant of [`update`].
217 ///
218 /// Writes `job` only if the persisted row's state currently matches
219 /// `expected`. Returns `Ok(true)` when the update was applied,
220 /// `Ok(false)` when the state had moved on (a stomp was avoided),
221 /// and `Err(JobNotFound)` when no row exists for the id.
222 ///
223 /// Use this when a caller has read the job, decided to transition it
224 /// based on what it observed, and might race with a worker or a peer
225 /// dashboard. The dashboard "retry" button is the canonical example —
226 /// without CAS, a slow second retry could overwrite a `Processing`
227 /// state that a worker had already taken on after the first retry.
228 async fn update_if_state(
229 &self,
230 job: &Job,
231 expected: JobStateKind,
232 ) -> Result<bool, StorageError>;
233
234 /// Remove a job from storage (soft or hard delete).
235 async fn delete(&self, job_id: &str) -> Result<bool, StorageError>;
236
237 /// List jobs with optional filtering and pagination.
238 ///
239 /// `state_filter` is a [`JobStateKind`] discriminant — every backend
240 /// already filters by discriminant only. The earlier signature took
241 /// `Option<&JobState>` and required callers (notably the dashboard
242 /// router) to construct throwaway `JobState` values with bogus inner
243 /// fields just to pick a variant. The fields were silently ignored
244 /// but the type system couldn't say so.
245 async fn list(
246 &self,
247 state_filter: Option<JobStateKind>,
248 limit: Option<usize>,
249 offset: Option<usize>,
250 ) -> Result<Vec<Job>, StorageError>;
251
252 /// Get the count of jobs grouped by their current state.
253 async fn get_job_counts(&self) -> Result<HashMap<JobStateKind, usize>, StorageError>;
254}
255// =========================================================================
256// Sub-traits — operational surfaces of a storage backend
257// =========================================================================
258//
259// Originally one giant `Storage` trait carried 24 methods spanning job
260// CRUD + atomic claim + recurring-job templates + server registry +
261// generic named locks. The mass made it (a) hard for a partial backend
262// (e.g. an in-process mirror) to opt out of methods it doesn't support
263// and (b) easy for callers to demand the full surface where a narrow
264// one would do.
265//
266// The split below carves the surface into five cohesive sub-traits.
267// `Storage` is now a marker umbrella with a blanket `impl<T> Storage
268// for T where T: ...`, so every existing `Arc<dyn Storage>` callsite
269// continues to work and every backend that implements the five sub-
270// traits automatically implements `Storage`.
271//
272// * `JobStore` — enqueue, list/query, time-based fetches,
273// atomic claim-and-transition, expiration.
274// * `JobLocker` — race-condition primitives: fetch-and-lock,
275// per-job named locks, stranded recovery.
276// * `RecurringStore` — cron-scheduled job templates.
277// * `ServerRegistry` — heartbeat / dead-server detection / reclaim.
278// * `NamedLocks` — generic distributed locks for user-facing
279// "at most one instance of X" semantics.
280//
281// `JobStore` extends `MonitoringApi`, so backends only have to write
282// the dashboard read-side once.
283
284/// Persistence-side of a storage backend: enqueue, list/query, atomic
285/// claim-and-transition, expiration sweep.
286#[async_trait]
287pub trait JobStore: MonitoringApi {
288 /// Persist a new job. Typically lands in the `Enqueued` state
289 /// unless the caller assigned a different state on `job.state`.
290 async fn enqueue(&self, job: &Job) -> Result<(), StorageError>;
291
292 /// Get jobs that are ready to be processed immediately.
293 ///
294 /// Returns enqueued jobs, scheduled jobs whose time has arrived,
295 /// and jobs awaiting retry whose retry time has passed. Used by
296 /// the dashboard's queue-statistics view; workers go through the
297 /// atomic [`JobLocker::fetch_and_lock_job`] path instead.
298 async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError>;
299
300 /// Fetch scheduled jobs whose `enqueue_at` has already passed.
301 /// Read-only — does not transition state. Backends push the time
302 /// predicate down to the engine; results are ordered by priority
303 /// (desc) then `created_at` (asc). Use [`claim_due_scheduled_jobs`](Self::claim_due_scheduled_jobs)
304 /// for the atomic claim-and-promote primitive used by the scheduler.
305 async fn fetch_due_scheduled_jobs(
306 &self,
307 now: DateTime<Utc>,
308 limit: usize,
309 ) -> Result<Vec<Job>, StorageError>;
310
311 /// Read-only counterpart of [`fetch_due_scheduled_jobs`](Self::fetch_due_scheduled_jobs)
312 /// for jobs in `AwaitingRetry`.
313 async fn fetch_due_retry_jobs(
314 &self,
315 now: DateTime<Utc>,
316 limit: usize,
317 ) -> Result<Vec<Job>, StorageError>;
318
319 /// Atomically claim due scheduled jobs and transition them to
320 /// `Enqueued`.
321 ///
322 /// The transition is persisted by the storage engine before this
323 /// call returns. Two schedulers running against the same backend
324 /// cannot both promote the same job. Returned jobs are already in
325 /// `Enqueued` state.
326 ///
327 /// **Caller contract:** do NOT call [`MonitoringApi::update`] on
328 /// the returned jobs to "save" the transition — the persisted row
329 /// already reflects it. Re-writing is harmless on Postgres / Memory
330 /// but causes redundant index churn on Redis.
331 async fn claim_due_scheduled_jobs(
332 &self,
333 now: DateTime<Utc>,
334 limit: usize,
335 ) -> Result<Vec<Job>, StorageError>;
336
337 /// Atomic counterpart of [`claim_due_scheduled_jobs`](Self::claim_due_scheduled_jobs)
338 /// for jobs in `AwaitingRetry`. Same caller contract.
339 async fn claim_due_retry_jobs(
340 &self,
341 now: DateTime<Utc>,
342 limit: usize,
343 ) -> Result<Vec<Job>, StorageError>;
344
345 /// Delete jobs whose `expires_at` is in the past.
346 ///
347 /// Called periodically by [`crate::processing::CleanupWorker`].
348 /// Backends should only touch rows in a final state
349 /// (`Succeeded` / `Failed` / `Deleted`) — in-flight jobs should
350 /// never carry an `expires_at`. Returns the number of rows removed.
351 async fn delete_expired_jobs(&self, now: DateTime<Utc>) -> Result<usize, StorageError>;
352}
353
354/// Race-condition primitives: atomic fetch-and-lock for workers, per-job
355/// named locks, and stranded-job recovery.
356#[async_trait]
357pub trait JobLocker: Send + Sync {
358 /// Atomically fetch and lock a job for processing.
359 ///
360 /// The primary worker entry point. Atomically finds an available
361 /// job, locks it, and marks it `Processing` in a single operation —
362 /// preventing multiple workers from claiming the same job.
363 ///
364 /// Backends use different mechanisms to enforce atomicity:
365 /// - **PostgreSQL**: `SELECT ... FOR UPDATE SKIP LOCKED`.
366 /// - **Redis**: a Lua script that picks the highest-score entry
367 /// from one or more candidate ZSETs.
368 /// - **Memory**: mutex-based.
369 ///
370 /// `queues = None` matches any queue. `queues = Some(&[...])`
371 /// restricts to the listed queues. With per-queue indexing on
372 /// Redis (added alongside the original 1024-cap fix), the queue
373 /// filter is exact on every backend.
374 async fn fetch_and_lock_job(
375 &self,
376 worker_id: &str,
377 queues: Option<&[String]>,
378 ) -> Result<Option<Job>, StorageError>;
379
380 /// Atomic batch variant of [`fetch_and_lock_job`](Self::fetch_and_lock_job).
381 /// Each backend calls fetch-and-lock per slot; the contract is N
382 /// claims rather than one giant atomic over N rows.
383 async fn fetch_available_jobs_atomic(
384 &self,
385 worker_id: &str,
386 limit: Option<usize>,
387 queues: Option<&[String]>,
388 ) -> Result<Vec<Job>, StorageError>;
389
390 /// Acquire an exclusive per-job lock for `timeout_seconds`.
391 /// Returns `Ok(true)` if the lock was acquired, `Ok(false)` if
392 /// another worker holds it.
393 ///
394 /// Distinct from [`NamedLocks::try_acquire_lock`]: this lock lives
395 /// on the job row itself (or a per-job entry on Redis/Memory) so
396 /// fetch-and-lock can remain a single atomic operation.
397 async fn try_acquire_job_lock(
398 &self,
399 job_id: &str,
400 worker_id: &str,
401 timeout_seconds: u64,
402 ) -> Result<bool, StorageError>;
403
404 /// Release a per-job lock held by `worker_id`. No-op if the lock
405 /// has been taken over by someone else.
406 async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError>;
407
408 /// Recover jobs stranded in the `Processing` state by a previous
409 /// server instance.
410 ///
411 /// A job is stranded if its `Processing::started_at` predates
412 /// `stale_before`. Matching jobs are transitioned back to
413 /// `Enqueued` (preserving their original `queue`) and any explicit
414 /// per-job locks are cleared. Returns the number of jobs recovered.
415 ///
416 /// Called by `BackgroundJobServer::start` on startup.
417 /// `stale_before` should comfortably exceed the typical job
418 /// runtime so a still-alive worker on another server isn't
419 /// fighting the sweep.
420 async fn requeue_stranded_jobs(
421 &self,
422 stale_before: DateTime<Utc>,
423 ) -> Result<usize, StorageError>;
424}
425
426/// Recurring-job templates — the storage side of cron-scheduled jobs.
427#[async_trait]
428pub trait RecurringStore: Send + Sync {
429 /// Insert or update a [`RecurringJob`] template, keyed by
430 /// [`RecurringJob::id`].
431 async fn upsert_recurring_job(&self, job: &RecurringJob) -> Result<(), StorageError>;
432
433 /// Remove a recurring-job template by id. Returns `Ok(true)` if
434 /// the row existed and was removed, `Ok(false)` if the id was
435 /// unknown.
436 async fn remove_recurring_job(&self, id: &str) -> Result<bool, StorageError>;
437
438 /// List recurring-job templates (for dashboards / operator tooling).
439 async fn list_recurring_jobs(&self) -> Result<Vec<RecurringJob>, StorageError>;
440
441 /// Atomically claim recurring-job templates whose `next_run_at <=
442 /// now` and are `enabled`. Implementations use locking (Postgres:
443 /// `FOR UPDATE SKIP LOCKED`; Redis: per-row `SET NX`) so two
444 /// servers running the poller cannot double-fire the same tick.
445 ///
446 /// Claimed rows are returned to the caller *before* `next_run_at`
447 /// is advanced — the caller calls [`RecurringJob::advance`] and
448 /// [`upsert_recurring_job`](Self::upsert_recurring_job) to write
449 /// the new `next_run_at` back. Cron expressions can't be
450 /// computed in the database.
451 async fn fetch_due_recurring_jobs(
452 &self,
453 now: DateTime<Utc>,
454 limit: usize,
455 ) -> Result<Vec<RecurringJob>, StorageError>;
456}
457
458/// Live-server registry. Used by the heartbeat worker to detect dead
459/// peers and reclaim their in-flight jobs.
460#[async_trait]
461pub trait ServerRegistry: Send + Sync {
462 /// Insert or update a live [`ServerInfo`] registration. Called
463 /// once on `BackgroundJobServer::start` when heartbeats are
464 /// enabled.
465 async fn register_server(&self, info: &ServerInfo) -> Result<(), StorageError>;
466
467 /// Bump `last_heartbeat` for a previously-registered `server_id`.
468 /// Returns `Ok(false)` if the server was not registered (or had
469 /// already been reclaimed).
470 async fn heartbeat_server(
471 &self,
472 server_id: &str,
473 now: DateTime<Utc>,
474 ) -> Result<bool, StorageError>;
475
476 /// Remove a server registration. Called from `stop()` on graceful
477 /// shutdown, and by peers after reclaiming a dead server's jobs.
478 async fn deregister_server(&self, server_id: &str) -> Result<bool, StorageError>;
479
480 /// Return every server whose `last_heartbeat < stale_before`. Peers
481 /// call this to find servers that have likely crashed.
482 async fn list_dead_servers(
483 &self,
484 stale_before: DateTime<Utc>,
485 ) -> Result<Vec<ServerInfo>, StorageError>;
486
487 /// Re-queue every `Processing` job whose
488 /// [`crate::core::JobState::Processing::server_name`] matches
489 /// `server_id`, returning the number of jobs moved back to
490 /// `Enqueued`. Idempotent — a second call after the first reclaim
491 /// returns 0.
492 async fn reclaim_jobs_from_server(&self, server_id: &str) -> Result<usize, StorageError>;
493}
494
495/// Generic distributed named locks — for "at most one instance of X"
496/// semantics (e.g. a recurring report that must not overlap with
497/// itself).
498#[async_trait]
499pub trait NamedLocks: Send + Sync {
500 /// Try to acquire a named lock.
501 ///
502 /// `resource` is the lock key, `owner` identifies the holder, `ttl`
503 /// is how long the lock lives before another owner can take over.
504 ///
505 /// Semantics:
506 /// - Free resource → created, `Ok(true)`.
507 /// - Expired → taken over (overwriting `owner` and `expires_at`), `Ok(true)`.
508 /// - Held by same `owner` → refresh (extend), `Ok(true)`.
509 /// - Held live by a different owner → `Ok(false)`.
510 ///
511 /// Distinct from [`JobLocker::try_acquire_job_lock`]: per-job
512 /// locks live on the job row so fetch-and-lock remains a single
513 /// atomic operation.
514 async fn try_acquire_lock(
515 &self,
516 resource: &str,
517 owner: &str,
518 ttl: std::time::Duration,
519 ) -> Result<bool, StorageError>;
520
521 /// Release a named lock. Only the current `owner` can release.
522 async fn release_lock(&self, resource: &str, owner: &str) -> Result<bool, StorageError>;
523
524 /// Background sweep of expired generic named locks. Returns the
525 /// number of expired entries removed.
526 ///
527 /// - **Postgres**: `DELETE FROM qml_locks WHERE expires_at < $1`.
528 /// The `try_acquire_lock` path replaces expired rows
529 /// opportunistically on contention, but a workload that takes a
530 /// lock once and never re-acquires would otherwise leak rows.
531 /// - **Redis**: no-op — Redis-native PX TTL handles expiration
532 /// server-side. Returns `Ok(0)`.
533 /// - **Memory**: drops entries from the in-process map.
534 ///
535 /// Called by [`crate::processing::CleanupWorker`] on each tick.
536 async fn cleanup_expired_named_locks(&self, now: DateTime<Utc>) -> Result<usize, StorageError>;
537}
538
539/// Composite trait combining every storage operation: job CRUD +
540/// queries ([`JobStore`]), atomic claim/lock primitives ([`JobLocker`]),
541/// recurring-job templates ([`RecurringStore`]), server registry
542/// ([`ServerRegistry`]), and generic named locks ([`NamedLocks`]).
543///
544/// `Arc<dyn Storage>` is the canonical handle the runtime holds. Every
545/// method on `Storage` comes from one of the five sub-traits via
546/// supertrait inheritance; calling them on a `dyn Storage` value
547/// requires the relevant sub-trait to be in scope.
548///
549/// A [`prelude`] module re-exports all five sub-traits in one shot —
550/// `use qml_rs::storage::prelude::*` is the easiest way to bring them
551/// all into scope when you'd otherwise need `use qml_rs::Storage` to
552/// reach the full surface.
553///
554/// Each backend writes a one-line `impl Storage for Backend {}` —
555/// zero-cost, because every method comes from the five sub-traits.
556pub trait Storage:
557 JobStore + JobLocker + RecurringStore + ServerRegistry + NamedLocks + Send + Sync
558{
559}
560
561impl Storage for MemoryStorage {}
562#[cfg(feature = "redis")]
563impl Storage for RedisStorage {}
564#[cfg(feature = "postgres")]
565impl Storage for PostgresStorage {}
566
567/// One-stop import for every storage trait in this module.
568///
569/// `use qml_rs::storage::prelude::*` brings [`Storage`] *and* the five
570/// sub-traits ([`JobStore`], [`JobLocker`], [`RecurringStore`],
571/// [`ServerRegistry`], [`NamedLocks`]) plus [`MonitoringApi`] into
572/// scope. Because Rust resolves trait methods by which trait is in
573/// scope, calling `storage.enqueue(...)` on an `&dyn Storage` requires
574/// `JobStore` to be reachable — the prelude saves callers from
575/// remembering which method lives where.
576pub mod prelude {
577 pub use super::{
578 JobLocker, JobStore, MonitoringApi, NamedLocks, RecurringStore, ServerRegistry, Storage,
579 };
580}
581
582// =========================================================================
583// StorageInstance — module-level constructors returning Arc<dyn Storage>
584// =========================================================================
585
586/// Module-level constructor surface for the supported backends.
587///
588/// Originally a 3-variant enum (`Memory | Redis | Postgres`) with a
589/// 350-line hand-written `match`-based dispatch implementing every
590/// `Storage` trait method. With the trait split into sub-traits and a
591/// blanket `impl<T> Storage for T where T: ...`, the enum + dispatch
592/// became pure boilerplate. Replaced with a unit struct whose
593/// associated functions return `Arc<dyn Storage>` directly — every
594/// backend type already satisfies `Storage` via the blanket impl, so
595/// no per-backend dispatch is needed.
596///
597/// Existing call sites (`StorageInstance::memory()`,
598/// `StorageInstance::redis(cfg).await`, etc.) keep their syntax; what
599/// changes is that those constructors now return `Arc<dyn Storage>`
600/// rather than an enum value the caller has to wrap in `Arc::new`.
601pub struct StorageInstance;
602
603impl StorageInstance {
604 /// Create a storage instance from configuration.
605 ///
606 /// # Examples
607 ///
608 /// ```rust
609 /// use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};
610 ///
611 /// # tokio_test::block_on(async {
612 /// let config = StorageConfig::Memory(MemoryConfig::default());
613 /// let storage = StorageInstance::from_config(config).await.unwrap();
614 /// # });
615 /// ```
616 pub async fn from_config(
617 config: StorageConfig,
618 ) -> Result<std::sync::Arc<dyn Storage>, StorageError> {
619 match config {
620 StorageConfig::Memory(memory_config) => Ok(std::sync::Arc::new(
621 MemoryStorage::with_config(memory_config),
622 )),
623 #[cfg(feature = "redis")]
624 StorageConfig::Redis(redis_config) => Ok(std::sync::Arc::new(
625 RedisStorage::with_config(redis_config).await?,
626 )),
627 #[cfg(feature = "postgres")]
628 StorageConfig::Postgres(postgres_config) => Ok(std::sync::Arc::new(
629 PostgresStorage::new(postgres_config).await?,
630 )),
631 }
632 }
633
634 /// Create a memory storage instance with default configuration.
635 ///
636 /// # Examples
637 ///
638 /// ```rust
639 /// use qml_rs::storage::StorageInstance;
640 ///
641 /// let storage = StorageInstance::memory();
642 /// ```
643 pub fn memory() -> std::sync::Arc<dyn Storage> {
644 std::sync::Arc::new(MemoryStorage::new())
645 }
646
647 /// Create a memory storage instance with custom configuration.
648 pub fn memory_with_config(config: MemoryConfig) -> std::sync::Arc<dyn Storage> {
649 std::sync::Arc::new(MemoryStorage::with_config(config))
650 }
651
652 /// Create a Redis storage instance with custom configuration.
653 #[cfg(feature = "redis")]
654 pub async fn redis(config: RedisConfig) -> Result<std::sync::Arc<dyn Storage>, StorageError> {
655 Ok(std::sync::Arc::new(
656 RedisStorage::with_config(config).await?,
657 ))
658 }
659
660 /// Create a PostgreSQL storage instance with custom configuration.
661 #[cfg(feature = "postgres")]
662 pub async fn postgres(
663 config: PostgresConfig,
664 ) -> Result<std::sync::Arc<dyn Storage>, StorageError> {
665 Ok(std::sync::Arc::new(PostgresStorage::new(config).await?))
666 }
667}