celery 0.5.5

Rust implementation of Celery
Documentation
#!/usr/bin/env python3

import argparse
import os
import sys
import time

from celery import Celery
from celery.bin.celery import main as _main


my_app = Celery("celery", broker=os.environ.get("AMQP_ADDR", "amqp://127.0.0.1:5672"))
my_app.conf.update(
    result_backend=None,
    task_ignore_result=True,
    task_routes=(
        [("buggy_task", {"queue": "buggy-queue"})],
        [("*", {"queue": "celery"})],
    ),
)


# NOTE: we have to set the name for tasks manually in order to match the names
# of the Rust tasks. Otherwise the task names here would be prefixed with 'celery.'.
@my_app.task(name="add")
def add(x, y):
    return x + y


@my_app.task(
    name="buggy_task",
    max_retries=3,
    autoretry_for=(RuntimeError,),
    retry_backoff=True,
)
def buggy_task():
    raise RuntimeError("This error is part of the example: it is used to showcase error handling")


@my_app.task(name="long_running_task", max_retries=2)
def long_running_task(secs: int = 10):
    time.sleep(secs)


@my_app.task(name="bound_task", bind=True)
def bound_task(task):
    # Print some info about the request for debugging.
    print(task.request.origin)
    print(task.request.hostname)


def parse_args():
    parser = argparse.ArgumentParser(
        "celery_app", description="Run a Python Celery producer or consumer"
    )
    parser.add_argument("mode", choices=["consume", "produce"])
    parser.add_argument(
        "task", nargs="*", choices=["add", "buggy_task", "long_running_task", "bound_task"]
    )
    return parser.parse_args()


def main():
    opts = parse_args()
    if opts.mode == "consume":
        sys.argv = [
            "celery",
            "--app=celery_app.my_app",
            "worker",
            "-Q=celery,buggy-queue",
            "-Ofair",
            "--loglevel=info",
        ]
        _main()
    else:
        if opts.task:
            for task in opts.task:
                if task == "add":
                    add.apply_async(args=(1, 0))
                elif task == "buggy_task":
                    buggy_task.apply_async()
                elif task == "long_running_task":
                    long_running_task.apply_async()
                else:
                    buggy_task.apply_async()
        else:
            # Basic task sending.
            add.apply_async(args=(1, 0))
            bound_task.apply_async()

            # Send with additional options like `countdown`.
            add.apply_async(args=(1, 3), countdown=3)

            # Send the buggy task that will fail and be retried a few times.
            buggy_task.apply_async()

            # Send the long running task that will fail with a timeout error.
            long_running_task.apply_async(args=(3,), time_limit=2)


if __name__ == "__main__":
    main()