Module redis_queue::manager

source ·
Expand description

Queue processing and management

Usage

This module provides utilities to manage and process queue tasks

Example below provides brief explanation on how to use it

use redis_queue::redis;
use redis_queue::{Queue, QueueConfig};
use redis_queue::types::Entry;
use redis_queue::manager::{Manager, ManagerConfig, ConsumerKind, RunParams, manage};
use redis_queue::manager::dispatch::{Dispatch, TaskResult, TaskResultKind};

use core::future::Future;
use core::pin::Pin;
use core::time;

use tokio::sync::oneshot;

///This is dispatcher to process your tasks
struct TaskDispatcher {
}

impl Dispatch for TaskDispatcher {
    type PayloadType = String;
    type Future = Pin<Box<dyn Future<Output = TaskResult<String>> + Send + Sync + 'static>>;
    fn send(&self, entry: Entry<Self::PayloadType>) -> Self::Future {
        //TaskResultKind enum determines how queue manage loop will handle result
        //For now only `TempFail` has special handling to retry task,
        //while other variants will just result in log entry
        todo!();
    }
}

async fn example() {
    //Limit on concurrent task number
    const MAX_CONCURRENT_TASK: usize = 1000;
    //User of your queue, will also connect to the same redis instance and use the same stream name.
    let config = QueueConfig {
        stream: "stream_name".into()
    };
    let client = redis::Client::open("redis://127.0.0.1/").expect("to create redis client");
    let conn = client.get_tokio_connection_manager().await.expect("to get connection");
    let queue = Queue::new(config, conn);

    let config = ManagerConfig {
        //Group is shared by all consumers
        group: "group".into(),
        //Use Single if you only need 1 manager.
        //Use 1 Main and Extra when deploying multiple.
        //Only Main/Single trims queue
        kind: ConsumerKind::Single,
        //This is unique consumer name.
        //Every instance of manager should have own name to avoid clashes
        consumer: "consumer-name".into(),
        //This is maximum time manager is allowed to block waiting for new messages in queue
        poll_time: time::Duration::from_secs(10),
        //This is maximum time task that temporary failed will remain in queue.
        //Note that it will remain longer if due to concurrency starvation
        //it cannot complete at least max_pending_time / poll_time retries
        max_pending_time: time::Duration::from_secs(60),
    };
    let manager = Manager::new(queue, config).await.expect("to create manager");

    let (shutdown, shutdown_recv) = oneshot::channel();
    let params = RunParams {
        manager,
        shutdown_recv,
        max_task_count: MAX_CONCURRENT_TASK,
        dispatcher: TaskDispatcher {
        }
    };
    let handle = tokio::spawn(manage(params));
    //Do whatever you want (like wait some signal to shutdown yourself)

    //then if you want to shutdown gracefully:
    shutdown.send(()).expect("manager lives");
    handle.await.expect("finish successfully");
}

Modules

Structs

Enums

Functions

  • Starts main loop using provided parameters.
  • Wrapper for queue_trim_consumed to perform consuming logic depending on ConsumerKind of manager