use aurora_db::parser::executor::ExecutionResult;
use aurora_db::{Aurora, AuroraConfig};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::Duration;
#[tokio::test]
async fn test_subscription_flow() -> aurora_db::error::Result<()> {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("subscription_test.db");
let config = AuroraConfig {
db_path,
..Default::default()
};
let db = Aurora::with_config(config).await?;
let db = Arc::new(db);
db.execute(
r#"
schema {
define collection messages {
content: String
}
}
"#,
)
.await?;
let sub_result = db
.execute(
r#"
subscription {
messages {
content
}
}
"#,
)
.await?;
let mut stream = if let ExecutionResult::Subscription(res) = sub_result {
res.stream.expect("Stream should be present")
} else {
panic!("Expected Subscription result");
};
let handle = tokio::spawn(async move {
let event = stream.recv().await.expect("Channel closed");
event
});
tokio::time::sleep(Duration::from_millis(50)).await;
let mut vars = HashMap::new();
vars.insert("content".to_string(), json!("Hello World"));
db.execute((
r#"
mutation($content: String) {
insertInto(collection: "messages", data: { content: $content }) {
id
}
}
"#,
json!(vars),
))
.await?;
let event = match tokio::time::timeout(Duration::from_secs(2), handle).await {
Ok(res) => res.unwrap(),
Err(_) => panic!("Timed out waiting for subscription event"),
};
assert_eq!(event.collection, "messages");
assert!(event.document.is_some());
assert_eq!(
event.document.unwrap().data.get("content").unwrap(),
&aurora_db::Value::String("Hello World".to_string())
);
Ok(())
}
#[tokio::test]
async fn test_subscription_filter() -> aurora_db::error::Result<()> {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("subscription_filter_test.db");
let config = AuroraConfig {
db_path,
..Default::default()
};
let db = Aurora::with_config(config).await?;
let db = Arc::new(db);
db.execute(
r#"
schema {
define collection events {
category: String
score: Int
}
}
"#,
)
.await?;
let sub_result = db
.execute(
r#"
subscription {
events(where: { score: { gt: 10 } }) {
category
score
}
}
"#,
)
.await?;
let mut stream = if let ExecutionResult::Subscription(res) = sub_result {
res.stream.expect("Stream should be present")
} else {
panic!("Expected Subscription result");
};
let handle = tokio::spawn(async move {
let mut events = Vec::new();
loop {
match tokio::time::timeout(Duration::from_millis(500), stream.recv()).await {
Ok(Ok(event)) => events.push(event),
_ => break,
}
}
events
});
tokio::time::sleep(Duration::from_millis(50)).await;
db.execute((r#"mutation { insertInto(collection: "events", data: { category: "A", score: 5 }) { id } }"#, json!({}))).await?;
db.execute((r#"mutation { insertInto(collection: "events", data: { category: "B", score: 15 }) { id } }"#, json!({}))).await?;
db.execute((r#"mutation { insertInto(collection: "events", data: { category: "C", score: 10 }) { id } }"#, json!({}))).await?;
let events = handle.await.unwrap();
assert_eq!(events.len(), 1, "Expected exactly 1 matching event");
let event = &events[0];
assert_eq!(
event.document.as_ref().unwrap().data.get("category"),
Some(&aurora_db::Value::String("B".to_string()))
);
Ok(())
}
#[tokio::test]
async fn test_subscription_string_filter() -> aurora_db::error::Result<()> {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("subscription_string_filter_test.db");
let config = AuroraConfig {
db_path,
..Default::default()
};
let db = Aurora::with_config(config).await?;
let db = Arc::new(db);
db.execute(
r#"
schema {
define collection logs {
level: String
msg: String
}
}
"#,
)
.await?;
let sub_result = db
.execute(
r#"
subscription {
logs(where: { msg: { startsWith: "Error" } }) {
level
msg
}
}
"#,
)
.await?;
let mut stream = if let ExecutionResult::Subscription(res) = sub_result {
res.stream.expect("Stream should be present")
} else {
panic!("Expected Subscription result");
};
let handle = tokio::spawn(async move {
let mut events = Vec::new();
loop {
match tokio::time::timeout(Duration::from_millis(500), stream.recv()).await {
Ok(Ok(event)) => events.push(event),
_ => break,
}
}
events
});
tokio::time::sleep(Duration::from_millis(50)).await;
db.execute((r#"mutation { insertInto(collection: "logs", data: { level: "INFO", msg: "Server started" }) { id } }"#, json!({}))).await?;
db.execute((r#"mutation { insertInto(collection: "logs", data: { level: "ERROR", msg: "Error: db connection failed" }) { id } }"#, json!({}))).await?;
db.execute((r#"mutation { insertInto(collection: "logs", data: { level: "CRITICAL", msg: "Error: out of memory" }) { id } }"#, json!({}))).await?;
let events = handle.await.unwrap();
assert_eq!(events.len(), 2, "Expected exactly 2 matching events");
assert_eq!(
events[0].document.as_ref().unwrap().data.get("level"),
Some(&aurora_db::Value::String("ERROR".to_string()))
);
Ok(())
}