allsource_core/infrastructure/query/
eventql.rs1use arrow::{
23 array::{Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray},
24 datatypes::{DataType, Field, Schema, TimeUnit},
25};
26use datafusion::prelude::*;
27use serde::{Deserialize, Serialize};
28use std::sync::Arc;
29
30use crate::domain::entities::Event;
31
32#[derive(Debug, Clone, Deserialize)]
34pub struct EventQLRequest {
35 pub query: String,
37 pub tenant_id: Option<String>,
39}
40
41#[derive(Debug, Clone, Serialize)]
43pub struct EventQLRow {
44 pub columns: Vec<serde_json::Value>,
46}
47
48#[derive(Debug, Clone, Serialize)]
50pub struct EventQLResponse {
51 pub columns: Vec<String>,
53 pub rows: Vec<EventQLRow>,
55 pub row_count: usize,
57}
58
59fn events_schema() -> Schema {
61 Schema::new(vec![
62 Field::new("id", DataType::Utf8, false),
63 Field::new("event_type", DataType::Utf8, false),
64 Field::new("entity_id", DataType::Utf8, false),
65 Field::new("tenant_id", DataType::Utf8, false),
66 Field::new("payload", DataType::Utf8, false),
67 Field::new("metadata", DataType::Utf8, true),
68 Field::new(
69 "timestamp",
70 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
71 false,
72 ),
73 Field::new("version", DataType::Int64, false),
74 ])
75}
76
77fn events_to_record_batch(events: &[Event]) -> Result<RecordBatch, arrow::error::ArrowError> {
79 let schema = Arc::new(events_schema());
80
81 let ids: Vec<String> = events.iter().map(|e| e.id.to_string()).collect();
82 let event_types: Vec<String> = events
83 .iter()
84 .map(|e| e.event_type_str().to_string())
85 .collect();
86 let entity_ids: Vec<String> = events
87 .iter()
88 .map(|e| e.entity_id_str().to_string())
89 .collect();
90 let tenant_ids: Vec<String> = events
91 .iter()
92 .map(|e| e.tenant_id_str().to_string())
93 .collect();
94 let payloads: Vec<String> = events.iter().map(|e| e.payload.to_string()).collect();
95 let metadatas: Vec<Option<String>> = events
96 .iter()
97 .map(|e| e.metadata.as_ref().map(|m| m.to_string()))
98 .collect();
99 let timestamps: Vec<i64> = events
100 .iter()
101 .map(|e| e.timestamp.timestamp_micros())
102 .collect();
103 let versions: Vec<i64> = events.iter().map(|e| e.version).collect();
104
105 RecordBatch::try_new(
106 schema,
107 vec![
108 Arc::new(StringArray::from(ids)),
109 Arc::new(StringArray::from(event_types)),
110 Arc::new(StringArray::from(entity_ids)),
111 Arc::new(StringArray::from(tenant_ids)),
112 Arc::new(StringArray::from(payloads)),
113 Arc::new(StringArray::from(metadatas)),
114 Arc::new(TimestampMicrosecondArray::from(timestamps).with_timezone("UTC")),
115 Arc::new(Int64Array::from(versions)),
116 ],
117 )
118}
119
120pub async fn execute_eventql(
122 events: &[Event],
123 request: &EventQLRequest,
124) -> Result<EventQLResponse, String> {
125 let lower = request.query.trim().to_lowercase();
127 if lower.starts_with("insert")
128 || lower.starts_with("update")
129 || lower.starts_with("delete")
130 || lower.starts_with("drop")
131 || lower.starts_with("alter")
132 || lower.starts_with("create")
133 {
134 return Err("EventQL is read-only: only SELECT queries are allowed".to_string());
135 }
136
137 let filtered: Vec<&Event> = if let Some(ref tid) = request.tenant_id {
139 events.iter().filter(|e| e.tenant_id_str() == tid).collect()
140 } else {
141 events.iter().collect()
142 };
143
144 let owned: Vec<Event> = filtered.into_iter().cloned().collect();
146 let batch =
147 events_to_record_batch(&owned).map_err(|e| format!("Arrow conversion error: {e}"))?;
148
149 let ctx = SessionContext::new();
151 ctx.register_batch("events", batch)
152 .map_err(|e| format!("Failed to register events table: {e}"))?;
153
154 let df = ctx
156 .sql(&request.query)
157 .await
158 .map_err(|e| format!("SQL error: {e}"))?;
159
160 let result_batches = df
161 .collect()
162 .await
163 .map_err(|e| format!("Execution error: {e}"))?;
164
165 let columns: Vec<String> = if let Some(first) = result_batches.first() {
167 first
168 .schema()
169 .fields()
170 .iter()
171 .map(|f| f.name().clone())
172 .collect()
173 } else {
174 return Ok(EventQLResponse {
175 columns: vec![],
176 rows: vec![],
177 row_count: 0,
178 });
179 };
180
181 let mut rows = Vec::new();
183 for batch in &result_batches {
184 for row_idx in 0..batch.num_rows() {
185 let mut col_values = Vec::new();
186 for col_idx in 0..batch.num_columns() {
187 let col = batch.column(col_idx);
188 let json_val = if col.is_null(row_idx) {
189 serde_json::Value::Null
190 } else {
191 let formatted = arrow::util::display::array_value_to_string(col, row_idx)
193 .unwrap_or_else(|_| "null".to_string());
194 if formatted == "null" {
195 serde_json::Value::Null
196 } else if let Ok(n) = formatted.parse::<i64>() {
197 serde_json::Value::Number(n.into())
198 } else if let Ok(f) = formatted.parse::<f64>() {
199 serde_json::json!(f)
200 } else {
201 serde_json::Value::String(formatted)
202 }
203 };
204 col_values.push(json_val);
205 }
206 rows.push(EventQLRow {
207 columns: col_values,
208 });
209 }
210 }
211
212 let row_count = rows.len();
213 Ok(EventQLResponse {
214 columns,
215 rows,
216 row_count,
217 })
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::domain::entities::Event;
224
225 fn make_test_events() -> Vec<Event> {
226 vec![
227 Event::from_strings(
228 "user.created".to_string(),
229 "user-1".to_string(),
230 "tenant-a".to_string(),
231 serde_json::json!({"name": "Alice", "age": 30}),
232 Some(serde_json::json!({"source": "web"})),
233 )
234 .unwrap(),
235 Event::from_strings(
236 "user.created".to_string(),
237 "user-2".to_string(),
238 "tenant-a".to_string(),
239 serde_json::json!({"name": "Bob", "age": 25}),
240 None,
241 )
242 .unwrap(),
243 Event::from_strings(
244 "order.placed".to_string(),
245 "order-1".to_string(),
246 "tenant-a".to_string(),
247 serde_json::json!({"total": 99.99}),
248 None,
249 )
250 .unwrap(),
251 Event::from_strings(
252 "user.created".to_string(),
253 "user-3".to_string(),
254 "tenant-b".to_string(),
255 serde_json::json!({"name": "Charlie"}),
256 None,
257 )
258 .unwrap(),
259 ]
260 }
261
262 #[tokio::test]
263 async fn test_select_all() {
264 let events = make_test_events();
265 let req = EventQLRequest {
266 query: "SELECT id, event_type, entity_id FROM events".to_string(),
267 tenant_id: None,
268 };
269 let result = execute_eventql(&events, &req).await.unwrap();
270 assert_eq!(result.row_count, 4);
271 assert_eq!(result.columns, vec!["id", "event_type", "entity_id"]);
272 }
273
274 #[tokio::test]
275 async fn test_where_filter() {
276 let events = make_test_events();
277 let req = EventQLRequest {
278 query: "SELECT entity_id FROM events WHERE event_type = 'order.placed'".to_string(),
279 tenant_id: None,
280 };
281 let result = execute_eventql(&events, &req).await.unwrap();
282 assert_eq!(result.row_count, 1);
283 }
284
285 #[tokio::test]
286 async fn test_group_by_count() {
287 let events = make_test_events();
288 let req = EventQLRequest {
289 query: "SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type ORDER BY cnt DESC".to_string(),
290 tenant_id: None,
291 };
292 let result = execute_eventql(&events, &req).await.unwrap();
293 assert_eq!(result.row_count, 2); assert_eq!(result.columns, vec!["event_type", "cnt"]);
295 }
296
297 #[tokio::test]
298 async fn test_tenant_filter() {
299 let events = make_test_events();
300 let req = EventQLRequest {
301 query: "SELECT * FROM events".to_string(),
302 tenant_id: Some("tenant-b".to_string()),
303 };
304 let result = execute_eventql(&events, &req).await.unwrap();
305 assert_eq!(result.row_count, 1);
306 }
307
308 #[tokio::test]
309 async fn test_limit() {
310 let events = make_test_events();
311 let req = EventQLRequest {
312 query: "SELECT * FROM events LIMIT 2".to_string(),
313 tenant_id: None,
314 };
315 let result = execute_eventql(&events, &req).await.unwrap();
316 assert_eq!(result.row_count, 2);
317 }
318
319 #[tokio::test]
320 async fn test_reject_insert() {
321 let events = make_test_events();
322 let req = EventQLRequest {
323 query: "INSERT INTO events VALUES ('a','b','c','d','{}',NULL,0,1)".to_string(),
324 tenant_id: None,
325 };
326 let result = execute_eventql(&events, &req).await;
327 assert!(result.is_err());
328 assert!(result.unwrap_err().contains("read-only"));
329 }
330
331 #[tokio::test]
332 async fn test_empty_events() {
333 let events: Vec<Event> = vec![];
334 let req = EventQLRequest {
335 query: "SELECT * FROM events".to_string(),
336 tenant_id: None,
337 };
338 let result = execute_eventql(&events, &req).await.unwrap();
339 assert_eq!(result.row_count, 0);
340 }
341
342 #[tokio::test]
343 async fn test_order_by() {
344 let events = make_test_events();
345 let req = EventQLRequest {
346 query: "SELECT entity_id FROM events ORDER BY entity_id ASC".to_string(),
347 tenant_id: None,
348 };
349 let result = execute_eventql(&events, &req).await.unwrap();
350 assert_eq!(result.row_count, 4);
351 }
352
353 #[tokio::test]
354 async fn test_payload_like() {
355 let events = make_test_events();
356 let req = EventQLRequest {
357 query: "SELECT entity_id FROM events WHERE payload LIKE '%Alice%'".to_string(),
358 tenant_id: None,
359 };
360 let result = execute_eventql(&events, &req).await.unwrap();
361 assert_eq!(result.row_count, 1);
362 }
363
364 #[tokio::test]
365 async fn test_reject_drop() {
366 let events = make_test_events();
367 let req = EventQLRequest {
368 query: "DROP TABLE events".to_string(),
369 tenant_id: None,
370 };
371 let result = execute_eventql(&events, &req).await;
372 assert!(result.is_err());
373 }
374}