streameroo 0.4.4

Compilation of mini-frameworks & utilities for data-streaming applications
Documentation
use std::collections::HashMap;
use std::ops::Deref;

use super::DeliveryContext;
use amqprs::{BasicProperties, FieldTable, FieldValue, ShortStr};
use opentelemetry::Context;
use opentelemetry::propagation::{Extractor, Injector};
use opentelemetry::trace::SpanKind;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_opentelemetry_instrumentation_sdk::otel_trace_span;

pub struct ValueStore<'a> {
    table: &'a FieldTable,
    extra_values: HashMap<&'a str, String>,
}

impl<'a> ValueStore<'a> {
    pub fn new(table: &'a FieldTable) -> Self {
        let mut extra_values = HashMap::new();
        for (key, value) in table.as_ref().iter() {
            match value {
                FieldValue::S(_) => {}
                value => {
                    extra_values.insert(key.as_ref().as_str(), value.to_string());
                }
            }
        }
        Self {
            table,
            extra_values,
        }
    }
}

impl Extractor for ValueStore<'_> {
    fn get(&self, key: &str) -> Option<&str> {
        let short_key = ShortStr::try_from(key).ok()?;
        if let Some(value) = self.table.get(&short_key) {
            match value {
                FieldValue::S(long_str) => Some(long_str.as_ref().as_str()),
                _ => None,
            }
        } else if let Some(value) = self.extra_values.get(key) {
            Some(value.as_str())
        } else {
            None
        }
    }

    fn keys(&self) -> Vec<&str> {
        let map = self.table.as_ref();
        map.keys()
            .map(|k| k.as_ref().as_str())
            .chain(self.extra_values.keys().map(Deref::deref))
            .collect()
    }
}

pub struct HeaderInjector<'a>(&'a mut FieldTable);

impl Injector for HeaderInjector<'_> {
    fn set(&mut self, key: &str, value: String) {
        if let Ok(key) = ShortStr::try_from(key) {
            self.0.insert(key, value.into());
        }
    }
}

pub fn inject_context(context: &Context, headers: &mut FieldTable) {
    let mut injector = HeaderInjector(headers);
    opentelemetry::global::get_text_map_propagator(|propagator| {
        propagator.inject_context(context, &mut injector);
    });
}

pub fn extract_context(headers: &FieldTable) -> Context {
    let extractor = ValueStore::new(headers);
    opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor))
}

pub fn make_span_from_delivery_context(delivery_context: &DeliveryContext) -> tracing::Span {
    let span = make_span_from_properties(
        &delivery_context.properties,
        SpanKind::Consumer,
        &delivery_context.exchange,
        &delivery_context.routing_key,
    );
    span.set_attribute("delivery_tag", delivery_context.delivery_tag.to_string());
    if let Some(field_table) = delivery_context.properties.headers() {
        let context = extract_context(field_table);
        if let Err(e) = span.set_parent(context) {
            tracing::warn!("Failed to set parent context for span: {e}");
        }
    }
    span
}

pub fn make_span_from_properties(
    properties: &BasicProperties,
    kind: SpanKind,
    exchange: &str,
    routing_key: &str,
) -> tracing::Span {
    let name = if exchange.is_empty() {
        routing_key.to_string()
    } else {
        format!("{exchange}.{routing_key}")
    };
    otel_trace_span!(
        "AMQP Event",
        otel.name = name,
        otel.kind = ?kind,
        amqp.exchange = exchange,
        amqp.routing_key = routing_key,
        amqp.correlation_id = properties.correlation_id(),
        amqp.reply_to = properties.reply_to(),
        amqp.content_type = properties.content_type(),
    )
}