use crate::shors_debug;
use crate::transport::rpc::route::{Handler, Middleware};
use crate::transport::rpc::Request;
use crate::transport::Context;
use opentelemetry::sdk::trace::Tracer as OTELTracer;
use opentelemetry::trace::{Span, SpanBuilder, SpanKind, StatusCode, TraceContextExt, Tracer};
use std::fmt::Debug;
use std::time::Instant;
pub fn debug<E>(handler: Handler<E>) -> Handler<E>
where
E: 'static,
{
(move |ctx: &mut Context, req| {
shors_debug!(ctx: ctx, "call {} with {:?}", ctx.endpoint_path(), req);
handler(ctx, req)
})
.into()
}
pub fn record_latency<E>(hist: &'static prometheus::HistogramVec) -> Middleware<E>
where
E: 'static,
{
Middleware(Box::new(move |call| {
Handler(Box::new(move |ctx, req| {
let start_time = Instant::now();
let call_result = call(ctx, req);
let status = if call_result.is_ok() { "OK" } else { "ERR" };
hist.with_label_values(&[ctx.endpoint_path(), status])
.observe(start_time.elapsed().as_secs_f64());
call_result
}))
}))
}
pub fn otel<E>(tracer: &'static OTELTracer) -> Middleware<E>
where
E: Debug + 'static,
{
(move |handler: Handler<E>| {
(move |ctx: &mut Context, req: Request| {
if !ctx.tracing_enabled() {
return handler(ctx, req);
}
let trace_ctx = ctx.extract();
let span_builder = SpanBuilder::from_name(format!("storage {}", ctx.endpoint_path()))
.with_kind(SpanKind::Server);
let mut span = tracer.build_with_context(span_builder, &trace_ctx);
let span_ctx = span.span_context().clone();
ctx.inject(&trace_ctx.with_remote_span_context(span_ctx));
handler(ctx, req).map_err(|e| {
span.set_status(StatusCode::Error, format!("{:?}", e));
e
})
})
.into()
})
.into()
}
pub mod client {
use crate::builtin::utils::retry;
use crate::shors_warn;
use crate::tarantool::tlua::{LuaState, Push};
use crate::transport::rpc::client::{Middleware, RemoteLuaCall};
use opentelemetry::sdk::trace::Tracer as OTELTracer;
use opentelemetry::trace::{Span, SpanBuilder, SpanKind, TraceContextExt, Tracer};
use serde::Serialize;
use std::fmt::Debug;
use std::time::Instant;
pub fn otel<'a, ID, A>(tracer: &'static OTELTracer) -> Middleware<'a, ID, A>
where
A: Serialize + Debug + 'a,
ID: Push<LuaState> + Debug + 'a,
{
Middleware(Box::new(move |call| {
RemoteLuaCall(Box::new(move |ctx, id, path, opts, args| {
if !ctx.tracing_enabled() {
return call(ctx, id, path, opts, args);
}
let original_ctx = ctx.extract();
let call_context = original_ctx.clone();
let call_result = {
let span_builder =
SpanBuilder::from_name(format!("call {}, id: {:?}", path, id))
.with_kind(SpanKind::Client);
let span = tracer.build_with_context(span_builder, &call_context);
let span_ctx = span.span_context().clone();
ctx.inject(&call_context.with_remote_span_context(span_ctx));
call(ctx, id, path, opts, args)
};
ctx.inject(&original_ctx);
call_result
}))
}))
}
pub fn retry<'a, ID, A>(count: usize) -> Middleware<'a, ID, A>
where
A: Serialize + Debug + Copy + 'a,
ID: Push<LuaState> + Debug + Copy + 'a,
{
Middleware(Box::new(move |call| {
RemoteLuaCall(Box::new(move |ctx, id, path, opts, args| {
let delays = retry::Fibonacci::from_millis(100).take(count);
retry::retry(delays, || {
call(ctx, id, path, opts.clone(), args).map_err(|e| {
shors_warn!(ctx: ctx, "{}", e);
e
})
})
.map_err(|re| re.source)
}))
}))
}
pub fn record_latency<'a, ID, A>(
hist: &'static prometheus::HistogramVec,
) -> Middleware<'a, ID, A>
where
A: Serialize + Debug + 'a,
ID: Push<LuaState> + Debug + 'a,
{
Middleware(Box::new(move |call| {
RemoteLuaCall(Box::new(move |ctx, id, path, opts, args| {
let start_time = Instant::now();
let call_result = call(ctx, id, path, opts, args);
hist.with_label_values(&[path])
.observe(start_time.elapsed().as_secs_f64());
call_result
}))
}))
}
}