Skip to main content

allsource_core/infrastructure/query/
eventql.rs

1//! EventQL — SQL-like query language for AllSource events
2//!
3//! Uses Apache DataFusion to execute SQL queries over events materialized
4//! as Arrow RecordBatches. Events are registered as a virtual table named
5//! `events` with columns: id, event_type, entity_id, tenant_id, payload,
6//! metadata, timestamp, version.
7//!
8//! ## Supported syntax
9//!
10//! ```sql
11//! SELECT * FROM events WHERE event_type = 'user.created' LIMIT 10
12//! SELECT entity_id, COUNT(*) as cnt FROM events GROUP BY entity_id ORDER BY cnt DESC
13//! SELECT * FROM events WHERE timestamp > '2026-01-01T00:00:00Z' AND entity_id = 'user-123'
14//! ```
15//!
16//! ## Limitations
17//!
18//! - `payload` and `metadata` are stored as UTF-8 JSON strings (use JSON functions or LIKE for filtering)
19//! - No JOINs across other tables (events is the only registered table)
20//! - Query operates on an in-memory snapshot of events at execution time
21
22use arrow::{
23    array::{Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray},
24    datatypes::{DataType, Field, Schema, TimeUnit},
25};
26use datafusion::prelude::*;
27use serde::{Deserialize, Serialize};
28use std::sync::Arc;
29
30use crate::domain::entities::Event;
31
32/// EventQL query request
33#[derive(Debug, Clone, Deserialize)]
34pub struct EventQLRequest {
35    /// SQL query string (table name must be `events`)
36    pub query: String,
37    /// Optional tenant_id filter applied before SQL execution
38    pub tenant_id: Option<String>,
39}
40
41/// A single row in EventQL results
42#[derive(Debug, Clone, Serialize)]
43pub struct EventQLRow {
44    /// Column values as JSON
45    pub columns: Vec<serde_json::Value>,
46}
47
48/// EventQL query response
49#[derive(Debug, Clone, Serialize)]
50pub struct EventQLResponse {
51    /// Column names from the result schema
52    pub columns: Vec<String>,
53    /// Result rows
54    pub rows: Vec<EventQLRow>,
55    /// Number of rows returned
56    pub row_count: usize,
57}
58
59/// Build the Arrow schema for the events table
60fn events_schema() -> Schema {
61    Schema::new(vec![
62        Field::new("id", DataType::Utf8, false),
63        Field::new("event_type", DataType::Utf8, false),
64        Field::new("entity_id", DataType::Utf8, false),
65        Field::new("tenant_id", DataType::Utf8, false),
66        Field::new("payload", DataType::Utf8, false),
67        Field::new("metadata", DataType::Utf8, true),
68        Field::new(
69            "timestamp",
70            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
71            false,
72        ),
73        Field::new("version", DataType::Int64, false),
74    ])
75}
76
77/// Convert a slice of Events into an Arrow RecordBatch
78fn events_to_record_batch(events: &[Event]) -> Result<RecordBatch, arrow::error::ArrowError> {
79    let schema = Arc::new(events_schema());
80
81    let ids: Vec<String> = events.iter().map(|e| e.id.to_string()).collect();
82    let event_types: Vec<String> = events
83        .iter()
84        .map(|e| e.event_type_str().to_string())
85        .collect();
86    let entity_ids: Vec<String> = events
87        .iter()
88        .map(|e| e.entity_id_str().to_string())
89        .collect();
90    let tenant_ids: Vec<String> = events
91        .iter()
92        .map(|e| e.tenant_id_str().to_string())
93        .collect();
94    let payloads: Vec<String> = events.iter().map(|e| e.payload.to_string()).collect();
95    let metadatas: Vec<Option<String>> = events
96        .iter()
97        .map(|e| e.metadata.as_ref().map(|m| m.to_string()))
98        .collect();
99    let timestamps: Vec<i64> = events
100        .iter()
101        .map(|e| e.timestamp.timestamp_micros())
102        .collect();
103    let versions: Vec<i64> = events.iter().map(|e| e.version).collect();
104
105    RecordBatch::try_new(
106        schema,
107        vec![
108            Arc::new(StringArray::from(ids)),
109            Arc::new(StringArray::from(event_types)),
110            Arc::new(StringArray::from(entity_ids)),
111            Arc::new(StringArray::from(tenant_ids)),
112            Arc::new(StringArray::from(payloads)),
113            Arc::new(StringArray::from(metadatas)),
114            Arc::new(TimestampMicrosecondArray::from(timestamps).with_timezone("UTC")),
115            Arc::new(Int64Array::from(versions)),
116        ],
117    )
118}
119
120/// Execute an EventQL query against a set of events
121pub async fn execute_eventql(
122    events: &[Event],
123    request: &EventQLRequest,
124) -> Result<EventQLResponse, String> {
125    // Validate: reject destructive statements
126    let lower = request.query.trim().to_lowercase();
127    if lower.starts_with("insert")
128        || lower.starts_with("update")
129        || lower.starts_with("delete")
130        || lower.starts_with("drop")
131        || lower.starts_with("alter")
132        || lower.starts_with("create")
133    {
134        return Err("EventQL is read-only: only SELECT queries are allowed".to_string());
135    }
136
137    // Apply tenant filter if specified
138    let filtered: Vec<&Event> = if let Some(ref tid) = request.tenant_id {
139        events.iter().filter(|e| e.tenant_id_str() == tid).collect()
140    } else {
141        events.iter().collect()
142    };
143
144    // Convert to Arrow
145    let owned: Vec<Event> = filtered.into_iter().cloned().collect();
146    let batch =
147        events_to_record_batch(&owned).map_err(|e| format!("Arrow conversion error: {e}"))?;
148
149    // Create DataFusion context and register the events table
150    let ctx = SessionContext::new();
151    ctx.register_batch("events", batch)
152        .map_err(|e| format!("Failed to register events table: {e}"))?;
153
154    // Execute the SQL query
155    let df = ctx
156        .sql(&request.query)
157        .await
158        .map_err(|e| format!("SQL error: {e}"))?;
159
160    let result_batches = df
161        .collect()
162        .await
163        .map_err(|e| format!("Execution error: {e}"))?;
164
165    // Extract column names from the result schema
166    let columns: Vec<String> = if let Some(first) = result_batches.first() {
167        first
168            .schema()
169            .fields()
170            .iter()
171            .map(|f| f.name().clone())
172            .collect()
173    } else {
174        return Ok(EventQLResponse {
175            columns: vec![],
176            rows: vec![],
177            row_count: 0,
178        });
179    };
180
181    // Convert result batches to JSON rows
182    let mut rows = Vec::new();
183    for batch in &result_batches {
184        for row_idx in 0..batch.num_rows() {
185            let mut col_values = Vec::new();
186            for col_idx in 0..batch.num_columns() {
187                let col = batch.column(col_idx);
188                let json_val = if col.is_null(row_idx) {
189                    serde_json::Value::Null
190                } else {
191                    // Use arrow's display formatter for generic value extraction
192                    let formatted = arrow::util::display::array_value_to_string(col, row_idx)
193                        .unwrap_or_else(|_| "null".to_string());
194                    if formatted == "null" {
195                        serde_json::Value::Null
196                    } else if let Ok(n) = formatted.parse::<i64>() {
197                        serde_json::Value::Number(n.into())
198                    } else if let Ok(f) = formatted.parse::<f64>() {
199                        serde_json::json!(f)
200                    } else {
201                        serde_json::Value::String(formatted)
202                    }
203                };
204                col_values.push(json_val);
205            }
206            rows.push(EventQLRow {
207                columns: col_values,
208            });
209        }
210    }
211
212    let row_count = rows.len();
213    Ok(EventQLResponse {
214        columns,
215        rows,
216        row_count,
217    })
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::domain::entities::Event;
224
225    fn make_test_events() -> Vec<Event> {
226        vec![
227            Event::from_strings(
228                "user.created".to_string(),
229                "user-1".to_string(),
230                "tenant-a".to_string(),
231                serde_json::json!({"name": "Alice", "age": 30}),
232                Some(serde_json::json!({"source": "web"})),
233            )
234            .unwrap(),
235            Event::from_strings(
236                "user.created".to_string(),
237                "user-2".to_string(),
238                "tenant-a".to_string(),
239                serde_json::json!({"name": "Bob", "age": 25}),
240                None,
241            )
242            .unwrap(),
243            Event::from_strings(
244                "order.placed".to_string(),
245                "order-1".to_string(),
246                "tenant-a".to_string(),
247                serde_json::json!({"total": 99.99}),
248                None,
249            )
250            .unwrap(),
251            Event::from_strings(
252                "user.created".to_string(),
253                "user-3".to_string(),
254                "tenant-b".to_string(),
255                serde_json::json!({"name": "Charlie"}),
256                None,
257            )
258            .unwrap(),
259        ]
260    }
261
262    #[tokio::test]
263    async fn test_select_all() {
264        let events = make_test_events();
265        let req = EventQLRequest {
266            query: "SELECT id, event_type, entity_id FROM events".to_string(),
267            tenant_id: None,
268        };
269        let result = execute_eventql(&events, &req).await.unwrap();
270        assert_eq!(result.row_count, 4);
271        assert_eq!(result.columns, vec!["id", "event_type", "entity_id"]);
272    }
273
274    #[tokio::test]
275    async fn test_where_filter() {
276        let events = make_test_events();
277        let req = EventQLRequest {
278            query: "SELECT entity_id FROM events WHERE event_type = 'order.placed'".to_string(),
279            tenant_id: None,
280        };
281        let result = execute_eventql(&events, &req).await.unwrap();
282        assert_eq!(result.row_count, 1);
283    }
284
285    #[tokio::test]
286    async fn test_group_by_count() {
287        let events = make_test_events();
288        let req = EventQLRequest {
289            query: "SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type ORDER BY cnt DESC".to_string(),
290            tenant_id: None,
291        };
292        let result = execute_eventql(&events, &req).await.unwrap();
293        assert_eq!(result.row_count, 2); // user.created and order.placed
294        assert_eq!(result.columns, vec!["event_type", "cnt"]);
295    }
296
297    #[tokio::test]
298    async fn test_tenant_filter() {
299        let events = make_test_events();
300        let req = EventQLRequest {
301            query: "SELECT * FROM events".to_string(),
302            tenant_id: Some("tenant-b".to_string()),
303        };
304        let result = execute_eventql(&events, &req).await.unwrap();
305        assert_eq!(result.row_count, 1);
306    }
307
308    #[tokio::test]
309    async fn test_limit() {
310        let events = make_test_events();
311        let req = EventQLRequest {
312            query: "SELECT * FROM events LIMIT 2".to_string(),
313            tenant_id: None,
314        };
315        let result = execute_eventql(&events, &req).await.unwrap();
316        assert_eq!(result.row_count, 2);
317    }
318
319    #[tokio::test]
320    async fn test_reject_insert() {
321        let events = make_test_events();
322        let req = EventQLRequest {
323            query: "INSERT INTO events VALUES ('a','b','c','d','{}',NULL,0,1)".to_string(),
324            tenant_id: None,
325        };
326        let result = execute_eventql(&events, &req).await;
327        assert!(result.is_err());
328        assert!(result.unwrap_err().contains("read-only"));
329    }
330
331    #[tokio::test]
332    async fn test_empty_events() {
333        let events: Vec<Event> = vec![];
334        let req = EventQLRequest {
335            query: "SELECT * FROM events".to_string(),
336            tenant_id: None,
337        };
338        let result = execute_eventql(&events, &req).await.unwrap();
339        assert_eq!(result.row_count, 0);
340    }
341
342    #[tokio::test]
343    async fn test_order_by() {
344        let events = make_test_events();
345        let req = EventQLRequest {
346            query: "SELECT entity_id FROM events ORDER BY entity_id ASC".to_string(),
347            tenant_id: None,
348        };
349        let result = execute_eventql(&events, &req).await.unwrap();
350        assert_eq!(result.row_count, 4);
351    }
352
353    #[tokio::test]
354    async fn test_payload_like() {
355        let events = make_test_events();
356        let req = EventQLRequest {
357            query: "SELECT entity_id FROM events WHERE payload LIKE '%Alice%'".to_string(),
358            tenant_id: None,
359        };
360        let result = execute_eventql(&events, &req).await.unwrap();
361        assert_eq!(result.row_count, 1);
362    }
363
364    #[tokio::test]
365    async fn test_reject_drop() {
366        let events = make_test_events();
367        let req = EventQLRequest {
368            query: "DROP TABLE events".to_string(),
369            tenant_id: None,
370        };
371        let result = execute_eventql(&events, &req).await;
372        assert!(result.is_err());
373    }
374}