apalis_redis/
lib.rs

1#![warn(
2    missing_debug_implementations,
3    missing_docs,
4    rust_2018_idioms,
5    unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8//! # apalis-redis
9//!
10//! Background task processing for rust using `apalis` and `redis`
11//!
12//! ## Features
13//!
14//! - **Reliable task queue** using any `redis` compatible service as the backend.
15//! - **Multiple storage types**: standard polling and `pubsub` based approaches.
16//! - **Customizable codecs** for serializing/deserializing task arguments such as `json`, `msgpack` and `bincode`.
17//! - **Heartbeat and orphaned tasks re-enqueueing** for consistent task processing.
18//! - **Integration with `apalis` workers and middleware** such as `retry`, `long_running` and `parallelize`
19//! - **Shared storage** for multiple task types using the same `redis` connection.
20//! - **Observability**: Monitor and manage tasks using [apalis-board](https://github.com/apalis-dev/apalis-board).
21//!
22//! ## Usage
23//!
24//! Add the latest versions from crates.io:
25//!
26//! ```toml
27//! apalis = { version = "1", features = ["retry"] }
28//! apalis-redis = { version = "1" }
29//! ```
30//!
31//! ### Example
32//!
33//! ```rust,no_run
34//! use apalis::prelude::*;
35//! use apalis_redis::{RedisStorage, RedisConfig as Config};
36//! use serde::{Deserialize, Serialize};
37//! # use std::env;
38//!
39//! #[derive(Debug, Deserialize, Serialize)]
40//! struct Email {
41//!     to: String,
42//! }
43//!
44//! async fn send_email(task: Email) -> Result<(), BoxDynError> {
45//!     Ok(())
46//! }
47//!
48//! #[tokio::main]
49//! async fn main() {
50//!     let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
51//!     let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
52//!     let storage = RedisStorage::new(conn);
53//!
54//!     let worker = WorkerBuilder::new("tasty-pear")
55//!         .backend(storage)
56//!         .build(send_email);
57//!
58//!     worker.run().await;
59//! }
60//! ```
61//!
62//! ### Shared Example
63//!
64//! This shows an example of multiple backends using the same connection.
65//! This can improve performance if you have many task types.
66//!
67//! ```rust,no_run
68//! use apalis::prelude::*;
69//! use apalis_redis::{RedisStorage, RedisConfig as Config, shared::SharedRedisStorage, Client};
70//! use std::collections::HashMap;
71//! use futures::stream;
72//! use std::time::Duration;
73//! use std::env;
74//!
75//! #[tokio::main]
76//! async fn main() {
77//!     let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
78//!
79//!     let mut store = SharedRedisStorage::new(client).await.expect("Could not create shared");
80//!
81//!     let mut map_store = store.make_shared().unwrap();
82//!
83//!     let mut int_store = store.make_shared().unwrap();
84//!
85//!     map_store
86//!         .push_stream(&mut stream::iter(vec![HashMap::<String, String>::new()]))
87//!         .await
88//!         .unwrap();
89//!     int_store.push(99).await.unwrap();
90//!
91//!     async fn send_reminder<T, I>(
92//!         _: T,
93//!         task_id: TaskId<I>,
94//!         wrk: WorkerContext,
95//!     ) -> Result<(), BoxDynError> {
96//!         tokio::time::sleep(Duration::from_secs(2)).await;
97//!         wrk.stop().unwrap();
98//!         Ok(())
99//!     }
100//!
101//!     let int_worker = WorkerBuilder::new("rango-tango-2")
102//!         .backend(int_store)
103//!         .build(send_reminder);
104//!     let map_worker = WorkerBuilder::new("rango-tango-1")
105//!         .backend(map_store)
106//!         .build(send_reminder);
107//!     tokio::try_join!(int_worker.run(), map_worker.run()).unwrap();
108//! }
109//! ```
110//!
111//! ## Workflow Example
112//!
113//! ```rust,no_run
114//! use apalis::prelude::*;
115//! use apalis_redis::{RedisStorage, RedisConfig as Config};
116//! use apalis_workflow::WorkFlow;
117//! use serde::{Deserialize, Serialize};
118//! # use std::env;
119//!
120//! #[derive(Debug, Deserialize, Serialize)]
121//! struct Data {
122//!    value: u32,
123//! }
124//!
125//! async fn task1(task: u32) -> Result<Data, BoxDynError> {
126//!    Ok(Data { value: task + 1 })
127//! }
128//!
129//! async fn task2(task: Data) -> Result<Data, BoxDynError> {
130//!   Ok(Data { value: task.value * 2 })
131//! }
132//!
133//! async fn task3(task: Data) -> Result<(), BoxDynError> {
134//!   println!("Final value: {}", task.value);
135//!   Ok(())
136//! }
137//!
138//! #[tokio::main]
139//! async fn main() {
140//!   let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
141//!   let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
142//!   let storage = RedisStorage::new(conn);
143//!
144//!   let work_flow = WorkFlow::new("sample-workflow")
145//!       .then(task1)
146//!       .then(task2)
147//!       .then(task3);
148//!
149//!   let worker = WorkerBuilder::new("tasty-carrot")
150//!       .backend(storage)
151//!       .build(work_flow);
152//!
153//!   worker.run().await;
154//! }
155//! ```
156//!
157//! ## Observability
158//!
159//! You can track your tasks using [apalis-board](https://github.com/apalis-dev/apalis-board).
160//! ![Task](https://github.com/apalis-dev/apalis-board/raw/master/screenshots/task.png)
161
162use std::{any::type_name, io, marker::PhantomData, sync::Arc};
163
164use apalis_core::{
165    backend::{
166        Backend, TaskStream,
167        codec::{Codec, json::JsonCodec},
168    },
169    error::BoxDynError,
170    task::Task,
171    worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
172};
173use chrono::Utc;
174use event_listener::Event;
175use futures::{
176    FutureExt, StreamExt,
177    future::select,
178    stream::{self, BoxStream},
179};
180use redis::aio::ConnectionLike;
181pub use redis::{Client, IntoConnectionInfo, aio};
182
183mod ack;
184mod config;
185mod context;
186mod fetcher;
187mod queries;
188/// Shared utilities for Redis storage.
189pub mod shared;
190/// Redis sink module.
191pub mod sink;
192
193pub use redis::{RedisError, aio::ConnectionManager};
194
195use ulid::Ulid;
196
197pub use crate::{
198    ack::RedisAck, config::RedisConfig, context::RedisContext, fetcher::*, sink::RedisSink,
199};
200
201/// A Redis task type alias
202pub type RedisTask<Args> = Task<Args, RedisContext, Ulid>;
203
204/// Represents a [Backend] that uses Redis for storage.
205#[doc = "# Feature Support\n"]
206#[derive(Debug)]
207pub struct RedisStorage<Args, Conn = ConnectionManager, C = JsonCodec<Vec<u8>>> {
208    conn: Conn,
209    job_type: PhantomData<Args>,
210    config: RedisConfig,
211    codec: PhantomData<C>,
212    poller: Arc<Event>,
213    sink: RedisSink<Args, C, Conn>,
214}
215
216impl<Args, Conn: Clone, Cdc: Clone> Clone for RedisStorage<Args, Conn, Cdc> {
217    fn clone(&self) -> Self {
218        Self {
219            conn: self.conn.clone(),
220            job_type: PhantomData,
221            config: self.config.clone(),
222            codec: PhantomData,
223            poller: self.poller.clone(),
224            sink: self.sink.clone(),
225        }
226    }
227}
228
229impl<T, Conn: Clone> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
230    /// Start a new connection
231    pub fn new(conn: Conn) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
232        Self::new_with_codec::<JsonCodec<Vec<u8>>>(
233            conn,
234            RedisConfig::default().set_namespace(type_name::<T>()),
235        )
236    }
237
238    /// Start a connection with a custom config
239    pub fn new_with_config(
240        conn: Conn,
241        config: RedisConfig,
242    ) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
243        Self::new_with_codec::<JsonCodec<Vec<u8>>>(conn, config)
244    }
245
246    /// Start a new connection providing custom config and a codec
247    pub fn new_with_codec<K>(conn: Conn, config: RedisConfig) -> RedisStorage<T, Conn, K>
248    where
249        K: Sync + Send + 'static,
250    {
251        let sink = RedisSink::new(&conn, &config);
252        RedisStorage {
253            conn,
254            job_type: PhantomData,
255            config,
256            codec: PhantomData::<K>,
257            poller: Arc::new(Event::new()),
258            sink,
259        }
260    }
261
262    /// Get current connection
263    pub fn get_connection(&self) -> &Conn {
264        &self.conn
265    }
266
267    /// Get the config used by the storage
268    pub fn get_config(&self) -> &RedisConfig {
269        &self.config
270    }
271}
272
273impl<Args, Conn, C> Backend for RedisStorage<Args, Conn, C>
274where
275    Args: Unpin + Send + Sync + 'static,
276    Conn: Clone + ConnectionLike + Send + Sync + 'static,
277    C: Codec<Args, Compact = Vec<u8>> + Unpin + Send + 'static,
278    C::Error: Into<BoxDynError>,
279{
280    type Args = Args;
281    type Stream = TaskStream<Task<Args, RedisContext, Ulid>, RedisError>;
282
283    type IdType = Ulid;
284
285    type Error = RedisError;
286    type Layer = AcknowledgeLayer<RedisAck<Conn, C>>;
287
288    type Compact = Vec<u8>;
289
290    type Codec = C;
291
292    type Context = RedisContext;
293
294    type Beat = BoxStream<'static, Result<(), Self::Error>>;
295
296    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
297        let keep_alive = *self.config.get_keep_alive();
298
299        let config = self.config.clone();
300        let worker_id = worker.name().to_owned();
301        let conn = self.conn.clone();
302        let service = worker.get_service().to_owned();
303
304        let keep_alive = stream::unfold(
305            (
306                keep_alive,
307                worker_id.clone(),
308                conn.clone(),
309                config.clone(),
310                service,
311            ),
312            |(keep_alive, worker_id, mut conn, config, service)| async move {
313                apalis_core::timer::sleep(keep_alive).await;
314                let register_worker =
315                    redis::Script::new(include_str!("../lua/register_worker.lua"));
316                let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker_id);
317                let workers_set = config.workers_set();
318
319                let now: i64 = Utc::now().timestamp();
320
321                let res = register_worker
322                    .key(workers_set)
323                    .key("core::apalis::workers:metadata::")
324                    .arg(now)
325                    .arg(inflight_set)
326                    .arg(config.get_keep_alive().as_secs())
327                    .arg("RedisStorage")
328                    .arg(&service)
329                    .invoke_async::<()>(&mut conn)
330                    .await;
331                Some((res, (keep_alive, worker_id, conn, config, service)))
332            },
333        );
334
335        let enqueue_scheduled = stream::unfold(
336            (worker_id, conn, config),
337            |(worker_id, mut conn, config)| async move {
338                apalis_core::timer::sleep(*config.get_enqueue_scheduled()).await;
339                let scheduled_jobs_set = config.scheduled_jobs_set();
340                let active_jobs_list = config.active_jobs_list();
341                let signal_list = config.signal_list();
342                let now: i64 = Utc::now().timestamp();
343                let enqueue_jobs =
344                    redis::Script::new(include_str!("../lua/enqueue_scheduled_jobs.lua"));
345                let res: Result<usize, _> = enqueue_jobs
346                    .key(scheduled_jobs_set)
347                    .key(active_jobs_list)
348                    .key(signal_list)
349                    .arg(now)
350                    .arg(100)
351                    .invoke_async(&mut conn)
352                    .await;
353                match res {
354                    Ok(_) => Some((Ok(()), (worker_id, conn, config))),
355                    Err(e) => Some((Err(e), (worker_id, conn, config))),
356                }
357            },
358        );
359        stream::select(keep_alive, enqueue_scheduled).boxed()
360    }
361    fn middleware(&self) -> Self::Layer {
362        AcknowledgeLayer::new(RedisAck::new(&self.conn, &self.config))
363    }
364
365    fn poll(self, worker: &WorkerContext) -> Self::Stream {
366        let worker = worker.clone();
367        let worker_id = worker.name().to_owned();
368        let config = self.config.clone();
369        let mut conn = self.conn.clone();
370        let event_listener = self.poller.clone();
371        let service = worker.get_service().to_owned();
372        let register = futures::stream::once(async move {
373            let register_worker = redis::Script::new(include_str!("../lua/register_worker.lua"));
374            let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker_id);
375            let workers_set = config.workers_set();
376
377            let now: i64 = Utc::now().timestamp();
378
379            register_worker
380                .key(workers_set)
381                .key("core::apalis::workers:metadata::")
382                .arg(now)
383                .arg(inflight_set)
384                .arg(config.get_keep_alive().as_secs())
385                .arg("RedisStorage")
386                .arg(service)
387                .invoke_async::<()>(&mut conn)
388                .await?;
389            Ok(None)
390        })
391        .filter_map(
392            |res: Result<Option<Task<Args, RedisContext>>, RedisError>| async move {
393                match res {
394                    Ok(_) => None,
395                    Err(e) => Some(Err(e)),
396                }
397            },
398        );
399        let stream = stream::unfold(
400            (
401                worker,
402                self.config.clone(),
403                self.conn.clone(),
404                event_listener,
405            ),
406            |(worker, config, mut conn, event_listener)| async {
407                let interval = apalis_core::timer::sleep(*config.get_poll_interval()).boxed();
408                let pub_sub = event_listener.listen().boxed();
409                select(pub_sub, interval).await; // Pubsub or else interval
410                let data = Self::fetch_next(&worker, &config, &mut conn).await;
411                Some((data, (worker, config, conn, event_listener)))
412            },
413        )
414        .flat_map(|res| match res {
415            Ok(s) => {
416                let stm: Vec<_> = s
417                    .into_iter()
418                    .map(|s| Ok::<_, RedisError>(Some(s)))
419                    .collect();
420                stream::iter(stm)
421            }
422            Err(e) => stream::iter(vec![Err(e)]),
423        });
424        register.chain(stream).boxed()
425    }
426}
427
428/// Shorthand to create a client and connect
429pub async fn connect<S: IntoConnectionInfo>(redis: S) -> Result<ConnectionManager, RedisError> {
430    let client = Client::open(redis.into_connection_info()?)?;
431    let conn = client.get_connection_manager().await?;
432    Ok(conn)
433}
434
435fn build_error(message: &str) -> RedisError {
436    RedisError::from(io::Error::new(io::ErrorKind::InvalidData, message))
437}
438
439#[cfg(test)]
440mod tests {
441    use apalis_workflow::WorkFlow;
442
443    use redis::Client;
444    use std::{env, time::Duration};
445
446    use apalis_core::{
447        backend::{TaskSink, shared::MakeShared},
448        task::builder::TaskBuilder,
449        worker::{
450            builder::WorkerBuilder,
451            ext::{event_listener::EventListenerExt, parallelize::ParallelizeExt},
452        },
453    };
454
455    use crate::shared::SharedRedisStorage;
456
457    use super::*;
458
459    const ITEMS: u32 = 100;
460
461    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
462    async fn basic_worker() {
463        let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
464        let conn = client.get_connection_manager().await.unwrap();
465        let mut backend = RedisStorage::new_with_config(
466            conn,
467            RedisConfig::default()
468                .set_namespace("redis_basic_worker")
469                .set_buffer_size(100),
470        );
471        for i in 0..ITEMS {
472            backend.push(i).await.unwrap();
473        }
474
475        async fn task(task: u32, ctx: RedisContext, wrk: WorkerContext) -> Result<(), BoxDynError> {
476            let handle = std::thread::current();
477            println!("{task:?}, {ctx:?}, Thread: {:?}", handle.id());
478            if task == ITEMS - 1 {
479                wrk.stop().unwrap();
480                return Err("Worker stopped!")?;
481            }
482            Ok(())
483        }
484
485        let worker = WorkerBuilder::new("rango-tango")
486            .backend(backend)
487            .on_event(|ctx, ev| {
488                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
489            })
490            .build(task);
491        worker.run().await.unwrap();
492    }
493
494    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
495    async fn basic_worker_bincode() {
496        struct Bincode;
497
498        impl<T: bincode::Decode<()> + bincode::Encode> Codec<T> for Bincode {
499            type Compact = Vec<u8>;
500            type Error = bincode::error::DecodeError;
501            fn decode(val: &Self::Compact) -> Result<T, Self::Error> {
502                bincode::decode_from_slice(val, bincode::config::standard()).map(|s| s.0)
503            }
504
505            fn encode(val: &T) -> Result<Self::Compact, Self::Error> {
506                Ok(bincode::encode_to_vec(val, bincode::config::standard()).unwrap())
507            }
508        }
509
510        let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
511        let conn = client.get_connection_manager().await.unwrap();
512        let mut backend = RedisStorage::new_with_codec::<Bincode>(
513            conn,
514            RedisConfig::new("bincode-queue").set_buffer_size(100),
515        );
516
517        for i in 0..ITEMS {
518            let req = TaskBuilder::new(i).build();
519            backend.push_task(req).await.unwrap();
520        }
521
522        async fn task(
523            task: u32,
524            meta: RedisContext,
525            wrk: WorkerContext,
526        ) -> Result<String, BoxDynError> {
527            let handle = std::thread::current();
528            println!("{task:?}, {meta:?}, Thread: {:?}", handle.id());
529            if task == ITEMS - 1 {
530                wrk.stop().unwrap();
531                return Err("Worker stopped!")?;
532            }
533            Ok("Worker".to_owned())
534        }
535
536        let worker = WorkerBuilder::new("rango-tango")
537            .backend(backend)
538            .parallelize(tokio::spawn)
539            .on_event(|ctx, ev| {
540                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
541            })
542            .build(task);
543        worker.run().await.unwrap();
544    }
545
546    #[tokio::test]
547    async fn shared_workers() {
548        let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
549        let mut store = SharedRedisStorage::new(client).await.unwrap();
550
551        let mut string_store = store
552            .make_shared_with_config(
553                RedisConfig::default()
554                    .set_namespace("strrrrrr")
555                    .set_poll_interval(Duration::from_secs(1))
556                    .set_buffer_size(5),
557            )
558            .unwrap();
559        let mut int_store = store
560            .make_shared_with_config(
561                RedisConfig::default()
562                    .set_namespace("Intttttt")
563                    .set_poll_interval(Duration::from_secs(2))
564                    .set_buffer_size(5),
565            )
566            .unwrap();
567
568        for i in 0..ITEMS {
569            string_store.push(format!("ITEM: {i}")).await.unwrap();
570            int_store.push(i).await.unwrap();
571        }
572
573        async fn task(job: u32, ctx: WorkerContext) -> Result<usize, BoxDynError> {
574            tokio::time::sleep(Duration::from_millis(2)).await;
575            if job == ITEMS - 1 {
576                ctx.stop().unwrap();
577                return Err("Worker stopped!")?;
578            }
579            Ok(job as usize)
580        }
581
582        let int_worker = WorkerBuilder::new("rango-tango-int")
583            .backend(int_store)
584            .on_event(|ctx, ev| {
585                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
586            })
587            .build(task)
588            .run();
589
590        let string_worker = WorkerBuilder::new("rango-tango-string")
591            .backend(string_store)
592            .on_event(|ctx, ev| {
593                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
594            })
595            .build(|req: String, ctx: WorkerContext| async move {
596                tokio::time::sleep(Duration::from_millis(3)).await;
597                println!("{req}");
598                if req.ends_with(&(ITEMS - 1).to_string()) {
599                    ctx.stop().unwrap();
600                }
601            })
602            .run();
603        let _ = futures::future::try_join(int_worker, string_worker)
604            .await
605            .unwrap();
606    }
607
608    #[tokio::test]
609    async fn workflow() {
610        async fn task1(job: u32) -> Result<Vec<u32>, BoxDynError> {
611            Ok((job..2).collect())
612        }
613
614        async fn task2(_: Vec<u32>) -> Result<usize, BoxDynError> {
615            Ok(42)
616        }
617
618        async fn task3(job: usize, wrk: WorkerContext, ctx: RedisContext) -> Result<(), io::Error> {
619            wrk.stop().unwrap();
620            println!("{job}");
621            dbg!(&ctx);
622            Ok(())
623        }
624
625        let work_flow = WorkFlow::new("sample-workflow")
626            .then(task1)
627            .delay_for(Duration::from_millis(1000))
628            .then(task2)
629            .then(task3);
630
631        let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
632        let conn = client.get_connection_manager().await.unwrap();
633        let mut backend: RedisStorage<Vec<u8>> = RedisStorage::new_with_config(
634            conn,
635            RedisConfig::default().set_namespace("redis_workflow"),
636        );
637
638        apalis_core::backend::WeakTaskSink::push(&mut backend, 1u32)
639            .await
640            .unwrap();
641
642        let worker = WorkerBuilder::new("rango-tango")
643            .backend(backend)
644            .on_event(|ctx, ev| {
645                use apalis_core::worker::event::Event;
646                println!("Worker {:?}, On Event = {:?}", ctx.name(), ev);
647                if matches!(ev, Event::Error(_)) {
648                    ctx.stop().unwrap();
649                }
650            })
651            .build(work_flow);
652        worker.run().await.unwrap();
653    }
654}