use std::sync::Arc;
use rustvello::prelude::*;
use rustvello::trigger_builder::TriggerBuilder;
use rustvello_core::trigger::{TriggerManager, TriggerStore};
use rustvello_mem::trigger::MemTriggerStore;
use rustvello_proto::status::InvocationStatus;
use rustvello_proto::trigger::{ExceptionContext, ResultContext, StatusContext, TriggerLogic};
fn mem_store() -> Arc<dyn TriggerStore> {
Arc::new(MemTriggerStore::new())
}
fn task_id(module: &str, name: &str) -> rustvello_proto::identifiers::TaskId {
rustvello_proto::identifiers::TaskId::new(module, name)
}
#[tokio::test]
async fn builder_register_status_trigger() {
let store = mem_store();
let target = task_id("test", "target_task");
let def = TriggerBuilder::new()
.on_status(
&task_id("test", "source_task"),
&[InvocationStatus::Success],
)
.build_and_register(&target, &store)
.await
.unwrap();
let fetched = store.get_trigger(&def.trigger_id).await.unwrap().unwrap();
assert_eq!(fetched.task_id, target);
assert_eq!(fetched.condition_ids.len(), 1);
}
#[tokio::test]
async fn builder_register_multi_condition_trigger() {
let store = mem_store();
let target = task_id("test", "multi_target");
let def = TriggerBuilder::new()
.on_status(&task_id("test", "source_a"), &[InvocationStatus::Success])
.on_result(&task_id("test", "source_b"))
.with_logic(TriggerLogic::And)
.build_and_register(&target, &store)
.await
.unwrap();
assert_eq!(def.condition_ids.len(), 2);
assert_eq!(def.logic, TriggerLogic::And);
}
#[tokio::test]
async fn status_trigger_fires_on_success() {
let store = mem_store();
let source = task_id("test", "source_task");
let target = task_id("test", "target_task");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_status(&source, &[InvocationStatus::Success])
.build_and_register(&target, &store)
.await
.unwrap();
let ctx = StatusContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
status: InvocationStatus::Success,
arguments: std::collections::BTreeMap::new(),
};
let valid = tm.report_status_change(&ctx).await.unwrap();
assert_eq!(valid.len(), 1);
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert_eq!(to_invoke.len(), 1);
assert_eq!(to_invoke[0].0.task_id, target);
}
#[tokio::test]
async fn status_trigger_does_not_fire_on_wrong_status() {
let store = mem_store();
let source = task_id("test", "source_task");
let target = task_id("test", "target_task");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_status(&source, &[InvocationStatus::Success])
.build_and_register(&target, &store)
.await
.unwrap();
let ctx = StatusContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
status: InvocationStatus::Failed,
arguments: std::collections::BTreeMap::new(),
};
let valid = tm.report_status_change(&ctx).await.unwrap();
assert!(valid.is_empty());
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert!(to_invoke.is_empty());
}
#[tokio::test]
async fn result_trigger_fires() {
let store = mem_store();
let source = task_id("test", "source_task");
let target = task_id("test", "result_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_result(&source)
.build_and_register(&target, &store)
.await
.unwrap();
let ctx = ResultContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
result: serde_json::json!("42"),
arguments: std::collections::BTreeMap::new(),
};
let valid = tm.report_result(&ctx).await.unwrap();
assert_eq!(valid.len(), 1);
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert_eq!(to_invoke.len(), 1);
assert_eq!(to_invoke[0].0.task_id, target);
}
#[tokio::test]
async fn exception_trigger_fires() {
let store = mem_store();
let source = task_id("test", "source_task");
let target = task_id("test", "exception_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_exception(&source, &["TaskExecutionError"])
.build_and_register(&target, &store)
.await
.unwrap();
let ctx = ExceptionContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
error_type: "TaskExecutionError".to_string(),
error_message: "something failed".to_string(),
arguments: std::collections::BTreeMap::new(),
};
let valid = tm.report_failure(&ctx).await.unwrap();
assert_eq!(valid.len(), 1);
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert_eq!(to_invoke.len(), 1);
assert_eq!(to_invoke[0].0.task_id, target);
}
#[tokio::test]
async fn exception_trigger_does_not_fire_wrong_type() {
let store = mem_store();
let source = task_id("test", "source_task");
let target = task_id("test", "exception_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_exception(&source, &["SpecificError"])
.build_and_register(&target, &store)
.await
.unwrap();
let ctx = ExceptionContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
error_type: "DifferentError".to_string(),
error_message: "something failed".to_string(),
arguments: std::collections::BTreeMap::new(),
};
let valid = tm.report_failure(&ctx).await.unwrap();
assert!(valid.is_empty());
}
#[tokio::test]
async fn event_trigger_fires() {
let store = mem_store();
let target = task_id("test", "event_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_event("data_ready")
.build_and_register(&target, &store)
.await
.unwrap();
let _event_id = tm
.emit_event("data_ready", serde_json::json!({"key": "value"}))
.await
.unwrap();
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert_eq!(to_invoke.len(), 1);
assert_eq!(to_invoke[0].0.task_id, target);
}
#[tokio::test]
async fn event_trigger_does_not_fire_wrong_code() {
let store = mem_store();
let target = task_id("test", "event_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_event("data_ready")
.build_and_register(&target, &store)
.await
.unwrap();
let _event_id = tm
.emit_event("other_event", serde_json::json!({}))
.await
.unwrap();
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert!(to_invoke.is_empty());
}
#[tokio::test]
async fn and_trigger_requires_all_conditions() {
let store = mem_store();
let source_a = task_id("test", "source_a");
let source_b = task_id("test", "source_b");
let target = task_id("test", "and_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_status(&source_a, &[InvocationStatus::Success])
.on_result(&source_b)
.with_logic(TriggerLogic::And)
.build_and_register(&target, &store)
.await
.unwrap();
let ctx_a = StatusContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source_a.clone(),
status: InvocationStatus::Success,
arguments: std::collections::BTreeMap::new(),
};
tm.report_status_change(&ctx_a).await.unwrap();
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert!(to_invoke.is_empty());
let ctx_b = ResultContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source_b.clone(),
result: serde_json::json!("ok"),
arguments: std::collections::BTreeMap::new(),
};
tm.report_result(&ctx_b).await.unwrap();
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert_eq!(to_invoke.len(), 1);
assert_eq!(to_invoke[0].0.task_id, target);
}
#[tokio::test]
async fn or_trigger_fires_on_any_condition() {
let store = mem_store();
let source_a = task_id("test", "source_a");
let source_b = task_id("test", "source_b");
let target = task_id("test", "or_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_status(&source_a, &[InvocationStatus::Success])
.on_result(&source_b)
.with_logic(TriggerLogic::Or)
.build_and_register(&target, &store)
.await
.unwrap();
let ctx_a = StatusContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source_a.clone(),
status: InvocationStatus::Success,
arguments: std::collections::BTreeMap::new(),
};
tm.report_status_change(&ctx_a).await.unwrap();
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert_eq!(to_invoke.len(), 1);
assert_eq!(to_invoke[0].0.task_id, target);
}
#[tokio::test]
async fn trigger_run_dedup() {
let store = mem_store();
let source = task_id("test", "source");
let target = task_id("test", "dedup_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_status(&source, &[InvocationStatus::Success])
.build_and_register(&target, &store)
.await
.unwrap();
let ctx = StatusContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
status: InvocationStatus::Success,
arguments: std::collections::BTreeMap::new(),
};
tm.report_status_change(&ctx).await.unwrap();
let first = tm.evaluate_triggers().await.unwrap();
assert_eq!(first.len(), 1);
tm.report_status_change(&ctx).await.unwrap();
let second = tm.evaluate_triggers().await.unwrap();
assert!(second.is_empty(), "same run should be deduped");
}
#[tokio::test]
async fn separate_trigger_fires_independently() {
let store = mem_store();
let source = task_id("test", "source");
let target = task_id("test", "fresh_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_status(&source, &[InvocationStatus::Success])
.build_and_register(&target, &store)
.await
.unwrap();
let ctx1 = StatusContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
status: InvocationStatus::Success,
arguments: std::collections::BTreeMap::new(),
};
tm.report_status_change(&ctx1).await.unwrap();
let first = tm.evaluate_triggers().await.unwrap();
assert_eq!(first.len(), 1, "first event should fire");
store.purge().await.unwrap();
TriggerBuilder::new()
.on_status(&source, &[InvocationStatus::Success])
.build_and_register(&target, &store)
.await
.unwrap();
let ctx2 = StatusContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
status: InvocationStatus::Success,
arguments: std::collections::BTreeMap::new(),
};
tm.report_status_change(&ctx2).await.unwrap();
let second = tm.evaluate_triggers().await.unwrap();
assert_eq!(second.len(), 1, "second event with fresh state should fire");
}
#[test]
fn builder_empty_conditions_error() {
let target = task_id("test", "target");
let result = TriggerBuilder::new().build(&target);
assert!(result.is_err());
}
#[tokio::test]
async fn trigger_with_static_args() {
let store = mem_store();
let source = task_id("test", "source");
let target = task_id("test", "args_target");
let tm = TriggerManager::new(Arc::clone(&store));
TriggerBuilder::new()
.on_status(&source, &[InvocationStatus::Success])
.with_static_args(serde_json::json!({"x": 42}))
.build_and_register(&target, &store)
.await
.unwrap();
let ctx = StatusContext {
invocation_id: rustvello_proto::identifiers::InvocationId::new(),
task_id: source.clone(),
status: InvocationStatus::Success,
arguments: std::collections::BTreeMap::new(),
};
tm.report_status_change(&ctx).await.unwrap();
let to_invoke = tm.evaluate_triggers().await.unwrap();
assert_eq!(to_invoke.len(), 1);
assert_eq!(to_invoke[0].1, serde_json::json!({"x": 42}));
}
#[tokio::test]
async fn app_builder_memory_includes_trigger_manager() {
let app = Rustvello::builder()
.app_id("test")
.memory()
.build()
.await
.unwrap();
assert!(app.trigger_manager().is_some());
}
#[tokio::test]
async fn app_builder_default_has_trigger_manager() {
let app = Rustvello::builder()
.app_id("test")
.memory()
.build()
.await
.unwrap();
assert!(app.trigger_manager().is_some());
}
#[tokio::test]
async fn purge_clears_triggers() {
let store = mem_store();
let source = task_id("test", "source");
let target = task_id("test", "target");
TriggerBuilder::new()
.on_status(&source, &[InvocationStatus::Success])
.build_and_register(&target, &store)
.await
.unwrap();
let conditions = store.get_conditions_for_task(&source).await.unwrap();
assert!(!conditions.is_empty());
store.purge().await.unwrap();
let conditions = store.get_conditions_for_task(&source).await.unwrap();
assert!(conditions.is_empty());
}