Task

Trait Task 

Source
pub trait Task: Send + Sync {
    // Required method
    fn run<'life0, 'async_trait>(
        &'life0 self,
        context: Context,
    ) -> Pin<Box<dyn Future<Output = Result<TaskResult>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;

    // Provided method
    fn id(&self) -> &str { ... }
}
Expand description

Core trait that all tasks must implement.

Tasks are the building blocks of your workflow. Each task represents a unit of work that can access shared context and control the flow of execution.

§Examples

§Basic Task

use graph_flow::{Task, TaskResult, NextAction, Context};
use async_trait::async_trait;

struct GreetingTask;

#[async_trait]
impl Task for GreetingTask {
    fn id(&self) -> &str {
        "greeting"
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        let name: String = context.get("name").await.unwrap_or("World".to_string());
        let greeting = format!("Hello, {}!", name);
         
        Ok(TaskResult::new(Some(greeting), NextAction::Continue))
    }
}

§Task with Default ID

struct DefaultIdTask;

#[async_trait]
impl Task for DefaultIdTask {
    // id() is automatically implemented using the type name
     
    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        Ok(TaskResult::new(None, NextAction::End))
    }
}

§Complex Task with Error Handling

struct ValidationTask {
    max_retries: usize,
}

#[async_trait]
impl Task for ValidationTask {
    fn id(&self) -> &str {
        "validator"
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        let data: Option<String> = context.get("data").await;
        let retry_count: usize = context.get("retry_count").await.unwrap_or(0);
         
        match data {
            Some(data) if self.validate(&data) => {
                context.set("retry_count", 0).await; // Reset counter
                Ok(TaskResult::new(
                    Some("Validation passed".to_string()),
                    NextAction::Continue
                ))
            }
            Some(_) if retry_count < self.max_retries => {
                context.set("retry_count", retry_count + 1).await;
                Ok(TaskResult::new(
                    Some("Validation failed, retrying...".to_string()),
                    NextAction::GoTo("data_input".to_string())
                ))
            }
            _ => {
                Err(GraphError::TaskExecutionFailed(
                    "Validation failed after max retries".to_string()
                ))
            }
        }
    }
}

impl ValidationTask {
    fn validate(&self, data: &str) -> bool {
        !data.is_empty() && data.len() > 5
    }
}

Required Methods§

Source

fn run<'life0, 'async_trait>( &'life0 self, context: Context, ) -> Pin<Box<dyn Future<Output = Result<TaskResult>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Execute the task with the given context.

This is where you implement your task’s logic. You have access to the shared context for reading input data and storing results.

§Parameters
  • context - Shared context containing workflow state and data
§Returns

Returns a Result<TaskResult> where:

  • Ok(TaskResult) indicates successful execution
  • Err(GraphError) indicates an error that should stop the workflow
§Examples
struct DataProcessor;

#[async_trait]
impl Task for DataProcessor {
    fn id(&self) -> &str {
        "data_processor"
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        // Read input from context
        let input: String = context.get("raw_data").await
            .unwrap_or_default();
         
        // Process the data
        let processed = self.process_data(&input).await?;
         
        // Store result for next task
        context.set("processed_data", processed.clone()).await;
         
        // Return result with next action
        Ok(TaskResult::new(
            Some(format!("Processed {} bytes", processed.len())),
            NextAction::Continue
        ))
    }
}

impl DataProcessor {
    async fn process_data(&self, input: &str) -> graph_flow::Result<String> {
        // Your processing logic here
        Ok(input.to_uppercase())
    }
}

Provided Methods§

Source

fn id(&self) -> &str

Unique identifier for this task.

By default, this returns the type name of the implementing struct. Override this method if you need a custom identifier.

§Examples
// Using default implementation (type name)
struct MyTask;

#[async_trait]
impl Task for MyTask {
    // id() will return "my_module::MyTask"
    async fn run(&self, _context: Context) -> graph_flow::Result<TaskResult> {
        Ok(TaskResult::new(None, NextAction::End))
    }
}

// Using custom ID
struct CustomTask;

#[async_trait]
impl Task for CustomTask {
    fn id(&self) -> &str {
        "custom_task_id"
    }

    async fn run(&self, _context: Context) -> graph_flow::Result<TaskResult> {
        Ok(TaskResult::new(None, NextAction::End))
    }
}

Implementors§