Skip to main content

qm_redis/
lib.rs

1#![deny(missing_docs)]
2
3//! Redis connection, caching, and work queue utilities.
4//!
5//! This crate provides Redis connection management with connection pooling,
6//! distributed locking, caching with JSON serialization, and async work queues.
7//!
8//! ## Features
9//!
10//! - **Connection Pooling**: Deadpool-based connection pooling
11//! - **Distributed Locks**: Acquire and manage distributed locks
12//! - **JSON Caching**: Cache JSON-serializable types with automatic serialization
13//! - **Work Queues**: Async worker queues for background job processing
14//! - **Configuration**: Environment-based configuration with prefix support
15//!
16//! ## Usage
17//!
18//! \```ignore
19//! use qm_redis::{Redis, RedisConfig};
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//!     let redis = Redis::new()?;
24//!     let mut con = redis.connect().await?;
25//!     Ok(())
26//! }
27//! \```
28//!
29//! ## Environment Variables
30//!
31//! | Variable | Description | Default |
32//! |----------|-------------|---------|
33//! | `REDIS_HOST` | Redis host | `127.0.0.1` |
34//! | `REDIS_PORT` | Redis port | `6379` |
35//! | `REDIS_DB` | Redis database number | `0` |
36//! | `REDIS_USERNAME` | Redis username | (none) |
37//! | `REDIS_PASSWORD` | Redis password | (none) |
38
39pub use deadpool_redis::redis;
40use deadpool_redis::PoolError;
41use deadpool_redis::Runtime;
42use redis::FromRedisValue;
43use redis::RedisError;
44use redis::ToRedisArgs;
45use std::sync::Arc;
46mod config;
47/// Distributed locking utilities.
48pub mod lock;
49/// Work queue implementation.
50pub mod work_queue;
51use futures::stream::FuturesUnordered;
52use futures::StreamExt;
53use redis::AsyncCommands;
54use redis::RedisResult;
55use serde::de::DeserializeOwned;
56use serde::Serialize;
57use std::future::Future;
58use std::pin::Pin;
59use std::sync::atomic::AtomicBool;
60use std::sync::atomic::Ordering;
61use std::time::Duration;
62use tokio::runtime::Builder;
63use tokio::sync::RwLock;
64use tokio::task::LocalSet;
65use work_queue::Item;
66use work_queue::KeyPrefix;
67use work_queue::WorkQueue;
68
69pub use crate::config::Config as RedisConfig;
70use crate::lock::Lock;
71
72/// Error type for cache operations.
73#[derive(Debug, thiserror::Error)]
74pub enum CacheError {
75    /// Connection pool error.
76    #[error(transparent)]
77    Pool(#[from] PoolError),
78    /// Redis error.
79    #[error(transparent)]
80    Redis(#[from] RedisError),
81    /// Fetch operation failed.
82    #[error("failed to fetch: {0}")]
83    Failure(String),
84}
85
86/// JSON wrapper for Redis serialization.
87///
88/// Automatically serializes to/from JSON when storing/retrieving from Redis.
89#[derive(serde::Serialize, serde::Deserialize)]
90pub struct Json<T>(T);
91
92impl<T> FromRedisValue for Json<T>
93where
94    T: DeserializeOwned,
95{
96    fn from_redis_value(v: redis::Value) -> Result<Self, redis::ParsingError> {
97        if let redis::Value::SimpleString(s) = v {
98            serde_json::from_str(&s).map_err(|e| redis::ParsingError::from(e.to_string()))
99        } else {
100            Err(redis::ParsingError::from("expected simple string value"))
101        }
102    }
103}
104
105impl<T> ToRedisArgs for Json<T>
106where
107    T: Serialize,
108{
109    fn write_redis_args<W>(&self, out: &mut W)
110    where
111        W: ?Sized + redis::RedisWrite,
112    {
113        let v = serde_json::to_string(&self.0).unwrap_or_default();
114        v.write_redis_args(out);
115    }
116}
117
118/// Internal state for Redis connection.
119pub struct Inner {
120    config: RedisConfig,
121    client: redis::Client,
122    pool: deadpool_redis::Pool,
123}
124
125/// Redis connection wrapper with connection pooling.
126#[derive(Clone)]
127pub struct Redis {
128    inner: Arc<Inner>,
129}
130
131impl AsRef<deadpool_redis::Pool> for Redis {
132    fn as_ref(&self) -> &deadpool_redis::Pool {
133        &self.inner.pool
134    }
135}
136
137impl Redis {
138    /// Creates a new Redis connection from environment variables.
139    pub fn new() -> anyhow::Result<Self> {
140        let config = RedisConfig::builder().build()?;
141        let client = redis::Client::open(config.address())?;
142        let redis_cfg = deadpool_redis::Config::from_url(config.address());
143        let pool = redis_cfg.create_pool(Some(Runtime::Tokio1))?;
144        Ok(Self {
145            inner: Arc::new(Inner {
146                config,
147                client,
148                pool,
149            }),
150        })
151    }
152
153    /// Returns a reference to the Redis configuration.
154    pub fn config(&self) -> &RedisConfig {
155        &self.inner.config
156    }
157
158    /// Returns a reference to the Redis client.
159    pub fn client(&self) -> &redis::Client {
160        &self.inner.client
161    }
162
163    /// Returns a clone of the connection pool.
164    pub fn pool(&self) -> Arc<deadpool_redis::Pool> {
165        Arc::new(self.inner.pool.clone())
166    }
167
168    /// Acquires a connection from the pool.
169    pub async fn connect(&self) -> Result<deadpool_redis::Connection, deadpool_redis::PoolError> {
170        self.inner.pool.get().await
171    }
172
173    /// Clears all data from the Redis database.
174    pub async fn cleanup(&self) -> anyhow::Result<()> {
175        let mut con = self.connect().await?;
176        let _: redis::Value = redis::cmd("FLUSHALL").query_async(&mut con).await?;
177        Ok(())
178    }
179
180    /// Acquires a distributed lock with the given parameters.
181    pub async fn lock(
182        &self,
183        key: &str,
184        ttl: usize,
185        retry_count: u32,
186        retry_delay: u32,
187    ) -> Result<Lock, lock::Error> {
188        let mut con = self.connect().await?;
189        lock::lock(&mut con, key, ttl, retry_count, retry_delay).await
190    }
191
192    /// Releases a distributed lock.
193    pub async fn unlock(&self, key: &str, lock_id: &str) -> Result<i64, lock::Error> {
194        let mut con = self.connect().await?;
195        lock::unlock(&mut con, key, lock_id).await
196    }
197}
198
199/// Runs async function exclusively using Redis lock.
200///
201/// Lock will be released even if async block fails.
202///
203/// # Errors
204///
205/// This function will return an error if either `f` call triggers exception, or lock failure.
206/// Panic in async call will not release lock, but it will be released after timeout.
207pub async fn mutex_run<S, O, E, F>(lock_name: S, redis: &Redis, f: F) -> Result<O, E>
208where
209    S: AsRef<str>,
210    F: std::future::Future<Output = Result<O, E>>,
211    E: From<self::lock::Error>,
212{
213    let lock = redis.lock(lock_name.as_ref(), 5000, 20, 250).await?;
214
215    let result = f.await;
216
217    redis.unlock(lock_name.as_ref(), &lock.id).await?;
218
219    result
220}
221
222/// Macro to implement AsRef<Redis> for a storage type.
223#[macro_export]
224macro_rules! redis {
225    ($storage:ty) => {
226        impl AsRef<qm::redis::Redis> for $storage {
227            fn as_ref(&self) -> &qm::redis::Redis {
228                &self.inner.redis
229            }
230        }
231    };
232}
233
234/// Type for running worker futures.
235pub type RunningWorkers =
236    FuturesUnordered<Pin<Box<dyn Future<Output = String> + Send + Sync + 'static>>>;
237
238/// Type for executable item futures.
239pub type ExecItemFuture = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>;
240
241/// Context passed to worker functions.
242pub struct WorkerContext<Ctx>
243where
244    Ctx: Clone + Send + Sync + 'static,
245{
246    ctx: Ctx,
247    /// The worker ID.
248    pub worker_id: usize,
249    /// The work queue.
250    pub queue: Arc<WorkQueue>,
251    /// The Redis client.
252    pub client: Arc<redis::Client>,
253    /// The current work item.
254    pub item: Item,
255}
256
257impl<Ctx> WorkerContext<Ctx>
258where
259    Ctx: Clone + Send + Sync + 'static,
260{
261    /// Returns a reference to the context.
262    pub fn ctx(&self) -> &Ctx {
263        &self.ctx
264    }
265
266    /// Marks the current item as completed.
267    pub async fn complete(&self) -> anyhow::Result<()> {
268        let mut con = self.client.get_multiplexed_async_connection().await?;
269        self.queue.complete(&mut con, &self.item).await?;
270        Ok(())
271    }
272}
273
274async fn add(
275    is_running: Arc<AtomicBool>,
276    instances: Arc<RwLock<Option<RunningWorkers>>>,
277    fut: Pin<Box<dyn Future<Output = String> + Send + Sync + 'static>>,
278) {
279    if !is_running.load(Ordering::SeqCst) {
280        return;
281    }
282    instances.write().await.as_mut().unwrap().push(fut);
283}
284
285/// Trait for implementing worker logic.
286#[async_trait::async_trait]
287pub trait Work<Ctx, T>: Send + Sync
288where
289    Ctx: Clone + Send + Sync + 'static,
290    T: DeserializeOwned + Send + Sync,
291{
292    /// Runs the worker logic for a given item.
293    async fn run(&self, ctx: WorkerContext<Ctx>, item: T) -> anyhow::Result<()>;
294}
295
296async fn run_recovery_worker<Ctx, T>(
297    client: Arc<redis::Client>,
298    is_running: Arc<AtomicBool>,
299    worker: Arc<AsyncWorker<Ctx, T>>,
300) -> anyhow::Result<()>
301where
302    Ctx: Clone + Send + Sync + 'static,
303    T: DeserializeOwned + Send + Sync,
304{
305    tracing::info!("start {} worker recovery", worker.prefix);
306    let mut con = client.get_multiplexed_async_connection().await?;
307    loop {
308        if !is_running.load(Ordering::SeqCst) {
309            break;
310        }
311        tokio::time::sleep(Duration::from_secs(10)).await;
312        worker.recover(&mut con).await?;
313    }
314    Ok(())
315}
316
317async fn run_worker_queue<Ctx, T>(
318    ctx: Ctx,
319    client: Arc<redis::Client>,
320    is_running: Arc<AtomicBool>,
321    worker: Arc<AsyncWorker<Ctx, T>>,
322    worker_id: usize,
323) -> anyhow::Result<()>
324where
325    Ctx: Clone + Send + Sync + 'static,
326    T: DeserializeOwned + Send + Sync,
327{
328    tracing::info!("start {} worker #{worker_id} queue", worker.prefix);
329    let request_queue = Arc::new(WorkQueue::new(KeyPrefix::new(worker.prefix.clone())));
330    let mut con = client.get_multiplexed_async_connection().await?;
331    loop {
332        if !is_running.load(Ordering::SeqCst) {
333            break;
334        }
335        if let Some(item) = request_queue
336            .lease(
337                &mut con,
338                Some(Duration::from_secs(worker.timeout)),
339                Duration::from_secs(worker.lease_duration),
340            )
341            .await?
342        {
343            if item.data.is_empty() {
344                tracing::info!("item is empty");
345                request_queue.complete(&mut con, &item).await?;
346                continue;
347            }
348            if let Ok(request) = serde_json::from_slice::<T>(&item.data).inspect_err(|_| {
349                tracing::error!(
350                    "invalid request item on worker {} #{worker_id} Item: {}",
351                    worker.prefix,
352                    String::from_utf8_lossy(&item.data)
353                );
354            }) {
355                if let Some(work) = worker.work.as_ref() {
356                    work.run(
357                        WorkerContext {
358                            ctx: ctx.clone(),
359                            worker_id,
360                            queue: request_queue.clone(),
361                            client: client.clone(),
362                            item: Item {
363                                id: item.id.clone(),
364                                data: Box::new([]),
365                            },
366                        },
367                        request,
368                    )
369                    .await?;
370                }
371            } else {
372                request_queue.complete(&mut con, &item).await?;
373            }
374        }
375    }
376    Ok(())
377}
378
379struct WorkerInner {
380    client: Arc<redis::Client>,
381    instances: Arc<RwLock<Option<RunningWorkers>>>,
382    is_running: Arc<AtomicBool>,
383}
384
385/// Worker pool for background job processing.
386///
387/// Manages multiple async workers that process jobs from Redis queues.
388#[derive(Clone)]
389pub struct Workers {
390    inner: Arc<WorkerInner>,
391}
392
393impl Workers {
394    /// Creates a new Workers instance from a Redis config.
395    pub fn new(config: &RedisConfig) -> RedisResult<Self> {
396        let client = Arc::new(redis::Client::open(config.address())?);
397        Ok(Self::new_with_client(client))
398    }
399
400    /// Creates a new Workers instance with an existing Redis client.
401    pub fn new_with_client(client: Arc<redis::Client>) -> Self {
402        Self {
403            inner: Arc::new(WorkerInner {
404                client,
405                instances: Arc::new(RwLock::new(Some(RunningWorkers::default()))),
406                is_running: Arc::new(AtomicBool::new(true)),
407            }),
408        }
409    }
410
411    /// Starts the workers with the given context and async worker.
412    pub async fn start<Ctx, T>(&self, ctx: Ctx, worker: AsyncWorker<Ctx, T>) -> anyhow::Result<()>
413    where
414        Ctx: Clone + Send + Sync + 'static,
415        T: DeserializeOwned + Send + Sync + 'static,
416    {
417        let worker = Arc::new(worker);
418        let mut con = self.inner.client.get_multiplexed_async_connection().await?;
419        worker.recover(&mut con).await?;
420        {
421            let instances = self.inner.instances.clone();
422            let client = self.inner.client.clone();
423            let worker = worker.clone();
424            let _th = std::thread::spawn(move || {
425                let rt = Builder::new_current_thread().enable_all().build().unwrap();
426                let local = LocalSet::new();
427                local.spawn_local(async move {
428                    let fut_worker = worker.clone();
429                    let (tx, rx) = tokio::sync::oneshot::channel::<()>();
430                    let is_running = Arc::new(AtomicBool::new(true));
431                    let is_fut_running = is_running.clone();
432                    add(
433                        is_running.clone(),
434                        instances,
435                        Box::pin(async move {
436                            let worker = fut_worker.clone();
437                            tracing::info!("stopping {} recovery", worker.prefix);
438                            is_fut_running.store(false, Ordering::SeqCst);
439                            rx.await.ok();
440                            " recovery".to_string()
441                        }),
442                    )
443                    .await;
444                    if let Err(err) = run_recovery_worker(client, is_running, worker).await {
445                        tracing::error!("{err:#?}");
446                        std::process::exit(1);
447                    }
448                    tx.send(()).ok();
449                });
450                rt.block_on(local);
451            });
452        }
453        for worker_id in 0..worker.num_workers {
454            let worker = worker.clone();
455            let client = self.inner.client.clone();
456            let ctx = ctx.clone();
457            let instances = self.inner.instances.clone();
458            let _th = std::thread::spawn(move || {
459                let rt = Builder::new_current_thread().enable_all().build().unwrap();
460                let local = LocalSet::new();
461                local.spawn_local(async move {
462                    let fut_worker = worker.clone();
463                    let (tx, rx) = tokio::sync::oneshot::channel::<()>();
464                    let is_running = Arc::new(AtomicBool::new(true));
465                    let is_fut_running = is_running.clone();
466                    add(
467                        is_running.clone(),
468                        instances,
469                        Box::pin(async move {
470                            let worker = fut_worker.clone();
471                            tracing::info!("stopping {} #{worker_id}", worker.prefix);
472                            is_fut_running.store(false, Ordering::SeqCst);
473                            rx.await.ok();
474                            format!("{} worker #{worker_id}", fut_worker.prefix)
475                        }),
476                    )
477                    .await;
478                    if let Err(err) =
479                        run_worker_queue(ctx.clone(), client, is_running, worker, worker_id).await
480                    {
481                        tracing::error!("{err:#?}");
482                        std::process::exit(1);
483                    }
484                    tx.send(()).ok();
485                });
486                rt.block_on(local);
487            });
488        }
489        Ok(())
490    }
491
492    /// Terminates all workers gracefully.
493    pub async fn terminate(&self) -> anyhow::Result<()> {
494        if !self.inner.is_running.load(Ordering::SeqCst) {
495            anyhow::bail!("Workers already terminated");
496        }
497        let mut futs = self.inner.instances.write().await.take().unwrap();
498        tracing::info!("try stopping {} workers", futs.len());
499        while let Some(result) = futs.next().await {
500            tracing::info!("stopped {}", result);
501        }
502        Ok(())
503    }
504}
505
506/// Producer for adding jobs to a work queue.
507///
508/// Use this to enqueue work items that will be processed by workers.
509pub struct Producer {
510    client: Arc<deadpool_redis::Pool>,
511    queue: WorkQueue,
512}
513
514impl Producer {
515    /// Creates a new Producer from a Redis config and prefix.
516    pub fn new<S>(config: &RedisConfig, prefix: S) -> anyhow::Result<Self>
517    where
518        S: Into<String>,
519    {
520        let redis_cfg = deadpool_redis::Config::from_url(config.address());
521        let redis = Arc::new(redis_cfg.create_pool(Some(Runtime::Tokio1))?);
522        Ok(Self::new_with_client(redis, prefix))
523    }
524
525    /// Creates a new Producer with an existing connection pool and prefix.
526    pub fn new_with_client<S>(client: Arc<deadpool_redis::Pool>, prefix: S) -> Self
527    where
528        S: Into<String>,
529    {
530        let queue = WorkQueue::new(KeyPrefix::new(prefix.into()));
531        Self { client, queue }
532    }
533
534    /// Adds an item using an existing connection.
535    pub async fn add_item_with_connection<C, T>(&self, db: &mut C, data: &T) -> anyhow::Result<()>
536    where
537        C: AsyncCommands,
538        T: Serialize,
539    {
540        let item = Item::from_json_data(data)?;
541        self.queue.add_item(db, &item).await?;
542        Ok(())
543    }
544
545    /// Adds an item to the queue.
546    pub async fn add_item<T>(&self, data: &T) -> anyhow::Result<()>
547    where
548        T: Serialize,
549    {
550        let item = Item::from_json_data(data)?;
551        let mut con = self.client.get().await?;
552        self.queue.add_item(&mut con, &item).await?;
553        Ok(())
554    }
555}
556
557/// Async worker for processing jobs from a queue.
558///
559/// Configurable with timeout, lease duration, and number of workers.
560/// Use [`AsyncWorker::new`] to create, then configure and call [`AsyncWorker::run`].
561pub struct AsyncWorker<Ctx, T>
562where
563    Ctx: Clone + Send + Sync + 'static,
564    T: DeserializeOwned + Send + Sync,
565{
566    prefix: String,
567    num_workers: usize,
568    timeout: u64,
569    lease_duration: u64,
570    recovery_key: String,
571    recovery_queue: WorkQueue,
572    work: Option<Box<dyn Work<Ctx, T>>>,
573}
574
575impl<Ctx, T> AsyncWorker<Ctx, T>
576where
577    Ctx: Clone + Send + Sync + 'static,
578    T: DeserializeOwned + Send + Sync,
579{
580    /// Creates a new AsyncWorker with the given prefix.
581    pub fn new<S>(prefix: S) -> Self
582    where
583        S: Into<String>,
584    {
585        let prefix = prefix.into();
586        let name = KeyPrefix::new(prefix.clone());
587        Self {
588            recovery_key: name.of(":clean"),
589            recovery_queue: WorkQueue::new(name),
590            timeout: 5,
591            lease_duration: 60,
592            num_workers: 1,
593            prefix,
594            work: None,
595        }
596    }
597
598    /// Sets the timeout for worker tasks.
599    pub fn with_timeout(mut self, timeout: u64) -> Self {
600        self.timeout = timeout;
601        self
602    }
603
604    /// Sets the lease duration for queue items.
605    pub fn with_lease_duration(mut self, lease_duration: u64) -> Self {
606        self.lease_duration = lease_duration;
607        self
608    }
609
610    /// Sets the number of worker threads.
611    pub fn with_num_workers(mut self, num_workers: usize) -> Self {
612        self.num_workers = num_workers;
613        self
614    }
615
616    /// Creates a Producer for adding items to the queue.
617    pub fn producer(&self, client: Arc<deadpool_redis::Pool>) -> Producer {
618        Producer {
619            client,
620            queue: WorkQueue::new(KeyPrefix::new(self.prefix.clone())),
621        }
622    }
623
624    /// Recovers pending items from a previous run.
625    pub async fn recover<C: AsyncCommands>(&self, db: &mut C) -> anyhow::Result<()> {
626        let l = lock::lock(db, &self.recovery_key, 3600, 36, 100).await?;
627        self.recovery_queue.recover(db).await?;
628        lock::unlock(db, &self.recovery_key, l.id).await?;
629        Ok(())
630    }
631
632    /// Sets the work function and returns self.
633    pub fn run(mut self, work: impl Work<Ctx, T> + 'static) -> Self {
634        self.work = Some(Box::new(work));
635        self
636    }
637}