shors 0.12.6

Transport layer for cartridge + tarantool-module projects.
Documentation
use crate::shors_debug;
use crate::transport::rpc::route::{Handler, Middleware};
use crate::transport::rpc::Request;
use crate::transport::Context;
use opentelemetry::sdk::trace::Tracer as OTELTracer;
use opentelemetry::trace::{Span, SpanBuilder, SpanKind, StatusCode, TraceContextExt, Tracer};
use std::fmt::Debug;
use std::time::Instant;

pub fn debug<E>(handler: Handler<E>) -> Handler<E>
where
    E: 'static,
{
    (move |ctx: &mut Context, req| {
        shors_debug!(ctx: ctx, "call {} with {:?}", ctx.endpoint_path(), req);
        handler(ctx, req)
    })
    .into()
}

pub fn record_latency<E>(hist: &'static prometheus::HistogramVec) -> Middleware<E>
where
    E: 'static,
{
    Middleware(Box::new(move |call| {
        Handler(Box::new(move |ctx, req| {
            let start_time = Instant::now();

            let call_result = call(ctx, req);
            let status = if call_result.is_ok() { "OK" } else { "ERR" };

            hist.with_label_values(&[ctx.endpoint_path(), status])
                .observe(start_time.elapsed().as_secs_f64());

            call_result
        }))
    }))
}

pub fn otel<E>(tracer: &'static OTELTracer) -> Middleware<E>
where
    E: Debug + 'static,
{
    (move |handler: Handler<E>| {
        (move |ctx: &mut Context, req: Request| {
            if !ctx.tracing_enabled() {
                return handler(ctx, req);
            }

            let trace_ctx = ctx.extract();

            let span_builder = SpanBuilder::from_name(format!("storage {}", ctx.endpoint_path()))
                .with_kind(SpanKind::Server);
            let mut span = tracer.build_with_context(span_builder, &trace_ctx);

            let span_ctx = span.span_context().clone();
            ctx.inject(&trace_ctx.with_remote_span_context(span_ctx));

            handler(ctx, req).map_err(|e| {
                span.set_status(StatusCode::Error, format!("{:?}", e));
                e
            })
        })
        .into()
    })
    .into()
}

pub mod client {
    use crate::builtin::utils::retry;
    use crate::shors_warn;
    use crate::tarantool::tlua::{LuaState, Push};
    use crate::transport::rpc::client::{Middleware, RemoteLuaCall};
    use opentelemetry::sdk::trace::Tracer as OTELTracer;
    use opentelemetry::trace::{Span, SpanBuilder, SpanKind, TraceContextExt, Tracer};
    use serde::Serialize;
    use std::fmt::Debug;
    use std::time::Instant;

    pub fn otel<'a, ID, A>(tracer: &'static OTELTracer) -> Middleware<'a, ID, A>
    where
        A: Serialize + Debug + 'a,
        ID: Push<LuaState> + Debug + 'a,
    {
        Middleware(Box::new(move |call| {
            RemoteLuaCall(Box::new(move |ctx, id, path, opts, args| {
                if !ctx.tracing_enabled() {
                    return call(ctx, id, path, opts, args);
                }

                let original_ctx = ctx.extract();
                let call_context = original_ctx.clone();

                let call_result = {
                    let span_builder =
                        SpanBuilder::from_name(format!("call {}, id: {:?}", path, id))
                            .with_kind(SpanKind::Client);
                    let span = tracer.build_with_context(span_builder, &call_context);
                    let span_ctx = span.span_context().clone();
                    ctx.inject(&call_context.with_remote_span_context(span_ctx));

                    call(ctx, id, path, opts, args)
                };
                ctx.inject(&original_ctx);

                call_result
            }))
        }))
    }

    pub fn retry<'a, ID, A>(count: usize) -> Middleware<'a, ID, A>
    where
        A: Serialize + Debug + Copy + 'a,
        ID: Push<LuaState> + Debug + Copy + 'a,
    {
        Middleware(Box::new(move |call| {
            RemoteLuaCall(Box::new(move |ctx, id, path, opts, args| {
                //100ms, 200ms, 300ms
                let delays = retry::Fibonacci::from_millis(100).take(count);
                retry::retry(delays, || {
                    call(ctx, id, path, opts.clone(), args).map_err(|e| {
                        shors_warn!(ctx: ctx, "{}", e);
                        e
                    })
                })
                .map_err(|re| re.source)
            }))
        }))
    }

    pub fn record_latency<'a, ID, A>(
        hist: &'static prometheus::HistogramVec,
    ) -> Middleware<'a, ID, A>
    where
        A: Serialize + Debug + 'a,
        ID: Push<LuaState> + Debug + 'a,
    {
        Middleware(Box::new(move |call| {
            RemoteLuaCall(Box::new(move |ctx, id, path, opts, args| {
                let start_time = Instant::now();

                let call_result = call(ctx, id, path, opts, args);

                hist.with_label_values(&[path])
                    .observe(start_time.elapsed().as_secs_f64());

                call_result
            }))
        }))
    }
}