runner_q/
lib.rs

1//! Runner-Q: A Redis-based activity queue and worker system for Rust
2//!
3//! This crate provides a robust, scalable activity queue system built on Redis with support for:
4//! - **Priority-based activity processing** with Critical, High, Normal, and Low priority levels
5//! - **Activity scheduling** with precise timestamp-based scheduling for future execution
6//! - **Intelligent retry mechanism** with exponential backoff for failed activities
7//! - **Dead letter queue** handling for activities that exceed retry limits
8//! - **Concurrent activity processing** with configurable worker pools
9//! - **Graceful shutdown** handling with proper cleanup
10//! - **Activity orchestration** enabling activities to execute other activities
11//! - **Comprehensive error handling** with retryable and non-retryable error types
12//! - **Activity metadata** support for context and tracking
13//! - **Redis persistence** for durability and scalability
14//! - **Queue statistics** and monitoring capabilities
15//!
16//! # Example
17//!
18//! ```rust,no_run
19//! use runner_q::{WorkerEngine, ActivityPriority, ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};
20//! use std::sync::Arc;
21//! use async_trait::async_trait;
22//! use serde_json::json;
23//! use serde::{Serialize, Deserialize};
24//! use std::time::Duration;
25//!
26//! // Define activity types
27//! #[derive(Debug, Clone)]
28//! enum MyActivityType {
29//!     SendEmail,
30//!     ProcessPayment,
31//! }
32//!
33//! impl std::fmt::Display for MyActivityType {
34//!     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35//!         match self {
36//!             MyActivityType::SendEmail => write!(f, "send_email"),
37//!             MyActivityType::ProcessPayment => write!(f, "process_payment"),
38//!         }
39//!     }
40//! }
41//!
42//! // Implement activity handler
43//! pub struct SendEmailActivity;
44//!
45//! #[async_trait]
46//! impl ActivityHandler for SendEmailActivity {
47//!     async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
48//!         // Parse the email data - use ? operator for clean error handling
49//!         let email_data: serde_json::Map<String, serde_json::Value> = payload
50//!             .as_object()
51//!             .ok_or_else(|| ActivityError::NonRetry("Invalid payload format".to_string()))?
52//!             .clone();
53//!
54//!         let to = email_data.get("to")
55//!             .and_then(|v| v.as_str())
56//!             .ok_or_else(|| ActivityError::NonRetry("Missing 'to' field".to_string()))?;
57//!
58//!         // Simulate sending email
59//!         println!("Sending email to: {}", to);
60//!
61//!         // Return success with result data
62//!         Ok(Some(serde_json::json!({
63//!             "message": format!("Email sent to {}", to),
64//!             "status": "delivered"
65//!         })))
66//!     }
67//!
68//!     fn activity_type(&self) -> String {
69//!         MyActivityType::SendEmail.to_string()
70//!     }
71//! }
72//!
73//! #[derive(Debug, Serialize, Deserialize)]
74//! pub struct EmailResult {
75//!     message: String,
76//!     status: String,
77//! }
78//!
79//! #[tokio::main]
80//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
81//!     // Improved API: Builder pattern for WorkerEngine
82//!     let engine = WorkerEngine::builder()
83//!         .redis_url("redis://localhost:6379")
84//!         .queue_name("my_app")
85//!         .max_workers(8)
86//!         .schedule_poll_interval(Duration::from_secs(30))
87//!         .build()
88//!         .await?;
89//!
90//!     // Register activity handler
91//!     let send_email_activity = SendEmailActivity;
92//!     engine.register_activity(MyActivityType::SendEmail.to_string(), Arc::new(send_email_activity));
93//!
94//!     // Get activity executor for fluent activity execution
95//!     let activity_executor = engine.get_activity_executor();
96//!
97//!     // Improved API: Fluent activity execution
98//!     let future = activity_executor
99//!         .activity("send_email")
100//!         .payload(json!({"to": "user@example.com", "subject": "Welcome!"}))
101//!         .max_retries(5)
102//!         .timeout(Duration::from_secs(600))
103//!         .execute()
104//!         .await?;
105//!
106//!     // Schedule an activity for future execution (10 seconds from now)
107//!     let scheduled_future = activity_executor
108//!         .activity("send_email")
109//!         .payload(json!({"to": "user@example.com", "subject": "Reminder"}))
110//!         .max_retries(3)
111//!         .timeout(Duration::from_secs(300))
112//!         .delay(Duration::from_secs(10))
113//!         .execute()
114//!         .await?;
115//!
116//!     // Execute an activity with default options
117//!     let future2 = activity_executor
118//!         .activity("send_email")
119//!         .payload(json!({"to": "admin@example.com"}))
120//!         .execute()
121//!         .await?;
122//!
123//!     // Spawn a task to handle the result
124//!     tokio::spawn(async move {
125//!         if let Ok(result) = future.get_result().await {
126//!             match result {
127//!                 None => {}
128//!                 Some(data) => {
129//!                     let email_result: EmailResult = serde_json::from_value(data).unwrap();
130//!                     println!("Email result: {:?}", email_result);
131//!                 }
132//!             }
133//!         }
134//!     });
135//!
136//!     // Start the worker engine (this will run indefinitely)
137//!     engine.start().await?;
138//!
139//!     Ok(())
140//! }
141//! ```
142//!
143//! ## Activity Orchestration
144//!
145//! Activities can execute other activities using the `ActivityExecutor` available in the
146//! `ActivityContext`. This enables building complex workflows and activity orchestration patterns.
147//!
148//! ```rust,no_run
149//! use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityOption, ActivityPriority, ActivityError};
150//! use async_trait::async_trait;
151//!
152//! pub struct OrderProcessingActivity;
153//!
154//! #[async_trait]
155//! impl ActivityHandler for OrderProcessingActivity {
156//!     async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
157//!         let order_id = payload["order_id"]
158//!             .as_str()
159//!             .ok_or_else(|| ActivityError::NonRetry("Missing order_id".to_string()))?;
160//!
161//!         // Step 1: Validate payment
162//!         let _payment_future = context.activity_executor
163//!             .activity("validate_payment")
164//!             .payload(serde_json::json!({"order_id": order_id}))
165//!             .priority(ActivityPriority::High)
166//!             .max_retries(3)
167//!             .timeout(std::time::Duration::from_secs(120))
168//!             .execute()
169//!             .await.map_err(|e| ActivityError::Retry(format!("Failed to enqueue payment validation: {}", e)))?;
170//!
171//!         // Step 2: Update inventory
172//!         let _inventory_future = context.activity_executor
173//!             .activity("update_inventory")
174//!             .payload(serde_json::json!({"order_id": order_id}))
175//!             .execute()
176//!             .await.map_err(|e| ActivityError::Retry(format!("Failed to enqueue inventory update: {}", e)))?;
177//!
178//!         // Step 3: Schedule delivery notification for later
179//!         context.activity_executor
180//!             .activity("send_delivery_notification")
181//!             .payload(serde_json::json!({"order_id": order_id, "customer_email": payload["customer_email"]}))
182//!             .max_retries(5)
183//!             .timeout(std::time::Duration::from_secs(300))
184//!             .delay(std::time::Duration::from_secs(3600)) // 1 hour
185//!             .execute()
186//!             .await.map_err(|e| ActivityError::Retry(format!("Failed to schedule notification: {}", e)))?;
187//!
188//!         Ok(Some(serde_json::json!({
189//!             "order_id": order_id,
190//!             "status": "processing",
191//!             "steps_initiated": ["payment_validation", "inventory_update", "delivery_notification"]
192//!         })))
193//!     }
194//!
195//!     fn activity_type(&self) -> String {
196//!         "process_order".to_string()
197//!     }
198//! }
199//! ```
200
201pub mod activity;
202pub mod config;
203pub mod observability;
204pub mod queue;
205pub mod runner;
206
207// Re-export main types for easy access
208pub use crate::config::WorkerConfig;
209pub use crate::observability::{observability_api, runnerq_ui, DeadLetterRecord, QueueInspector};
210pub use crate::queue::queue::{
211    ActivityEvent, ActivityEventType, ActivityQueue, ActivitySnapshot, QueueStats,
212};
213pub use crate::runner::error::WorkerError;
214pub use crate::runner::redis::RedisConfig;
215pub use crate::runner::runner::{
216    ActivityBuilder, ActivityExecutor, MetricsSink, WorkerEngine, WorkerEngineBuilder,
217};
218pub use activity::activity::{
219    ActivityContext, ActivityFuture, ActivityHandler, ActivityHandlerResult, ActivityPriority,
220    ActivityStatus, OnDuplicate,
221};
222pub use activity::error::{ActivityError, RetryableError};