rustcdc 0.6.2

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
#![cfg(feature = "metrics")]

use std::{collections::HashMap, time::Duration};

use rustcdc::{OTelConfig, OTelEventTracer};
use serde_json::Value;
use testcontainers::{
    core::{IntoContainerPort, WaitFor},
    runners::AsyncRunner,
    GenericImage, ImageExt,
};

#[tokio::test]
async fn otel_tracing_exports_hierarchy_and_crash_retry_to_jaeger() -> rustcdc::Result<()> {
    if std::env::var("CDC_RS_RUN_DOCKER_TESTS").as_deref() != Ok("1") {
        eprintln!("skipping otel tracing integration test (set CDC_RS_RUN_DOCKER_TESTS=1)");
        return Ok(());
    }

    let container = GenericImage::new("jaegertracing/all-in-one", "1.57")
        .with_exposed_port(4317.tcp())
        .with_exposed_port(16686.tcp())
        .with_wait_for(WaitFor::message_on_stderr(
            "Starting jaeger-collector gRPC server",
        ))
        .with_env_var("COLLECTOR_OTLP_ENABLED", "true")
        .start()
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let host = container
        .get_host()
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
    let otlp_port = container
        .get_host_port_ipv4(4317.tcp())
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
    let ui_port = container
        .get_host_port_ipv4(16686.tcp())
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let service_name = format!("rustcdc-otel-tracing-{}", std::process::id());
    let tracer = OTelEventTracer::with_otlp_exporter(OTelConfig::new(
        format!("http://{host}:{otlp_port}"),
        service_name.clone(),
        "test",
        "integration",
    ))?
    .with_source_type("postgres");

    tracer.start_snapshot_span("snapshot-root", "public.users", 1000);
    tracer.start_snapshot_chunk_span("chunk-1", "snapshot-root", "public.users", 0, 500);
    tracer.end_span("chunk-1");
    tracer.end_span("snapshot-root");

    tracer.start_handoff_span("handoff-1", 17, None);

    let mut stream_attrs = HashMap::new();
    stream_attrs.insert("source.table".to_string(), "public.users".to_string());
    stream_attrs.insert("stream.events_count".to_string(), "42".to_string());
    tracer.start_span_with_parent(
        "stream-1",
        "rustcdc.stream",
        stream_attrs,
        Some("handoff-1"),
    );

    tracer.start_transform_span(
        "transform-crash",
        "mask_hash",
        Some("public.users"),
        Some("stream-1"),
    );
    tracer.end_span_with_status("transform-crash", "panic", Some("transform_crash"));

    tracer.start_transform_span(
        "transform-retry",
        "mask_hash",
        Some("public.users"),
        Some("stream-1"),
    );
    tracer.end_span("transform-retry");

    tracer.end_span("stream-1");
    tracer.end_span("handoff-1");

    tokio::time::timeout(
        Duration::from_secs(3),
        tokio::task::spawn_blocking(move || tracer.shutdown()),
    )
    .await
    .map_err(|_| rustcdc::Error::TimeoutError("otel tracer shutdown timed out".to_string()))?
    .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let client = reqwest::Client::builder()
        .timeout(Duration::from_secs(1))
        .build()
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let mut validated = false;
    for _ in 0..30 {
        let response = client
            .get(format!(
                "http://{host}:{ui_port}/api/traces?service={service_name}&limit=20"
            ))
            .send()
            .await
            .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

        if !response.status().is_success() {
            tokio::time::sleep(Duration::from_millis(300)).await;
            continue;
        }

        let payload: Value = response
            .json()
            .await
            .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

        if payload_contains_required_hierarchy_and_retry(&payload) {
            validated = true;
            break;
        }

        tokio::time::sleep(Duration::from_millis(300)).await;
    }

    assert!(
        validated,
        "expected snapshot->chunk and handoff->stream hierarchy with crash/retry transform traces"
    );

    Ok(())
}

fn payload_contains_required_hierarchy_and_retry(payload: &Value) -> bool {
    let Some(traces) = payload.get("data").and_then(Value::as_array) else {
        return false;
    };

    let mut snapshot_parent_ok = false;
    let mut handoff_stream_ok = false;
    let mut saw_crash = false;
    let mut saw_retry = false;

    for trace in traces {
        let Some(spans) = trace.get("spans").and_then(Value::as_array) else {
            continue;
        };

        snapshot_parent_ok |=
            child_relation_exists(spans, "rustcdc.snapshot", "rustcdc.snapshot.chunk");
        handoff_stream_ok |= child_relation_exists(spans, "rustcdc.handoff", "rustcdc.stream");

        for span in spans {
            if span
                .get("operationName")
                .and_then(Value::as_str)
                .is_none_or(|name| name != "rustcdc.event.transform")
            {
                continue;
            }

            let has_crash_tag = span_has_tag(span, "error.type", "transform_crash")
                || span_has_tag(span, "error.type", "panic");

            if has_crash_tag {
                saw_crash = true;
            } else {
                saw_retry = true;
            }
        }
    }

    snapshot_parent_ok && handoff_stream_ok && saw_crash && saw_retry
}

fn child_relation_exists(spans: &[Value], parent_operation: &str, child_operation: &str) -> bool {
    let mut parent_span_id = None;
    for span in spans {
        if span
            .get("operationName")
            .and_then(Value::as_str)
            .is_some_and(|name| name == parent_operation)
        {
            parent_span_id = span
                .get("spanID")
                .and_then(Value::as_str)
                .map(ToOwned::to_owned);
            break;
        }
    }

    let Some(parent_span_id) = parent_span_id else {
        return false;
    };

    for span in spans {
        if span
            .get("operationName")
            .and_then(Value::as_str)
            .is_none_or(|name| name != child_operation)
        {
            continue;
        }

        let references = span
            .get("references")
            .and_then(Value::as_array)
            .cloned()
            .unwrap_or_default();

        for reference in references {
            let is_child = reference
                .get("refType")
                .and_then(Value::as_str)
                .is_some_and(|value| value == "CHILD_OF");
            let parent_id_matches = reference
                .get("spanID")
                .and_then(Value::as_str)
                .is_some_and(|value| value == parent_span_id);
            if is_child && parent_id_matches {
                return true;
            }
        }
    }

    false
}

fn span_has_tag(span: &Value, key: &str, expected_value: &str) -> bool {
    let Some(tags) = span.get("tags").and_then(Value::as_array) else {
        return false;
    };

    tags.iter().any(|tag| {
        tag.get("key")
            .and_then(Value::as_str)
            .is_some_and(|tag_key| tag_key == key)
            && tag
                .get("value")
                .and_then(Value::as_str)
                .is_some_and(|value| value == expected_value)
    })
}