1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! HTTP server exposing metrics in Prometheus format

use std::convert::Infallible;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;

use async_broadcast::Receiver;
use hyper::{
    header::{HeaderValue, CONTENT_TYPE},
    service::{make_service_fn, service_fn},
    Body, Method, Request, Response, Server,
};
use opentelemetry::metrics::MeterProvider;
use opentelemetry::sdk::metrics::reader::AggregationSelector;
use opentelemetry::sdk::metrics::{Aggregation, InstrumentKind};
use prometheus::{Encoder, Registry, TextEncoder};
use tracing::{info, info_span, instrument};
use tracing_futures::Instrument;

use crate::components::IndexMeter;
use crate::errors::{Error, SummaServerResult};
use crate::services::Index;
use crate::utils::thread_handler::ControlMessage;

#[derive(Clone)]
pub struct Metrics {
    config: crate::configs::metrics::Config,
    registry: Registry,
}

struct AppState {
    index_service: Index,
    index_meter: IndexMeter,
    registry: Registry,
}

#[derive(Debug)]
struct CustomAgg;

impl AggregationSelector for CustomAgg {
    fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
        match kind {
            InstrumentKind::Counter => Aggregation::Sum,
            InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
                boundaries: vec![0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0],
                record_min_max: false,
            },
            _ => Aggregation::LastValue,
        }
    }
}

impl Metrics {
    pub fn new(config: &crate::configs::metrics::Config) -> SummaServerResult<Metrics> {
        let registry = Registry::new();
        Ok(Metrics {
            config: config.clone(),
            registry,
        })
    }

    async fn serve_request(request: Request<Body>, state: Arc<AppState>) -> Result<Response<Body>, hyper::Error> {
        let empty_header_value = HeaderValue::from_static("");
        let _span = info_span!(
            "request",
            request_id = ?request.headers().get("request-id").unwrap_or(&empty_header_value),
            session_id = ?request.headers().get("session-id").unwrap_or(&empty_header_value),
        );
        info!(path = ?request.uri().path());
        let response = match request.method() {
            &Method::GET => {
                for index_holder in state.index_service.index_registry().index_holders().read().await.values() {
                    state
                        .index_meter
                        .record_metrics(index_holder)
                        .map_err(Error::from)
                        .expect("cannot record meters")
                }

                let mut buffer = vec![];
                let encoder = TextEncoder::new();
                let metric_families = state.registry.gather();
                encoder.encode(&metric_families[..], &mut buffer).expect("prometheus failed");
                Response::builder()
                    .status(200)
                    .header(CONTENT_TYPE, encoder.format_type())
                    .body(Body::from(buffer))
                    .expect("encoding body failed")
            }
            _ => Response::builder().status(404).body(Body::from("Missing Page")).expect("encoding body failed"),
        };
        Ok(response)
    }

    #[instrument("lifecycle", skip_all)]
    pub async fn prepare_serving_future(
        &self,
        index_service: &Index,
        mut terminator: Receiver<ControlMessage>,
    ) -> SummaServerResult<impl Future<Output = SummaServerResult<()>>> {
        let exporter = opentelemetry_prometheus::exporter()
            .with_registry(self.registry.clone())
            .with_aggregation_selector(CustomAgg)
            .build()
            .expect("internal error");
        let provider = opentelemetry::sdk::metrics::MeterProvider::builder().with_reader(exporter).build();
        let meter = provider.meter("summa");

        let state = Arc::new(AppState {
            index_service: index_service.clone(),
            index_meter: IndexMeter::new(meter),
            registry: self.registry.clone(),
        });
        let service = make_service_fn(move |_conn| {
            let state = state.clone();
            async move { Ok::<_, Infallible>(service_fn(move |request| Metrics::serve_request(request, state.clone()))) }
        });

        let server = Server::bind(&self.config.endpoint.parse()?).serve(service);
        info!(action = "binded", endpoint = ?self.config.endpoint);
        let graceful = server.with_graceful_shutdown(async move {
            let signal_result = terminator.recv().await;
            info!(action = "sigterm_received", received = ?signal_result);
        });

        Ok(async move {
            match graceful.await {
                Ok(_) => info!(action = "terminated"),
                Err(e) => info!(action = "terminated", error = ?e),
            }
            Ok(())
        }
        .instrument(info_span!(parent: None, "lifecycle")))
    }
}