dataflow-rs 2.1.5

A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust. Define rules with JSONLogic conditions, execute actions, and chain workflows.
Documentation
# Performance

Dataflow-rs is designed for high-performance rule evaluation and data processing with minimal overhead.

## Architecture for Performance

### Pre-compilation

All JSONLogic expressions are compiled once at engine startup:

```rust
// This compiles all logic at creation time
let engine = Engine::new(workflows, None);

// Runtime processing uses pre-compiled logic
// No parsing or compilation overhead
engine.process_message(&mut message).await?;
```

### Benefits of Pre-compilation

- **Zero runtime parsing** - No JSON parsing during message processing
- **Cached compiled logic** - O(1) access to compiled expressions
- **Early validation** - Invalid expressions caught at startup
- **Consistent latency** - Predictable performance per message

### Memory Efficiency

- **Arc-wrapped compiled logic** - Shared without copying
- **Immutable workflows** - Safe concurrent access
- **Context caching** - Avoids repeated JSON cloning

## Benchmarking

Run the included benchmark:

```bash
cargo run --example benchmark --release
```

### Sample Benchmark

```rust
use dataflow_rs::{Engine, Workflow, Message};
use std::time::Instant;

// Setup
let workflow = Workflow::from_json(workflow_json)?;
let engine = Engine::new(vec![workflow], None);

// Benchmark
let iterations = 10_000;
let start = Instant::now();

for _ in 0..iterations {
    let mut message = Message::new(&test_data);
    engine.process_message(&mut message).await?;
}

let elapsed = start.elapsed();
println!("Processed {} messages in {:?}", iterations, elapsed);
println!("Average: {:?} per message", elapsed / iterations);
```

## Optimization Tips

### 1. Minimize Mappings

Combine related transformations:

```json
// Less efficient: Multiple mappings
{
    "mappings": [
        {"path": "data.a", "logic": {"var": "data.source.a"}},
        {"path": "data.b", "logic": {"var": "data.source.b"}},
        {"path": "data.c", "logic": {"var": "data.source.c"}}
    ]
}

// More efficient: Single object mapping when possible
{
    "mappings": [
        {"path": "data", "logic": {"var": "data.source"}}
    ]
}
```

### 2. Use Conditions Wisely

Skip unnecessary processing with conditions:

```json
{
    "id": "expensive_task",
    "condition": {"==": [{"var": "metadata.needs_processing"}, true]},
    "function": { ... }
}
```

### 3. Order Rules by Frequency

Put frequently-executed rules earlier (lower priority):

```json
{"id": "common_rule", "priority": 1, ...}
{"id": "rare_rule", "priority": 100, ...}
```

### 4. Use temp_data

Store intermediate results to avoid recomputation:

```json
{
    "mappings": [
        {
            "path": "temp_data.computed",
            "logic": {"expensive": "computation"}
        },
        {
            "path": "data.result1",
            "logic": {"var": "temp_data.computed"}
        },
        {
            "path": "data.result2",
            "logic": {"var": "temp_data.computed"}
        }
    ]
}
```

### 5. Avoid Unnecessary Validation

Validate only what's necessary:

```json
// Validate at system boundaries
{
    "id": "input_validation",
    "condition": {"==": [{"var": "metadata.source"}, "external"]},
    "tasks": [
        {"id": "validate", "function": {"name": "validation", ...}}
    ]
}
```

## Concurrent Processing

Process multiple messages concurrently:

```rust
use std::sync::Arc;
use tokio::task;

let engine = Arc::new(Engine::new(workflows, None));

let handles: Vec<_> = messages.into_iter()
    .map(|mut msg| {
        let engine = Arc::clone(&engine);
        task::spawn(async move {
            engine.process_message(&mut msg).await
        })
    })
    .collect();

// Wait for all
for handle in handles {
    handle.await??;
}
```

### Thread Safety

- Engine is `Send + Sync`
- Compiled logic shared via `Arc`
- Each message processed independently

## Memory Considerations

### Large Messages

For very large messages, consider:

1. **Streaming** - Process chunks instead of entire payload
2. **Selective Loading** - Load only needed fields
3. **Cleanup temp_data** - Clear intermediate results when done

### Many Rules

For many rules:

1. **Organize by Domain** - Group related rules
2. **Use Conditions** - Skip irrelevant rules early
3. **Profile** - Identify bottleneck rules

## Profiling

### Enable Logging

```rust
env_logger::Builder::from_env(
    env_logger::Env::default().default_filter_or("debug")
).init();
```

### Custom Metrics

```rust
use std::time::Instant;

let start = Instant::now();
engine.process_message(&mut message).await?;
let duration = start.elapsed();

metrics::histogram!("dataflow.processing_time", duration);
```

## Production Recommendations

1. **Build with --release** - Debug builds are significantly slower
2. **Pre-warm** - Process a few messages at startup to warm caches
3. **Monitor** - Track processing times and error rates
4. **Profile** - Identify slow rules in production
5. **Scale Horizontally** - Engine is stateless, scale with instances