use crate::client::RestClient;
use crate::error::Result;
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, Serialize)]
pub struct StatsQuery {
#[serde(skip_serializing_if = "Option::is_none")]
pub interval: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stime: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub etime: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metrics: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatsResponse {
pub intervals: Vec<StatsInterval>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatsInterval {
pub time: String,
pub metrics: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LastStatsResponse {
pub stime: Option<String>,
pub etime: Option<String>,
pub interval: Option<String>,
#[serde(flatten)]
pub metrics: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregatedStatsResponse {
pub stats: Vec<ResourceStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceStats {
pub uid: u32,
pub intervals: Vec<StatsInterval>,
}
pub struct StatsHandler {
client: RestClient,
}
impl StatsHandler {
pub fn new(client: RestClient) -> Self {
StatsHandler { client }
}
pub async fn cluster(&self, query: Option<StatsQuery>) -> Result<StatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/cluster/stats?{}", query_str))
.await
} else {
self.client.get("/v1/cluster/stats").await
}
}
pub async fn cluster_last(&self) -> Result<LastStatsResponse> {
self.client.get("/v1/cluster/stats/last").await
}
pub async fn node(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/nodes/{}/stats?{}", uid, query_str))
.await
} else {
self.client.get(&format!("/v1/nodes/{}/stats", uid)).await
}
}
pub async fn node_last(&self, uid: u32) -> Result<LastStatsResponse> {
self.client
.get(&format!("/v1/nodes/{}/stats/last", uid))
.await
}
pub async fn nodes(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/nodes/stats?{}", query_str))
.await
} else {
self.client.get("/v1/nodes/stats").await
}
}
pub async fn nodes_last(&self) -> Result<AggregatedStatsResponse> {
self.client.get("/v1/nodes/stats/last").await
}
pub async fn node_alt(&self, uid: u32) -> Result<StatsResponse> {
self.client.get(&format!("/v1/nodes/stats/{}", uid)).await
}
pub async fn node_last_alt(&self, uid: u32) -> Result<LastStatsResponse> {
self.client
.get(&format!("/v1/nodes/stats/last/{}", uid))
.await
}
pub async fn database(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/bdbs/{}/stats?{}", uid, query_str))
.await
} else {
self.client.get(&format!("/v1/bdbs/{}/stats", uid)).await
}
}
pub async fn database_last(&self, uid: u32) -> Result<LastStatsResponse> {
self.client
.get(&format!("/v1/bdbs/{}/stats/last", uid))
.await
}
pub async fn databases(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/bdbs/stats?{}", query_str))
.await
} else {
self.client.get("/v1/bdbs/stats").await
}
}
pub async fn databases_last(&self) -> Result<AggregatedStatsResponse> {
self.client.get("/v1/bdbs/stats/last").await
}
pub async fn database_alt(&self, uid: u32) -> Result<StatsResponse> {
self.client.get(&format!("/v1/bdbs/stats/{}", uid)).await
}
pub async fn database_last_alt(&self, uid: u32) -> Result<LastStatsResponse> {
self.client
.get(&format!("/v1/bdbs/stats/last/{}", uid))
.await
}
pub async fn shard(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/shards/{}/stats?{}", uid, query_str))
.await
} else {
self.client.get(&format!("/v1/shards/{}/stats", uid)).await
}
}
pub async fn shards(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/shards/stats?{}", query_str))
.await
} else {
self.client.get("/v1/shards/stats").await
}
}
pub fn stream_cluster(
&self,
poll_interval: Duration,
) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
Box::pin(async_stream::stream! {
loop {
match self.cluster_last().await {
Ok(stats) => yield Ok(stats),
Err(e) => {
yield Err(e);
break;
}
}
sleep(poll_interval).await;
}
})
}
pub fn stream_node(
&self,
uid: u32,
poll_interval: Duration,
) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
Box::pin(async_stream::stream! {
loop {
match self.node_last(uid).await {
Ok(stats) => yield Ok(stats),
Err(e) => {
yield Err(e);
break;
}
}
sleep(poll_interval).await;
}
})
}
pub fn stream_database(
&self,
uid: u32,
poll_interval: Duration,
) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
Box::pin(async_stream::stream! {
loop {
match self.database_last(uid).await {
Ok(stats) => yield Ok(stats),
Err(e) => {
yield Err(e);
break;
}
}
sleep(poll_interval).await;
}
})
}
}