use std::sync::Arc;
use rustvello::prelude::*;
use rustvello_core::runner::Runner;
#[rustvello::task]
fn int_add(x: i32, y: i32) -> i32 {
x + y
}
#[rustvello::task(max_retries = 2)]
fn int_fail(_n: i32) -> i32 {
panic!("int_fail: boom");
}
#[rustvello::task]
fn echo_big(data: String) -> String {
data
}
fn make_registry<T: Task>(task: T) -> Arc<TaskRegistry> {
Arc::new({
let mut reg = TaskRegistry::new();
reg.register_typed(task).unwrap();
reg
})
}
fn make_runner_for(app: &RustvelloApp, registry: Arc<TaskRegistry>) -> PersistentTokioRunner {
TaskRunner::new(
app.config.app_id.clone(),
app.config.clone(),
app.broker(),
app.orchestrator(),
app.state_backend(),
registry,
None,
)
}
#[tokio::test]
async fn app_id_isolation_independent_execution() {
let mut app_a = RustvelloApp::new(AppConfig::new("app-a"));
let mut app_b = RustvelloApp::new(AppConfig::new("app-b"));
app_a.register(IntAddTask::new()).unwrap();
app_b.register(IntAddTask::new()).unwrap();
let handle_a = app_a
.submit_call(&IntAddTask::new(), IntAddParams { x: 1, y: 2 })
.await
.unwrap();
let handle_b = app_b
.submit_call(&IntAddTask::new(), IntAddParams { x: 10, y: 20 })
.await
.unwrap();
let reg = make_registry(IntAddTask::new());
let runner_a = make_runner_for(&app_a, Arc::clone(®));
let runner_b = make_runner_for(&app_b, Arc::clone(®));
runner_a.run_one().await.unwrap();
runner_b.run_one().await.unwrap();
assert_eq!(handle_a.result().await.unwrap(), 3);
assert_eq!(handle_b.result().await.unwrap(), 30);
}
#[tokio::test]
async fn app_id_isolation_purge_does_not_affect_sibling() {
let mut app_a = RustvelloApp::new(AppConfig::new("purge-a"));
let mut app_b = RustvelloApp::new(AppConfig::new("purge-b"));
app_a.register(IntAddTask::new()).unwrap();
app_b.register(IntAddTask::new()).unwrap();
let _handle_a = app_a
.submit_call(&IntAddTask::new(), IntAddParams { x: 1, y: 1 })
.await
.unwrap();
let handle_b = app_b
.submit_call(&IntAddTask::new(), IntAddParams { x: 5, y: 5 })
.await
.unwrap();
let reg = make_registry(IntAddTask::new());
make_runner_for(&app_a, Arc::clone(®))
.run_one()
.await
.unwrap();
make_runner_for(&app_b, Arc::clone(®))
.run_one()
.await
.unwrap();
app_a.purge().await.unwrap();
let result: i32 = handle_b.result().await.unwrap();
assert_eq!(result, 10);
}
#[tokio::test]
async fn waiter_set_and_release() {
let orch = rustvello_mem::orchestrator::MemOrchestrator::new();
let task_id = TaskId::new("mod", "waiter_task");
let call1 = CallDTO::new(task_id.clone(), SerializedArguments::new());
let call2 = CallDTO::new(task_id.clone(), SerializedArguments::new());
let inv1 = orch.register_invocation(&call1).await.unwrap();
let inv2 = orch.register_invocation(&call2).await.unwrap();
orch.set_waiting_for(&inv1, &inv2).await.unwrap();
let waiters = orch.get_waiters(&inv2).await.unwrap();
assert_eq!(waiters.len(), 1);
assert_eq!(waiters[0], inv1);
let released = orch.release_waiters(&inv2).await.unwrap();
assert_eq!(released.len(), 1);
assert_eq!(released[0], inv1);
let waiters = orch.get_waiters(&inv2).await.unwrap();
assert!(waiters.is_empty());
}
#[tokio::test]
async fn waiter_get_blocking_invocations() {
let orch = rustvello_mem::orchestrator::MemOrchestrator::new();
let task_id = TaskId::new("mod", "blocking_task");
let call1 = CallDTO::new(task_id.clone(), SerializedArguments::new());
let call2 = CallDTO::new(task_id.clone(), SerializedArguments::new());
let call3 = CallDTO::new(task_id.clone(), SerializedArguments::new());
let inv1 = orch.register_invocation(&call1).await.unwrap();
let inv2 = orch.register_invocation(&call2).await.unwrap();
let inv3 = orch.register_invocation(&call3).await.unwrap();
orch.set_waiting_for(&inv1, &inv2).await.unwrap();
orch.set_waiting_for(&inv3, &inv2).await.unwrap();
let blocking = orch.get_blocking_invocations(10).await.unwrap();
assert!(blocking.contains(&inv2));
}
#[tokio::test]
async fn waiter_blocking_max_limit() {
let orch = rustvello_mem::orchestrator::MemOrchestrator::new();
let task_id = TaskId::new("mod", "limit_task");
let mut blockers = Vec::new();
for _ in 0..5 {
let call = CallDTO::new(task_id.clone(), SerializedArguments::new());
let inv = orch.register_invocation(&call).await.unwrap();
blockers.push(inv);
}
for blocker in &blockers {
let call = CallDTO::new(task_id.clone(), SerializedArguments::new());
let waiter = orch.register_invocation(&call).await.unwrap();
orch.set_waiting_for(&waiter, blocker).await.unwrap();
}
let blocking = orch.get_blocking_invocations(3).await.unwrap();
assert!(blocking.len() <= 3);
}
#[tokio::test]
async fn cc_task_registration_blocks_duplicate() {
use rustvello_mem::orchestrator::MemOrchestrator;
let orch = MemOrchestrator::new();
let task_id = TaskId::new("mod", "cc_reg_task");
let mut config = TaskConfig::default();
config.concurrency_control = ConcurrencyControlType::Task;
config.running_concurrency = Some(1);
let call = CallDTO::new(task_id.clone(), SerializedArguments::new());
let inv1 = orch.register_invocation(&call).await.unwrap();
let runner = RunnerId::from_string("runner-1");
orch.set_invocation_status(&inv1, InvocationStatus::Pending, Some(&runner))
.await
.unwrap();
orch.index_for_concurrency_control(&inv1, &task_id, Some(&SerializedArguments::new()))
.await
.unwrap();
let allowed = orch
.check_running_concurrency(&task_id, &config, Some(&SerializedArguments::new()))
.await
.unwrap();
assert!(!allowed);
orch.remove_from_concurrency_index(&inv1).await.unwrap();
let allowed = orch
.check_running_concurrency(&task_id, &config, Some(&SerializedArguments::new()))
.await
.unwrap();
assert!(allowed);
}
#[tokio::test]
async fn cc_argument_blocks_same_args() {
use rustvello_mem::orchestrator::MemOrchestrator;
let orch = MemOrchestrator::new();
let task_id = TaskId::new("mod", "cc_arg_task");
let mut config = TaskConfig::default();
config.concurrency_control = ConcurrencyControlType::Argument;
config.running_concurrency = Some(1);
let mut args1 = SerializedArguments::new();
args1.insert("x", "1");
let mut args2 = SerializedArguments::new();
args2.insert("x", "2");
let call = CallDTO::new(task_id.clone(), args1.clone());
let inv1 = orch.register_invocation(&call).await.unwrap();
let runner = RunnerId::from_string("runner-1");
orch.set_invocation_status(&inv1, InvocationStatus::Pending, Some(&runner))
.await
.unwrap();
orch.index_for_concurrency_control(&inv1, &task_id, Some(&args1))
.await
.unwrap();
let allowed = orch
.check_running_concurrency(&task_id, &config, Some(&args1))
.await
.unwrap();
assert!(!allowed);
let allowed = orch
.check_running_concurrency(&task_id, &config, Some(&args2))
.await
.unwrap();
assert!(allowed);
}
#[tokio::test]
async fn set_pending_atomicity_concurrent() {
use rustvello_mem::orchestrator::MemOrchestrator;
let orch = Arc::new(MemOrchestrator::new());
let task_id = TaskId::new("mod", "atomic_task");
let call = CallDTO::new(task_id.clone(), SerializedArguments::new());
let inv_id = orch.register_invocation(&call).await.unwrap();
let runner1 = RunnerId::from_string("runner-1");
let runner2 = RunnerId::from_string("runner-2");
let orch1 = Arc::clone(&orch);
let orch2 = Arc::clone(&orch);
let inv1 = inv_id.clone();
let inv2 = inv_id.clone();
let r1 = runner1.clone();
let r2 = runner2.clone();
let (result1, result2) = tokio::join!(
async move {
orch1
.set_invocation_status(&inv1, InvocationStatus::Pending, Some(&r1))
.await
},
async move {
orch2
.set_invocation_status(&inv2, InvocationStatus::Pending, Some(&r2))
.await
}
);
let successes = [result1.is_ok(), result2.is_ok()]
.iter()
.filter(|&&s| s)
.count();
assert_eq!(successes, 1, "Exactly one set_pending should succeed");
}
#[tokio::test]
async fn recovery_stale_pending() {
let orch = rustvello_mem::orchestrator::MemOrchestrator::new();
let task_id = TaskId::new("mod", "recovery_task");
let call = CallDTO::new(task_id.clone(), SerializedArguments::new());
let inv_id = orch.register_invocation(&call).await.unwrap();
let dead_runner = RunnerId::from_string("dead-runner");
orch.set_invocation_status(&inv_id, InvocationStatus::Pending, Some(&dead_runner))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let stale = orch.get_stale_pending_invocations(0).await.unwrap();
assert!(!stale.is_empty(), "Should detect stale pending invocation");
assert!(stale.contains(&inv_id));
orch.set_invocation_status(&inv_id, InvocationStatus::PendingRecovery, None)
.await
.unwrap();
orch.set_invocation_status(&inv_id, InvocationStatus::Rerouted, None)
.await
.unwrap();
let status = orch.get_invocation_status(&inv_id).await.unwrap();
assert_eq!(status.status, InvocationStatus::Rerouted);
}
#[tokio::test]
async fn recovery_stale_running() {
let orch = rustvello_mem::orchestrator::MemOrchestrator::new();
let task_id = TaskId::new("mod", "recovery_running_task");
let call = CallDTO::new(task_id.clone(), SerializedArguments::new());
let inv_id = orch.register_invocation(&call).await.unwrap();
let dead_runner = RunnerId::from_string("dead-runner-2");
orch.set_invocation_status(&inv_id, InvocationStatus::Pending, Some(&dead_runner))
.await
.unwrap();
orch.set_invocation_status(&inv_id, InvocationStatus::Running, Some(&dead_runner))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let stale = orch.get_stale_running_invocations(0).await.unwrap();
assert!(!stale.is_empty(), "Should detect stale running invocation");
assert!(stale.contains(&inv_id));
orch.set_invocation_status(&inv_id, InvocationStatus::RunningRecovery, None)
.await
.unwrap();
orch.set_invocation_status(&inv_id, InvocationStatus::Rerouted, None)
.await
.unwrap();
let status = orch.get_invocation_status(&inv_id).await.unwrap();
assert_eq!(status.status, InvocationStatus::Rerouted);
}
#[tokio::test]
async fn large_argument_round_trip() {
let mut app = RustvelloApp::new(AppConfig::new("large-arg"));
app.register(EchoBigTask::new()).unwrap();
let big = "A".repeat(1_000_000);
let handle = app
.submit_call(&EchoBigTask::new(), EchoBigParams { data: big.clone() })
.await
.unwrap();
let runner = make_runner_for(&app, make_registry(EchoBigTask::new()));
runner.run_one().await.unwrap();
let result: String = handle.result().await.unwrap();
assert_eq!(result.len(), big.len());
assert_eq!(result, big);
}