Expand description
Queue processing and management
§Usage
This module provides utilities to manage and process queue tasks
Example below provides brief explanation on how to use it
use redis_queue::redis;
use redis_queue::{Queue, QueueConfig};
use redis_queue::types::Entry;
use redis_queue::manager::{Manager, ManagerConfig, ConsumerKind, RunParams, manage};
use redis_queue::manager::dispatch::{Dispatch, TaskResult, TaskResultKind};
use core::future::Future;
use core::pin::Pin;
use core::time;
use tokio::sync::oneshot;
///This is dispatcher to process your tasks
struct TaskDispatcher {
}
impl Dispatch for TaskDispatcher {
type PayloadType = String;
type Future = Pin<Box<dyn Future<Output = TaskResult<String>> + Send + Sync + 'static>>;
fn send(&self, entry: Entry<Self::PayloadType>) -> Self::Future {
//TaskResultKind enum determines how queue manage loop will handle result
//For now only `TempFail` has special handling to retry task,
//while other variants will just result in log entry
todo!();
}
}
async fn example() {
//Limit on concurrent task number
const MAX_CONCURRENT_TASK: usize = 1000;
//User of your queue, will also connect to the same redis instance and use the same stream name.
let config = QueueConfig {
stream: "stream_name".into()
};
let client = redis::Client::open("redis://127.0.0.1/").expect("to create redis client");
let conn = client.get_tokio_connection_manager().await.expect("to get connection");
let queue = Queue::new(config, conn);
let config = ManagerConfig {
//Group is shared by all consumers
group: "group".into(),
//Use Single if you only need 1 manager.
//Use 1 Main and Extra when deploying multiple.
//Only Main/Single trims queue
kind: ConsumerKind::Single,
//This is unique consumer name.
//Every instance of manager should have own name to avoid clashes
consumer: "consumer-name".into(),
//This is maximum time manager is allowed to block waiting for new messages in queue
poll_time: time::Duration::from_secs(10),
//This is maximum time task that temporary failed will remain in queue.
//Note that it will remain longer if due to concurrency starvation
//it cannot complete at least max_pending_time / poll_time retries
max_pending_time: time::Duration::from_secs(60),
};
let manager = Manager::new(queue, config).await.expect("to create manager");
let (shutdown, shutdown_recv) = oneshot::channel();
let params = RunParams {
manager,
shutdown_recv,
max_task_count: MAX_CONCURRENT_TASK,
dispatcher: TaskDispatcher {
}
};
let handle = tokio::spawn(manage(params));
//Do whatever you want (like wait some signal to shutdown yourself)
//then if you want to shutdown gracefully:
shutdown.send(()).expect("manager lives");
handle.await.expect("finish successfully");
}
Modules§
- dispatch
- Queue task dispatcher
Structs§
- Manager
- Task Queue manager.
- Manager
Config - Manager configuration.
- RunParams
- Parameters for
manage
function
Enums§
- Config
Error - Possible error creating manager
- Consumer
Kind - Describes type of consumer configured.
Functions§
- manage
- Starts main loop using provided parameters.
- trim_
queue_ task - Wrapper for
queue_trim_consumed
to perform consuming logic depending onConsumerKind
ofmanager