use crate::config::{ConfigBuilder, TaskQueueConfig};
use crate::TaskQueueBuilder;
#[cfg(feature = "auto-register")]
use crate::task::TaskRegistry;
use std::env;
#[cfg(feature = "cli")]
use tracing_subscriber;
#[cfg(feature = "tracing")]
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
pub async fn start_cli_worker(config: TaskQueueConfig) -> Result<(), Box<dyn std::error::Error>> {
{
let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
let log_format = env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string());
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new(format!(
"rust_task_queue={},{}",
log_level,
if log_level == "debug" || log_level == "trace" {
"redis=warn,deadpool=warn"
} else {
"warn"
}
))
});
let fmt_layer = match log_format.as_str() {
"json" => fmt::layer()
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.json()
.boxed(),
"compact" => fmt::layer().with_target(false).compact().boxed(),
_ => fmt::layer()
.with_target(true)
.with_thread_ids(true)
.pretty()
.boxed(),
};
if let Err(e) = tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.try_init()
{
eprintln!("Failed to initialize tracing: {}", e);
std::process::exit(1);
}
}
#[cfg(feature = "tracing")]
{
tracing::info!("Starting Consumer Task Worker");
tracing::info!("Redis URL: {}", config.redis.url);
tracing::info!("Workers: {}", config.workers.initial_count);
tracing::info!("Auto-register: {}", config.auto_register.enabled);
tracing::info!("Scheduler: {}", config.scheduler.enabled);
tracing::info!(
log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()),
log_format = env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string()),
"Enhanced tracing initialized"
);
}
#[cfg(feature = "auto-register")]
let mut task_queue_builder = TaskQueueBuilder::new(&config.redis.url);
#[cfg(not(feature = "auto-register"))]
let task_queue_builder = TaskQueueBuilder::new(&config.redis.url);
#[cfg(feature = "auto-register")]
if config.auto_register.enabled {
task_queue_builder = task_queue_builder.auto_register_tasks();
}
let task_queue = task_queue_builder.build().await?;
#[cfg(feature = "tracing")]
tracing::info!("Starting {} workers...", config.workers.initial_count);
task_queue
.start_workers(config.workers.initial_count)
.await?;
#[cfg(feature = "auto-register")]
if config.auto_register.enabled {
let task_registry = TaskRegistry::with_auto_registered()
.map_err(|e| format!("Failed to create registry: {}", e))?;
let registered_tasks = task_registry.registered_tasks();
#[cfg(feature = "tracing")]
{
tracing::info!("Auto-discovered {} task types:", registered_tasks.len());
for task_type in ®istered_tasks {
tracing::info!(" • {}", task_type);
}
}
}
#[cfg(feature = "tracing")]
{
tracing::info!("Workers started successfully!");
tracing::info!("Listening for tasks on all queues");
tracing::info!("Press Ctrl+C to shutdown gracefully");
}
tokio::signal::ctrl_c().await?;
#[cfg(feature = "tracing")]
tracing::info!("Shutting down gracefully...");
Ok(())
}
pub async fn start_worker() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskQueueConfig::load()?;
start_cli_worker(config).await
}
pub async fn start_worker_from_env() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskQueueConfig::from_env()?;
start_cli_worker(config).await
}
pub async fn start_worker_with_builder<F>(builder_fn: F) -> Result<(), Box<dyn std::error::Error>>
where
F: FnOnce(ConfigBuilder) -> ConfigBuilder,
{
let config = builder_fn(ConfigBuilder::new()).build();
start_cli_worker(config).await
}
#[macro_export]
macro_rules! create_worker_main {
() => {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
$crate::cli::start_worker().await
}
};
(env) => {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
$crate::cli::start_worker_from_env().await
}
};
($config:expr) => {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
$crate::cli:start_consumer_workerr($config).await
}
};
}
#[macro_export]
macro_rules! create_worker_with_builder {
($builder_fn:expr) => {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
$crate::cli::start_worker_with_builder($builder_fn).await
}
};
}