Skip to main content

task

Attribute Macro task 

Source
#[task]
Expand description

Attribute macro that generates a task wrapper with checkpointing, retry, and streaming.

The annotated function becomes the inner task body. The macro generates a wrapper function (prefixed with __task_) that:

  • Checks ExecutionLog for cached results (resume-skip path)
  • Emits StreamEvent::node_start and StreamEvent::node_end events
  • Implements retry logic when retry(max_attempts, backoff) is specified
  • Calls record_completion() on success
  • Calls record_failure() after all retries are exhausted

§Requirements

  • The function must be async
  • The function must accept &mut TaskContext as its first argument

§Attributes

  • retry(max_attempts = N, backoff = "Xs") — retry on failure with exponential backoff
  • rerun_on_resume — always re-execute on workflow resume, skip cached results
  • rerun_on_resume = true / rerun_on_resume = false — explicit boolean form

§Examples

use adk_graph::functional::TaskContext;
use adk_graph::error::Result;
use serde_json::Value;

#[task(retry(max_attempts = 3, backoff = "1s"))]
async fn step_a(ctx: &mut TaskContext, input: &str) -> Result<Value> {
    Ok(serde_json::json!({"processed": input}))
}

#[task(rerun_on_resume)]
async fn step_b(ctx: &mut TaskContext) -> Result<Value> {
    // This task always re-executes on resume, never uses cached results
    Ok(serde_json::json!({"timestamp": chrono::Utc::now().to_rfc3339()}))
}

#[task(rerun_on_resume, retry(max_attempts = 2, backoff = "2s"))]
async fn step_c(ctx: &mut TaskContext) -> Result<Value> {
    // Combined: re-executes on resume with retry logic
    Ok(serde_json::json!({"status": "ok"}))
}

// Generates: async fn __task_step_a(ctx: &mut TaskContext, input: &str) -> Result<Value>
// which wraps step_a with checkpoint/retry/streaming logic.