use crate::client::RestClient;
use crate::error::Result;
use futures::stream::Stream;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, Serialize)]
pub struct StatsQuery {
#[serde(skip_serializing_if = "Option::is_none")]
pub interval: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stime: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub etime: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metrics: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatsResponse {
pub intervals: Vec<StatsInterval>,
}
#[derive(Debug, Clone, Serialize)]
pub struct StatsInterval {
pub time: String,
pub metrics: Value,
}
impl<'de> Deserialize<'de> for StatsInterval {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let value = Value::deserialize(deserializer)?;
let object = value.as_object().ok_or_else(|| {
<D::Error as serde::de::Error>::custom("expected stats interval object")
})?;
if let (Some(time), Some(metrics)) = (
object.get("time").and_then(Value::as_str),
object.get("metrics"),
) {
return Ok(Self {
time: time.to_string(),
metrics: metrics.clone(),
});
}
let time = object
.get("stime")
.and_then(Value::as_str)
.or_else(|| object.get("etime").and_then(Value::as_str))
.ok_or_else(|| {
<D::Error as serde::de::Error>::custom(
"expected stats interval to contain either time or stime/etime",
)
})?;
Ok(Self {
time: time.to_string(),
metrics: value,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LastStatsResponse {
pub stime: Option<String>,
pub etime: Option<String>,
pub interval: Option<String>,
#[serde(flatten)]
pub metrics: Value,
}
#[derive(Debug, Clone, Serialize)]
pub struct AggregatedStatsResponse {
pub stats: Vec<ResourceStats>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
enum AggregatedStatsResponseWire {
Wrapped { stats: Vec<ResourceStats> },
Bare(Vec<ResourceStats>),
}
impl<'de> Deserialize<'de> for AggregatedStatsResponse {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
match AggregatedStatsResponseWire::deserialize(deserializer)? {
AggregatedStatsResponseWire::Wrapped { stats } => Ok(Self { stats }),
AggregatedStatsResponseWire::Bare(stats) => Ok(Self { stats }),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceStats {
pub uid: u32,
pub intervals: Vec<StatsInterval>,
}
pub struct StatsHandler {
client: RestClient,
}
impl StatsHandler {
pub fn new(client: RestClient) -> Self {
StatsHandler { client }
}
pub async fn cluster(&self, query: Option<StatsQuery>) -> Result<StatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/cluster/stats?{}", query_str))
.await
} else {
self.client.get("/v1/cluster/stats").await
}
}
pub async fn cluster_last(&self) -> Result<LastStatsResponse> {
self.client.get("/v1/cluster/stats/last").await
}
pub async fn node(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/nodes/{}/stats?{}", uid, query_str))
.await
} else {
self.client.get(&format!("/v1/nodes/{}/stats", uid)).await
}
}
pub async fn node_last(&self, uid: u32) -> Result<LastStatsResponse> {
self.client
.get(&format!("/v1/nodes/{}/stats/last", uid))
.await
}
pub async fn nodes(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/nodes/stats?{}", query_str))
.await
} else {
self.client.get("/v1/nodes/stats").await
}
}
pub async fn nodes_last(&self) -> Result<AggregatedStatsResponse> {
self.client.get("/v1/nodes/stats/last").await
}
pub async fn node_alt(&self, uid: u32) -> Result<StatsResponse> {
self.client.get(&format!("/v1/nodes/stats/{}", uid)).await
}
pub async fn node_last_alt(&self, uid: u32) -> Result<LastStatsResponse> {
self.client
.get(&format!("/v1/nodes/stats/last/{}", uid))
.await
}
pub async fn database(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/bdbs/{}/stats?{}", uid, query_str))
.await
} else {
self.client.get(&format!("/v1/bdbs/{}/stats", uid)).await
}
}
pub async fn database_last(&self, uid: u32) -> Result<LastStatsResponse> {
self.client
.get(&format!("/v1/bdbs/{}/stats/last", uid))
.await
}
pub async fn databases(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/bdbs/stats?{}", query_str))
.await
} else {
self.client.get("/v1/bdbs/stats").await
}
}
pub async fn databases_last(&self) -> Result<AggregatedStatsResponse> {
self.client.get("/v1/bdbs/stats/last").await
}
pub async fn database_alt(&self, uid: u32) -> Result<StatsResponse> {
self.client.get(&format!("/v1/bdbs/stats/{}", uid)).await
}
pub async fn database_last_alt(&self, uid: u32) -> Result<LastStatsResponse> {
self.client
.get(&format!("/v1/bdbs/stats/last/{}", uid))
.await
}
pub async fn shard(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/shards/stats/{}?{}", uid, query_str))
.await
} else {
self.client.get(&format!("/v1/shards/stats/{}", uid)).await
}
}
pub async fn shards(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
if let Some(q) = query {
let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
self.client
.get(&format!("/v1/shards/stats?{}", query_str))
.await
} else {
self.client.get("/v1/shards/stats").await
}
}
pub async fn shards_last(&self) -> Result<Value> {
self.client.get("/v1/shards/stats/last").await
}
pub async fn shard_last(&self, uid: u32) -> Result<Value> {
self.client
.get(&format!("/v1/shards/stats/last/{}", uid))
.await
}
pub fn stream_cluster(
&self,
poll_interval: Duration,
) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
Box::pin(async_stream::stream! {
loop {
match self.cluster_last().await {
Ok(stats) => yield Ok(stats),
Err(e) => {
yield Err(e);
break;
}
}
sleep(poll_interval).await;
}
})
}
pub fn stream_node(
&self,
uid: u32,
poll_interval: Duration,
) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
Box::pin(async_stream::stream! {
loop {
match self.node_last(uid).await {
Ok(stats) => yield Ok(stats),
Err(e) => {
yield Err(e);
break;
}
}
sleep(poll_interval).await;
}
})
}
pub fn stream_database(
&self,
uid: u32,
poll_interval: Duration,
) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
Box::pin(async_stream::stream! {
loop {
match self.database_last(uid).await {
Ok(stats) => yield Ok(stats),
Err(e) => {
yield Err(e);
break;
}
}
sleep(poll_interval).await;
}
})
}
}