1use crate::config::{ConfigBuilder, TaskQueueConfig};
7use crate::TaskQueueBuilder;
8#[cfg(feature = "auto-register")]
9use crate::task::TaskRegistry;
10use std::env;
11
12#[cfg(feature = "cli")]
13use tracing_subscriber;
14
15#[cfg(feature = "tracing")]
16use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
17
18pub async fn start_cli_worker(config: TaskQueueConfig) -> Result<(), Box<dyn std::error::Error>> {
38 {
40 let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
41 let log_format = env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string());
42
43 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
44 EnvFilter::new(format!(
45 "rust_task_queue={},{}",
46 log_level,
47 if log_level == "debug" || log_level == "trace" {
48 "redis=warn,deadpool=warn"
49 } else {
50 "warn"
51 }
52 ))
53 });
54
55 let fmt_layer = match log_format.as_str() {
56 "json" => fmt::layer()
57 .with_target(true)
58 .with_thread_ids(true)
59 .with_file(true)
60 .with_line_number(true)
61 .json()
62 .boxed(),
63 "compact" => fmt::layer().with_target(false).compact().boxed(),
64 _ => fmt::layer()
65 .with_target(true)
66 .with_thread_ids(true)
67 .pretty()
68 .boxed(),
69 };
70
71 if let Err(e) = tracing_subscriber::registry()
72 .with(env_filter)
73 .with(fmt_layer)
74 .try_init()
75 {
76 eprintln!("Failed to initialize tracing: {}", e);
77 std::process::exit(1);
78 }
79 }
80
81 #[cfg(feature = "tracing")]
82 {
83 tracing::info!("Starting Consumer Task Worker");
84 tracing::info!("Redis URL: {}", config.redis.url);
85 tracing::info!("Workers: {}", config.workers.initial_count);
86 tracing::info!("Auto-register: {}", config.auto_register.enabled);
87 tracing::info!("Scheduler: {}", config.scheduler.enabled);
88 tracing::info!(
89 log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()),
90 log_format = env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string()),
91 "Enhanced tracing initialized"
92 );
93 }
94
95 #[cfg(feature = "auto-register")]
97 let mut task_queue_builder = TaskQueueBuilder::new(&config.redis.url);
98 #[cfg(not(feature = "auto-register"))]
99 let task_queue_builder = TaskQueueBuilder::new(&config.redis.url);
100
101 #[cfg(feature = "auto-register")]
102 if config.auto_register.enabled {
103 task_queue_builder = task_queue_builder.auto_register_tasks();
104 }
105
106 let task_queue = task_queue_builder.build().await?;
107
108 #[cfg(feature = "tracing")]
110 tracing::info!("Starting {} workers...", config.workers.initial_count);
111 task_queue
112 .start_workers(config.workers.initial_count)
113 .await?;
114
115 #[cfg(feature = "auto-register")]
117 if config.auto_register.enabled {
118 let task_registry = TaskRegistry::with_auto_registered()
119 .map_err(|e| format!("Failed to create registry: {}", e))?;
120 let registered_tasks = task_registry.registered_tasks();
121 #[cfg(feature = "tracing")]
122 {
123 tracing::info!("Auto-discovered {} task types:", registered_tasks.len());
124 for task_type in ®istered_tasks {
125 tracing::info!(" • {}", task_type);
126 }
127 }
128 }
129
130 #[cfg(feature = "tracing")]
131 {
132 tracing::info!("Workers started successfully!");
133 tracing::info!("Listening for tasks on all queues");
134 tracing::info!("Press Ctrl+C to shutdown gracefully");
135 }
136
137 tokio::signal::ctrl_c().await?;
139
140 #[cfg(feature = "tracing")]
141 tracing::info!("Shutting down gracefully...");
142
143 Ok(())
144}
145
146pub async fn start_worker() -> Result<(), Box<dyn std::error::Error>> {
167 let config = TaskQueueConfig::load()?;
168 start_cli_worker(config).await
169}
170
171pub async fn start_worker_from_env() -> Result<(), Box<dyn std::error::Error>> {
189 let config = TaskQueueConfig::from_env()?;
190 start_cli_worker(config).await
191}
192
193pub async fn start_worker_with_builder<F>(builder_fn: F) -> Result<(), Box<dyn std::error::Error>>
216where
217 F: FnOnce(ConfigBuilder) -> ConfigBuilder,
218{
219 let config = builder_fn(ConfigBuilder::new()).build();
220 start_cli_worker(config).await
221}
222
223#[macro_export]
236macro_rules! create_worker_main {
237 () => {
238 #[tokio::main]
239 async fn main() -> Result<(), Box<dyn std::error::Error>> {
240 $crate::cli::start_worker().await
241 }
242 };
243
244 (env) => {
245 #[tokio::main]
246 async fn main() -> Result<(), Box<dyn std::error::Error>> {
247 $crate::cli::start_worker_from_env().await
248 }
249 };
250
251 ($config:expr) => {
252 #[tokio::main]
253 async fn main() -> Result<(), Box<dyn std::error::Error>> {
254 $crate::cli:start_consumer_workerr($config).await
255 }
256 };
257}
258
259#[macro_export]
275macro_rules! create_worker_with_builder {
276 ($builder_fn:expr) => {
277 #[tokio::main]
278 async fn main() -> Result<(), Box<dyn std::error::Error>> {
279 $crate::cli::start_worker_with_builder($builder_fn).await
280 }
281 };
282}