Skip to main content

redis_enterprise/
stats.rs

1//! Statistics and metrics collection for Redis Enterprise
2//!
3//! ## Overview
4//! - Query cluster, node, database, and shard statistics
5//! - Retrieve time-series metrics with configurable intervals
6//! - Access both current and historical performance data
7//!
8//! ## Return Types
9//!
10//! Stats methods return either typed responses (`StatsResponse`, `LastStatsResponse`)
11//! or raw `serde_json::Value` for endpoints with dynamic metric names as keys.
12//! The Value returns allow access to all metrics without compile-time knowledge
13//! of metric names.
14//!
15//! ## Examples
16//!
17//! ### Querying Database Stats
18//! ```no_run
19//! use redis_enterprise::EnterpriseClient;
20//! use redis_enterprise::stats::StatsQuery;
21//!
22//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
23//! // Get last interval stats for a database
24//! let last_stats = client.stats().database_last(1).await?;
25//! println!("Database stats: {:?}", last_stats);
26//!
27//! // Query with specific interval (all metrics by default)
28//! let query = StatsQuery {
29//!     interval: Some("5min".to_string()),
30//!     stime: None,
31//!     etime: None,
32//!     metrics: None,  // None means all metrics
33//! };
34//! let historical = client.stats().database(1, Some(query)).await?;
35//! println!("5-minute intervals: {:?}", historical.intervals);
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! ### Cluster-Wide Statistics
41//! ```no_run
42//! # use redis_enterprise::EnterpriseClient;
43//! # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
44//! // Get aggregated stats for all nodes
45//! let all_nodes = client.stats().nodes_last().await?;
46//! println!("Total stats across cluster: {:?}", all_nodes.stats);
47//!
48//! // Get aggregated database stats
49//! let all_dbs = client.stats().databases_last().await?;
50//! for resource_stats in &all_dbs.stats {
51//!     println!("Resource {}: {:?}", resource_stats.uid, resource_stats.intervals);
52//! }
53//! # Ok(())
54//! # }
55//! ```
56
57use crate::client::RestClient;
58use crate::error::Result;
59use futures::stream::Stream;
60use serde::{Deserialize, Deserializer, Serialize};
61use serde_json::Value;
62use std::pin::Pin;
63use std::time::Duration;
64use tokio::time::sleep;
65
66/// Stats query parameters
67#[derive(Debug, Serialize)]
68pub struct StatsQuery {
69    /// Time interval for aggregation ("1min", "5min", "1hour", "1day")
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub interval: Option<String>,
72    /// Start time for the query (ISO 8601 format)
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub stime: Option<String>,
75    /// End time for the query (ISO 8601 format)
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub etime: Option<String>,
78    /// Comma-separated list of specific metrics to retrieve
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub metrics: Option<String>,
81}
82
83/// Generic stats response
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct StatsResponse {
86    /// Array of time intervals with their corresponding metrics
87    pub intervals: Vec<StatsInterval>,
88}
89
90/// Stats interval
91#[derive(Debug, Clone, Serialize)]
92pub struct StatsInterval {
93    /// Timestamp for this interval (ISO 8601 format)
94    pub time: String,
95    /// Metrics data for this time interval (dynamic field names)
96    pub metrics: Value,
97}
98
99impl<'de> Deserialize<'de> for StatsInterval {
100    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
101    where
102        D: Deserializer<'de>,
103    {
104        let value = Value::deserialize(deserializer)?;
105        let object = value.as_object().ok_or_else(|| {
106            <D::Error as serde::de::Error>::custom("expected stats interval object")
107        })?;
108
109        if let (Some(time), Some(metrics)) = (
110            object.get("time").and_then(Value::as_str),
111            object.get("metrics"),
112        ) {
113            return Ok(Self {
114                time: time.to_string(),
115                metrics: metrics.clone(),
116            });
117        }
118
119        let time = object
120            .get("stime")
121            .and_then(Value::as_str)
122            .or_else(|| object.get("etime").and_then(Value::as_str))
123            .ok_or_else(|| {
124                <D::Error as serde::de::Error>::custom(
125                    "expected stats interval to contain either time or stime/etime",
126                )
127            })?;
128
129        Ok(Self {
130            time: time.to_string(),
131            metrics: value,
132        })
133    }
134}
135
136/// Last stats response for single resource
137/// Response for last stats endpoint - the API returns metrics directly
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct LastStatsResponse {
140    /// Start time of the stats interval
141    pub stime: Option<String>,
142    /// End time of the stats interval
143    pub etime: Option<String>,
144    /// Interval duration (e.g., "5min", "1hour")
145    pub interval: Option<String>,
146    /// All metric values for the last interval (dynamic field names)
147    #[serde(flatten)]
148    pub metrics: Value,
149}
150
151/// Aggregated stats response for multiple resources
152#[derive(Debug, Clone, Serialize)]
153pub struct AggregatedStatsResponse {
154    /// Array of stats for individual resources (nodes, databases, shards)
155    pub stats: Vec<ResourceStats>,
156}
157
158#[derive(Debug, Clone, Deserialize)]
159#[serde(untagged)]
160enum AggregatedStatsResponseWire {
161    Wrapped { stats: Vec<ResourceStats> },
162    Bare(Vec<ResourceStats>),
163}
164
165impl<'de> Deserialize<'de> for AggregatedStatsResponse {
166    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
167    where
168        D: Deserializer<'de>,
169    {
170        match AggregatedStatsResponseWire::deserialize(deserializer)? {
171            AggregatedStatsResponseWire::Wrapped { stats } => Ok(Self { stats }),
172            AggregatedStatsResponseWire::Bare(stats) => Ok(Self { stats }),
173        }
174    }
175}
176
177/// Stats for a single resource
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct ResourceStats {
180    /// Unique identifier of the resource (node UID, database UID, etc.)
181    pub uid: u32,
182    /// Time intervals with metrics for this specific resource
183    pub intervals: Vec<StatsInterval>,
184}
185
186/// Stats handler for retrieving metrics
187pub struct StatsHandler {
188    client: RestClient,
189}
190
191impl StatsHandler {
192    /// Create a new handler bound to the given REST client.
193    pub fn new(client: RestClient) -> Self {
194        StatsHandler { client }
195    }
196
197    /// Get cluster stats
198    pub async fn cluster(&self, query: Option<StatsQuery>) -> Result<StatsResponse> {
199        if let Some(q) = query {
200            let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
201            self.client
202                .get(&format!("/v1/cluster/stats?{}", query_str))
203                .await
204        } else {
205            self.client.get("/v1/cluster/stats").await
206        }
207    }
208
209    /// Get cluster stats for last interval
210    pub async fn cluster_last(&self) -> Result<LastStatsResponse> {
211        self.client.get("/v1/cluster/stats/last").await
212    }
213
214    // raw variant removed: use cluster_last()
215
216    /// Get node stats
217    pub async fn node(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
218        if let Some(q) = query {
219            let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
220            self.client
221                .get(&format!("/v1/nodes/{}/stats?{}", uid, query_str))
222                .await
223        } else {
224            self.client.get(&format!("/v1/nodes/{}/stats", uid)).await
225        }
226    }
227
228    /// Get node stats for last interval
229    pub async fn node_last(&self, uid: u32) -> Result<LastStatsResponse> {
230        self.client
231            .get(&format!("/v1/nodes/{}/stats/last", uid))
232            .await
233    }
234
235    // raw variant removed: use node_last()
236
237    /// Get all nodes stats
238    pub async fn nodes(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
239        if let Some(q) = query {
240            let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
241            self.client
242                .get(&format!("/v1/nodes/stats?{}", query_str))
243                .await
244        } else {
245            self.client.get("/v1/nodes/stats").await
246        }
247    }
248
249    // raw variant removed: use nodes()
250
251    /// Get all nodes last stats
252    pub async fn nodes_last(&self) -> Result<AggregatedStatsResponse> {
253        self.client.get("/v1/nodes/stats/last").await
254    }
255
256    // raw variant removed: use nodes_last()
257
258    /// Get node stats via alternate path form
259    pub async fn node_alt(&self, uid: u32) -> Result<StatsResponse> {
260        self.client.get(&format!("/v1/nodes/stats/{}", uid)).await
261    }
262
263    /// Get node last stats via alternate path form
264    pub async fn node_last_alt(&self, uid: u32) -> Result<LastStatsResponse> {
265        self.client
266            .get(&format!("/v1/nodes/stats/last/{}", uid))
267            .await
268    }
269
270    /// Get database stats
271    pub async fn database(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
272        if let Some(q) = query {
273            let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
274            self.client
275                .get(&format!("/v1/bdbs/{}/stats?{}", uid, query_str))
276                .await
277        } else {
278            self.client.get(&format!("/v1/bdbs/{}/stats", uid)).await
279        }
280    }
281
282    /// Get database stats for last interval
283    pub async fn database_last(&self, uid: u32) -> Result<LastStatsResponse> {
284        self.client
285            .get(&format!("/v1/bdbs/{}/stats/last", uid))
286            .await
287    }
288
289    // raw variant removed: use database_last()
290
291    /// Get all databases stats
292    pub async fn databases(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
293        if let Some(q) = query {
294            let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
295            self.client
296                .get(&format!("/v1/bdbs/stats?{}", query_str))
297                .await
298        } else {
299            self.client.get("/v1/bdbs/stats").await
300        }
301    }
302
303    // raw variant removed: use databases()
304
305    /// Get all databases last stats (aggregate)
306    pub async fn databases_last(&self) -> Result<AggregatedStatsResponse> {
307        self.client.get("/v1/bdbs/stats/last").await
308    }
309
310    // raw variant removed: use databases_last()
311
312    /// Get database stats via alternate path form
313    pub async fn database_alt(&self, uid: u32) -> Result<StatsResponse> {
314        self.client.get(&format!("/v1/bdbs/stats/{}", uid)).await
315    }
316
317    /// Get database last stats via alternate path form
318    pub async fn database_last_alt(&self, uid: u32) -> Result<LastStatsResponse> {
319        self.client
320            .get(&format!("/v1/bdbs/stats/last/{}", uid))
321            .await
322    }
323
324    /// Get shard stats
325    pub async fn shard(&self, uid: u32, query: Option<StatsQuery>) -> Result<StatsResponse> {
326        if let Some(q) = query {
327            let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
328            self.client
329                .get(&format!("/v1/shards/stats/{}?{}", uid, query_str))
330                .await
331        } else {
332            self.client.get(&format!("/v1/shards/stats/{}", uid)).await
333        }
334    }
335
336    /// Get all shards stats
337    pub async fn shards(&self, query: Option<StatsQuery>) -> Result<AggregatedStatsResponse> {
338        if let Some(q) = query {
339            let query_str = serde_urlencoded::to_string(&q).unwrap_or_default();
340            self.client
341                .get(&format!("/v1/shards/stats?{}", query_str))
342                .await
343        } else {
344            self.client.get("/v1/shards/stats").await
345        }
346    }
347
348    // raw variant removed: use shards()
349
350    /// Get all shards last stats
351    pub async fn shards_last(&self) -> Result<Value> {
352        self.client.get("/v1/shards/stats/last").await
353    }
354
355    /// Get shard last stats
356    pub async fn shard_last(&self, uid: u32) -> Result<Value> {
357        self.client
358            .get(&format!("/v1/shards/stats/last/{}", uid))
359            .await
360    }
361
362    /// Stream cluster stats in real-time by polling
363    ///
364    /// # Arguments
365    /// * `poll_interval` - Time to wait between polls
366    ///
367    /// # Returns
368    /// A stream of stats responses
369    pub fn stream_cluster(
370        &self,
371        poll_interval: Duration,
372    ) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
373        Box::pin(async_stream::stream! {
374            loop {
375                match self.cluster_last().await {
376                    Ok(stats) => yield Ok(stats),
377                    Err(e) => {
378                        yield Err(e);
379                        break;
380                    }
381                }
382                sleep(poll_interval).await;
383            }
384        })
385    }
386
387    /// Stream node stats in real-time by polling
388    ///
389    /// # Arguments
390    /// * `uid` - Node ID
391    /// * `poll_interval` - Time to wait between polls
392    ///
393    /// # Returns
394    /// A stream of stats responses
395    pub fn stream_node(
396        &self,
397        uid: u32,
398        poll_interval: Duration,
399    ) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
400        Box::pin(async_stream::stream! {
401            loop {
402                match self.node_last(uid).await {
403                    Ok(stats) => yield Ok(stats),
404                    Err(e) => {
405                        yield Err(e);
406                        break;
407                    }
408                }
409                sleep(poll_interval).await;
410            }
411        })
412    }
413
414    /// Stream database stats in real-time by polling
415    ///
416    /// # Arguments
417    /// * `uid` - Database ID
418    /// * `poll_interval` - Time to wait between polls
419    ///
420    /// # Returns
421    /// A stream of stats responses
422    pub fn stream_database(
423        &self,
424        uid: u32,
425        poll_interval: Duration,
426    ) -> Pin<Box<dyn Stream<Item = Result<LastStatsResponse>> + Send + '_>> {
427        Box::pin(async_stream::stream! {
428            loop {
429                match self.database_last(uid).await {
430                    Ok(stats) => yield Ok(stats),
431                    Err(e) => {
432                        yield Err(e);
433                        break;
434                    }
435                }
436                sleep(poll_interval).await;
437            }
438        })
439    }
440}