use std::env;
#[derive(Debug, thiserror::Error)]
pub enum BrokerConfigError {
#[error("Missing environment variable: {0}\n\nSuggestion: Set the environment variable before running:\n export {0}=<value>")]
MissingEnvVar(String),
#[error("Unsupported broker type: {broker_type}\n\nSupported types: redis, postgres, mysql, amqp, sqs\nNote: {note}")]
UnsupportedBrokerType {
broker_type: String,
note: String,
},
#[error("Feature not enabled: {feature}\n\nTo enable this feature, add it to your Cargo.toml:\n celers = {{ version = \"0.1\", features = [\"{feature}\"] }}\n\nAvailable features: redis, postgres, mysql, amqp, sqs, backend-redis, backend-db, backend-rpc")]
FeatureNotEnabled {
feature: String,
},
#[error("Broker creation failed: {message}\n\nPossible causes:\n{suggestions}")]
CreationFailed {
message: String,
suggestions: String,
},
}
pub async fn create_broker_from_env() -> Result<Box<dyn crate::Broker>, BrokerConfigError> {
let broker_type = env::var("CELERS_BROKER_TYPE")
.map_err(|_| BrokerConfigError::MissingEnvVar("CELERS_BROKER_TYPE".to_string()))?;
let broker_url = env::var("CELERS_BROKER_URL")
.map_err(|_| BrokerConfigError::MissingEnvVar("CELERS_BROKER_URL".to_string()))?;
let queue_name = env::var("CELERS_BROKER_QUEUE").unwrap_or_else(|_| "celers".to_string());
create_broker(&broker_type, &broker_url, &queue_name).await
}
pub async fn create_broker(
broker_type: &str,
broker_url: &str,
queue_name: &str,
) -> Result<Box<dyn crate::Broker>, BrokerConfigError> {
match broker_type.to_lowercase().as_str() {
#[cfg(feature = "redis")]
"redis" => {
use crate::RedisBroker;
RedisBroker::new(broker_url, queue_name)
.map(|b| Box::new(b) as Box<dyn crate::Broker>)
.map_err(|e| BrokerConfigError::CreationFailed {
message: e.to_string(),
suggestions: "- Check that Redis server is running\n - Verify the connection URL format: redis://host:port\n - Ensure network connectivity to Redis server".to_string(),
})
}
#[cfg(feature = "postgres")]
"postgres" | "postgresql" => {
use crate::PostgresBroker;
PostgresBroker::with_queue(broker_url, queue_name)
.await
.map(|b| Box::new(b) as Box<dyn crate::Broker>)
.map_err(|e| BrokerConfigError::CreationFailed {
message: e.to_string(),
suggestions: "- Check that PostgreSQL server is running\n - Verify the connection URL format: postgres://user:pass@host:port/db\n - Ensure database exists and user has permissions".to_string(),
})
}
#[cfg(feature = "mysql")]
"mysql" => {
use crate::MysqlBroker;
MysqlBroker::with_queue(broker_url, queue_name)
.await
.map(|b| Box::new(b) as Box<dyn crate::Broker>)
.map_err(|e| BrokerConfigError::CreationFailed {
message: e.to_string(),
suggestions: "- Check that MySQL server is running\n - Verify the connection URL format: mysql://user:pass@host:port/db\n - Ensure database exists and user has permissions".to_string(),
})
}
_ => {
#[cfg(not(feature = "redis"))]
if broker_type.to_lowercase() == "redis" {
return Err(BrokerConfigError::FeatureNotEnabled {
feature: "redis".to_string(),
});
}
#[cfg(not(feature = "postgres"))]
if broker_type.to_lowercase() == "postgres"
|| broker_type.to_lowercase() == "postgresql"
{
return Err(BrokerConfigError::FeatureNotEnabled {
feature: "postgres".to_string(),
});
}
#[cfg(not(feature = "mysql"))]
if broker_type.to_lowercase() == "mysql" {
return Err(BrokerConfigError::FeatureNotEnabled {
feature: "mysql".to_string(),
});
}
if broker_type.to_lowercase() == "amqp" || broker_type.to_lowercase() == "rabbitmq" {
return Err(BrokerConfigError::UnsupportedBrokerType {
broker_type: broker_type.to_string(),
note: "AMQP brokers use the Transport trait. Import and use AmqpBroker directly from celers::AmqpBroker".to_string(),
});
}
if broker_type.to_lowercase() == "sqs" {
return Err(BrokerConfigError::UnsupportedBrokerType {
broker_type: broker_type.to_string(),
note: "SQS brokers use the Transport trait. Import and use SqsBroker directly from celers::SqsBroker".to_string(),
});
}
Err(BrokerConfigError::UnsupportedBrokerType {
broker_type: broker_type.to_string(),
note: "Check the broker type name for typos".to_string(),
})
}
}
}