edgebase_admin/
analytics.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use edgebase_core::error::Error;
6use edgebase_core::generated::api_core::GeneratedDbApi;
7use edgebase_core::http_client::HttpClient;
8use serde_json::{json, Value};
9
10use crate::generated::admin_api_core::GeneratedAdminApi;
11
12pub struct AnalyticsClient {
13 http: Arc<HttpClient>,
14}
15
16impl AnalyticsClient {
17 pub(crate) fn new(http: Arc<HttpClient>) -> Self {
18 Self { http }
19 }
20
21 pub async fn overview(&self, options: Option<HashMap<String, String>>) -> Result<Value, Error> {
22 self.admin_core()
23 .query_analytics(&self.build_query("overview", options))
24 .await
25 }
26
27 pub async fn time_series(&self, options: Option<HashMap<String, String>>) -> Result<Vec<Value>, Error> {
28 let result = self
29 .admin_core()
30 .query_analytics(&self.build_query("timeSeries", options))
31 .await?;
32 Ok(extract_list(&result, "timeSeries"))
33 }
34
35 pub async fn breakdown(&self, options: Option<HashMap<String, String>>) -> Result<Vec<Value>, Error> {
36 let result = self
37 .admin_core()
38 .query_analytics(&self.build_query("breakdown", options))
39 .await?;
40 Ok(extract_list(&result, "breakdown"))
41 }
42
43 pub async fn top_endpoints(&self, options: Option<HashMap<String, String>>) -> Result<Vec<Value>, Error> {
44 let result = self
45 .admin_core()
46 .query_analytics(&self.build_query("topEndpoints", options))
47 .await?;
48 Ok(extract_list(&result, "topItems"))
49 }
50
51 pub async fn track(
52 &self,
53 name: &str,
54 properties: Option<Value>,
55 user_id: Option<&str>,
56 ) -> Result<(), Error> {
57 let mut event = serde_json::Map::new();
58 event.insert("name".to_string(), Value::String(name.to_string()));
59 event.insert("timestamp".to_string(), json!(current_time_millis()));
60 if let Some(props) = properties {
61 event.insert("properties".to_string(), props);
62 }
63 if let Some(user) = user_id {
64 event.insert("userId".to_string(), Value::String(user.to_string()));
65 }
66 self.track_batch(vec![Value::Object(event)]).await
67 }
68
69 pub async fn track_batch(&self, events: Vec<Value>) -> Result<(), Error> {
70 if events.is_empty() {
71 return Ok(());
72 }
73
74 let normalized = events
75 .into_iter()
76 .map(|event| {
77 if let Value::Object(mut map) = event {
78 map.entry("timestamp".to_string())
79 .or_insert_with(|| json!(current_time_millis()));
80 Value::Object(map)
81 } else {
82 event
83 }
84 })
85 .collect::<Vec<_>>();
86
87 self.core()
88 .track_events(&json!({ "events": normalized }))
89 .await?;
90 Ok(())
91 }
92
93 pub async fn query_events(&self, options: Option<HashMap<String, String>>) -> Result<Value, Error> {
94 let params = options.unwrap_or_default();
95 self.admin_core().query_custom_events(¶ms).await
96 }
97
98 fn core(&self) -> GeneratedDbApi<'_> {
99 GeneratedDbApi::new(&self.http)
100 }
101
102 fn admin_core(&self) -> GeneratedAdminApi<'_> {
103 GeneratedAdminApi::new(&self.http)
104 }
105
106 fn build_query(&self, metric: &str, options: Option<HashMap<String, String>>) -> HashMap<String, String> {
107 let mut query = HashMap::from([("metric".to_string(), metric.to_string())]);
108 if let Some(options) = options {
109 query.extend(options);
110 }
111 query
112 }
113}
114
115fn extract_list(result: &Value, field: &str) -> Vec<Value> {
116 result
117 .get(field)
118 .and_then(|value| value.as_array())
119 .cloned()
120 .unwrap_or_default()
121}
122
123fn current_time_millis() -> u128 {
124 SystemTime::now()
125 .duration_since(UNIX_EPOCH)
126 .map(|duration| duration.as_millis())
127 .unwrap_or(0)
128}