Skip to main content

worker

Attribute Macro worker 

Source
#[worker]
Expand description

Marks an async function as a Conductor worker task.

This macro transforms a regular async function into a Conductor worker that can be registered with a TaskHandler. The function’s parameters are automatically extracted from the task’s input data.

§Attributes

  • name - Task definition name (defaults to function name)
  • poll_interval - Poll interval in milliseconds (default: 100)
  • thread_count - Maximum concurrent executions (default: 1)
  • domain - Task routing domain (optional)
  • identity - Worker identity (optional)

§Function Signatures

§Simple parameters extracted from task input

#[worker(name = "greet")]
async fn greet(name: String) -> String {
    format!("Hello, {}!", name)
}

§With Task parameter for full access

#[worker(name = "process")]
async fn process(task: Task) -> WorkerOutput {
    let data = task.get_input_string("data").unwrap();
    WorkerOutput::completed_with_result(data)
}

§With TaskContext for metadata access

#[worker(name = "long_running")]
async fn long_running(ctx: TaskContext, batch_size: i32) -> WorkerOutput {
    let offset = ctx.poll_count() * batch_size;
    // Process batch at offset...
     
    if ctx.is_first_poll() {
        println!("Starting task {}", ctx.task_id());
    }
     
    WorkerOutput::in_progress(30)
}

§Combined Task and TaskContext

#[worker(name = "advanced")]
async fn advanced(task: Task, ctx: TaskContext) -> WorkerOutput {
    // Full task access and convenient context methods
    let input: String = task.get_input("data").unwrap_or_default();
    println!("Processing task {} (poll #{})", ctx.task_id(), ctx.poll_count());
    WorkerOutput::completed_with_result(input)
}

§Return Types

  • String or any serializable type - Wrapped in WorkerOutput::completed_with_result()
  • WorkerOutput - Used directly
  • Result<T, E> - Success wrapped in completed, error converted to failed

§Generated Code

The macro generates a function {fn_name}_worker() that returns an FnWorker:

#[worker(name = "greet", thread_count = 5)]
async fn greet(name: String) -> String {
    format!("Hello, {}!", name)
}

// Generates:
fn greet_worker() -> FnWorker {
    FnWorker::new("greet", |task| async move {
        let name: String = task.get_input("name").unwrap_or_default();
        let result = format!("Hello, {}!", name);
        Ok(WorkerOutput::completed_with_result(result))
    })
    .with_thread_count(5)
}

§Usage

use conductor_macros::worker;
use conductor::{TaskHandler, Configuration};

#[worker(name = "process_order", thread_count = 10, domain = "orders")]
async fn process_order(order_id: String, amount: f64) -> serde_json::Value {
    serde_json::json!({
        "order_id": order_id,
        "amount": amount,
        "status": "processed"
    })
}

#[tokio::main]
async fn main() {
    let config = Configuration::default();
    let mut handler = TaskHandler::new(config).unwrap();
     
    // Use the generated worker function
    handler.add_worker(process_order_worker());
     
    handler.start().await.unwrap();
}