Skip to main content

allsource_core/infrastructure/query/
eventql.rs

1//! EventQL — SQL-like query language for AllSource events
2//!
3//! Uses Apache DataFusion to execute SQL queries over events materialized
4//! as Arrow RecordBatches. Events are registered as a virtual table named
5//! `events` with columns: id, event_type, entity_id, tenant_id, payload,
6//! metadata, timestamp, version.
7//!
8//! ## Supported syntax
9//!
10//! ```sql
11//! SELECT * FROM events WHERE event_type = 'user.created' LIMIT 10
12//! SELECT entity_id, COUNT(*) as cnt FROM events GROUP BY entity_id ORDER BY cnt DESC
13//! SELECT * FROM events WHERE timestamp > '2026-01-01T00:00:00Z' AND entity_id = 'user-123'
14//! ```
15//!
16//! ## Limitations
17//!
18//! - `payload` and `metadata` are stored as UTF-8 JSON strings (use JSON functions or LIKE for filtering)
19//! - No JOINs across other tables (events is the only registered table)
20//! - Query operates on an in-memory snapshot of events at execution time
21
22use arrow::{
23    array::{Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray},
24    datatypes::{DataType, Field, Schema, TimeUnit},
25    util::display::ArrayFormatter,
26};
27use datafusion::prelude::*;
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30
31use crate::domain::entities::Event;
32
33/// EventQL query request
34#[derive(Debug, Clone, Deserialize)]
35pub struct EventQLRequest {
36    /// SQL query string (table name must be `events`)
37    pub query: String,
38    /// Optional tenant_id filter applied before SQL execution
39    pub tenant_id: Option<String>,
40}
41
42/// A single row in EventQL results
43#[derive(Debug, Clone, Serialize)]
44pub struct EventQLRow {
45    /// Column values as JSON
46    pub columns: Vec<serde_json::Value>,
47}
48
49/// EventQL query response
50#[derive(Debug, Clone, Serialize)]
51pub struct EventQLResponse {
52    /// Column names from the result schema
53    pub columns: Vec<String>,
54    /// Result rows
55    pub rows: Vec<EventQLRow>,
56    /// Number of rows returned
57    pub row_count: usize,
58}
59
60/// Build the Arrow schema for the events table
61fn events_schema() -> Schema {
62    Schema::new(vec![
63        Field::new("id", DataType::Utf8, false),
64        Field::new("event_type", DataType::Utf8, false),
65        Field::new("entity_id", DataType::Utf8, false),
66        Field::new("tenant_id", DataType::Utf8, false),
67        Field::new("payload", DataType::Utf8, false),
68        Field::new("metadata", DataType::Utf8, true),
69        Field::new(
70            "timestamp",
71            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
72            false,
73        ),
74        Field::new("version", DataType::Int64, false),
75    ])
76}
77
78/// Convert a slice of Events into an Arrow RecordBatch
79fn events_to_record_batch(events: &[Event]) -> Result<RecordBatch, arrow::error::ArrowError> {
80    let schema = Arc::new(events_schema());
81
82    let ids: Vec<String> = events.iter().map(|e| e.id.to_string()).collect();
83    let event_types: Vec<String> = events
84        .iter()
85        .map(|e| e.event_type_str().to_string())
86        .collect();
87    let entity_ids: Vec<String> = events
88        .iter()
89        .map(|e| e.entity_id_str().to_string())
90        .collect();
91    let tenant_ids: Vec<String> = events
92        .iter()
93        .map(|e| e.tenant_id_str().to_string())
94        .collect();
95    let payloads: Vec<String> = events.iter().map(|e| e.payload.to_string()).collect();
96    let metadatas: Vec<Option<String>> = events
97        .iter()
98        .map(|e| e.metadata.as_ref().map(|m| m.to_string()))
99        .collect();
100    let timestamps: Vec<i64> = events
101        .iter()
102        .map(|e| e.timestamp.timestamp_micros())
103        .collect();
104    let versions: Vec<i64> = events.iter().map(|e| e.version).collect();
105
106    RecordBatch::try_new(
107        schema,
108        vec![
109            Arc::new(StringArray::from(ids)),
110            Arc::new(StringArray::from(event_types)),
111            Arc::new(StringArray::from(entity_ids)),
112            Arc::new(StringArray::from(tenant_ids)),
113            Arc::new(StringArray::from(payloads)),
114            Arc::new(StringArray::from(metadatas)),
115            Arc::new(TimestampMicrosecondArray::from(timestamps).with_timezone("UTC")),
116            Arc::new(Int64Array::from(versions)),
117        ],
118    )
119}
120
121/// Execute an EventQL query against a set of events
122pub async fn execute_eventql(
123    events: &[Event],
124    request: &EventQLRequest,
125) -> Result<EventQLResponse, String> {
126    // Validate: reject destructive statements
127    let lower = request.query.trim().to_lowercase();
128    if lower.starts_with("insert")
129        || lower.starts_with("update")
130        || lower.starts_with("delete")
131        || lower.starts_with("drop")
132        || lower.starts_with("alter")
133        || lower.starts_with("create")
134    {
135        return Err("EventQL is read-only: only SELECT queries are allowed".to_string());
136    }
137
138    // Apply tenant filter if specified
139    let filtered: Vec<&Event> = if let Some(ref tid) = request.tenant_id {
140        events.iter().filter(|e| e.tenant_id_str() == tid).collect()
141    } else {
142        events.iter().collect()
143    };
144
145    // Convert to Arrow
146    let owned: Vec<Event> = filtered.into_iter().cloned().collect();
147    let batch =
148        events_to_record_batch(&owned).map_err(|e| format!("Arrow conversion error: {e}"))?;
149
150    // Create DataFusion context and register the events table
151    let ctx = SessionContext::new();
152    ctx.register_batch("events", batch)
153        .map_err(|e| format!("Failed to register events table: {e}"))?;
154
155    // Execute the SQL query
156    let df = ctx
157        .sql(&request.query)
158        .await
159        .map_err(|e| format!("SQL error: {e}"))?;
160
161    let result_batches = df
162        .collect()
163        .await
164        .map_err(|e| format!("Execution error: {e}"))?;
165
166    // Extract column names from the result schema
167    let columns: Vec<String> = if let Some(first) = result_batches.first() {
168        first
169            .schema()
170            .fields()
171            .iter()
172            .map(|f| f.name().clone())
173            .collect()
174    } else {
175        return Ok(EventQLResponse {
176            columns: vec![],
177            rows: vec![],
178            row_count: 0,
179        });
180    };
181
182    // Convert result batches to JSON rows
183    let mut rows = Vec::new();
184    for batch in &result_batches {
185        for row_idx in 0..batch.num_rows() {
186            let mut col_values = Vec::new();
187            for col_idx in 0..batch.num_columns() {
188                let col = batch.column(col_idx);
189                let json_val = if col.is_null(row_idx) {
190                    serde_json::Value::Null
191                } else {
192                    // Use arrow's display formatter for generic value extraction
193                    let formatter = ArrayFormatter::try_new(col.as_ref(), &Default::default());
194                    let formatted = match formatter {
195                        Ok(fmt) => format!("{}", fmt.value(row_idx)),
196                        Err(_) => "null".to_string(),
197                    };
198                    if formatted == "null" {
199                        serde_json::Value::Null
200                    } else if let Ok(n) = formatted.parse::<i64>() {
201                        serde_json::Value::Number(n.into())
202                    } else if let Ok(f) = formatted.parse::<f64>() {
203                        serde_json::json!(f)
204                    } else {
205                        serde_json::Value::String(formatted)
206                    }
207                };
208                col_values.push(json_val);
209            }
210            rows.push(EventQLRow {
211                columns: col_values,
212            });
213        }
214    }
215
216    let row_count = rows.len();
217    Ok(EventQLResponse {
218        columns,
219        rows,
220        row_count,
221    })
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use crate::domain::entities::Event;
228
229    fn make_test_events() -> Vec<Event> {
230        vec![
231            Event::from_strings(
232                "user.created".to_string(),
233                "user-1".to_string(),
234                "tenant-a".to_string(),
235                serde_json::json!({"name": "Alice", "age": 30}),
236                Some(serde_json::json!({"source": "web"})),
237            )
238            .unwrap(),
239            Event::from_strings(
240                "user.created".to_string(),
241                "user-2".to_string(),
242                "tenant-a".to_string(),
243                serde_json::json!({"name": "Bob", "age": 25}),
244                None,
245            )
246            .unwrap(),
247            Event::from_strings(
248                "order.placed".to_string(),
249                "order-1".to_string(),
250                "tenant-a".to_string(),
251                serde_json::json!({"total": 99.99}),
252                None,
253            )
254            .unwrap(),
255            Event::from_strings(
256                "user.created".to_string(),
257                "user-3".to_string(),
258                "tenant-b".to_string(),
259                serde_json::json!({"name": "Charlie"}),
260                None,
261            )
262            .unwrap(),
263        ]
264    }
265
266    #[tokio::test]
267    async fn test_select_all() {
268        let events = make_test_events();
269        let req = EventQLRequest {
270            query: "SELECT id, event_type, entity_id FROM events".to_string(),
271            tenant_id: None,
272        };
273        let result = execute_eventql(&events, &req).await.unwrap();
274        assert_eq!(result.row_count, 4);
275        assert_eq!(result.columns, vec!["id", "event_type", "entity_id"]);
276    }
277
278    #[tokio::test]
279    async fn test_where_filter() {
280        let events = make_test_events();
281        let req = EventQLRequest {
282            query: "SELECT entity_id FROM events WHERE event_type = 'order.placed'".to_string(),
283            tenant_id: None,
284        };
285        let result = execute_eventql(&events, &req).await.unwrap();
286        assert_eq!(result.row_count, 1);
287    }
288
289    #[tokio::test]
290    async fn test_group_by_count() {
291        let events = make_test_events();
292        let req = EventQLRequest {
293            query: "SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type ORDER BY cnt DESC".to_string(),
294            tenant_id: None,
295        };
296        let result = execute_eventql(&events, &req).await.unwrap();
297        assert_eq!(result.row_count, 2); // user.created and order.placed
298        assert_eq!(result.columns, vec!["event_type", "cnt"]);
299    }
300
301    #[tokio::test]
302    async fn test_tenant_filter() {
303        let events = make_test_events();
304        let req = EventQLRequest {
305            query: "SELECT * FROM events".to_string(),
306            tenant_id: Some("tenant-b".to_string()),
307        };
308        let result = execute_eventql(&events, &req).await.unwrap();
309        assert_eq!(result.row_count, 1);
310    }
311
312    #[tokio::test]
313    async fn test_limit() {
314        let events = make_test_events();
315        let req = EventQLRequest {
316            query: "SELECT * FROM events LIMIT 2".to_string(),
317            tenant_id: None,
318        };
319        let result = execute_eventql(&events, &req).await.unwrap();
320        assert_eq!(result.row_count, 2);
321    }
322
323    #[tokio::test]
324    async fn test_reject_insert() {
325        let events = make_test_events();
326        let req = EventQLRequest {
327            query: "INSERT INTO events VALUES ('a','b','c','d','{}',NULL,0,1)".to_string(),
328            tenant_id: None,
329        };
330        let result = execute_eventql(&events, &req).await;
331        assert!(result.is_err());
332        assert!(result.unwrap_err().contains("read-only"));
333    }
334
335    #[tokio::test]
336    async fn test_empty_events() {
337        let events: Vec<Event> = vec![];
338        let req = EventQLRequest {
339            query: "SELECT * FROM events".to_string(),
340            tenant_id: None,
341        };
342        let result = execute_eventql(&events, &req).await.unwrap();
343        assert_eq!(result.row_count, 0);
344    }
345
346    #[tokio::test]
347    async fn test_order_by() {
348        let events = make_test_events();
349        let req = EventQLRequest {
350            query: "SELECT entity_id FROM events ORDER BY entity_id ASC".to_string(),
351            tenant_id: None,
352        };
353        let result = execute_eventql(&events, &req).await.unwrap();
354        assert_eq!(result.row_count, 4);
355    }
356
357    #[tokio::test]
358    async fn test_payload_like() {
359        let events = make_test_events();
360        let req = EventQLRequest {
361            query: "SELECT entity_id FROM events WHERE payload LIKE '%Alice%'".to_string(),
362            tenant_id: None,
363        };
364        let result = execute_eventql(&events, &req).await.unwrap();
365        assert_eq!(result.row_count, 1);
366    }
367
368    #[tokio::test]
369    async fn test_reject_drop() {
370        let events = make_test_events();
371        let req = EventQLRequest {
372            query: "DROP TABLE events".to_string(),
373            tenant_id: None,
374        };
375        let result = execute_eventql(&events, &req).await;
376        assert!(result.is_err());
377    }
378}