#![allow(non_upper_case_globals)]
use anyhow::Result;
use async_trait::async_trait;
use celery::prelude::*;
use env_logger::Env;
use structopt::StructOpt;
use tokio::time::{self, Duration};
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
Ok(x + y)
}
#[celery::task(max_retries = 3)]
async fn buggy_task() -> TaskResult<()> {
let data = tokio::fs::read("this-file-doesn't-exist")
.await
.with_unexpected_err(|| {
"This error is part of the example, it is used to showcase error handling"
})?;
println!("Read {} bytes", data.len());
Ok(())
}
#[celery::task(max_retries = 2)]
async fn long_running_task(secs: Option<u64>) {
let secs = secs.unwrap_or(10);
time::sleep(Duration::from_secs(secs)).await;
}
#[celery::task(bind = true)]
fn bound_task(task: &Self) {
println!("{:?}", task.request.origin);
println!("{:?}", task.request.hostname);
}
#[derive(Debug, StructOpt)]
#[structopt(
name = "celery_app",
about = "Run a Rust Celery producer or consumer.",
setting = structopt::clap::AppSettings::ColoredHelp,
)]
enum CeleryOpt {
Consume,
Produce {
#[structopt(possible_values = &["add", "buggy_task", "bound_task", "long_running_task"])]
tasks: Vec<String>,
},
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let opt = CeleryOpt::from_args();
let my_app = celery::app!(
broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672".into()) },
tasks = [
add,
buggy_task,
long_running_task,
bound_task,
],
task_routes = [
"buggy_task" => "buggy-queue",
"*" => "celery",
],
prefetch_count = 2,
heartbeat = Some(10),
).await?;
match opt {
CeleryOpt::Consume => {
my_app.display_pretty().await;
my_app.consume_from(&["celery", "buggy-queue"]).await?;
}
CeleryOpt::Produce { tasks } => {
if tasks.is_empty() {
my_app.send_task(add::new(1, 2)).await?;
my_app.send_task(bound_task::new()).await?;
my_app.send_task(add::new(1, 3).with_countdown(3)).await?;
my_app.send_task(buggy_task::new()).await?;
my_app
.send_task(long_running_task::new(Some(3)).with_time_limit(2))
.await?;
for _ in 0..100 {
my_app
.send_task(long_running_task::new(Some(10)).with_time_limit(20))
.await?;
}
} else {
for task in tasks {
match task.as_str() {
"add" => my_app.send_task(add::new(1, 2)).await?,
"bound_task" => my_app.send_task(bound_task::new()).await?,
"buggy_task" => my_app.send_task(buggy_task::new()).await?,
"long_running_task" => {
my_app.send_task(long_running_task::new(Some(3))).await?
}
_ => panic!("unknown task"),
};
}
}
}
};
my_app.close().await?;
Ok(())
}