1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
use err_derive::Error; use reqwest::Client; use serde::Deserialize; use std::cmp::{max, min}; use std::collections::HashMap; use std::time::SystemTime; use tokio::time::Duration; #[derive(Debug, Error)] pub enum Error { #[error(display = "Network error: {}", _0)] Network(reqwest::Error), #[error(display = "Malformed json response: {}", _0)] MalformedResponse(reqwest::Error), #[error(display = "Data point is not an interger: {}", _0)] NonNumericDataPoint(String), } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "lowercase")] pub enum QueryResultStatus { Success, Error, } #[derive(Debug, Clone, Deserialize)] struct QueryResult { status: QueryResultStatus, data: QueryResultData, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "lowercase")] enum QueryResultDataType { Matrix, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] struct QueryResultData { result_type: QueryResultDataType, result: Vec<QueryResultDataResult>, } #[derive(Debug, Clone, Deserialize)] struct QueryResultDataResult { metric: HashMap<String, String>, values: Vec<QueryResultDataResultValue>, } #[derive(Debug, Clone, Deserialize)] struct QueryResultDataResultValue(u64, String); async fn query_prometheus( client: &Client, base_url: &str, query: &str, start: u64, end: u64, step: usize, ) -> Result<QueryResult, Error> { client .get(base_url) .query(&[ ("query", query), ("start", &format!("{}", start)), ("end", &format!("{}", end)), ("step", &format!("{}", step)), ]) .send() .await .map_err(|err| Error::Network(err))? .json() .await .map_err(|err| Error::MalformedResponse(err)) } #[derive(Debug, Clone)] pub struct EdgeDetector { client: Client, base_url: String, } impl EdgeDetector { pub fn new(prometheus_url: impl std::fmt::Display) -> Self { EdgeDetector { client: Client::new(), base_url: format!("{}/api/v1/query_range", prometheus_url), } } pub async fn get_last_edge( &self, query: &str, from: u64, to: u64, max_age: Duration, ) -> Result<Option<u64>, Error> { let end_time = SystemTime::now(); let start_time = end_time - max_age; let end_time = end_time .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); let start_time = start_time .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); self.get_edge_between(query, from, to, start_time, end_time) .await } pub async fn get_edge_between( &self, query: &str, from: u64, to: u64, start_time: u64, end_time: u64, ) -> Result<Option<u64>, Error> { let rising = from < to; let data = query_prometheus( &self.client, &self.base_url, query, start_time, end_time, min( 60usize, max(2usize, (end_time as usize - start_time as usize) / 240), ), ) .await? .data .result; let first_data = match data.into_iter().next() { Some(result) => result, None => return Ok(None), }; let mut last_from_time = 0; let mut last_to_time = 0; for point in first_data.values { let QueryResultDataResultValue(time, value_str) = point; let value: u64 = value_str .parse() .map_err(|_| Error::NonNumericDataPoint(value_str))?; let is_from_value = if rising { value <= from } else { value >= from }; let is_to_value = if rising { value >= from } else { value <= from }; if is_from_value { last_from_time = time; } else if is_to_value { last_to_time = time; } } if last_from_time > 0 && last_to_time > last_from_time { Ok(Some(last_from_time)) } else { Ok(None) } } }