#[cfg(test)]
mod thread_safety_tests {
use datalogic_rs::DataLogic;
use serde_json::json;
use std::sync::Arc;
use std::thread;
#[test]
fn test_compiled_logic_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
let engine = DataLogic::new();
let logic = json!({"==": [1, 1]});
let compiled = engine.compile(&logic).unwrap();
assert_send_sync::<datalogic_rs::CompiledLogic>();
let _arc_compiled = compiled;
}
#[test]
fn test_parallel_evaluation_with_threads() {
let engine = Arc::new(DataLogic::new());
let logic = json!({
"if": [
{">": [{"var": "score"}, 90]},
"excellent",
{"if": [
{">": [{"var": "score"}, 70]},
"good",
"needs improvement"
]}
]
});
let compiled = engine.compile(&logic).unwrap();
let test_cases = vec![
(json!({"score": 95}), "excellent"),
(json!({"score": 85}), "good"),
(json!({"score": 65}), "needs improvement"),
(json!({"score": 100}), "excellent"),
(json!({"score": 75}), "good"),
];
let handles: Vec<_> = test_cases
.into_iter()
.map(|(data, expected)| {
let engine = Arc::clone(&engine);
let compiled = Arc::clone(&compiled);
let data = Arc::new(data);
thread::spawn(move || {
let result = engine.evaluate(&compiled, data).unwrap();
assert_eq!(result.as_str().unwrap(), expected);
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_multiple_engines_in_parallel() {
let handles: Vec<_> = (0..5)
.map(|i| {
thread::spawn(move || {
let engine = DataLogic::new();
let logic = json!({"+": [{"var": "a"}, {"var": "b"}]});
let data = json!({"a": i, "b": i * 2});
let data = Arc::new(data);
let compiled = engine.compile(&logic).unwrap();
let result = engine.evaluate(&compiled, data).unwrap();
assert_eq!(result.as_i64().unwrap(), i + i * 2);
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_shared_compiled_logic_across_threads() {
let engine = DataLogic::new();
let logic = json!({
"map": [
{"var": "items"},
{
"*": [
{"var": ""},
2 ]
}
]
});
let compiled = engine.compile(&logic).unwrap();
let datasets = vec![
json!({"items": [1, 2, 3]}),
json!({"items": [4, 5, 6]}),
json!({"items": [7, 8, 9]}),
];
let handles: Vec<_> = datasets
.into_iter()
.enumerate()
.map(|(idx, data)| {
let compiled = Arc::clone(&compiled);
let data = Arc::new(data);
thread::spawn(move || {
let engine = DataLogic::new();
let result = engine.evaluate(&compiled, data).unwrap();
let arr = result.as_array().unwrap();
match idx {
0 => assert_eq!(arr, &vec![json!(2), json!(4), json!(6)]),
1 => assert_eq!(arr, &vec![json!(8), json!(10), json!(12)]),
2 => assert_eq!(arr, &vec![json!(14), json!(16), json!(18)]),
_ => panic!("Unexpected index"),
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
}
#[cfg(test)]
mod async_tests {
use datalogic_rs::DataLogic;
use serde_json::json;
use std::sync::Arc;
#[tokio::test]
async fn test_async_evaluation_with_tokio() {
let engine = Arc::new(DataLogic::new());
let logic = json!({
"filter": [
{"var": "users"},
{">": [{"var": "age"}, 18]}
]
});
let compiled = engine.compile(&logic).unwrap();
let mut tasks = vec![];
for i in 0..5 {
let engine = Arc::clone(&engine);
let compiled = Arc::clone(&compiled);
let task = tokio::spawn(async move {
let data = json!({
"users": [
{"name": format!("User{}", i), "age": 20 + i},
{"name": format!("Kid{}", i), "age": 10 + i},
{"name": format!("Adult{}", i), "age": 30 + i},
]
});
let data = Arc::new(data);
let result = engine.evaluate(&compiled, data).unwrap();
let filtered = result.as_array().unwrap();
assert_eq!(filtered.len(), 2);
filtered.len()
});
tasks.push(task);
}
let results = futures::future::join_all(tasks).await;
for result in results {
assert_eq!(result.unwrap(), 2);
}
}
#[tokio::test]
async fn test_concurrent_evaluation_with_shared_engine() {
let engine = Arc::new(DataLogic::new());
let logics = vec![
json!({"==": [1, 1]}),
json!({"+": [2, 3]}),
json!({"*": [4, 5]}),
json!({">": [10, 5]}),
json!({"and": [true, true]}),
];
let mut tasks = vec![];
for logic in logics {
let engine = Arc::clone(&engine);
let data = Arc::new(json!({}));
let task = tokio::spawn(async move {
let compiled = engine.compile(&logic).unwrap();
engine.evaluate(&compiled, data).unwrap()
});
tasks.push(task);
}
let results = futures::future::join_all(tasks).await;
assert_eq!(results[0].as_ref().unwrap(), &json!(true));
assert_eq!(results[1].as_ref().unwrap(), &json!(5));
assert_eq!(results[2].as_ref().unwrap(), &json!(20));
assert_eq!(results[3].as_ref().unwrap(), &json!(true));
assert_eq!(results[4].as_ref().unwrap(), &json!(true));
}
#[tokio::test]
async fn test_blocking_evaluation_in_spawn_blocking() {
let engine = Arc::new(DataLogic::new());
let logic = json!({
"reduce": [
{"var": "numbers"},
{"+": [{"var": "accumulator"}, {"var": "current"}]},
0
]
});
let compiled = engine.compile(&logic).unwrap();
let handle = tokio::task::spawn_blocking(move || {
let data = json!({
"numbers": (1..=1000).collect::<Vec<i32>>()
});
let data = Arc::new(data);
engine.evaluate(&compiled, data).unwrap()
});
let result = handle.await.unwrap();
assert_eq!(result.as_i64().unwrap(), 500500);
}
}