tc_rpc_server/
middleware.rs1use tetsy_jsonrpc_core::{
22 Middleware as RequestMiddleware, Metadata,
23 Request, Response, FutureResponse, FutureOutput
24};
25use prometheus_endpoint::{
26 Registry, CounterVec, PrometheusError,
27 Opts, register, U64
28};
29
30use futures::{future::Either, Future};
31
32#[derive(Debug, Clone)]
34pub struct RpcMetrics {
35 rpc_calls: Option<CounterVec<U64>>,
36}
37
38impl RpcMetrics {
39 pub fn new(metrics_registry: Option<&Registry>) -> Result<Self, PrometheusError> {
41 Ok(Self {
42 rpc_calls: metrics_registry.map(|r|
43 register(
44 CounterVec::new(
45 Opts::new(
46 "rpc_calls_total",
47 "Number of rpc calls received",
48 ),
49 &["protocol"]
50 )?,
51 r,
52 )
53 ).transpose()?,
54 })
55 }
56}
57
58pub struct RpcMiddleware {
60 metrics: RpcMetrics,
61 transport_label: String,
62}
63
64impl RpcMiddleware {
65 pub fn new(metrics: RpcMetrics, transport_label: &str) -> Self {
70 RpcMiddleware {
71 metrics,
72 transport_label: String::from(transport_label),
73 }
74 }
75}
76
77impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
78 type Future = FutureResponse;
79 type CallFuture = FutureOutput;
80
81 fn on_request<F, X>(&self, request: Request, meta: M, next: F) -> Either<FutureResponse, X>
82 where
83 F: Fn(Request, M) -> X + Send + Sync,
84 X: Future<Item = Option<Response>, Error = ()> + Send + 'static,
85 {
86 if let Some(ref rpc_calls) = self.metrics.rpc_calls {
87 rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc();
88 }
89
90 Either::B(next(request, meta))
91 }
92}