camel-processor 0.6.2

Message processors for rust-camel
Documentation
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use camel_api::{AggregatorConfig, Exchange, Message, Value};
use camel_processor::aggregator::{AggregatorService, SharedLanguageRegistry};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tower::ServiceExt;

fn bench_aggregator_sizes(c: &mut Criterion) {
    let mut group = c.benchmark_group("aggregator/messages");
    let rt = tokio::runtime::Runtime::new().unwrap();

    for size in [10, 100, 1000] {
        group.throughput(Throughput::Elements(size as u64));
        let payloads: Vec<String> = (0..size).map(|i| format!("msg-{i}")).collect();
        group.bench_with_input(BenchmarkId::new("aggregate", size), &size, |b, &size| {
            b.to_async(&rt).iter(|| {
                let config = AggregatorConfig::correlate_by("corr-id")
                    .complete_when_size(size)
                    .build();
                let (tx, _rx) = mpsc::channel(1024);
                let registry: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
                let cancel = CancellationToken::new();
                let svc = AggregatorService::new(config, tx, registry, cancel);
                let payloads = payloads.clone();
                async move {
                    for payload in payloads {
                        let mut ex = Exchange::new(Message::new(payload));
                        ex.input
                            .set_header("corr-id", Value::String("bucket-1".into()));
                        svc.clone().oneshot(ex).await.unwrap();
                    }
                }
            })
        });
    }
    group.finish();
}

criterion_group!(benches, bench_aggregator_sizes);
criterion_main!(benches);