1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
use err_derive::Error;
use reqwest::Client;
use serde::Deserialize;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::time::SystemTime;
use tokio::time::Duration;

#[derive(Debug, Error)]
pub enum Error {
    #[error(display = "Network error: {}", _0)]
    Network(reqwest::Error),
    #[error(display = "Malformed json response: {}", _0)]
    MalformedResponse(reqwest::Error),
    #[error(display = "Data point is not an interger: {}", _0)]
    NonNumericDataPoint(String),
}

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum QueryResultStatus {
    Success,
    Error,
}

#[derive(Debug, Clone, Deserialize)]
struct QueryResult {
    status: QueryResultStatus,
    data: QueryResultData,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "lowercase")]
enum QueryResultDataType {
    Matrix,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct QueryResultData {
    result_type: QueryResultDataType,
    result: Vec<QueryResultDataResult>,
}

#[derive(Debug, Clone, Deserialize)]
struct QueryResultDataResult {
    metric: HashMap<String, String>,
    values: Vec<QueryResultDataResultValue>,
}

#[derive(Debug, Clone, Deserialize)]
struct QueryResultDataResultValue(u64, String);

async fn query_prometheus(
    client: &Client,
    base_url: &str,
    query: &str,
    start: u64,
    end: u64,
    step: usize,
) -> Result<QueryResult, Error> {
    client
        .get(base_url)
        .query(&[
            ("query", query),
            ("start", &format!("{}", start)),
            ("end", &format!("{}", end)),
            ("step", &format!("{}", step)),
        ])
        .send()
        .await
        .map_err(|err| Error::Network(err))?
        .json()
        .await
        .map_err(|err| Error::MalformedResponse(err))
}

#[derive(Debug, Clone)]
pub struct EdgeDetector {
    client: Client,
    base_url: String,
}

impl EdgeDetector {
    pub fn new(prometheus_url: impl std::fmt::Display) -> Self {
        EdgeDetector {
            client: Client::new(),
            base_url: format!("{}/api/v1/query_range", prometheus_url),
        }
    }

    pub async fn get_last_edge(
        &self,
        query: &str,
        from: u64,
        to: u64,
        max_age: Duration,
    ) -> Result<Option<u64>, Error> {
        let end_time = SystemTime::now();
        let start_time = end_time - max_age;

        let end_time = end_time
            .duration_since(SystemTime::UNIX_EPOCH)
            .unwrap()
            .as_secs();
        let start_time = start_time
            .duration_since(SystemTime::UNIX_EPOCH)
            .unwrap()
            .as_secs();

        self.get_edge_between(query, from, to, start_time, end_time)
            .await
    }

    pub async fn get_edge_between(
        &self,
        query: &str,
        from: u64,
        to: u64,
        start_time: u64,
        end_time: u64,
    ) -> Result<Option<u64>, Error> {
        let rising = from < to;

        let data = query_prometheus(
            &self.client,
            &self.base_url,
            query,
            start_time,
            end_time,
            min(
                60usize,
                max(2usize, (end_time as usize - start_time as usize) / 240),
            ),
        )
        .await?
        .data
        .result;

        let first_data = match data.into_iter().next() {
            Some(result) => result,
            None => return Ok(None),
        };

        let mut last_from_time = 0;
        let mut last_to_time = 0;

        for point in first_data.values {
            let QueryResultDataResultValue(time, value_str) = point;
            let value: u64 = value_str
                .parse()
                .map_err(|_| Error::NonNumericDataPoint(value_str))?;

            let is_from_value = if rising { value <= from } else { value >= from };

            let is_to_value = if rising { value >= from } else { value <= from };

            if is_from_value {
                last_from_time = time;
            } else if is_to_value {
                last_to_time = time;
            }
        }

        if last_from_time > 0 && last_to_time > last_from_time {
            Ok(Some(last_from_time))
        } else {
            Ok(None)
        }
    }
}