amqpsy 0.1.0

Extremely opinionated AMQP PubSub library
Documentation
use std::collections::HashMap;

use opentelemetry::{
    global::{self},
    propagation::{Extractor, Injector},
    trace::{SpanKind, TraceContextExt, Tracer},
    Context, KeyValue,
};

use crate::protogen::amqpsy::message::AmqpMessageWrapper;

pub fn put_trace_context(
    prop: &mut AmqpMessageWrapper,
    span_name: &str,
    attrs: Vec<KeyValue>,
    parent_ctx: Context,
) -> Context {
    let tracer = global::tracer(file!());
    let span = tracer
        .span_builder(String::from(span_name))
        .with_kind(SpanKind::Client)
        .with_attributes(attrs)
        .start_with_context(&tracer, &parent_ctx);

    let cx = Context::current_with_span(span);

    global::get_text_map_propagator(|p| {
        let mut injector = AmqpMessageWrapperInjector(prop);
        p.inject_context(&cx, &mut injector);
    });

    cx
}

pub fn extract_context_from_hash_map(prop: &HashMap<String, String>) -> opentelemetry::Context {
    global::get_text_map_propagator(|p| p.extract(&HashMapExtractor(prop)))
}

pub struct AmqpMessageWrapperInjector<'a>(pub &'a mut AmqpMessageWrapper);
pub struct HashMapExtractor<'a>(pub &'a HashMap<String, String>);

impl Injector for AmqpMessageWrapperInjector<'_> {
    /// Set a key and value in the HeaderMap.  Does nothing if the key or value are not valid inputs.
    fn set(&mut self, key: &str, value: String) {
        if self.0.headers.contains_key(key) {
            return;
        }

        if value.is_empty() {
            return;
        }

        self.0.headers.insert(key.to_string(), value);
    }
}

impl Extractor for HashMapExtractor<'_> {
    fn get(&self, key: &str) -> Option<&str> {
        self.0.get(key).map(|v| v.as_str())
    }

    fn keys(&self) -> Vec<&str> {
        self.0.keys().map(|k| k.as_str()).collect()
    }
}