1use crate::client::RestClient;
58use crate::error::Result;
59use futures::stream::Stream;
60use serde::{Deserialize, Deserializer, Serialize};
61use serde_json::Value;
62use std::pin::Pin;
63use std::time::Duration;
64use tokio::time::sleep;
65
66#[derive(Debug, Serialize)]
68pub struct StatsQuery {
69 #[serde(skip_serializing_if = "Option::is_none")]
71 pub interval: Option<String>,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub stime: Option<String>,
75 #[serde(skip_serializing_if = "Option::is_none")]
77 pub etime: Option<String>,
78 #[serde(skip_serializing_if = "Option::is_none")]
80 pub metrics: Option<String>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct StatsResponse {
86 pub intervals: Vec<StatsInterval>,
88}
89
90#[derive(Debug, Clone, Serialize)]
92pub struct StatsInterval {
93 pub time: String,
95 pub metrics: Value,
97}
98
99impl<'de> Deserialize<'de> for StatsInterval {
100 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
101 where
102 D: Deserializer<'de>,
103 {
104 let value = Value::deserialize(deserializer)?;
105 let object = value.as_object().ok_or_else(|| {
106 <D::Error as serde::de::Error>::custom("expected stats interval object")
107 })?;
108
109 if let (Some(time), Some(metrics)) = (
110 object.get("time").and_then(Value::as_str),
111 object.get("metrics"),
112 ) {
113 return Ok(Self {
114 time: time.to_string(),
115 metrics: metrics.clone(),
116 });
117 }
118
119 let time = object
120 .get("stime")
121 .and_then(Value::as_str)
122 .or_else(|| object.get("etime").and_then(Value::as_str))
123 .ok_or_else(|| {
124 <D::Error as serde::de::Error>::custom(
125 "expected stats interval to contain either time or stime/etime",
126 )
127 })?;
128
129 Ok(Self {
130 time: time.to_string(),
131 metrics: value,
132 })
133 }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct LastStatsResponse {
140 pub stime: Option<String>,
142 pub etime: Option<String>,
144 pub interval: Option<String>,
146 #[serde(flatten)]
148 pub metrics: Value,
149}
150
151#[derive(Debug, Clone, Serialize)]
153pub struct AggregatedStatsResponse {
154 pub stats: Vec<ResourceStats>,
156}
157
158#[derive(Debug, Clone, Deserialize)]
159#[serde(untagged)]
160enum AggregatedStatsResponseWire {
161 Wrapped { stats: Vec<ResourceStats> },
162 Bare(Vec<ResourceStats>),
163}
164
165impl<'de> Deserialize<'de> for AggregatedStatsResponse {
166 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
167 where
168 D: Deserializer<'de>,
169 {
170 match AggregatedStatsResponseWire::deserialize(deserializer)? {
171 AggregatedStatsResponseWire::Wrapped { stats } => Ok(Self { stats }),
172 AggregatedStatsResponseWire::Bare(stats) => Ok(Self { stats }),
173 }
174 }
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct ResourceStats {
180 pub uid: u32,
182 pub intervals: Vec<StatsInterval>,
184}
185
186pub struct StatsHandler {
188 client: RestClient,
189}
190
191impl StatsHandler {
192 pub fn new(client: RestClient) -> Self {
194 StatsHandler { client }
195 }
196
197 pub async fn cluster(&self, query: Option<StatsQuery>) -> Result<StatsResponse> {
199 if let Some(q) = query {
200 let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
201 self.client
202 .get(&format!("/v1/cluster/stats?{}", query_str))
203 .await
204 } else {
205 self.client.get("/v1/cluster/stats").await
206 }
207 }
208
209 pub async fn cluster_last(&self) -> Result<LastStatsResponse> {
211 self.client.get("/v1/cluster/stats/last").await
212 }
213
214 pub async fn node(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
218 if let Some(q) = query {
219 let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
220 self.client
221 .get(&format!("/v1/nodes/{}/stats?{}", uid, query_str))
222 .await
223 } else {
224 self.client.get(&format!("/v1/nodes/{}/stats", uid)).await
225 }
226 }
227
228 pub async fn node_last(&self, uid: u32) -> Result<LastStatsResponse> {
230 self.client
231 .get(&format!("/v1/nodes/{}/stats/last", uid))
232 .await
233 }
234
235 pub async fn nodes(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
239 if let Some(q) = query {
240 let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
241 self.client
242 .get(&format!("/v1/nodes/stats?{}", query_str))
243 .await
244 } else {
245 self.client.get("/v1/nodes/stats").await
246 }
247 }
248
249 pub async fn nodes_last(&self) -> Result<AggregatedStatsResponse> {
253 self.client.get("/v1/nodes/stats/last").await
254 }
255
256 pub async fn node_alt(&self, uid: u32) -> Result<StatsResponse> {
260 self.client.get(&format!("/v1/nodes/stats/{}", uid)).await
261 }
262
263 pub async fn node_last_alt(&self, uid: u32) -> Result<LastStatsResponse> {
265 self.client
266 .get(&format!("/v1/nodes/stats/last/{}", uid))
267 .await
268 }
269
270 pub async fn database(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
272 if let Some(q) = query {
273 let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
274 self.client
275 .get(&format!("/v1/bdbs/{}/stats?{}", uid, query_str))
276 .await
277 } else {
278 self.client.get(&format!("/v1/bdbs/{}/stats", uid)).await
279 }
280 }
281
282 pub async fn database_last(&self, uid: u32) -> Result<LastStatsResponse> {
284 self.client
285 .get(&format!("/v1/bdbs/{}/stats/last", uid))
286 .await
287 }
288
289 pub async fn databases(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
293 if let Some(q) = query {
294 let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
295 self.client
296 .get(&format!("/v1/bdbs/stats?{}", query_str))
297 .await
298 } else {
299 self.client.get("/v1/bdbs/stats").await
300 }
301 }
302
303 pub async fn databases_last(&self) -> Result<AggregatedStatsResponse> {
307 self.client.get("/v1/bdbs/stats/last").await
308 }
309
310 pub async fn database_alt(&self, uid: u32) -> Result<StatsResponse> {
314 self.client.get(&format!("/v1/bdbs/stats/{}", uid)).await
315 }
316
317 pub async fn database_last_alt(&self, uid: u32) -> Result<LastStatsResponse> {
319 self.client
320 .get(&format!("/v1/bdbs/stats/last/{}", uid))
321 .await
322 }
323
324 pub async fn shard(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
326 if let Some(q) = query {
327 let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
328 self.client
329 .get(&format!("/v1/shards/stats/{}?{}", uid, query_str))
330 .await
331 } else {
332 self.client.get(&format!("/v1/shards/stats/{}", uid)).await
333 }
334 }
335
336 pub async fn shards(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
338 if let Some(q) = query {
339 let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
340 self.client
341 .get(&format!("/v1/shards/stats?{}", query_str))
342 .await
343 } else {
344 self.client.get("/v1/shards/stats").await
345 }
346 }
347
348 pub async fn shards_last(&self) -> Result<Value> {
352 self.client.get("/v1/shards/stats/last").await
353 }
354
355 pub async fn shard_last(&self, uid: u32) -> Result<Value> {
357 self.client
358 .get(&format!("/v1/shards/stats/last/{}", uid))
359 .await
360 }
361
362 pub fn stream_cluster(
370 &self,
371 poll_interval: Duration,
372 ) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
373 Box::pin(async_stream::stream! {
374 loop {
375 match self.cluster_last().await {
376 Ok(stats) => yield Ok(stats),
377 Err(e) => {
378 yield Err(e);
379 break;
380 }
381 }
382 sleep(poll_interval).await;
383 }
384 })
385 }
386
387 pub fn stream_node(
396 &self,
397 uid: u32,
398 poll_interval: Duration,
399 ) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
400 Box::pin(async_stream::stream! {
401 loop {
402 match self.node_last(uid).await {
403 Ok(stats) => yield Ok(stats),
404 Err(e) => {
405 yield Err(e);
406 break;
407 }
408 }
409 sleep(poll_interval).await;
410 }
411 })
412 }
413
414 pub fn stream_database(
423 &self,
424 uid: u32,
425 poll_interval: Duration,
426 ) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
427 Box::pin(async_stream::stream! {
428 loop {
429 match self.database_last(uid).await {
430 Ok(stats) => yield Ok(stats),
431 Err(e) => {
432 yield Err(e);
433 break;
434 }
435 }
436 sleep(poll_interval).await;
437 }
438 })
439 }
440}