1#![warn(
2 missing_debug_implementations,
3 missing_docs,
4 rust_2018_idioms,
5 unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8use std::{any::type_name, io, marker::PhantomData, sync::Arc};
163
164use apalis_core::{
165 backend::{
166 Backend, TaskStream,
167 codec::{Codec, json::JsonCodec},
168 },
169 error::BoxDynError,
170 task::Task,
171 worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
172};
173use chrono::Utc;
174use event_listener::Event;
175use futures::{
176 FutureExt, StreamExt,
177 future::select,
178 stream::{self, BoxStream},
179};
180use redis::aio::ConnectionLike;
181pub use redis::{Client, IntoConnectionInfo, aio};
182
183mod ack;
184mod config;
185mod context;
186mod fetcher;
187mod queries;
188pub mod shared;
190pub mod sink;
192
193pub use redis::{RedisError, aio::ConnectionManager};
194
195use ulid::Ulid;
196
197pub use crate::{
198 ack::RedisAck, config::RedisConfig, context::RedisContext, fetcher::*, sink::RedisSink,
199};
200
201pub type RedisTask<Args> = Task<Args, RedisContext, Ulid>;
203
204#[doc = "# Feature Support\n"]
206#[derive(Debug)]
207pub struct RedisStorage<Args, Conn = ConnectionManager, C = JsonCodec<Vec<u8>>> {
208 conn: Conn,
209 job_type: PhantomData<Args>,
210 config: RedisConfig,
211 codec: PhantomData<C>,
212 poller: Arc<Event>,
213 sink: RedisSink<Args, C, Conn>,
214}
215
216impl<Args, Conn: Clone, Cdc: Clone> Clone for RedisStorage<Args, Conn, Cdc> {
217 fn clone(&self) -> Self {
218 Self {
219 conn: self.conn.clone(),
220 job_type: PhantomData,
221 config: self.config.clone(),
222 codec: PhantomData,
223 poller: self.poller.clone(),
224 sink: self.sink.clone(),
225 }
226 }
227}
228
229impl<T, Conn: Clone> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
230 pub fn new(conn: Conn) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
232 Self::new_with_codec::<JsonCodec<Vec<u8>>>(
233 conn,
234 RedisConfig::default().set_namespace(type_name::<T>()),
235 )
236 }
237
238 pub fn new_with_config(
240 conn: Conn,
241 config: RedisConfig,
242 ) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>> {
243 Self::new_with_codec::<JsonCodec<Vec<u8>>>(conn, config)
244 }
245
246 pub fn new_with_codec<K>(conn: Conn, config: RedisConfig) -> RedisStorage<T, Conn, K>
248 where
249 K: Sync + Send + 'static,
250 {
251 let sink = RedisSink::new(&conn, &config);
252 RedisStorage {
253 conn,
254 job_type: PhantomData,
255 config,
256 codec: PhantomData::<K>,
257 poller: Arc::new(Event::new()),
258 sink,
259 }
260 }
261
262 pub fn get_connection(&self) -> &Conn {
264 &self.conn
265 }
266
267 pub fn get_config(&self) -> &RedisConfig {
269 &self.config
270 }
271}
272
273impl<Args, Conn, C> Backend for RedisStorage<Args, Conn, C>
274where
275 Args: Unpin + Send + Sync + 'static,
276 Conn: Clone + ConnectionLike + Send + Sync + 'static,
277 C: Codec<Args, Compact = Vec<u8>> + Unpin + Send + 'static,
278 C::Error: Into<BoxDynError>,
279{
280 type Args = Args;
281 type Stream = TaskStream<Task<Args, RedisContext, Ulid>, RedisError>;
282
283 type IdType = Ulid;
284
285 type Error = RedisError;
286 type Layer = AcknowledgeLayer<RedisAck<Conn, C>>;
287
288 type Compact = Vec<u8>;
289
290 type Codec = C;
291
292 type Context = RedisContext;
293
294 type Beat = BoxStream<'static, Result<(), Self::Error>>;
295
296 fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
297 let keep_alive = *self.config.get_keep_alive();
298
299 let config = self.config.clone();
300 let worker_id = worker.name().to_owned();
301 let conn = self.conn.clone();
302 let service = worker.get_service().to_owned();
303
304 let keep_alive = stream::unfold(
305 (
306 keep_alive,
307 worker_id.clone(),
308 conn.clone(),
309 config.clone(),
310 service,
311 ),
312 |(keep_alive, worker_id, mut conn, config, service)| async move {
313 apalis_core::timer::sleep(keep_alive).await;
314 let register_worker =
315 redis::Script::new(include_str!("../lua/register_worker.lua"));
316 let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker_id);
317 let workers_set = config.workers_set();
318
319 let now: i64 = Utc::now().timestamp();
320
321 let res = register_worker
322 .key(workers_set)
323 .key("core::apalis::workers:metadata::")
324 .arg(now)
325 .arg(inflight_set)
326 .arg(config.get_keep_alive().as_secs())
327 .arg("RedisStorage")
328 .arg(&service)
329 .invoke_async::<()>(&mut conn)
330 .await;
331 Some((res, (keep_alive, worker_id, conn, config, service)))
332 },
333 );
334
335 let enqueue_scheduled = stream::unfold(
336 (worker_id, conn, config),
337 |(worker_id, mut conn, config)| async move {
338 apalis_core::timer::sleep(*config.get_enqueue_scheduled()).await;
339 let scheduled_jobs_set = config.scheduled_jobs_set();
340 let active_jobs_list = config.active_jobs_list();
341 let signal_list = config.signal_list();
342 let now: i64 = Utc::now().timestamp();
343 let enqueue_jobs =
344 redis::Script::new(include_str!("../lua/enqueue_scheduled_jobs.lua"));
345 let res: Result<usize, _> = enqueue_jobs
346 .key(scheduled_jobs_set)
347 .key(active_jobs_list)
348 .key(signal_list)
349 .arg(now)
350 .arg(100)
351 .invoke_async(&mut conn)
352 .await;
353 match res {
354 Ok(_) => Some((Ok(()), (worker_id, conn, config))),
355 Err(e) => Some((Err(e), (worker_id, conn, config))),
356 }
357 },
358 );
359 stream::select(keep_alive, enqueue_scheduled).boxed()
360 }
361 fn middleware(&self) -> Self::Layer {
362 AcknowledgeLayer::new(RedisAck::new(&self.conn, &self.config))
363 }
364
365 fn poll(self, worker: &WorkerContext) -> Self::Stream {
366 let worker = worker.clone();
367 let worker_id = worker.name().to_owned();
368 let config = self.config.clone();
369 let mut conn = self.conn.clone();
370 let event_listener = self.poller.clone();
371 let service = worker.get_service().to_owned();
372 let register = futures::stream::once(async move {
373 let register_worker = redis::Script::new(include_str!("../lua/register_worker.lua"));
374 let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker_id);
375 let workers_set = config.workers_set();
376
377 let now: i64 = Utc::now().timestamp();
378
379 register_worker
380 .key(workers_set)
381 .key("core::apalis::workers:metadata::")
382 .arg(now)
383 .arg(inflight_set)
384 .arg(config.get_keep_alive().as_secs())
385 .arg("RedisStorage")
386 .arg(service)
387 .invoke_async::<()>(&mut conn)
388 .await?;
389 Ok(None)
390 })
391 .filter_map(
392 |res: Result<Option<Task<Args, RedisContext>>, RedisError>| async move {
393 match res {
394 Ok(_) => None,
395 Err(e) => Some(Err(e)),
396 }
397 },
398 );
399 let stream = stream::unfold(
400 (
401 worker,
402 self.config.clone(),
403 self.conn.clone(),
404 event_listener,
405 ),
406 |(worker, config, mut conn, event_listener)| async {
407 let interval = apalis_core::timer::sleep(*config.get_poll_interval()).boxed();
408 let pub_sub = event_listener.listen().boxed();
409 select(pub_sub, interval).await; let data = Self::fetch_next(&worker, &config, &mut conn).await;
411 Some((data, (worker, config, conn, event_listener)))
412 },
413 )
414 .flat_map(|res| match res {
415 Ok(s) => {
416 let stm: Vec<_> = s
417 .into_iter()
418 .map(|s| Ok::<_, RedisError>(Some(s)))
419 .collect();
420 stream::iter(stm)
421 }
422 Err(e) => stream::iter(vec![Err(e)]),
423 });
424 register.chain(stream).boxed()
425 }
426}
427
428pub async fn connect<S: IntoConnectionInfo>(redis: S) -> Result<ConnectionManager, RedisError> {
430 let client = Client::open(redis.into_connection_info()?)?;
431 let conn = client.get_connection_manager().await?;
432 Ok(conn)
433}
434
435fn build_error(message: &str) -> RedisError {
436 RedisError::from(io::Error::new(io::ErrorKind::InvalidData, message))
437}
438
439#[cfg(test)]
440mod tests {
441 use apalis_workflow::WorkFlow;
442
443 use redis::Client;
444 use std::{env, time::Duration};
445
446 use apalis_core::{
447 backend::{TaskSink, shared::MakeShared},
448 task::builder::TaskBuilder,
449 worker::{
450 builder::WorkerBuilder,
451 ext::{event_listener::EventListenerExt, parallelize::ParallelizeExt},
452 },
453 };
454
455 use crate::shared::SharedRedisStorage;
456
457 use super::*;
458
459 const ITEMS: u32 = 100;
460
461 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
462 async fn basic_worker() {
463 let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
464 let conn = client.get_connection_manager().await.unwrap();
465 let mut backend = RedisStorage::new_with_config(
466 conn,
467 RedisConfig::default()
468 .set_namespace("redis_basic_worker")
469 .set_buffer_size(100),
470 );
471 for i in 0..ITEMS {
472 backend.push(i).await.unwrap();
473 }
474
475 async fn task(task: u32, ctx: RedisContext, wrk: WorkerContext) -> Result<(), BoxDynError> {
476 let handle = std::thread::current();
477 println!("{task:?}, {ctx:?}, Thread: {:?}", handle.id());
478 if task == ITEMS - 1 {
479 wrk.stop().unwrap();
480 return Err("Worker stopped!")?;
481 }
482 Ok(())
483 }
484
485 let worker = WorkerBuilder::new("rango-tango")
486 .backend(backend)
487 .on_event(|ctx, ev| {
488 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
489 })
490 .build(task);
491 worker.run().await.unwrap();
492 }
493
494 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
495 async fn basic_worker_bincode() {
496 struct Bincode;
497
498 impl<T: bincode::Decode<()> + bincode::Encode> Codec<T> for Bincode {
499 type Compact = Vec<u8>;
500 type Error = bincode::error::DecodeError;
501 fn decode(val: &Self::Compact) -> Result<T, Self::Error> {
502 bincode::decode_from_slice(val, bincode::config::standard()).map(|s| s.0)
503 }
504
505 fn encode(val: &T) -> Result<Self::Compact, Self::Error> {
506 Ok(bincode::encode_to_vec(val, bincode::config::standard()).unwrap())
507 }
508 }
509
510 let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
511 let conn = client.get_connection_manager().await.unwrap();
512 let mut backend = RedisStorage::new_with_codec::<Bincode>(
513 conn,
514 RedisConfig::new("bincode-queue").set_buffer_size(100),
515 );
516
517 for i in 0..ITEMS {
518 let req = TaskBuilder::new(i).build();
519 backend.push_task(req).await.unwrap();
520 }
521
522 async fn task(
523 task: u32,
524 meta: RedisContext,
525 wrk: WorkerContext,
526 ) -> Result<String, BoxDynError> {
527 let handle = std::thread::current();
528 println!("{task:?}, {meta:?}, Thread: {:?}", handle.id());
529 if task == ITEMS - 1 {
530 wrk.stop().unwrap();
531 return Err("Worker stopped!")?;
532 }
533 Ok("Worker".to_owned())
534 }
535
536 let worker = WorkerBuilder::new("rango-tango")
537 .backend(backend)
538 .parallelize(tokio::spawn)
539 .on_event(|ctx, ev| {
540 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
541 })
542 .build(task);
543 worker.run().await.unwrap();
544 }
545
546 #[tokio::test]
547 async fn shared_workers() {
548 let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
549 let mut store = SharedRedisStorage::new(client).await.unwrap();
550
551 let mut string_store = store
552 .make_shared_with_config(
553 RedisConfig::default()
554 .set_namespace("strrrrrr")
555 .set_poll_interval(Duration::from_secs(1))
556 .set_buffer_size(5),
557 )
558 .unwrap();
559 let mut int_store = store
560 .make_shared_with_config(
561 RedisConfig::default()
562 .set_namespace("Intttttt")
563 .set_poll_interval(Duration::from_secs(2))
564 .set_buffer_size(5),
565 )
566 .unwrap();
567
568 for i in 0..ITEMS {
569 string_store.push(format!("ITEM: {i}")).await.unwrap();
570 int_store.push(i).await.unwrap();
571 }
572
573 async fn task(job: u32, ctx: WorkerContext) -> Result<usize, BoxDynError> {
574 tokio::time::sleep(Duration::from_millis(2)).await;
575 if job == ITEMS - 1 {
576 ctx.stop().unwrap();
577 return Err("Worker stopped!")?;
578 }
579 Ok(job as usize)
580 }
581
582 let int_worker = WorkerBuilder::new("rango-tango-int")
583 .backend(int_store)
584 .on_event(|ctx, ev| {
585 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
586 })
587 .build(task)
588 .run();
589
590 let string_worker = WorkerBuilder::new("rango-tango-string")
591 .backend(string_store)
592 .on_event(|ctx, ev| {
593 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
594 })
595 .build(|req: String, ctx: WorkerContext| async move {
596 tokio::time::sleep(Duration::from_millis(3)).await;
597 println!("{req}");
598 if req.ends_with(&(ITEMS - 1).to_string()) {
599 ctx.stop().unwrap();
600 }
601 })
602 .run();
603 let _ = futures::future::try_join(int_worker, string_worker)
604 .await
605 .unwrap();
606 }
607
608 #[tokio::test]
609 async fn workflow() {
610 async fn task1(job: u32) -> Result<Vec<u32>, BoxDynError> {
611 Ok((job..2).collect())
612 }
613
614 async fn task2(_: Vec<u32>) -> Result<usize, BoxDynError> {
615 Ok(42)
616 }
617
618 async fn task3(job: usize, wrk: WorkerContext, ctx: RedisContext) -> Result<(), io::Error> {
619 wrk.stop().unwrap();
620 println!("{job}");
621 dbg!(&ctx);
622 Ok(())
623 }
624
625 let work_flow = WorkFlow::new("sample-workflow")
626 .then(task1)
627 .delay_for(Duration::from_millis(1000))
628 .then(task2)
629 .then(task3);
630
631 let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
632 let conn = client.get_connection_manager().await.unwrap();
633 let mut backend: RedisStorage<Vec<u8>> = RedisStorage::new_with_config(
634 conn,
635 RedisConfig::default().set_namespace("redis_workflow"),
636 );
637
638 apalis_core::backend::WeakTaskSink::push(&mut backend, 1u32)
639 .await
640 .unwrap();
641
642 let worker = WorkerBuilder::new("rango-tango")
643 .backend(backend)
644 .on_event(|ctx, ev| {
645 use apalis_core::worker::event::Event;
646 println!("Worker {:?}, On Event = {:?}", ctx.name(), ev);
647 if matches!(ev, Event::Error(_)) {
648 ctx.stop().unwrap();
649 }
650 })
651 .build(work_flow);
652 worker.run().await.unwrap();
653 }
654}