1#![warn(
2 missing_debug_implementations,
3 missing_docs,
4 rust_2018_idioms,
5 unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8#![doc = include_str!("../README.md")]
9use std::{any::type_name, io, marker::PhantomData, sync::Arc};
10
11use apalis_core::{
12 backend::{
13 Backend, BackendExt, TaskStream,
14 codec::{Codec, json::JsonCodec},
15 },
16 error::BoxDynError,
17 features_table,
18 task::Task,
19 worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
20};
21use chrono::Utc;
22use event_listener::Event;
23use futures::{
24 FutureExt, StreamExt,
25 future::select,
26 stream::{self, BoxStream},
27};
28use redis::aio::ConnectionLike;
29pub use redis::{Client, IntoConnectionInfo, aio};
30
31mod ack;
32mod config;
33mod context;
34mod fetcher;
35mod queries;
36pub mod shared;
38pub mod sink;
40
41pub use redis::{RedisError, aio::ConnectionManager};
42
43use ulid::Ulid;
44
45pub use crate::{
46 ack::RedisAck, config::RedisConfig, context::RedisContext, fetcher::*, sink::RedisSink,
47};
48
49pub type RedisTask<Args> = Task<Args, RedisContext, Ulid>;
51
52#[doc = "# Feature Support\n"]
55#[doc = features_table! {
56 setup = r#"
57 # {
58 # use apalis_redis::RedisStorage;
59 # use std::env;
60 # let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
61 # let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
62 # RedisStorage::new(conn)
63 # };
64 "#,
65 TaskSink => supported("Ability to push new tasks", true),
66 MakeShared => supported("Share the same connection across multiple workers", false),
67 Workflow => supported("Supports workflows and orchestration", true),
68 WebUI => supported("Supports `apalis-board` for monitoring and managing tasks", true),
69 WaitForCompletion => supported("Wait for tasks to complete without blocking", true),
70 Serialization => supported("Supports multiple serialization formats such as JSON and MessagePack", false),
71 RegisterWorker => supported("Allow registering a worker with the backend", false),
72 ResumeAbandoned => supported("Resume abandoned tasks", false),
73}]
74#[derive(Debug)]
75pub struct RedisStorage<Args, Conn = ConnectionManager, C = JsonCodec<Vec<u8>>> {
76 conn: Conn,
77 job_type: PhantomData<Args>,
78 config: RedisConfig,
79 codec: PhantomData<C>,
80 poller: Arc<Event>,
81 sink: RedisSink<Args, C, Conn>,
82}
83
84impl<Args, Conn: Clone, Cdc: Clone> Clone for RedisStorage<Args, Conn, Cdc> {
85 fn clone(&self) -> Self {
86 Self {
87 conn: self.conn.clone(),
88 job_type: PhantomData,
89 config: self.config.clone(),
90 codec: PhantomData,
91 poller: self.poller.clone(),
92 sink: self.sink.clone(),
93 }
94 }
95}
96
97impl<T, Conn: Clone> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
98 pub fn new(conn: Conn) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
100 Self::new_with_codec::<JsonCodec<Vec<u8>>>(
101 conn,
102 RedisConfig::default().set_namespace(type_name::<T>()),
103 )
104 }
105
106 pub fn new_with_config(
108 conn: Conn,
109 config: RedisConfig,
110 ) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
111 Self::new_with_codec::<JsonCodec<Vec<u8>>>(conn, config)
112 }
113
114 pub fn new_with_codec<K>(conn: Conn, config: RedisConfig) -> RedisStorage<T, Conn, K>
116 where
117 K: Sync + Send + 'static,
118 {
119 let sink = RedisSink::new(&conn, &config);
120 RedisStorage {
121 conn,
122 job_type: PhantomData,
123 config,
124 codec: PhantomData::<K>,
125 poller: Arc::new(Event::new()),
126 sink,
127 }
128 }
129
130 pub fn get_connection(&self) -> &Conn {
132 &self.conn
133 }
134
135 pub fn get_config(&self) -> &RedisConfig {
137 &self.config
138 }
139}
140
141impl<Args, Conn, C> Backend for RedisStorage<Args, Conn, C>
142where
143 Args: Unpin + Send + Sync + 'static,
144 Conn: Clone + ConnectionLike + Send + Sync + 'static,
145 C: Codec<Args, Compact = Vec<u8>> + Unpin + Send + 'static,
146 C::Error: Into<BoxDynError>,
147{
148 type Args = Args;
149 type Stream = TaskStream<Task<Args, RedisContext, Ulid>, RedisError>;
150
151 type IdType = Ulid;
152
153 type Error = RedisError;
154 type Layer = AcknowledgeLayer<RedisAck<Conn, C>>;
155
156 type Context = RedisContext;
157
158 type Beat = BoxStream<'static, Result<(), Self::Error>>;
159
160 fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
161 let keep_alive = *self.config.get_keep_alive();
162
163 let config = self.config.clone();
164 let worker_id = worker.name().to_owned();
165 let conn = self.conn.clone();
166 let service = worker.get_service().to_owned();
167
168 let keep_alive = stream::unfold(
169 (
170 keep_alive,
171 worker_id.clone(),
172 conn.clone(),
173 config.clone(),
174 service,
175 ),
176 |(keep_alive, worker_id, mut conn, config, service)| async move {
177 apalis_core::timer::sleep(keep_alive).await;
178 let register_worker =
179 redis::Script::new(include_str!("../lua/register_worker.lua"));
180 let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker_id);
181 let workers_set = config.workers_set();
182
183 let now: i64 = Utc::now().timestamp();
184
185 let res = register_worker
186 .key(workers_set)
187 .key("core::apalis::workers:metadata::")
188 .arg(now)
189 .arg(inflight_set)
190 .arg(config.get_keep_alive().as_secs())
191 .arg("RedisStorage")
192 .arg(&service)
193 .invoke_async::<()>(&mut conn)
194 .await;
195 Some((res, (keep_alive, worker_id, conn, config, service)))
196 },
197 );
198
199 let enqueue_scheduled = stream::unfold(
200 (worker_id, conn, config),
201 |(worker_id, mut conn, config)| async move {
202 apalis_core::timer::sleep(*config.get_enqueue_scheduled()).await;
203 let scheduled_jobs_set = config.scheduled_jobs_set();
204 let active_jobs_list = config.active_jobs_list();
205 let signal_list = config.signal_list();
206 let now: i64 = Utc::now().timestamp();
207 let enqueue_jobs =
208 redis::Script::new(include_str!("../lua/enqueue_scheduled_jobs.lua"));
209 let res: Result<usize, _> = enqueue_jobs
210 .key(scheduled_jobs_set)
211 .key(active_jobs_list)
212 .key(signal_list)
213 .arg(now)
214 .arg(100)
215 .invoke_async(&mut conn)
216 .await;
217 match res {
218 Ok(_) => Some((Ok(()), (worker_id, conn, config))),
219 Err(e) => Some((Err(e), (worker_id, conn, config))),
220 }
221 },
222 );
223 stream::select(keep_alive, enqueue_scheduled).boxed()
224 }
225 fn middleware(&self) -> Self::Layer {
226 AcknowledgeLayer::new(RedisAck::new(&self.conn, &self.config))
227 }
228
229 fn poll(self, worker: &WorkerContext) -> Self::Stream {
230 self.poll_compact(worker)
231 .map(|a| match a {
232 Ok(Some(task)) => Ok(Some(
233 task.try_map(|t| C::decode(&t))
234 .map_err(|e| build_error(&e.into().to_string()))?,
235 )),
236 Ok(None) => Ok(None),
237 Err(e) => Err(e),
238 })
239 .boxed()
240 }
241}
242
243impl<Args, Conn, C> BackendExt for RedisStorage<Args, Conn, C>
244where
245 Args: Unpin + Send + Sync + 'static,
246 Conn: Clone + ConnectionLike + Send + Sync + 'static,
247 C: Codec<Args, Compact = Vec<u8>> + Unpin + Send + 'static,
248 C::Error: Into<BoxDynError>,
249{
250 type Compact = Vec<u8>;
251
252 type Codec = C;
253
254 type CompactStream = TaskStream<Task<Self::Compact, RedisContext, Ulid>, RedisError>;
255
256 fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
257 let worker = worker.clone();
258 let worker_id = worker.name().to_owned();
259 let config = self.config.clone();
260 let mut conn = self.conn.clone();
261 let event_listener = self.poller.clone();
262 let service = worker.get_service().to_owned();
263 let register = futures::stream::once(async move {
264 let register_worker = redis::Script::new(include_str!("../lua/register_worker.lua"));
265 let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker_id);
266 let workers_set = config.workers_set();
267
268 let now: i64 = Utc::now().timestamp();
269
270 register_worker
271 .key(workers_set)
272 .key("core::apalis::workers:metadata::")
273 .arg(now)
274 .arg(inflight_set)
275 .arg(config.get_keep_alive().as_secs())
276 .arg("RedisStorage")
277 .arg(service)
278 .invoke_async::<()>(&mut conn)
279 .await?;
280 Ok(None)
281 })
282 .filter_map(
283 |res: Result<Option<Task<Args, RedisContext>>, RedisError>| async move {
284 match res {
285 Ok(_) => None,
286 Err(e) => Some(Err(e)),
287 }
288 },
289 );
290 let stream = stream::unfold(
291 (
292 worker,
293 self.config.clone(),
294 self.conn.clone(),
295 event_listener,
296 ),
297 |(worker, config, mut conn, event_listener)| async {
298 let interval = apalis_core::timer::sleep(*config.get_poll_interval()).boxed();
299 let pub_sub = event_listener.listen().boxed();
300 select(pub_sub, interval).await; let data = Self::fetch_next(&worker, &config, &mut conn).await;
302 Some((data, (worker, config, conn, event_listener)))
303 },
304 )
305 .flat_map(|res| match res {
306 Ok(s) => {
307 let stm: Vec<_> = s
308 .into_iter()
309 .map(|s| Ok::<_, RedisError>(Some(s)))
310 .collect();
311 stream::iter(stm)
312 }
313 Err(e) => stream::iter(vec![Err(e)]),
314 });
315 register.chain(stream).boxed()
316 }
317}
318
319pub async fn connect<S: IntoConnectionInfo>(redis: S) -> Result<ConnectionManager, RedisError> {
321 let client = Client::open(redis.into_connection_info()?)?;
322 let conn = client.get_connection_manager().await?;
323 Ok(conn)
324}
325
326fn build_error(message: &str) -> RedisError {
327 RedisError::from(io::Error::new(io::ErrorKind::InvalidData, message))
328}
329
330#[cfg(test)]
331mod tests {
332 use apalis_workflow::Workflow;
333 use apalis_workflow::WorkflowSink;
334
335 use redis::Client;
336 use std::{env, time::Duration};
337
338 use apalis_core::{
339 backend::{TaskSink, shared::MakeShared},
340 task::builder::TaskBuilder,
341 worker::{
342 builder::WorkerBuilder,
343 ext::{event_listener::EventListenerExt, parallelize::ParallelizeExt},
344 },
345 };
346
347 use crate::shared::SharedRedisStorage;
348
349 use super::*;
350
351 const ITEMS: u32 = 10;
352
353 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
354 async fn basic_worker() {
355 let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
356 let conn = client.get_connection_manager().await.unwrap();
357 let mut backend = RedisStorage::new_with_config(
358 conn,
359 RedisConfig::default()
360 .set_namespace("redis_basic_worker")
361 .set_buffer_size(100),
362 );
363 for i in 0..ITEMS {
364 backend.push(i).await.unwrap();
365 }
366
367 async fn task(task: u32, ctx: RedisContext, wrk: WorkerContext) -> Result<(), BoxDynError> {
368 let handle = std::thread::current();
369 println!("{task:?}, {ctx:?}, Thread: {:?}", handle.id());
370 if task == ITEMS - 1 {
371 wrk.stop().unwrap();
372 return Err("Worker stopped!")?;
373 }
374 Ok(())
375 }
376
377 let worker = WorkerBuilder::new("rango-tango")
378 .backend(backend)
379 .on_event(|ctx, ev| {
380 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
381 })
382 .build(task);
383 worker.run().await.unwrap();
384 }
385
386 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
387 async fn basic_worker_bincode() {
388 struct Bincode;
389
390 impl<T: bincode::Decode<()> + bincode::Encode> Codec<T> for Bincode {
391 type Compact = Vec<u8>;
392 type Error = bincode::error::DecodeError;
393 fn decode(val: &Self::Compact) -> Result<T, Self::Error> {
394 bincode::decode_from_slice(val, bincode::config::standard()).map(|s| s.0)
395 }
396
397 fn encode(val: &T) -> Result<Self::Compact, Self::Error> {
398 Ok(bincode::encode_to_vec(val, bincode::config::standard()).unwrap())
399 }
400 }
401
402 let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
403 let conn = client.get_connection_manager().await.unwrap();
404 let mut backend = RedisStorage::new_with_codec::<Bincode>(
405 conn,
406 RedisConfig::new("bincode-queue").set_buffer_size(100),
407 );
408
409 for i in 0..ITEMS {
410 let req = TaskBuilder::new(i).build();
411 backend.push_task(req).await.unwrap();
412 }
413
414 async fn task(
415 task: u32,
416 meta: RedisContext,
417 wrk: WorkerContext,
418 ) -> Result<String, BoxDynError> {
419 let handle = std::thread::current();
420 println!("{task:?}, {meta:?}, Thread: {:?}", handle.id());
421 if task == ITEMS - 1 {
422 wrk.stop().unwrap();
423 return Err("Worker stopped!")?;
424 }
425 Ok("Worker".to_owned())
426 }
427
428 let worker = WorkerBuilder::new("rango-tango")
429 .backend(backend)
430 .parallelize(tokio::spawn)
431 .on_event(|ctx, ev| {
432 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
433 })
434 .build(task);
435 worker.run().await.unwrap();
436 }
437
438 #[tokio::test]
439 async fn shared_workers() {
440 let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
441 let mut store = SharedRedisStorage::new(client).await.unwrap();
442
443 let mut string_store = store
444 .make_shared_with_config(
445 RedisConfig::default()
446 .set_namespace("strrrrrr")
447 .set_poll_interval(Duration::from_secs(1))
448 .set_buffer_size(5),
449 )
450 .unwrap();
451 let mut int_store = store
452 .make_shared_with_config(
453 RedisConfig::default()
454 .set_namespace("Intttttt")
455 .set_poll_interval(Duration::from_secs(2))
456 .set_buffer_size(5),
457 )
458 .unwrap();
459
460 for i in 0..ITEMS {
461 string_store.push(format!("ITEM: {i}")).await.unwrap();
462 int_store.push(i).await.unwrap();
463 }
464
465 async fn task(job: u32, ctx: WorkerContext) -> Result<usize, BoxDynError> {
466 tokio::time::sleep(Duration::from_millis(2)).await;
467 if job == ITEMS - 1 {
468 ctx.stop().unwrap();
469 return Err("Worker stopped!")?;
470 }
471 Ok(job as usize)
472 }
473
474 let int_worker = WorkerBuilder::new("rango-tango-int")
475 .backend(int_store)
476 .on_event(|ctx, ev| {
477 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
478 })
479 .build(task)
480 .run();
481
482 let string_worker = WorkerBuilder::new("rango-tango-string")
483 .backend(string_store)
484 .on_event(|ctx, ev| {
485 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
486 })
487 .build(|req: String, ctx: WorkerContext| async move {
488 tokio::time::sleep(Duration::from_millis(3)).await;
489 println!("{req}");
490 if req.ends_with(&(ITEMS - 1).to_string()) {
491 ctx.stop().unwrap();
492 }
493 })
494 .run();
495 let _ = futures::future::try_join(int_worker, string_worker)
496 .await
497 .unwrap();
498 }
499
500 #[tokio::test]
501 async fn workflow() {
502 async fn task1(job: u32) -> Result<Vec<u32>, BoxDynError> {
503 Ok((job..2).collect())
504 }
505
506 async fn task2(_: Vec<u32>) -> Result<usize, BoxDynError> {
507 Ok(42)
508 }
509
510 async fn task3(job: usize, wrk: WorkerContext, ctx: RedisContext) -> Result<(), io::Error> {
511 wrk.stop().unwrap();
512 println!("{job}");
513 dbg!(&ctx);
514 Ok(())
515 }
516
517 let work_flow = Workflow::new("sample-workflow")
518 .and_then(task1)
519 .delay_for(Duration::from_millis(1000))
520 .and_then(task2)
521 .and_then(task3);
522
523 let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
524 let conn = client.get_connection_manager().await.unwrap();
525 let mut backend = RedisStorage::new_with_config(
526 conn,
527 RedisConfig::default().set_namespace("redis_workflow"),
528 );
529
530 backend.push_start(0u32).await.unwrap();
531
532 let worker = WorkerBuilder::new("rango-tango")
533 .backend(backend)
534 .on_event(|ctx, ev| {
535 use apalis_core::worker::event::Event;
536 println!("Worker {:?}, On Event = {:?}", ctx.name(), ev);
537 if matches!(ev, Event::Error(_)) {
538 ctx.stop().unwrap();
539 }
540 })
541 .build(work_flow);
542 worker.run().await.unwrap();
543 }
544}