Skip to main content

edgebase_admin/
analytics.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use edgebase_core::error::Error;
6use edgebase_core::generated::api_core::GeneratedDbApi;
7use edgebase_core::http_client::HttpClient;
8use serde_json::{json, Value};
9
10use crate::generated::admin_api_core::GeneratedAdminApi;
11
12pub struct AnalyticsClient {
13    http: Arc<HttpClient>,
14}
15
16impl AnalyticsClient {
17    pub(crate) fn new(http: Arc<HttpClient>) -> Self {
18        Self { http }
19    }
20
21    pub async fn overview(&self, options: Option<HashMap<String, String>>) -> Result<Value, Error> {
22        self.admin_core()
23            .query_analytics(&self.build_query("overview", options))
24            .await
25    }
26
27    pub async fn time_series(&self, options: Option<HashMap<String, String>>) -> Result<Vec<Value>, Error> {
28        let result = self
29            .admin_core()
30            .query_analytics(&self.build_query("timeSeries", options))
31            .await?;
32        Ok(extract_list(&result, "timeSeries"))
33    }
34
35    pub async fn breakdown(&self, options: Option<HashMap<String, String>>) -> Result<Vec<Value>, Error> {
36        let result = self
37            .admin_core()
38            .query_analytics(&self.build_query("breakdown", options))
39            .await?;
40        Ok(extract_list(&result, "breakdown"))
41    }
42
43    pub async fn top_endpoints(&self, options: Option<HashMap<String, String>>) -> Result<Vec<Value>, Error> {
44        let result = self
45            .admin_core()
46            .query_analytics(&self.build_query("topEndpoints", options))
47            .await?;
48        Ok(extract_list(&result, "topItems"))
49    }
50
51    pub async fn track(
52        &self,
53        name: &str,
54        properties: Option<Value>,
55        user_id: Option<&str>,
56    ) -> Result<(), Error> {
57        let mut event = serde_json::Map::new();
58        event.insert("name".to_string(), Value::String(name.to_string()));
59        event.insert("timestamp".to_string(), json!(current_time_millis()));
60        if let Some(props) = properties {
61            event.insert("properties".to_string(), props);
62        }
63        if let Some(user) = user_id {
64            event.insert("userId".to_string(), Value::String(user.to_string()));
65        }
66        self.track_batch(vec![Value::Object(event)]).await
67    }
68
69    pub async fn track_batch(&self, events: Vec<Value>) -> Result<(), Error> {
70        if events.is_empty() {
71            return Ok(());
72        }
73
74        let normalized = events
75            .into_iter()
76            .map(|event| {
77                if let Value::Object(mut map) = event {
78                    map.entry("timestamp".to_string())
79                        .or_insert_with(|| json!(current_time_millis()));
80                    Value::Object(map)
81                } else {
82                    event
83                }
84            })
85            .collect::<Vec<_>>();
86
87        self.core()
88            .track_events(&json!({ "events": normalized }))
89            .await?;
90        Ok(())
91    }
92
93    pub async fn query_events(&self, options: Option<HashMap<String, String>>) -> Result<Value, Error> {
94        let params = options.unwrap_or_default();
95        self.admin_core().query_custom_events(&params).await
96    }
97
98    fn core(&self) -> GeneratedDbApi<'_> {
99        GeneratedDbApi::new(&self.http)
100    }
101
102    fn admin_core(&self) -> GeneratedAdminApi<'_> {
103        GeneratedAdminApi::new(&self.http)
104    }
105
106    fn build_query(&self, metric: &str, options: Option<HashMap<String, String>>) -> HashMap<String, String> {
107        let mut query = HashMap::from([("metric".to_string(), metric.to_string())]);
108        if let Some(options) = options {
109            query.extend(options);
110        }
111        query
112    }
113}
114
115fn extract_list(result: &Value, field: &str) -> Vec<Value> {
116    result
117        .get(field)
118        .and_then(|value| value.as_array())
119        .cloned()
120        .unwrap_or_default()
121}
122
123fn current_time_millis() -> u128 {
124    SystemTime::now()
125        .duration_since(UNIX_EPOCH)
126        .map(|duration| duration.as_millis())
127        .unwrap_or(0)
128}