dataflow-rs 3.0.1

A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust. Define rules with JSONLogic conditions, execute actions, and chain workflows.
Documentation
# Error Handling

Dataflow-rs provides flexible error handling at multiple levels to build resilient automation rules.

## Two complementary error channels

Every error encountered during `process_message` flows through two
complementary channels:

- **`message.errors()`** — **always** contains every error encountered:
  validation failures, task panics, 5xx-status outcomes, workflow
  wrappers. Callers that want a uniform view scan this list.
- **`Result::Err` from `process_message`** — signals **only** that the
  engine stopped before processing every workflow. Callers that want
  fail-fast match on it; the error pushed to `message.errors()` for the
  same failure carries the workflow context that the bare `Err` doesn't.

A workflow with `continue_on_error: true` records its errors to
`message.errors()` and returns `Ok(())`. A workflow with
`continue_on_error: false` records to `message.errors()` *and* returns
`Result::Err` (which short-circuits the rest of `process_message`).

## Error Levels

Errors can be handled at three levels:

1. **Action Level** - Individual action (task) error handling
2. **Rule Level** - Rule-wide (workflow) error policy
3. **Engine Level** - Processing errors

## Action-Level Error Handling

### Stop on Error (Default)

```json
{
    "id": "critical_action",
    "continue_on_error": false,
    "function": { ... }
}
```

If the action fails:
- Error is recorded in `message.errors()`
- Rule execution stops
- No further actions execute

### Continue on Error

```json
{
    "id": "optional_action",
    "continue_on_error": true,
    "function": { ... }
}
```

If the action fails:
- Error is recorded in `message.errors()`
- Rule continues to next action

## Rule-Level Error Handling

The rule's `continue_on_error` applies to all actions by default:

```json
{
    "id": "resilient_rule",
    "continue_on_error": true,
    "tasks": [
        {"id": "action1", "function": { ... }},
        {"id": "action2", "function": { ... }},
        {"id": "action3", "function": { ... }}
    ]
}
```

All actions will continue even if earlier actions fail.

### Override at Action Level

```json
{
    "id": "mixed_rule",
    "continue_on_error": true,
    "tasks": [
        {"id": "optional_action", "function": { ... }},
        {
            "id": "critical_action",
            "continue_on_error": false,
            "function": { ... }
        }
    ]
}
```

## Accessing Errors

After processing, walk `message.errors()`:

```rust
let result = engine.process_message(&mut message).await;

for error in message.errors() {
    println!("Error: {} in {}/{}",
        error.message,
        error.workflow_id.as_deref().unwrap_or("unknown"),
        error.task_id.as_deref().unwrap_or("unknown")
    );
}

// Fail-fast signal — true when the engine stopped before all workflows ran.
if let Err(e) = result {
    eprintln!("engine stopped early: {e}");
}
```

Common error codes you'll see:

- `VALIDATION_ERROR` — from the `validation` built-in
- `TASK_ERROR` — handler returned `Result::Err`
- `TASK_STATUS_ERROR` — handler returned `TaskOutcome::Status(s)` with `s >= 500`
- `WORKFLOW_ERROR` — wrapper recording workflow context for the failure above

## Error Types

### Validation Errors

Generated by the `validation` function when rules fail:

```json
{
    "function": {
        "name": "validation",
        "input": {
            "rules": [
                {
                    "condition": {"!!": {"var": "data.email"}},
                    "error_message": "Email is required"
                }
            ]
        }
    }
}
```

### Execution Errors

Generated when function execution fails:

- JSONLogic evaluation errors
- Data type mismatches
- Missing required fields

### Custom Function Errors

Return errors from custom functions via `Result::Err`:

```rust,ignore
use dataflow_rs::prelude::*;

impl AsyncFunctionHandler for MyFunction {
    type Input = serde_json::Value;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        _input: &serde_json::Value,
    ) -> Result<TaskOutcome> {
        if some_condition {
            return Err(DataflowError::Task(
                "Custom error message".to_string()
            ));
        }
        Ok(TaskOutcome::Success)
    }
}
```

`DataflowError` provides typed variants for the most common cases —
`Validation`, `Task`, `Workflow`, `FunctionExecution`, `FunctionNotFound`,
`Http`, `Timeout`, `Io`, `LogicEvaluation`, `Deserialization`, `Unknown`.
See the [API reference](../api/reference.md#dataflowerror) for the full list.

## Error Recovery Patterns

### Fallback Values

Use conditions to provide fallback values:

```json
{
    "tasks": [
        {
            "id": "try_primary",
            "continue_on_error": true,
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.result", "logic": {"var": "data.primary"}}
                    ]
                }
            }
        },
        {
            "id": "use_fallback",
            "condition": {"!": {"var": "temp_data.result"}},
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": "default_value"}
                    ]
                }
            }
        }
    ]
}
```

### Validation Before Processing

Validate data before critical operations:

```json
{
    "tasks": [
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {"condition": {"!!": {"var": "data.required_field"}}, "error_message": "Required field missing"}
                    ]
                }
            }
        },
        {
            "id": "process",
            "function": { ... }
        }
    ]
}
```

If validation fails, the rule stops before further processing.

## Try It

> **Want more features?** Try the [Full Debugger UI](/dataflow-rs/debugger/) with step-by-step execution and workflow visualization.

<div class="playground-widget" data-workflows='[{"id":"error_demo","name":"Error Demo","continue_on_error":true,"tasks":[{"id":"parse","name":"Parse Payload","function":{"name":"parse_json","input":{"source":"payload","target":"input"}}},{"id":"validate_email","name":"Validate Email","function":{"name":"validation","input":{"rules":[{"logic":{"!!":[{"var":"data.input.email"}]},"message":"Email is required"}]}}},{"id":"greet","name":"Greet User","function":{"name":"map","input":{"mappings":[{"path":"data.greeting","logic":{"cat":["Hello, ",{"var":"data.input.name"},"!"]}}]}}}]}]' data-payload='{"name":"John"}'>
</div>

Notice the validation error is recorded but processing continues.

## Best Practices

1. **Validate Early**
   - Add validation actions at the start of rules
   - Fail fast on invalid data

2. **Use continue_on_error Wisely**
   - Only for truly optional actions
   - Critical operations should stop on error

3. **Check Errors**
   - Always check `message.errors()` after processing
   - Log errors for monitoring

4. **Provide Context**
   - Include meaningful error messages
   - Include field paths in validation errors