runner_q/lib.rs
1//! Runner-Q: A Redis-based activity queue and worker system for Rust
2//!
3//! This crate provides a robust, scalable activity queue system built on Redis with support for:
4//! - **Priority-based activity processing** with Critical, High, Normal, and Low priority levels
5//! - **Activity scheduling** with precise timestamp-based scheduling for future execution
6//! - **Intelligent retry mechanism** with exponential backoff for failed activities
7//! - **Dead letter queue** handling for activities that exceed retry limits
8//! - **Concurrent activity processing** with configurable worker pools
9//! - **Graceful shutdown** handling with proper cleanup
10//! - **Activity orchestration** enabling activities to execute other activities
11//! - **Comprehensive error handling** with retryable and non-retryable error types
12//! - **Activity metadata** support for context and tracking
13//! - **Redis persistence** for durability and scalability
14//! - **Queue statistics** and monitoring capabilities
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use runner_q::{WorkerEngine, ActivityPriority, ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};
20//! use std::sync::Arc;
21//! use async_trait::async_trait;
22//! use serde_json::json;
23//! use serde::{Serialize, Deserialize};
24//! use std::time::Duration;
25//!
26//! // Define activity types
27//! #[derive(Debug, Clone)]
28//! enum MyActivityType {
29//! SendEmail,
30//! ProcessPayment,
31//! }
32//!
33//! impl std::fmt::Display for MyActivityType {
34//! fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35//! match self {
36//! MyActivityType::SendEmail => write!(f, "send_email"),
37//! MyActivityType::ProcessPayment => write!(f, "process_payment"),
38//! }
39//! }
40//! }
41//!
42//! // Implement activity handler
43//! pub struct SendEmailActivity;
44//!
45//! #[async_trait]
46//! impl ActivityHandler for SendEmailActivity {
47//! async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
48//! // Parse the email data - use ? operator for clean error handling
49//! let email_data: serde_json::Map<String, serde_json::Value> = payload
50//! .as_object()
51//! .ok_or_else(|| ActivityError::NonRetry("Invalid payload format".to_string()))?
52//! .clone();
53//!
54//! let to = email_data.get("to")
55//! .and_then(|v| v.as_str())
56//! .ok_or_else(|| ActivityError::NonRetry("Missing 'to' field".to_string()))?;
57//!
58//! // Simulate sending email
59//! println!("Sending email to: {}", to);
60//!
61//! // Return success with result data
62//! Ok(Some(serde_json::json!({
63//! "message": format!("Email sent to {}", to),
64//! "status": "delivered"
65//! })))
66//! }
67//!
68//! fn activity_type(&self) -> String {
69//! MyActivityType::SendEmail.to_string()
70//! }
71//! }
72//!
73//! #[derive(Debug, Serialize, Deserialize)]
74//! pub struct EmailResult {
75//! message: String,
76//! status: String,
77//! }
78//!
79//! #[tokio::main]
80//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
81//! // Improved API: Builder pattern for WorkerEngine
82//! let engine = WorkerEngine::builder()
83//! .redis_url("redis://localhost:6379")
84//! .queue_name("my_app")
85//! .max_workers(8)
86//! .schedule_poll_interval(Duration::from_secs(30))
87//! .build()
88//! .await?;
89//!
90//! // Register activity handler
91//! let send_email_activity = SendEmailActivity;
92//! engine.register_activity(MyActivityType::SendEmail.to_string(), Arc::new(send_email_activity));
93//!
94//! // Get activity executor for fluent activity execution
95//! let activity_executor = engine.get_activity_executor();
96//!
97//! // Improved API: Fluent activity execution
98//! let future = activity_executor
99//! .activity("send_email")
100//! .payload(json!({"to": "user@example.com", "subject": "Welcome!"}))
101//! .max_retries(5)
102//! .timeout(Duration::from_secs(600))
103//! .execute()
104//! .await?;
105//!
106//! // Schedule an activity for future execution (10 seconds from now)
107//! let scheduled_future = activity_executor
108//! .activity("send_email")
109//! .payload(json!({"to": "user@example.com", "subject": "Reminder"}))
110//! .max_retries(3)
111//! .timeout(Duration::from_secs(300))
112//! .delay(Duration::from_secs(10))
113//! .execute()
114//! .await?;
115//!
116//! // Execute an activity with default options
117//! let future2 = activity_executor
118//! .activity("send_email")
119//! .payload(json!({"to": "admin@example.com"}))
120//! .execute()
121//! .await?;
122//!
123//! // Spawn a task to handle the result
124//! tokio::spawn(async move {
125//! if let Ok(result) = future.get_result().await {
126//! match result {
127//! None => {}
128//! Some(data) => {
129//! let email_result: EmailResult = serde_json::from_value(data).unwrap();
130//! println!("Email result: {:?}", email_result);
131//! }
132//! }
133//! }
134//! });
135//!
136//! // Start the worker engine (this will run indefinitely)
137//! engine.start().await?;
138//!
139//! Ok(())
140//! }
141//! ```
142//!
143//! ## Activity Orchestration
144//!
145//! Activities can execute other activities using the `ActivityExecutor` available in the
146//! `ActivityContext`. This enables building complex workflows and activity orchestration patterns.
147//!
148//! ```rust,no_run
149//! use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityOption, ActivityPriority, ActivityError};
150//! use async_trait::async_trait;
151//!
152//! pub struct OrderProcessingActivity;
153//!
154//! #[async_trait]
155//! impl ActivityHandler for OrderProcessingActivity {
156//! async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
157//! let order_id = payload["order_id"]
158//! .as_str()
159//! .ok_or_else(|| ActivityError::NonRetry("Missing order_id".to_string()))?;
160//!
161//! // Step 1: Validate payment
162//! let _payment_future = context.activity_executor
163//! .activity("validate_payment")
164//! .payload(serde_json::json!({"order_id": order_id}))
165//! .priority(ActivityPriority::High)
166//! .max_retries(3)
167//! .timeout(std::time::Duration::from_secs(120))
168//! .execute()
169//! .await.map_err(|e| ActivityError::Retry(format!("Failed to enqueue payment validation: {}", e)))?;
170//!
171//! // Step 2: Update inventory
172//! let _inventory_future = context.activity_executor
173//! .activity("update_inventory")
174//! .payload(serde_json::json!({"order_id": order_id}))
175//! .execute()
176//! .await.map_err(|e| ActivityError::Retry(format!("Failed to enqueue inventory update: {}", e)))?;
177//!
178//! // Step 3: Schedule delivery notification for later
179//! context.activity_executor
180//! .activity("send_delivery_notification")
181//! .payload(serde_json::json!({"order_id": order_id, "customer_email": payload["customer_email"]}))
182//! .max_retries(5)
183//! .timeout(std::time::Duration::from_secs(300))
184//! .delay(std::time::Duration::from_secs(3600)) // 1 hour
185//! .execute()
186//! .await.map_err(|e| ActivityError::Retry(format!("Failed to schedule notification: {}", e)))?;
187//!
188//! Ok(Some(serde_json::json!({
189//! "order_id": order_id,
190//! "status": "processing",
191//! "steps_initiated": ["payment_validation", "inventory_update", "delivery_notification"]
192//! })))
193//! }
194//!
195//! fn activity_type(&self) -> String {
196//! "process_order".to_string()
197//! }
198//! }
199//! ```
200
201pub mod activity;
202pub mod config;
203pub mod observability;
204pub mod queue;
205pub mod runner;
206
207// Re-export main types for easy access
208pub use crate::config::WorkerConfig;
209pub use crate::observability::{observability_api, runnerq_ui, DeadLetterRecord, QueueInspector};
210pub use crate::queue::queue::{
211 ActivityEvent, ActivityEventType, ActivityQueue, ActivitySnapshot, QueueStats,
212};
213pub use crate::runner::error::WorkerError;
214pub use crate::runner::redis::RedisConfig;
215pub use crate::runner::runner::{
216 ActivityBuilder, ActivityExecutor, MetricsSink, WorkerEngine, WorkerEngineBuilder,
217};
218pub use activity::activity::{
219 ActivityContext, ActivityFuture, ActivityHandler, ActivityHandlerResult, ActivityPriority,
220 ActivityStatus, OnDuplicate,
221};
222pub use activity::error::{ActivityError, RetryableError};