rust_task_queue/
cli.rs

1//! Consumer Helper Functions
2//!
3//! This module provides simple utilities for consumer projects to create
4//! task workers with minimal boilerplate code.
5
6use crate::config::{ConfigBuilder, TaskQueueConfig};
7use crate::TaskQueueBuilder;
8#[cfg(feature = "auto-register")]
9use crate::task::TaskRegistry;
10use std::env;
11
12#[cfg(feature = "cli")]
13use tracing_subscriber;
14
15#[cfg(feature = "tracing")]
16use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
17
18/// Start a consumer task worker with the given configuration
19///
20/// This function automatically discovers tasks from the consumer library
21/// and starts the specified number of worker processes.
22///
23/// # Example
24///
25/// ```rust,no_run
26/// // Import your tasks first (this example shows the pattern)
27/// // use my_task_app::*;
28/// use rust_task_queue::cli::*;
29/// use rust_task_queue::config::TaskQueueConfig;
30///
31/// #[tokio::main]
32/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
33///     let config = TaskQueueConfig::load()?;
34///     start_cli_worker(config).await
35/// }
36/// ```
37pub async fn start_cli_worker(config: TaskQueueConfig) -> Result<(), Box<dyn std::error::Error>> {
38    // Initialize logging
39    {
40        let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
41        let log_format = env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string());
42
43        let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
44            EnvFilter::new(format!(
45                "rust_task_queue={},{}",
46                log_level,
47                if log_level == "debug" || log_level == "trace" {
48                    "redis=warn,deadpool=warn"
49                } else {
50                    "warn"
51                }
52            ))
53        });
54
55        let fmt_layer = match log_format.as_str() {
56            "json" => fmt::layer()
57                .with_target(true)
58                .with_thread_ids(true)
59                .with_file(true)
60                .with_line_number(true)
61                .json()
62                .boxed(),
63            "compact" => fmt::layer().with_target(false).compact().boxed(),
64            _ => fmt::layer()
65                .with_target(true)
66                .with_thread_ids(true)
67                .pretty()
68                .boxed(),
69        };
70
71        if let Err(e) = tracing_subscriber::registry()
72            .with(env_filter)
73            .with(fmt_layer)
74            .try_init()
75        {
76            eprintln!("Failed to initialize tracing: {}", e);
77            std::process::exit(1);
78        }
79    }
80
81    #[cfg(feature = "tracing")]
82    {
83        tracing::info!("Starting Consumer Task Worker");
84        tracing::info!("Redis URL: {}", config.redis.url);
85        tracing::info!("Workers: {}", config.workers.initial_count);
86        tracing::info!("Auto-register: {}", config.auto_register.enabled);
87        tracing::info!("Scheduler: {}", config.scheduler.enabled);
88        tracing::info!(
89            log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()),
90            log_format = env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string()),
91            "Enhanced tracing initialized"
92        );
93    }
94
95    // Create task queue with configuration
96    #[cfg(feature = "auto-register")]
97    let mut task_queue_builder = TaskQueueBuilder::new(&config.redis.url);
98    #[cfg(not(feature = "auto-register"))]
99    let task_queue_builder = TaskQueueBuilder::new(&config.redis.url);
100
101    #[cfg(feature = "auto-register")]
102    if config.auto_register.enabled {
103        task_queue_builder = task_queue_builder.auto_register_tasks();
104    }
105
106    let task_queue = task_queue_builder.build().await?;
107
108    // Start workers
109    #[cfg(feature = "tracing")]
110    tracing::info!("Starting {} workers...", config.workers.initial_count);
111    task_queue
112        .start_workers(config.workers.initial_count)
113        .await?;
114
115    // Show discovered tasks if auto-registration is enabled
116    #[cfg(feature = "auto-register")]
117    if config.auto_register.enabled {
118        let task_registry = TaskRegistry::with_auto_registered()
119            .map_err(|e| format!("Failed to create registry: {}", e))?;
120        let registered_tasks = task_registry.registered_tasks();
121        #[cfg(feature = "tracing")]
122        {
123            tracing::info!("Auto-discovered {} task types:", registered_tasks.len());
124            for task_type in &registered_tasks {
125                tracing::info!("   • {}", task_type);
126            }
127        }
128    }
129
130    #[cfg(feature = "tracing")]
131    {
132        tracing::info!("Workers started successfully!");
133        tracing::info!("Listening for tasks on all queues");
134        tracing::info!("Press Ctrl+C to shutdown gracefully");
135    }
136
137    // Keep running until interrupt
138    tokio::signal::ctrl_c().await?;
139
140    #[cfg(feature = "tracing")]
141    tracing::info!("Shutting down gracefully...");
142
143    Ok(())
144}
145
146/// Start a consumer worker with automatic configuration
147///
148/// This function loads configuration from:
149/// 1. Configuration files (task-queue.toml, task-queue.yaml, etc.)
150/// 2. Environment variables
151/// 3. Command line arguments
152/// 4. Sensible defaults
153///
154/// # Example
155///
156/// ```rust,no_run
157/// // Import your tasks first (this example shows the pattern)
158/// // use my_task_app::*;
159/// use rust_task_queue::cli::*;
160///
161/// #[tokio::main]
162/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
163///     start_worker().await
164/// }
165/// ```
166pub async fn start_worker() -> Result<(), Box<dyn std::error::Error>> {
167    let config = TaskQueueConfig::load()?;
168    start_cli_worker(config).await
169}
170
171/// Start a consumer worker with configuration from environment variables only
172///
173/// This is useful when you want to avoid file-based configuration
174/// and only use environment variables.
175///
176/// # Example
177///
178/// ```rust,no_run
179/// // Import your tasks first (this example shows the pattern)
180/// // use my_task_app::*;
181/// use rust_task_queue::cli::*;
182///
183/// #[tokio::main]
184/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
185///     start_worker_from_env().await
186/// }
187/// ```
188pub async fn start_worker_from_env() -> Result<(), Box<dyn std::error::Error>> {
189    let config = TaskQueueConfig::from_env()?;
190    start_cli_worker(config).await
191}
192
193/// Start a consumer worker with custom configuration built using the builder pattern
194///
195/// # Example
196///
197/// ```rust,no_run
198/// // Import your tasks first (this example shows the pattern)
199/// // use my_task_app::*;
200/// use rust_task_queue::cli::*;
201/// use rust_task_queue::config::ConfigBuilder;
202///
203/// #[tokio::main]
204/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
205///     let config = ConfigBuilder::new()
206///         .redis_url("redis://localhost:6379")
207///         .workers(4)
208///         .enable_auto_register(true)
209///         .enable_scheduler(true)
210///         .build();
211///     
212///     start_cli_worker(config).await
213/// }
214/// ```
215pub async fn start_worker_with_builder<F>(builder_fn: F) -> Result<(), Box<dyn std::error::Error>>
216where
217    F: FnOnce(ConfigBuilder) -> ConfigBuilder,
218{
219    let config = builder_fn(ConfigBuilder::new()).build();
220    start_cli_worker(config).await
221}
222
223/// Macro to create a complete task worker binary with minimal code
224///
225/// This macro generates a complete `main.rs` for your task worker binary.
226/// It uses the comprehensive configuration system from config.rs.
227///
228/// # Example
229///
230/// ```rust,no_run
231/// // Import your tasks first (this example shows the pattern)
232/// // use my_task_app::*;
233/// rust_task_queue::create_worker_main!();
234/// ```
235#[macro_export]
236macro_rules! create_worker_main {
237    () => {
238        #[tokio::main]
239        async fn main() -> Result<(), Box<dyn std::error::Error>> {
240            $crate::cli::start_worker().await
241        }
242    };
243
244    (env) => {
245        #[tokio::main]
246        async fn main() -> Result<(), Box<dyn std::error::Error>> {
247            $crate::cli::start_worker_from_env().await
248        }
249    };
250
251    ($config:expr) => {
252        #[tokio::main]
253        async fn main() -> Result<(), Box<dyn std::error::Error>> {
254            $crate::cli:start_consumer_workerr($config).await
255        }
256    };
257}
258
259/// Macro to create a task worker with custom configuration using the builder pattern
260///
261/// # Example
262///
263/// ```rust,no_run
264/// // Import your tasks first (this example shows the pattern)
265/// // use my_task_app::*;
266/// rust_task_queue::create_worker_with_builder!(|builder| {
267///     builder
268///         .redis_url("redis://localhost:6379")
269///         .workers(4)
270///         .enable_auto_register(true)
271///         .enable_scheduler(true)
272/// });
273/// ```
274#[macro_export]
275macro_rules! create_worker_with_builder {
276    ($builder_fn:expr) => {
277        #[tokio::main]
278        async fn main() -> Result<(), Box<dyn std::error::Error>> {
279            $crate::cli::start_worker_with_builder($builder_fn).await
280        }
281    };
282}