apalis_redis/
lib.rs

1#![warn(
2    missing_debug_implementations,
3    missing_docs,
4    rust_2018_idioms,
5    unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8#![doc = include_str!("../README.md")]
9use std::{any::type_name, io, marker::PhantomData, sync::Arc};
10
11use apalis_core::{
12    backend::{
13        Backend, BackendExt, TaskStream,
14        codec::{Codec, json::JsonCodec},
15    },
16    error::BoxDynError,
17    features_table,
18    task::Task,
19    worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
20};
21use chrono::Utc;
22use event_listener::Event;
23use futures::{
24    FutureExt, StreamExt,
25    future::select,
26    stream::{self, BoxStream},
27};
28use redis::aio::ConnectionLike;
29pub use redis::{Client, IntoConnectionInfo, aio};
30
31mod ack;
32mod config;
33mod context;
34mod fetcher;
35mod queries;
36/// Shared utilities for Redis storage.
37pub mod shared;
38/// Redis sink module.
39pub mod sink;
40
41pub use redis::{RedisError, aio::ConnectionManager};
42
43use ulid::Ulid;
44
45pub use crate::{
46    ack::RedisAck, config::RedisConfig, context::RedisContext, fetcher::*, sink::RedisSink,
47};
48
49/// A Redis task type alias
50pub type RedisTask<Args> = Task<Args, RedisContext, Ulid>;
51
52/// Represents a [Backend] that uses Redis for storage.
53///
54#[doc = "# Feature Support\n"]
55#[doc = features_table! {
56    setup = r#"
57    # {
58    #    use apalis_redis::RedisStorage;
59    #    use std::env;
60    #    let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
61    #    let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
62    #    RedisStorage::new(conn)
63    # };
64    "#,
65    TaskSink => supported("Ability to push new tasks", true),
66    MakeShared => supported("Share the same connection across multiple workers", false),
67    Workflow => supported("Supports workflows and orchestration", true),
68    WebUI => supported("Supports `apalis-board` for monitoring and managing tasks", true),
69    WaitForCompletion => supported("Wait for tasks to complete without blocking", true),
70    Serialization => supported("Supports multiple serialization formats such as JSON and MessagePack", false),
71    RegisterWorker => supported("Allow registering a worker with the backend", false),
72    ResumeAbandoned => supported("Resume abandoned tasks", false),
73}]
74#[derive(Debug)]
75pub struct RedisStorage<Args, Conn = ConnectionManager, C = JsonCodec<Vec<u8>>> {
76    conn: Conn,
77    job_type: PhantomData<Args>,
78    config: RedisConfig,
79    codec: PhantomData<C>,
80    poller: Arc<Event>,
81    sink: RedisSink<Args, C, Conn>,
82}
83
84impl<Args, Conn: Clone, Cdc: Clone> Clone for RedisStorage<Args, Conn, Cdc> {
85    fn clone(&self) -> Self {
86        Self {
87            conn: self.conn.clone(),
88            job_type: PhantomData,
89            config: self.config.clone(),
90            codec: PhantomData,
91            poller: self.poller.clone(),
92            sink: self.sink.clone(),
93        }
94    }
95}
96
97impl<T, Conn: Clone> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
98    /// Start a new connection
99    pub fn new(conn: Conn) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
100        Self::new_with_codec::<JsonCodec<Vec<u8>>>(
101            conn,
102            RedisConfig::default().set_namespace(type_name::<T>()),
103        )
104    }
105
106    /// Start a connection with a custom config
107    pub fn new_with_config(
108        conn: Conn,
109        config: RedisConfig,
110    ) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
111        Self::new_with_codec::<JsonCodec<Vec<u8>>>(conn, config)
112    }
113
114    /// Start a new connection providing custom config and a codec
115    pub fn new_with_codec<K>(conn: Conn, config: RedisConfig) -> RedisStorage<T, Conn, K>
116    where
117        K: Sync + Send + 'static,
118    {
119        let sink = RedisSink::new(&conn, &config);
120        RedisStorage {
121            conn,
122            job_type: PhantomData,
123            config,
124            codec: PhantomData::<K>,
125            poller: Arc::new(Event::new()),
126            sink,
127        }
128    }
129
130    /// Get current connection
131    pub fn get_connection(&self) -> &Conn {
132        &self.conn
133    }
134
135    /// Get the config used by the storage
136    pub fn get_config(&self) -> &RedisConfig {
137        &self.config
138    }
139}
140
141impl<Args, Conn, C> Backend for RedisStorage<Args, Conn, C>
142where
143    Args: Unpin + Send + Sync + 'static,
144    Conn: Clone + ConnectionLike + Send + Sync + 'static,
145    C: Codec<Args, Compact = Vec<u8>> + Unpin + Send + 'static,
146    C::Error: Into<BoxDynError>,
147{
148    type Args = Args;
149    type Stream = TaskStream<Task<Args, RedisContext, Ulid>, RedisError>;
150
151    type IdType = Ulid;
152
153    type Error = RedisError;
154    type Layer = AcknowledgeLayer<RedisAck<Conn, C>>;
155
156    type Context = RedisContext;
157
158    type Beat = BoxStream<'static, Result<(), Self::Error>>;
159
160    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
161        let keep_alive = *self.config.get_keep_alive();
162
163        let config = self.config.clone();
164        let worker_id = worker.name().to_owned();
165        let conn = self.conn.clone();
166        let service = worker.get_service().to_owned();
167
168        let keep_alive = stream::unfold(
169            (
170                keep_alive,
171                worker_id.clone(),
172                conn.clone(),
173                config.clone(),
174                service,
175            ),
176            |(keep_alive, worker_id, mut conn, config, service)| async move {
177                apalis_core::timer::sleep(keep_alive).await;
178                let register_worker =
179                    redis::Script::new(include_str!("../lua/register_worker.lua"));
180                let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker_id);
181                let workers_set = config.workers_set();
182
183                let now: i64 = Utc::now().timestamp();
184
185                let res = register_worker
186                    .key(workers_set)
187                    .key("core::apalis::workers:metadata::")
188                    .arg(now)
189                    .arg(inflight_set)
190                    .arg(config.get_keep_alive().as_secs())
191                    .arg("RedisStorage")
192                    .arg(&service)
193                    .invoke_async::<()>(&mut conn)
194                    .await;
195                Some((res, (keep_alive, worker_id, conn, config, service)))
196            },
197        );
198
199        let enqueue_scheduled = stream::unfold(
200            (worker_id, conn, config),
201            |(worker_id, mut conn, config)| async move {
202                apalis_core::timer::sleep(*config.get_enqueue_scheduled()).await;
203                let scheduled_jobs_set = config.scheduled_jobs_set();
204                let active_jobs_list = config.active_jobs_list();
205                let signal_list = config.signal_list();
206                let now: i64 = Utc::now().timestamp();
207                let enqueue_jobs =
208                    redis::Script::new(include_str!("../lua/enqueue_scheduled_jobs.lua"));
209                let res: Result<usize, _> = enqueue_jobs
210                    .key(scheduled_jobs_set)
211                    .key(active_jobs_list)
212                    .key(signal_list)
213                    .arg(now)
214                    .arg(100)
215                    .invoke_async(&mut conn)
216                    .await;
217                match res {
218                    Ok(_) => Some((Ok(()), (worker_id, conn, config))),
219                    Err(e) => Some((Err(e), (worker_id, conn, config))),
220                }
221            },
222        );
223        stream::select(keep_alive, enqueue_scheduled).boxed()
224    }
225    fn middleware(&self) -> Self::Layer {
226        AcknowledgeLayer::new(RedisAck::new(&self.conn, &self.config))
227    }
228
229    fn poll(self, worker: &WorkerContext) -> Self::Stream {
230        self.poll_compact(worker)
231            .map(|a| match a {
232                Ok(Some(task)) => Ok(Some(
233                    task.try_map(|t| C::decode(&t))
234                        .map_err(|e| build_error(&e.into().to_string()))?,
235                )),
236                Ok(None) => Ok(None),
237                Err(e) => Err(e),
238            })
239            .boxed()
240    }
241}
242
243impl<Args, Conn, C> BackendExt for RedisStorage<Args, Conn, C>
244where
245    Args: Unpin + Send + Sync + 'static,
246    Conn: Clone + ConnectionLike + Send + Sync + 'static,
247    C: Codec<Args, Compact = Vec<u8>> + Unpin + Send + 'static,
248    C::Error: Into<BoxDynError>,
249{
250    type Compact = Vec<u8>;
251
252    type Codec = C;
253
254    type CompactStream = TaskStream<Task<Self::Compact, RedisContext, Ulid>, RedisError>;
255
256    fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
257        let worker = worker.clone();
258        let worker_id = worker.name().to_owned();
259        let config = self.config.clone();
260        let mut conn = self.conn.clone();
261        let event_listener = self.poller.clone();
262        let service = worker.get_service().to_owned();
263        let register = futures::stream::once(async move {
264            let register_worker = redis::Script::new(include_str!("../lua/register_worker.lua"));
265            let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker_id);
266            let workers_set = config.workers_set();
267
268            let now: i64 = Utc::now().timestamp();
269
270            register_worker
271                .key(workers_set)
272                .key("core::apalis::workers:metadata::")
273                .arg(now)
274                .arg(inflight_set)
275                .arg(config.get_keep_alive().as_secs())
276                .arg("RedisStorage")
277                .arg(service)
278                .invoke_async::<()>(&mut conn)
279                .await?;
280            Ok(None)
281        })
282        .filter_map(
283            |res: Result<Option<Task<Args, RedisContext>>, RedisError>| async move {
284                match res {
285                    Ok(_) => None,
286                    Err(e) => Some(Err(e)),
287                }
288            },
289        );
290        let stream = stream::unfold(
291            (
292                worker,
293                self.config.clone(),
294                self.conn.clone(),
295                event_listener,
296            ),
297            |(worker, config, mut conn, event_listener)| async {
298                let interval = apalis_core::timer::sleep(*config.get_poll_interval()).boxed();
299                let pub_sub = event_listener.listen().boxed();
300                select(pub_sub, interval).await; // Pubsub or else interval
301                let data = Self::fetch_next(&worker, &config, &mut conn).await;
302                Some((data, (worker, config, conn, event_listener)))
303            },
304        )
305        .flat_map(|res| match res {
306            Ok(s) => {
307                let stm: Vec<_> = s
308                    .into_iter()
309                    .map(|s| Ok::<_, RedisError>(Some(s)))
310                    .collect();
311                stream::iter(stm)
312            }
313            Err(e) => stream::iter(vec![Err(e)]),
314        });
315        register.chain(stream).boxed()
316    }
317}
318
319/// Shorthand to create a client and connect
320pub async fn connect<S: IntoConnectionInfo>(redis: S) -> Result<ConnectionManager, RedisError> {
321    let client = Client::open(redis.into_connection_info()?)?;
322    let conn = client.get_connection_manager().await?;
323    Ok(conn)
324}
325
326fn build_error(message: &str) -> RedisError {
327    RedisError::from(io::Error::new(io::ErrorKind::InvalidData, message))
328}
329
330#[cfg(test)]
331mod tests {
332    use apalis_workflow::Workflow;
333    use apalis_workflow::WorkflowSink;
334
335    use redis::Client;
336    use std::{env, time::Duration};
337
338    use apalis_core::{
339        backend::{TaskSink, shared::MakeShared},
340        task::builder::TaskBuilder,
341        worker::{
342            builder::WorkerBuilder,
343            ext::{event_listener::EventListenerExt, parallelize::ParallelizeExt},
344        },
345    };
346
347    use crate::shared::SharedRedisStorage;
348
349    use super::*;
350
351    const ITEMS: u32 = 10;
352
353    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
354    async fn basic_worker() {
355        let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
356        let conn = client.get_connection_manager().await.unwrap();
357        let mut backend = RedisStorage::new_with_config(
358            conn,
359            RedisConfig::default()
360                .set_namespace("redis_basic_worker")
361                .set_buffer_size(100),
362        );
363        for i in 0..ITEMS {
364            backend.push(i).await.unwrap();
365        }
366
367        async fn task(task: u32, ctx: RedisContext, wrk: WorkerContext) -> Result<(), BoxDynError> {
368            let handle = std::thread::current();
369            println!("{task:?}, {ctx:?}, Thread: {:?}", handle.id());
370            if task == ITEMS - 1 {
371                wrk.stop().unwrap();
372                return Err("Worker stopped!")?;
373            }
374            Ok(())
375        }
376
377        let worker = WorkerBuilder::new("rango-tango")
378            .backend(backend)
379            .on_event(|ctx, ev| {
380                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
381            })
382            .build(task);
383        worker.run().await.unwrap();
384    }
385
386    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
387    async fn basic_worker_bincode() {
388        struct Bincode;
389
390        impl<T: bincode::Decode<()> + bincode::Encode> Codec<T> for Bincode {
391            type Compact = Vec<u8>;
392            type Error = bincode::error::DecodeError;
393            fn decode(val: &Self::Compact) -> Result<T, Self::Error> {
394                bincode::decode_from_slice(val, bincode::config::standard()).map(|s| s.0)
395            }
396
397            fn encode(val: &T) -> Result<Self::Compact, Self::Error> {
398                Ok(bincode::encode_to_vec(val, bincode::config::standard()).unwrap())
399            }
400        }
401
402        let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
403        let conn = client.get_connection_manager().await.unwrap();
404        let mut backend = RedisStorage::new_with_codec::<Bincode>(
405            conn,
406            RedisConfig::new("bincode-queue").set_buffer_size(100),
407        );
408
409        for i in 0..ITEMS {
410            let req = TaskBuilder::new(i).build();
411            backend.push_task(req).await.unwrap();
412        }
413
414        async fn task(
415            task: u32,
416            meta: RedisContext,
417            wrk: WorkerContext,
418        ) -> Result<String, BoxDynError> {
419            let handle = std::thread::current();
420            println!("{task:?}, {meta:?}, Thread: {:?}", handle.id());
421            if task == ITEMS - 1 {
422                wrk.stop().unwrap();
423                return Err("Worker stopped!")?;
424            }
425            Ok("Worker".to_owned())
426        }
427
428        let worker = WorkerBuilder::new("rango-tango")
429            .backend(backend)
430            .parallelize(tokio::spawn)
431            .on_event(|ctx, ev| {
432                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
433            })
434            .build(task);
435        worker.run().await.unwrap();
436    }
437
438    #[tokio::test]
439    async fn shared_workers() {
440        let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
441        let mut store = SharedRedisStorage::new(client).await.unwrap();
442
443        let mut string_store = store
444            .make_shared_with_config(
445                RedisConfig::default()
446                    .set_namespace("strrrrrr")
447                    .set_poll_interval(Duration::from_secs(1))
448                    .set_buffer_size(5),
449            )
450            .unwrap();
451        let mut int_store = store
452            .make_shared_with_config(
453                RedisConfig::default()
454                    .set_namespace("Intttttt")
455                    .set_poll_interval(Duration::from_secs(2))
456                    .set_buffer_size(5),
457            )
458            .unwrap();
459
460        for i in 0..ITEMS {
461            string_store.push(format!("ITEM: {i}")).await.unwrap();
462            int_store.push(i).await.unwrap();
463        }
464
465        async fn task(job: u32, ctx: WorkerContext) -> Result<usize, BoxDynError> {
466            tokio::time::sleep(Duration::from_millis(2)).await;
467            if job == ITEMS - 1 {
468                ctx.stop().unwrap();
469                return Err("Worker stopped!")?;
470            }
471            Ok(job as usize)
472        }
473
474        let int_worker = WorkerBuilder::new("rango-tango-int")
475            .backend(int_store)
476            .on_event(|ctx, ev| {
477                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
478            })
479            .build(task)
480            .run();
481
482        let string_worker = WorkerBuilder::new("rango-tango-string")
483            .backend(string_store)
484            .on_event(|ctx, ev| {
485                println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
486            })
487            .build(|req: String, ctx: WorkerContext| async move {
488                tokio::time::sleep(Duration::from_millis(3)).await;
489                println!("{req}");
490                if req.ends_with(&(ITEMS - 1).to_string()) {
491                    ctx.stop().unwrap();
492                }
493            })
494            .run();
495        let _ = futures::future::try_join(int_worker, string_worker)
496            .await
497            .unwrap();
498    }
499
500    #[tokio::test]
501    async fn workflow() {
502        async fn task1(job: u32) -> Result<Vec<u32>, BoxDynError> {
503            Ok((job..2).collect())
504        }
505
506        async fn task2(_: Vec<u32>) -> Result<usize, BoxDynError> {
507            Ok(42)
508        }
509
510        async fn task3(job: usize, wrk: WorkerContext, ctx: RedisContext) -> Result<(), io::Error> {
511            wrk.stop().unwrap();
512            println!("{job}");
513            dbg!(&ctx);
514            Ok(())
515        }
516
517        let work_flow = Workflow::new("sample-workflow")
518            .and_then(task1)
519            .delay_for(Duration::from_millis(1000))
520            .and_then(task2)
521            .and_then(task3);
522
523        let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
524        let conn = client.get_connection_manager().await.unwrap();
525        let mut backend = RedisStorage::new_with_config(
526            conn,
527            RedisConfig::default().set_namespace("redis_workflow"),
528        );
529
530        backend.push_start(0u32).await.unwrap();
531
532        let worker = WorkerBuilder::new("rango-tango")
533            .backend(backend)
534            .on_event(|ctx, ev| {
535                use apalis_core::worker::event::Event;
536                println!("Worker {:?}, On Event = {:?}", ctx.name(), ev);
537                if matches!(ev, Event::Error(_)) {
538                    ctx.stop().unwrap();
539                }
540            })
541            .build(work_flow);
542        worker.run().await.unwrap();
543    }
544}