allsource_core/infrastructure/query/
eventql.rs1use arrow::{
23 array::{Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray},
24 datatypes::{DataType, Field, Schema, TimeUnit},
25 util::display::ArrayFormatter,
26};
27use datafusion::prelude::*;
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30
31use crate::domain::entities::Event;
32
33#[derive(Debug, Clone, Deserialize)]
35pub struct EventQLRequest {
36 pub query: String,
38 pub tenant_id: Option<String>,
40}
41
42#[derive(Debug, Clone, Serialize)]
44pub struct EventQLRow {
45 pub columns: Vec<serde_json::Value>,
47}
48
49#[derive(Debug, Clone, Serialize)]
51pub struct EventQLResponse {
52 pub columns: Vec<String>,
54 pub rows: Vec<EventQLRow>,
56 pub row_count: usize,
58}
59
60fn events_schema() -> Schema {
62 Schema::new(vec![
63 Field::new("id", DataType::Utf8, false),
64 Field::new("event_type", DataType::Utf8, false),
65 Field::new("entity_id", DataType::Utf8, false),
66 Field::new("tenant_id", DataType::Utf8, false),
67 Field::new("payload", DataType::Utf8, false),
68 Field::new("metadata", DataType::Utf8, true),
69 Field::new(
70 "timestamp",
71 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
72 false,
73 ),
74 Field::new("version", DataType::Int64, false),
75 ])
76}
77
78fn events_to_record_batch(events: &[Event]) -> Result<RecordBatch, arrow::error::ArrowError> {
80 let schema = Arc::new(events_schema());
81
82 let ids: Vec<String> = events.iter().map(|e| e.id.to_string()).collect();
83 let event_types: Vec<String> = events
84 .iter()
85 .map(|e| e.event_type_str().to_string())
86 .collect();
87 let entity_ids: Vec<String> = events
88 .iter()
89 .map(|e| e.entity_id_str().to_string())
90 .collect();
91 let tenant_ids: Vec<String> = events
92 .iter()
93 .map(|e| e.tenant_id_str().to_string())
94 .collect();
95 let payloads: Vec<String> = events.iter().map(|e| e.payload.to_string()).collect();
96 let metadatas: Vec<Option<String>> = events
97 .iter()
98 .map(|e| e.metadata.as_ref().map(|m| m.to_string()))
99 .collect();
100 let timestamps: Vec<i64> = events
101 .iter()
102 .map(|e| e.timestamp.timestamp_micros())
103 .collect();
104 let versions: Vec<i64> = events.iter().map(|e| e.version).collect();
105
106 RecordBatch::try_new(
107 schema,
108 vec![
109 Arc::new(StringArray::from(ids)),
110 Arc::new(StringArray::from(event_types)),
111 Arc::new(StringArray::from(entity_ids)),
112 Arc::new(StringArray::from(tenant_ids)),
113 Arc::new(StringArray::from(payloads)),
114 Arc::new(StringArray::from(metadatas)),
115 Arc::new(TimestampMicrosecondArray::from(timestamps).with_timezone("UTC")),
116 Arc::new(Int64Array::from(versions)),
117 ],
118 )
119}
120
121pub async fn execute_eventql(
123 events: &[Event],
124 request: &EventQLRequest,
125) -> Result<EventQLResponse, String> {
126 let lower = request.query.trim().to_lowercase();
128 if lower.starts_with("insert")
129 || lower.starts_with("update")
130 || lower.starts_with("delete")
131 || lower.starts_with("drop")
132 || lower.starts_with("alter")
133 || lower.starts_with("create")
134 {
135 return Err("EventQL is read-only: only SELECT queries are allowed".to_string());
136 }
137
138 let filtered: Vec<&Event> = if let Some(ref tid) = request.tenant_id {
140 events.iter().filter(|e| e.tenant_id_str() == tid).collect()
141 } else {
142 events.iter().collect()
143 };
144
145 let owned: Vec<Event> = filtered.into_iter().cloned().collect();
147 let batch =
148 events_to_record_batch(&owned).map_err(|e| format!("Arrow conversion error: {e}"))?;
149
150 let ctx = SessionContext::new();
152 ctx.register_batch("events", batch)
153 .map_err(|e| format!("Failed to register events table: {e}"))?;
154
155 let df = ctx
157 .sql(&request.query)
158 .await
159 .map_err(|e| format!("SQL error: {e}"))?;
160
161 let result_batches = df
162 .collect()
163 .await
164 .map_err(|e| format!("Execution error: {e}"))?;
165
166 let columns: Vec<String> = if let Some(first) = result_batches.first() {
168 first
169 .schema()
170 .fields()
171 .iter()
172 .map(|f| f.name().clone())
173 .collect()
174 } else {
175 return Ok(EventQLResponse {
176 columns: vec![],
177 rows: vec![],
178 row_count: 0,
179 });
180 };
181
182 let mut rows = Vec::new();
184 for batch in &result_batches {
185 for row_idx in 0..batch.num_rows() {
186 let mut col_values = Vec::new();
187 for col_idx in 0..batch.num_columns() {
188 let col = batch.column(col_idx);
189 let json_val = if col.is_null(row_idx) {
190 serde_json::Value::Null
191 } else {
192 let formatter = ArrayFormatter::try_new(col.as_ref(), &Default::default());
194 let formatted = match formatter {
195 Ok(fmt) => format!("{}", fmt.value(row_idx)),
196 Err(_) => "null".to_string(),
197 };
198 if formatted == "null" {
199 serde_json::Value::Null
200 } else if let Ok(n) = formatted.parse::<i64>() {
201 serde_json::Value::Number(n.into())
202 } else if let Ok(f) = formatted.parse::<f64>() {
203 serde_json::json!(f)
204 } else {
205 serde_json::Value::String(formatted)
206 }
207 };
208 col_values.push(json_val);
209 }
210 rows.push(EventQLRow {
211 columns: col_values,
212 });
213 }
214 }
215
216 let row_count = rows.len();
217 Ok(EventQLResponse {
218 columns,
219 rows,
220 row_count,
221 })
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use crate::domain::entities::Event;
228
229 fn make_test_events() -> Vec<Event> {
230 vec![
231 Event::from_strings(
232 "user.created".to_string(),
233 "user-1".to_string(),
234 "tenant-a".to_string(),
235 serde_json::json!({"name": "Alice", "age": 30}),
236 Some(serde_json::json!({"source": "web"})),
237 )
238 .unwrap(),
239 Event::from_strings(
240 "user.created".to_string(),
241 "user-2".to_string(),
242 "tenant-a".to_string(),
243 serde_json::json!({"name": "Bob", "age": 25}),
244 None,
245 )
246 .unwrap(),
247 Event::from_strings(
248 "order.placed".to_string(),
249 "order-1".to_string(),
250 "tenant-a".to_string(),
251 serde_json::json!({"total": 99.99}),
252 None,
253 )
254 .unwrap(),
255 Event::from_strings(
256 "user.created".to_string(),
257 "user-3".to_string(),
258 "tenant-b".to_string(),
259 serde_json::json!({"name": "Charlie"}),
260 None,
261 )
262 .unwrap(),
263 ]
264 }
265
266 #[tokio::test]
267 async fn test_select_all() {
268 let events = make_test_events();
269 let req = EventQLRequest {
270 query: "SELECT id, event_type, entity_id FROM events".to_string(),
271 tenant_id: None,
272 };
273 let result = execute_eventql(&events, &req).await.unwrap();
274 assert_eq!(result.row_count, 4);
275 assert_eq!(result.columns, vec!["id", "event_type", "entity_id"]);
276 }
277
278 #[tokio::test]
279 async fn test_where_filter() {
280 let events = make_test_events();
281 let req = EventQLRequest {
282 query: "SELECT entity_id FROM events WHERE event_type = 'order.placed'".to_string(),
283 tenant_id: None,
284 };
285 let result = execute_eventql(&events, &req).await.unwrap();
286 assert_eq!(result.row_count, 1);
287 }
288
289 #[tokio::test]
290 async fn test_group_by_count() {
291 let events = make_test_events();
292 let req = EventQLRequest {
293 query: "SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type ORDER BY cnt DESC".to_string(),
294 tenant_id: None,
295 };
296 let result = execute_eventql(&events, &req).await.unwrap();
297 assert_eq!(result.row_count, 2); assert_eq!(result.columns, vec!["event_type", "cnt"]);
299 }
300
301 #[tokio::test]
302 async fn test_tenant_filter() {
303 let events = make_test_events();
304 let req = EventQLRequest {
305 query: "SELECT * FROM events".to_string(),
306 tenant_id: Some("tenant-b".to_string()),
307 };
308 let result = execute_eventql(&events, &req).await.unwrap();
309 assert_eq!(result.row_count, 1);
310 }
311
312 #[tokio::test]
313 async fn test_limit() {
314 let events = make_test_events();
315 let req = EventQLRequest {
316 query: "SELECT * FROM events LIMIT 2".to_string(),
317 tenant_id: None,
318 };
319 let result = execute_eventql(&events, &req).await.unwrap();
320 assert_eq!(result.row_count, 2);
321 }
322
323 #[tokio::test]
324 async fn test_reject_insert() {
325 let events = make_test_events();
326 let req = EventQLRequest {
327 query: "INSERT INTO events VALUES ('a','b','c','d','{}',NULL,0,1)".to_string(),
328 tenant_id: None,
329 };
330 let result = execute_eventql(&events, &req).await;
331 assert!(result.is_err());
332 assert!(result.unwrap_err().contains("read-only"));
333 }
334
335 #[tokio::test]
336 async fn test_empty_events() {
337 let events: Vec<Event> = vec![];
338 let req = EventQLRequest {
339 query: "SELECT * FROM events".to_string(),
340 tenant_id: None,
341 };
342 let result = execute_eventql(&events, &req).await.unwrap();
343 assert_eq!(result.row_count, 0);
344 }
345
346 #[tokio::test]
347 async fn test_order_by() {
348 let events = make_test_events();
349 let req = EventQLRequest {
350 query: "SELECT entity_id FROM events ORDER BY entity_id ASC".to_string(),
351 tenant_id: None,
352 };
353 let result = execute_eventql(&events, &req).await.unwrap();
354 assert_eq!(result.row_count, 4);
355 }
356
357 #[tokio::test]
358 async fn test_payload_like() {
359 let events = make_test_events();
360 let req = EventQLRequest {
361 query: "SELECT entity_id FROM events WHERE payload LIKE '%Alice%'".to_string(),
362 tenant_id: None,
363 };
364 let result = execute_eventql(&events, &req).await.unwrap();
365 assert_eq!(result.row_count, 1);
366 }
367
368 #[tokio::test]
369 async fn test_reject_drop() {
370 let events = make_test_events();
371 let req = EventQLRequest {
372 query: "DROP TABLE events".to_string(),
373 tenant_id: None,
374 };
375 let result = execute_eventql(&events, &req).await;
376 assert!(result.is_err());
377 }
378}