Skip to main content

moltendb_core/
analytics.rs

1#![allow(dead_code)]
2// ─── analytics.rs ─────────────────────────────────────────────────────────────
3// This file implements the analytics query engine used by the WASM dashboard.
4//
5// What is an "analytics query"?
6//   A regular database query returns raw documents (rows of data).
7//   An analytics query returns a single computed number — a metric — derived
8//   from many documents. Examples:
9//     - "How many events happened in the last minute?" → COUNT
10//     - "What is the average order value?" → AVG
11//     - "What is the highest score?" → MAX
12//
13// How it's used:
14//   The analytics dashboard (analytics_dashboard.html) calls:
15//     client.startAutoRefresh(queries, 500)
16//   Every 500ms, the worker calls:
17//     db.analytics(JSON.stringify(query))
18//   which calls execute_query() here and returns a JSON string with the result.
19//
20// Data flow:
21//   analytics_dashboard.html
22//     → analytics-worker.js (db.analytics)
23//       → worker.rs (WorkerDb::analytics)
24//         → analytics::execute_query()
25//           → db.get_all() → filter → compute metric → return AnalyticsResult
26// ─────────────────────────────────────────────────────────────────────────────
27
28// Deserialize = automatically parse JSON into this struct.
29// Serialize   = automatically convert this struct to JSON.
30use serde::{Deserialize, Serialize};
31// Value = a generic JSON value (can be string, number, object, array, null).
32// json! = a macro that creates a Value from a JSON literal.
33use serde_json::{Value, json};
34// Db = the main database handle (holds in-memory state + storage backend).
35// query = the query module with helper functions like evaluate_where and get_nested_value.
36use crate::{engine::Db, engine::DbError, query};
37
38// ─── AnalyticsQuery ───────────────────────────────────────────────────────────
39
40/// The full analytics query sent from the dashboard.
41///
42/// Example JSON (what the dashboard sends):
43/// ```json
44/// {
45///   "collection": "events",
46///   "metric": { "type": "COUNT" },
47///   "where": { "event_type": "button_click" }
48/// }
49/// ```
50///
51/// The `#[derive(Debug, Deserialize)]` annotation tells Rust to:
52///   - `Debug`: allow printing this struct with `{:?}` for debugging.
53///   - `Deserialize`: automatically parse it from JSON using serde.
54#[derive(Debug, Deserialize)]
55pub struct AnalyticsQuery {
56    /// The name of the collection to query (e.g. "events", "users").
57    pub collection: String,
58
59    /// Which metric to compute (COUNT, SUM, AVG, MIN, MAX).
60    pub metric: MetricSpec,
61
62    /// Optional filter — only documents matching this WHERE clause are included.
63    /// The `#[serde(rename = "where")]` tells serde to look for the JSON key
64    /// "where" (not "filter") when deserializing. "where" is a reserved keyword
65    /// in Rust, so we can't use it as a field name directly.
66    #[serde(rename = "where")]
67    pub filter: Option<Value>,
68}
69
70// ─── MetricSpec ───────────────────────────────────────────────────────────────
71
72/// Which metric to compute and (for aggregations) which field to aggregate over.
73///
74/// This is a "tagged enum" — serde uses the "type" JSON key to decide which
75/// variant to deserialize into. For example:
76///   `{ "type": "COUNT" }` → MetricSpec::Count
77///   `{ "type": "SUM", "field": "price" }` → MetricSpec::Sum { field: "price" }
78///
79/// `#[serde(tag = "type")]` tells serde to use the "type" field as the discriminant.
80#[derive(Debug, Deserialize)]
81#[serde(tag = "type")]
82pub enum MetricSpec {
83    /// Count the number of matching documents. No field needed.
84    /// JSON: `{ "type": "COUNT" }`
85    #[serde(rename = "COUNT")]
86    Count,
87
88    /// Sum the numeric values of a field across all matching documents.
89    /// JSON: `{ "type": "SUM", "field": "price" }`
90    #[serde(rename = "SUM")]
91    Sum { field: String },
92
93    /// Compute the arithmetic mean (average) of a field.
94    /// JSON: `{ "type": "AVG", "field": "score" }`
95    #[serde(rename = "AVG")]
96    Avg { field: String },
97
98    /// Find the smallest value of a field across all matching documents.
99    /// JSON: `{ "type": "MIN", "field": "age" }`
100    #[serde(rename = "MIN")]
101    Min { field: String },
102
103    /// Find the largest value of a field across all matching documents.
104    /// JSON: `{ "type": "MAX", "field": "age" }`
105    #[serde(rename = "MAX")]
106    Max { field: String },
107}
108
109// ─── AnalyticsResult ──────────────────────────────────────────────────────────
110
111/// The result returned to the dashboard after executing an analytics query.
112///
113/// `result` is the computed metric value (a number, or null if no data).
114/// `metadata` contains performance information shown in the dashboard UI.
115///
116/// `#[derive(Serialize)]` lets this struct be converted to JSON automatically.
117#[derive(Debug, Serialize)]
118pub struct AnalyticsResult {
119    /// The computed metric value.
120    /// For COUNT: a JSON integer (e.g. 42).
121    /// For SUM/AVG/MIN/MAX: a JSON float (e.g. 3.14), or null if no documents matched.
122    pub result: Value,
123
124    /// Performance metadata — how long the query took and how many rows were scanned.
125    pub metadata: ResultMetadata,
126}
127
128/// Performance metadata attached to every analytics result.
129///
130/// The dashboard uses `execution_time_ms` to display the "Avg Query Latency" counter.
131/// `rows_scanned` is useful for understanding query cost (before indexes kick in).
132#[derive(Debug, Serialize)]
133pub struct ResultMetadata {
134    /// Wall-clock time from query start to result ready, in milliseconds.
135    /// Measured using `web_time::Instant` which works in both native and WASM.
136    pub execution_time_ms: u64,
137
138    /// Number of documents that passed the WHERE filter and were included in
139    /// the metric computation. For COUNT this equals the result; for others
140    /// it's the number of documents whose field value was extracted.
141    pub rows_scanned: usize,
142}
143
144// ─── execute_query ────────────────────────────────────────────────────────────
145
146/// Execute an analytics query against the database and return the result.
147///
148/// Steps:
149///   1. Record the start time (for execution_time_ms).
150///   2. Fetch all documents from the requested collection (O(n) copy).
151///   3. Filter documents using the WHERE clause (if provided).
152///   4. Compute the requested metric over the filtered documents.
153///   5. Return the result with timing metadata.
154///
155/// # Arguments
156/// * `db`    — The database handle. Used to call `db.get_all(collection)`.
157/// * `query` — The parsed analytics query (collection + metric + optional filter).
158pub fn execute_query(db: &Db, query: &AnalyticsQuery) -> Result<AnalyticsResult, DbError> {
159    // Record the start time. `web_time::Instant` is used instead of
160    // `std::time::Instant` because `std::time` is not available in WASM.
161    let start = web_time::Instant::now();
162
163    // Fetch all documents in the collection as a HashMap<String, Value>.
164    // This is an O(n) operation — it copies every document out of the DashMap.
165    // In the hybrid Bitcask model, this may involve many disk reads if documents are Cold.
166    let all_docs = db.get_all(&query.collection);
167
168    // Apply the WHERE filter (if any) to narrow down the documents.
169    // `filter()` is a lazy iterator adapter — it doesn't allocate until `.collect()`.
170    // `evaluate_where(doc, filter)` returns true if the document matches all conditions.
171    let mut filtered_docs = Vec::new();
172    for doc in all_docs.values() {
173        if let Some(filter) = &query.filter {
174            if query::evaluate_where(doc, filter)? {
175                filtered_docs.push(doc);
176            }
177        } else {
178            filtered_docs.push(doc);
179        }
180    }
181
182    // Record how many documents passed the filter (used in metadata).
183    let rows_scanned = filtered_docs.len();
184
185    // Compute the requested metric over the filtered documents.
186    // Each arm of the match returns a serde_json::Value (a JSON number or null).
187    let result = match &query.metric {
188        // COUNT: just return the number of matching documents.
189        MetricSpec::Count => {
190            json!(filtered_docs.len())
191        }
192
193        // SUM: extract the numeric field from each document and add them up.
194        // `filter_map` skips documents where the field is missing or non-numeric.
195        // `.sum()` works on f64 iterators — returns 0.0 if the iterator is empty.
196        MetricSpec::Sum { field } => {
197            let sum: f64 = filtered_docs
198                .iter()
199                .filter_map(|doc| extract_number(doc, field))
200                .sum();
201            json!(sum)
202        }
203
204        // AVG: collect all numeric values, then divide sum by count.
205        // Returns null (Value::Null) if no documents have the field.
206        MetricSpec::Avg { field } => {
207            let values: Vec<f64> = filtered_docs
208                .iter()
209                .filter_map(|doc| extract_number(doc, field))
210                .collect();
211
212            if values.is_empty() {
213                // No numeric values found — return null instead of NaN or 0.
214                Value::Null
215            } else {
216                // `.iter().sum::<f64>()` sums the Vec<f64>.
217                // `values.len() as f64` converts usize to f64 for division.
218                let avg = values.iter().sum::<f64>() / values.len() as f64;
219                json!(avg)
220            }
221        }
222
223        // MIN: find the smallest numeric value.
224        // `min_by` requires a comparator because f64 doesn't implement Ord
225        // (due to NaN — NaN is not comparable). `partial_cmp` handles this
226        // by returning None for NaN, and `unwrap()` is safe here because
227        // extract_number() never returns NaN.
228        MetricSpec::Min { field } => {
229            let min = filtered_docs
230                .iter()
231                .filter_map(|doc| extract_number(doc, field))
232                .min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
233
234            // `map(|v| json!(v))` converts Some(f64) to Some(Value::Number).
235            // `.unwrap_or(Value::Null)` returns null if no documents had the field.
236            min.map(|v| json!(v)).unwrap_or(Value::Null)
237        }
238
239        // MAX: find the largest numeric value. Same logic as MIN but reversed.
240        MetricSpec::Max { field } => {
241            let max = filtered_docs
242                .iter()
243                .filter_map(|doc| extract_number(doc, field))
244                .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
245
246            max.map(|v| json!(v)).unwrap_or(Value::Null)
247        }
248    };
249
250    // Measure how long the query took from start to now.
251    // `.as_millis()` returns u128; we cast to u64 (safe for any realistic query time).
252    let execution_time_ms = start.elapsed().as_millis() as u64;
253
254    Ok(AnalyticsResult {
255        result,
256        metadata: ResultMetadata {
257            execution_time_ms,
258            rows_scanned,
259        },
260    })
261}
262
263// ─── extract_number ───────────────────────────────────────────────────────────
264
265/// Extract a numeric value from a document, supporting nested dot-notation paths.
266///
267/// Examples:
268///   `extract_number(&doc, "price")`         → reads doc["price"]
269///   `extract_number(&doc, "meta.discount")` → reads doc["meta"]["discount"]
270///
271/// Returns `None` if:
272///   - The field doesn't exist in the document.
273///   - The field exists but is not a number (e.g. it's a string or null).
274///
275/// Tries three numeric types in order:
276///   1. f64 (floating point) — covers most JSON numbers.
277///   2. i64 (signed integer) — for integers that don't fit in f64 exactly.
278///   3. u64 (unsigned integer) — for very large positive integers.
279fn extract_number(doc: &Value, field: &str) -> Option<f64> {
280    // Split "meta.discount" into ["meta", "discount"] for nested traversal.
281    let parts: Vec<&str> = field.split('.').collect();
282
283    // `get_nested_value` traverses the document following the path parts.
284    // Returns None if any part of the path doesn't exist.
285    let value = query::get_nested_value(doc, &parts)?;
286
287    // Try to interpret the JSON value as a number.
288    // `or_else` chains fallbacks — if as_f64() returns None, try as_i64(), etc.
289    value.as_f64()
290        .or_else(|| value.as_i64().map(|i| i as f64))
291        .or_else(|| value.as_u64().map(|u| u as f64))
292}
293
294// ─── Tests ────────────────────────────────────────────────────────────────────
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use serde_json::json;
300
301    /// Verify that extract_number correctly reads top-level and nested numeric fields,
302    /// and returns None for missing fields.
303    #[test]
304    fn test_extract_number() {
305        let doc = json!({
306            "price": 99.99,
307            "quantity": 5,
308            "meta": {
309                "discount": 10.5
310            }
311        });
312
313        // Top-level float field.
314        assert_eq!(extract_number(&doc, "price"), Some(99.99));
315        // Top-level integer field — returned as f64.
316        assert_eq!(extract_number(&doc, "quantity"), Some(5.0));
317        // Nested field via dot notation.
318        assert_eq!(extract_number(&doc, "meta.discount"), Some(10.5));
319        // Missing field — should return None, not panic.
320        assert_eq!(extract_number(&doc, "nonexistent"), None);
321    }
322}