use arrow::{
array::{Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray},
datatypes::{DataType, Field, Schema, TimeUnit},
util::display::ArrayFormatter,
};
use datafusion::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::domain::entities::Event;
#[derive(Debug, Clone, Deserialize)]
pub struct EventQLRequest {
pub query: String,
pub tenant_id: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct EventQLRow {
pub columns: Vec<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize)]
pub struct EventQLResponse {
pub columns: Vec<String>,
pub rows: Vec<EventQLRow>,
pub row_count: usize,
}
fn events_schema() -> Schema {
Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("event_type", DataType::Utf8, false),
Field::new("entity_id", DataType::Utf8, false),
Field::new("tenant_id", DataType::Utf8, false),
Field::new("payload", DataType::Utf8, false),
Field::new("metadata", DataType::Utf8, true),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
false,
),
Field::new("version", DataType::Int64, false),
])
}
fn events_to_record_batch(events: &[Event]) -> Result<RecordBatch, arrow::error::ArrowError> {
let schema = Arc::new(events_schema());
let ids: Vec<String> = events.iter().map(|e| e.id.to_string()).collect();
let event_types: Vec<String> = events
.iter()
.map(|e| e.event_type_str().to_string())
.collect();
let entity_ids: Vec<String> = events
.iter()
.map(|e| e.entity_id_str().to_string())
.collect();
let tenant_ids: Vec<String> = events
.iter()
.map(|e| e.tenant_id_str().to_string())
.collect();
let payloads: Vec<String> = events.iter().map(|e| e.payload.to_string()).collect();
let metadatas: Vec<Option<String>> = events
.iter()
.map(|e| e.metadata.as_ref().map(std::string::ToString::to_string))
.collect();
let timestamps: Vec<i64> = events
.iter()
.map(|e| e.timestamp.timestamp_micros())
.collect();
let versions: Vec<i64> = events.iter().map(|e| e.version).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(event_types)),
Arc::new(StringArray::from(entity_ids)),
Arc::new(StringArray::from(tenant_ids)),
Arc::new(StringArray::from(payloads)),
Arc::new(StringArray::from(metadatas)),
Arc::new(TimestampMicrosecondArray::from(timestamps).with_timezone("UTC")),
Arc::new(Int64Array::from(versions)),
],
)
}
pub async fn execute_eventql(
events: &[Event],
request: &EventQLRequest,
) -> Result<EventQLResponse, String> {
let lower = request.query.trim().to_lowercase();
if lower.starts_with("insert")
|| lower.starts_with("update")
|| lower.starts_with("delete")
|| lower.starts_with("drop")
|| lower.starts_with("alter")
|| lower.starts_with("create")
{
return Err("EventQL is read-only: only SELECT queries are allowed".to_string());
}
let filtered: Vec<&Event> = if let Some(ref tid) = request.tenant_id {
events.iter().filter(|e| e.tenant_id_str() == tid).collect()
} else {
events.iter().collect()
};
let owned: Vec<Event> = filtered.into_iter().cloned().collect();
let batch =
events_to_record_batch(&owned).map_err(|e| format!("Arrow conversion error: {e}"))?;
let ctx = SessionContext::new();
ctx.register_batch("events", batch)
.map_err(|e| format!("Failed to register events table: {e}"))?;
let df = ctx
.sql(&request.query)
.await
.map_err(|e| format!("SQL error: {e}"))?;
let result_batches = df
.collect()
.await
.map_err(|e| format!("Execution error: {e}"))?;
let columns: Vec<String> = if let Some(first) = result_batches.first() {
first
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect()
} else {
return Ok(EventQLResponse {
columns: vec![],
rows: vec![],
row_count: 0,
});
};
let mut rows = Vec::new();
for batch in &result_batches {
for row_idx in 0..batch.num_rows() {
let mut col_values = Vec::new();
for col_idx in 0..batch.num_columns() {
let col = batch.column(col_idx);
let json_val = if col.is_null(row_idx) {
serde_json::Value::Null
} else {
let formatter = ArrayFormatter::try_new(col.as_ref(), &Default::default());
let formatted = match formatter {
Ok(fmt) => format!("{}", fmt.value(row_idx)),
Err(_) => "null".to_string(),
};
if formatted == "null" {
serde_json::Value::Null
} else if let Ok(n) = formatted.parse::<i64>() {
serde_json::Value::Number(n.into())
} else if let Ok(f) = formatted.parse::<f64>() {
serde_json::json!(f)
} else {
serde_json::Value::String(formatted)
}
};
col_values.push(json_val);
}
rows.push(EventQLRow {
columns: col_values,
});
}
}
let row_count = rows.len();
Ok(EventQLResponse {
columns,
rows,
row_count,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::entities::Event;
fn make_test_events() -> Vec<Event> {
vec![
Event::from_strings(
"user.created".to_string(),
"user-1".to_string(),
"tenant-a".to_string(),
serde_json::json!({"name": "Alice", "age": 30}),
Some(serde_json::json!({"source": "web"})),
)
.unwrap(),
Event::from_strings(
"user.created".to_string(),
"user-2".to_string(),
"tenant-a".to_string(),
serde_json::json!({"name": "Bob", "age": 25}),
None,
)
.unwrap(),
Event::from_strings(
"order.placed".to_string(),
"order-1".to_string(),
"tenant-a".to_string(),
serde_json::json!({"total": 99.99}),
None,
)
.unwrap(),
Event::from_strings(
"user.created".to_string(),
"user-3".to_string(),
"tenant-b".to_string(),
serde_json::json!({"name": "Charlie"}),
None,
)
.unwrap(),
]
}
#[tokio::test]
async fn test_select_all() {
let events = make_test_events();
let req = EventQLRequest {
query: "SELECT id, event_type, entity_id FROM events".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await.unwrap();
assert_eq!(result.row_count, 4);
assert_eq!(result.columns, vec!["id", "event_type", "entity_id"]);
}
#[tokio::test]
async fn test_where_filter() {
let events = make_test_events();
let req = EventQLRequest {
query: "SELECT entity_id FROM events WHERE event_type = 'order.placed'".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await.unwrap();
assert_eq!(result.row_count, 1);
}
#[tokio::test]
async fn test_group_by_count() {
let events = make_test_events();
let req = EventQLRequest {
query: "SELECT event_type, COUNT(*) as cnt FROM events GROUP BY event_type ORDER BY cnt DESC".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await.unwrap();
assert_eq!(result.row_count, 2); assert_eq!(result.columns, vec!["event_type", "cnt"]);
}
#[tokio::test]
async fn test_tenant_filter() {
let events = make_test_events();
let req = EventQLRequest {
query: "SELECT * FROM events".to_string(),
tenant_id: Some("tenant-b".to_string()),
};
let result = execute_eventql(&events, &req).await.unwrap();
assert_eq!(result.row_count, 1);
}
#[tokio::test]
async fn test_limit() {
let events = make_test_events();
let req = EventQLRequest {
query: "SELECT * FROM events LIMIT 2".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await.unwrap();
assert_eq!(result.row_count, 2);
}
#[tokio::test]
async fn test_reject_insert() {
let events = make_test_events();
let req = EventQLRequest {
query: "INSERT INTO events VALUES ('a','b','c','d','{}',NULL,0,1)".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("read-only"));
}
#[tokio::test]
async fn test_empty_events() {
let events: Vec<Event> = vec![];
let req = EventQLRequest {
query: "SELECT * FROM events".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await.unwrap();
assert_eq!(result.row_count, 0);
}
#[tokio::test]
async fn test_order_by() {
let events = make_test_events();
let req = EventQLRequest {
query: "SELECT entity_id FROM events ORDER BY entity_id ASC".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await.unwrap();
assert_eq!(result.row_count, 4);
}
#[tokio::test]
async fn test_payload_like() {
let events = make_test_events();
let req = EventQLRequest {
query: "SELECT entity_id FROM events WHERE payload LIKE '%Alice%'".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await.unwrap();
assert_eq!(result.row_count, 1);
}
#[tokio::test]
async fn test_reject_drop() {
let events = make_test_events();
let req = EventQLRequest {
query: "DROP TABLE events".to_string(),
tenant_id: None,
};
let result = execute_eventql(&events, &req).await;
assert!(result.is_err());
}
}