Module manager

Source
Expand description

Queue processing and management

§Usage

This module provides utilities to manage and process queue tasks

Example below provides brief explanation on how to use it

use redis_queue::redis;
use redis_queue::{Queue, QueueConfig};
use redis_queue::types::Entry;
use redis_queue::manager::{Manager, ManagerConfig, ConsumerKind, RunParams, manage};
use redis_queue::manager::dispatch::{Dispatch, TaskResult, TaskResultKind};

use core::future::Future;
use core::pin::Pin;
use core::time;

use tokio::sync::oneshot;

///This is dispatcher to process your tasks
struct TaskDispatcher {
}

impl Dispatch for TaskDispatcher {
    type PayloadType = String;
    type Future = Pin<Box<dyn Future<Output = TaskResult<String>> + Send + Sync + 'static>>;
    fn send(&self, entry: Entry<Self::PayloadType>) -> Self::Future {
        //TaskResultKind enum determines how queue manage loop will handle result
        //For now only `TempFail` has special handling to retry task,
        //while other variants will just result in log entry
        todo!();
    }
}

async fn example() {
    //Limit on concurrent task number
    const MAX_CONCURRENT_TASK: usize = 1000;
    //User of your queue, will also connect to the same redis instance and use the same stream name.
    let config = QueueConfig {
        stream: "stream_name".into()
    };
    let client = redis::Client::open("redis://127.0.0.1/").expect("to create redis client");
    let conn = client.get_tokio_connection_manager().await.expect("to get connection");
    let queue = Queue::new(config, conn);

    let config = ManagerConfig {
        //Group is shared by all consumers
        group: "group".into(),
        //Use Single if you only need 1 manager.
        //Use 1 Main and Extra when deploying multiple.
        //Only Main/Single trims queue
        kind: ConsumerKind::Single,
        //This is unique consumer name.
        //Every instance of manager should have own name to avoid clashes
        consumer: "consumer-name".into(),
        //This is maximum time manager is allowed to block waiting for new messages in queue
        poll_time: time::Duration::from_secs(10),
        //This is maximum time task that temporary failed will remain in queue.
        //Note that it will remain longer if due to concurrency starvation
        //it cannot complete at least max_pending_time / poll_time retries
        max_pending_time: time::Duration::from_secs(60),
    };
    let manager = Manager::new(queue, config).await.expect("to create manager");

    let (shutdown, shutdown_recv) = oneshot::channel();
    let params = RunParams {
        manager,
        shutdown_recv,
        max_task_count: MAX_CONCURRENT_TASK,
        dispatcher: TaskDispatcher {
        }
    };
    let handle = tokio::spawn(manage(params));
    //Do whatever you want (like wait some signal to shutdown yourself)

    //then if you want to shutdown gracefully:
    shutdown.send(()).expect("manager lives");
    handle.await.expect("finish successfully");
}

Modules§

dispatch
Queue task dispatcher

Structs§

Manager
Task Queue manager.
ManagerConfig
Manager configuration.
RunParams
Parameters for manage function

Enums§

ConfigError
Possible error creating manager
ConsumerKind
Describes type of consumer configured.

Functions§

manage
Starts main loop using provided parameters.
trim_queue_task
Wrapper for queue_trim_consumed to perform consuming logic depending on ConsumerKind of manager