#[cfg(feature = "websocket")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "websocket")]
#[derive(Serialize, Deserialize, Debug)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
#[cfg(feature = "websocket")]
#[derive(Serialize, Deserialize, Debug)]
struct ImagePayload {
url: String,
width: u32,
height: u32,
}
#[cfg(not(feature = "websocket"))]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
#[cfg(feature = "websocket")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use asynq::backend::{WebSocketBroker, WebSocketInspector};
use asynq::serve_mux::ServeMux;
use asynq::server::Server;
use asynq::task::Task;
use std::env;
use tracing::info;
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("asynq=debug".parse()?)
.add_directive("websocket_consumer=debug".parse()?),
)
.init();
let ws_url = env::var("ASYNQ_WS_URL").unwrap_or_else(|_| "127.0.0.1:8080".to_string());
let username = env::var("ASYNQ_USERNAME").ok();
let password = env::var("ASYNQ_PASSWORD").ok();
info!(
"Connecting to asynq-server at {} with authentication",
ws_url
);
let broker =
std::sync::Arc::new(WebSocketBroker::with_basic_auth(&ws_url, username, password).await?);
info!("Successfully connected to asynq-server");
let mut mux = ServeMux::new();
mux.handle_async_func("email:send", |task: Task| async move {
match task.get_payload_with_json::<EmailPayload>() {
Ok(payload) => {
info!("📧 Processing email task:");
info!(" To: {}", payload.to);
info!(" Subject: {}", payload.subject);
info!(" Body: {}", payload.body);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
info!("✅ Email sent successfully to {}", payload.to);
Ok(())
}
Err(e) => {
tracing::error!("Failed to parse email payload: {}", e);
Err(e)
}
}
});
mux.handle_async_func("email:reminder", |task: Task| async move {
match task.get_payload_with_json::<EmailPayload>() {
Ok(payload) => {
info!("⏰ Processing delayed email reminder task:");
info!(" To: {}", payload.to);
info!(" Subject: {}", payload.subject);
info!(" Body: {}", payload.body);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
info!("✅ Reminder email sent successfully to {}", payload.to);
Ok(())
}
Err(e) => {
tracing::error!("Failed to parse email reminder payload: {}", e);
Err(e)
}
}
});
mux.handle_async_func("image:resize", |task: Task| async move {
match task.get_payload_with_json::<ImagePayload>() {
Ok(payload) => {
info!("🖼️ Processing image resize task:");
info!(" URL: {}", payload.url);
info!(" Dimensions: {}x{}", payload.width, payload.height);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
info!("✅ Image resized successfully");
Ok(())
}
Err(e) => {
tracing::error!("Failed to parse image payload: {}", e);
Err(e)
}
}
});
mux.handle_async_func("report:daily", |task: Task| async move {
match task.get_payload_with_json::<EmailPayload>() {
Ok(payload) => {
info!("📊 Processing daily report task:");
info!(" To: {}", payload.to);
info!(" Subject: {}", payload.subject);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
info!("✅ Daily report sent successfully");
Ok(())
}
Err(e) => {
tracing::error!("Failed to parse report payload: {}", e);
Err(e)
}
}
});
let mut queues = std::collections::HashMap::new();
queues.insert("critical".to_string(), 6); queues.insert("default".to_string(), 3); queues.insert("image_processing".to_string(), 2); queues.insert("low".to_string(), 1);
let config = asynq::config::ServerConfig::new()
.concurrency(4) .queues(queues);
info!("Starting consumer with 4 concurrent workers...");
let inspector = std::sync::Arc::new(WebSocketInspector::new());
let mut server = Server::with_broker_and_inspector(broker, inspector, config).await?;
info!("🚀 Consumer is ready and waiting for tasks...");
info!("Press Ctrl+C to stop");
server.run(mux).await?;
Ok(())
}