use super::*;
#[derive(Clone, Debug)]
pub struct RecordingEffectHostRecord {
pub runtime_scope: RuntimeScope,
pub execution_scope: ExecutionScope,
pub effect_id: String,
pub effect_kind: RuntimeEffectKind,
pub replay_key: Option<String>,
pub envelope_hash: String,
}
#[derive(Clone)]
struct RecordingEffectHostController {
execution_scope: ExecutionScope,
records: Arc<Mutex<Vec<RecordingEffectHostRecord>>>,
}
#[async_trait::async_trait]
impl RuntimeEffectController for RecordingEffectHostController {
async fn execute_effect(
&self,
envelope: RuntimeEffectEnvelope,
_local_executor: RuntimeEffectLocalExecutor<'_>,
) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
let envelope_hash = envelope.stable_hash()?;
self.records
.lock()
.expect("effect host records")
.push(RecordingEffectHostRecord {
runtime_scope: envelope.invocation.scope.clone(),
execution_scope: self.execution_scope.clone(),
effect_id: envelope
.invocation
.effect_id()
.expect("effect invocation")
.to_string(),
effect_kind: envelope.command.kind(),
replay_key: envelope.invocation.replay_key().map(ToOwned::to_owned),
envelope_hash,
});
match envelope.command {
RuntimeEffectCommand::Sleep { .. } => Ok(RuntimeEffectOutcome::Sleep),
RuntimeEffectCommand::DurableStep { input, .. } => {
Ok(RuntimeEffectOutcome::DurableStep { value: input })
}
command => Err(RuntimeEffectControllerError::new(
"recording_effect_host_unsupported_command",
format!(
"recording effect host cannot synthesize {} outcomes",
command.kind().as_str()
),
)),
}
}
}
#[derive(Clone, Default)]
pub struct RecordingEffectHost {
selected_scopes: Arc<Mutex<Vec<ExecutionScope>>>,
records: Arc<Mutex<Vec<RecordingEffectHostRecord>>>,
}
impl RecordingEffectHost {
pub fn selected_scopes(&self) -> Vec<ExecutionScope> {
self.selected_scopes
.lock()
.expect("selected execution scopes")
.clone()
}
pub fn records(&self) -> Vec<RecordingEffectHostRecord> {
self.records.lock().expect("effect host records").clone()
}
fn scoped_for<'run>(
&self,
scope: ExecutionScope,
) -> Result<ScopedEffectController<'run>, crate::RuntimeError> {
self.selected_scopes
.lock()
.expect("selected execution scopes")
.push(scope.clone());
ScopedEffectController::shared(
Arc::new(RecordingEffectHostController {
execution_scope: scope.clone(),
records: Arc::clone(&self.records),
}),
scope,
)
}
}
impl EffectHost for RecordingEffectHost {
fn scoped<'run>(
&'run self,
scope: ExecutionScope,
) -> Result<ScopedEffectController<'run>, crate::RuntimeError> {
self.scoped_for(scope)
}
fn scoped_static(
&self,
scope: ExecutionScope,
) -> Result<Option<ScopedEffectController<'static>>, crate::RuntimeError> {
Ok(Some(self.scoped_for(scope)?))
}
}
pub async fn effect_host<F>(make: F)
where
F: Fn() -> Arc<dyn EffectHost>,
{
effect_host_preserves_scope_metadata(make()).await;
effect_host_rejects_missing_scope_ids(make()).await;
effect_host_static_scope_preserves_metadata_when_available(make()).await;
}
pub async fn effect_host_await_events<F>(make: F)
where
F: Fn() -> Arc<dyn EffectHost>,
{
effect_host_await_event_key_is_stable(make()).await;
effect_host_await_event_accepts_early_resolution(make()).await;
effect_host_await_event_duplicate_resolution_is_terminal(make()).await;
effect_host_await_event_cancel_and_timeout_are_terminal(make()).await;
effect_host_await_event_revokes_session_scope(make()).await;
effect_host_await_event_rejects_tampered_keys(make()).await;
}
#[cfg(any(test, feature = "testing"))]
pub async fn effect_host_durable_steps<F>(make: F)
where
F: Fn() -> Arc<dyn EffectHost>,
{
let host = make();
assert!(
host.supports_durable_effects(),
"durable-step conformance requires a host that advertises durable effects"
);
let scoped = host
.scoped(ExecutionScope::turn(
"durable-step-session",
"durable-step-turn",
))
.expect("durable-step scope");
effect_controller_durable_steps(scoped.controller()).await;
}
#[cfg(any(test, feature = "testing"))]
pub async fn effect_controller_durable_steps(controller: &dyn RuntimeEffectController) {
assert!(
controller.supports_durable_effects(),
"durable-step conformance requires a controller that advertises durable effects"
);
let first = durable_step_conformance_envelope("create", serde_json::json!({ "x": 1 }));
let second = durable_step_conformance_envelope("create", serde_json::json!({ "x": 1 }));
assert_eq!(
first.stable_hash().expect("first hash"),
second.stable_hash().expect("second hash"),
"same step id/input/replay key must hash stably"
);
assert!(
first.invocation.replay_key().is_some(),
"durable-step envelope must carry replay.key"
);
let changed = durable_step_conformance_envelope("create", serde_json::json!({ "x": 2 }));
assert_ne!(
first.stable_hash().expect("first hash"),
changed.stable_hash().expect("changed hash"),
"durable-step input must participate in the stable envelope hash"
);
let outcome = controller
.execute_effect(
first,
RuntimeEffectLocalExecutor::durable_step(|input| async move {
Ok(serde_json::json!({ "recorded": input }))
}),
)
.await
.expect("durable-step success");
assert_eq!(
outcome.into_durable_step().expect("durable-step outcome"),
serde_json::json!({ "recorded": { "x": 1 } })
);
let err = controller
.execute_effect(
durable_step_conformance_envelope("error", serde_json::json!({ "x": 1 })),
RuntimeEffectLocalExecutor::durable_step(|_| async {
Err(crate::RuntimeError::new(
"durable_step_conformance_error",
"durable step failed",
))
}),
)
.await
.expect_err("durable-step error must be recorded as controller error");
assert_eq!(err.code, "durable_step_conformance_error");
}
#[cfg(any(test, feature = "testing"))]
pub async fn effect_controller_durable_steps_replay(
controller: &dyn RuntimeEffectController,
start_replay: impl FnOnce(),
) {
effect_controller_durable_steps(controller).await;
let success =
durable_step_conformance_envelope("replay-success", serde_json::json!({ "n": 1 }));
let error = durable_step_conformance_envelope("replay-error", serde_json::json!({ "n": 2 }));
let first_success = controller
.execute_effect(
success.clone(),
RuntimeEffectLocalExecutor::durable_step(|input| async move {
Ok(serde_json::json!({ "first": input }))
}),
)
.await
.expect("first durable-step success");
assert_eq!(
first_success
.into_durable_step()
.expect("first durable-step value"),
serde_json::json!({ "first": { "n": 1 } })
);
let first_error = controller
.execute_effect(
error.clone(),
RuntimeEffectLocalExecutor::durable_step(|_| async {
Err(crate::RuntimeError::new(
"durable_step_replay_error",
"recorded error",
))
}),
)
.await
.expect_err("first durable-step error");
assert_eq!(first_error.code, "durable_step_replay_error");
start_replay();
let local_calls = Arc::new(Mutex::new(Vec::new()));
let replay_success = controller
.execute_effect(
success,
durable_step_conformance_failing_executor(Arc::clone(&local_calls)),
)
.await
.expect("replayed durable-step success");
assert_eq!(
replay_success
.into_durable_step()
.expect("replayed durable-step value"),
serde_json::json!({ "first": { "n": 1 } })
);
let replay_error = controller
.execute_effect(
error,
durable_step_conformance_failing_executor(Arc::clone(&local_calls)),
)
.await
.expect_err("replayed durable-step error");
assert_eq!(replay_error.code, "durable_step_replay_error");
assert!(
local_calls.lock().expect("local calls").is_empty(),
"durable-step replay must not invoke local closures"
);
}
async fn effect_host_preserves_scope_metadata(host: Arc<dyn EffectHost>) {
let scope = ExecutionScope::queue_drain("session-1", "drain-1");
let scoped = host.scoped(scope.clone()).expect("queue drain scope");
assert_eq!(
scoped.execution_scope(),
&scope,
"scoped controller must retain the selected semantic scope"
);
assert_eq!(scoped.scope_id(), "drain-1");
assert_eq!(scoped.turn_id(), None);
let turn_scope = ExecutionScope::turn("session-1", "turn-1");
let scoped_turn = host.scoped(turn_scope.clone()).expect("turn scope");
assert_eq!(scoped_turn.execution_scope(), &turn_scope);
assert_eq!(scoped_turn.scope_id(), "turn-1");
assert_eq!(scoped_turn.turn_id(), Some("turn-1"));
}
async fn effect_host_rejects_missing_scope_ids(host: Arc<dyn EffectHost>) {
let invalid_scopes = [
ExecutionScope::turn("", "turn"),
ExecutionScope::turn("session", ""),
ExecutionScope::process(""),
ExecutionScope::queue_drain("session", ""),
ExecutionScope::session_delete(""),
ExecutionScope::runtime_operation(""),
];
for scope in invalid_scopes {
let err = match host.scoped(scope) {
Ok(_) => panic!("invalid execution scope must be rejected"),
Err(err) => err,
};
assert_eq!(
err.code,
crate::RuntimeErrorCode::MissingExecutionScopeId,
"invalid scope ids must fail with the stable missing-scope code"
);
}
}
async fn effect_host_static_scope_preserves_metadata_when_available(host: Arc<dyn EffectHost>) {
let scope = ExecutionScope::runtime_operation("static-runtime-op");
let Some(scoped) = host
.scoped_static(scope.clone())
.expect("static scope factory")
else {
return;
};
assert_eq!(scoped.execution_scope(), &scope);
assert_eq!(scoped.scope_id(), "static-runtime-op");
}
async fn effect_host_await_event_key_is_stable(host: Arc<dyn EffectHost>) {
let scope = ExecutionScope::turn("await-event-session-stable", "turn-stable");
let wait = AwaitEventWaitIdentity::tool_completion("call-stable");
let first = host
.await_event_key(&scope, wait.clone())
.await
.expect("first await-event key");
let second = host
.await_event_key(&scope, wait)
.await
.expect("second await-event key");
assert_eq!(first, second);
}
async fn effect_host_await_event_accepts_early_resolution(host: Arc<dyn EffectHost>) {
let scope = ExecutionScope::turn("await-event-session-early", "turn-early");
let key = host
.await_event_key(
&scope,
AwaitEventWaitIdentity::tool_completion("call-early"),
)
.await
.expect("await-event key");
let resolution = Resolution::Ok(serde_json::json!({ "ready": true }));
assert_eq!(
host.resolve_await_event(&key, resolution.clone())
.await
.expect("early resolve"),
ResolveOutcome::Accepted
);
let awaited = host
.await_await_event(&key, tokio_util::sync::CancellationToken::new(), None)
.await
.expect("await early-resolved event");
assert_eq!(awaited, resolution);
}
async fn effect_host_await_event_duplicate_resolution_is_terminal(host: Arc<dyn EffectHost>) {
let scope = ExecutionScope::turn("await-event-session-dupe", "turn-dupe");
let key = host
.await_event_key(&scope, AwaitEventWaitIdentity::tool_completion("call-dupe"))
.await
.expect("await-event key");
let resolution = Resolution::Ok(serde_json::json!("first"));
let first = host
.resolve_await_event(&key, resolution.clone())
.await
.expect("first resolve");
let second = host
.resolve_await_event(&key, Resolution::Ok(serde_json::json!("second")))
.await
.expect("duplicate resolve");
assert_eq!(first, ResolveOutcome::Accepted);
assert_eq!(
second,
ResolveOutcome::AlreadyResolved {
terminal: resolution
}
);
}
async fn effect_host_await_event_cancel_and_timeout_are_terminal(host: Arc<dyn EffectHost>) {
let cancel_scope = ExecutionScope::turn("await-event-session-cancel", "turn-cancel");
let cancel_key = host
.await_event_key(
&cancel_scope,
AwaitEventWaitIdentity::tool_completion("call-cancel"),
)
.await
.expect("cancel await-event key");
let cancel = tokio_util::sync::CancellationToken::new();
cancel.cancel();
let cancelled = host
.await_await_event(&cancel_key, cancel, None)
.await
.expect("cancelled await-event");
assert_eq!(cancelled, Resolution::Cancelled);
assert_eq!(
host.resolve_await_event(&cancel_key, Resolution::Ok(serde_json::json!("late")))
.await
.expect("late cancel resolve"),
ResolveOutcome::AlreadyResolved {
terminal: Resolution::Cancelled
}
);
let timeout_scope = ExecutionScope::turn("await-event-session-timeout", "turn-timeout");
let timeout_key = host
.await_event_key(
&timeout_scope,
AwaitEventWaitIdentity::tool_completion("call-timeout"),
)
.await
.expect("timeout await-event key");
let timed_out = host
.await_await_event(
&timeout_key,
tokio_util::sync::CancellationToken::new(),
Some(std::time::Instant::now()),
)
.await
.expect("timed-out await-event");
assert_eq!(timed_out, Resolution::Timeout);
}
async fn effect_host_await_event_revokes_session_scope(host: Arc<dyn EffectHost>) {
let scope = ExecutionScope::turn("await-event-session-revoke", "turn-revoke");
let key = host
.await_event_key(
&scope,
AwaitEventWaitIdentity::tool_completion("call-revoke"),
)
.await
.expect("await-event key");
host.revoke_await_events_for_session("await-event-session-revoke")
.await
.expect("revoke session");
assert_eq!(
host.resolve_await_event(&key, Resolution::Ok(serde_json::json!("late")))
.await
.expect("resolve revoked key"),
ResolveOutcome::UnknownOrRevoked
);
let err = host
.await_await_event(&key, tokio_util::sync::CancellationToken::new(), None)
.await
.expect_err("revoked key must not await");
assert_eq!(err.code.as_str(), "await_event_unknown_or_revoked");
}
async fn effect_host_await_event_rejects_tampered_keys(host: Arc<dyn EffectHost>) {
let scope = ExecutionScope::turn("await-event-session-tamper", "turn-tamper");
let mut key = host
.await_event_key(
&scope,
AwaitEventWaitIdentity::tool_completion("call-tamper"),
)
.await
.expect("await-event key");
key.signature.push_str("-tampered");
assert_eq!(
host.resolve_await_event(&key, Resolution::Ok(serde_json::json!("bad")))
.await
.expect("resolve tampered key"),
ResolveOutcome::UnknownOrRevoked
);
let err = host
.await_await_event(&key, tokio_util::sync::CancellationToken::new(), None)
.await
.expect_err("tampered key must not await");
assert_eq!(err.code.as_str(), "await_event_unknown_or_revoked");
}
#[cfg(any(test, feature = "testing"))]
pub async fn effect_controller_concurrent_replay_deterministic(
controller: &dyn RuntimeEffectController,
start_replay: impl FnOnce(),
) {
let slow = replay_conformance_exec_envelope("effect-slow");
let fast = replay_conformance_exec_envelope("effect-fast");
let completion_order = Arc::new(Mutex::new(Vec::new()));
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let release_slow = Arc::new(tokio::sync::Notify::new());
let first_pass = tokio::time::timeout(std::time::Duration::from_secs(2), async {
tokio::join!(
controller.execute_effect(
slow.clone(),
replay_conformance_recording_executor(
"effect-slow",
Arc::clone(&barrier),
Arc::clone(&release_slow),
Arc::clone(&completion_order),
),
),
controller.execute_effect(
fast.clone(),
replay_conformance_recording_executor(
"effect-fast",
Arc::clone(&barrier),
Arc::clone(&release_slow),
Arc::clone(&completion_order),
),
),
)
})
.await
.expect("concurrent first-pass effects must both enter their local executors");
let slow_first = first_pass.0.expect("slow first pass");
let fast_first = first_pass.1.expect("fast first pass");
assert_replay_conformance_exec_marker(slow_first, "effect-slow");
assert_replay_conformance_exec_marker(fast_first, "effect-fast");
assert_eq!(
completion_order
.lock()
.expect("completion order")
.as_slice(),
&["effect-fast".to_string(), "effect-slow".to_string()],
"first pass must prove local completion order can differ from effect request order"
);
start_replay();
let replay_local_calls = Arc::new(Mutex::new(Vec::new()));
let replay_pass = tokio::time::timeout(std::time::Duration::from_secs(2), async {
tokio::join!(
controller.execute_effect(
fast,
replay_conformance_failing_executor(Arc::clone(&replay_local_calls)),
),
controller.execute_effect(
slow,
replay_conformance_failing_executor(Arc::clone(&replay_local_calls)),
),
)
})
.await
.expect("concurrent replay effects must resolve from host history");
let fast_replay = replay_pass.0.expect("fast replay");
let slow_replay = replay_pass.1.expect("slow replay");
assert_replay_conformance_exec_marker(fast_replay, "effect-fast");
assert_replay_conformance_exec_marker(slow_replay, "effect-slow");
assert!(
replay_local_calls
.lock()
.expect("replay local calls")
.is_empty(),
"replay must return recorded outcomes without invoking local executors"
);
}
#[cfg(any(test, feature = "testing"))]
fn durable_step_conformance_envelope(
step_id: &'static str,
input: serde_json::Value,
) -> RuntimeEffectEnvelope {
RuntimeEffectEnvelope::new(
RuntimeInvocation::effect(
RuntimeScope::for_turn("durable-step-session", "durable-step-turn", 7, 0),
format!("durable-step:{step_id}"),
RuntimeEffectKind::DurableStep,
format!("durable-step-replay:{step_id}"),
),
RuntimeEffectCommand::DurableStep {
step_id: step_id.to_string(),
input,
},
)
}
#[cfg(any(test, feature = "testing"))]
fn durable_step_conformance_failing_executor(
local_calls: Arc<Mutex<Vec<String>>>,
) -> RuntimeEffectLocalExecutor<'static> {
RuntimeEffectLocalExecutor::durable_step(move |input| async move {
local_calls
.lock()
.expect("local calls")
.push(input.to_string());
Err(crate::RuntimeError::new(
"durable_step_replay_local_executor_called",
"recorded durable-step replay must not invoke local execution",
))
})
}
#[cfg(any(test, feature = "testing"))]
fn replay_conformance_exec_envelope(effect_id: &'static str) -> RuntimeEffectEnvelope {
RuntimeEffectEnvelope::new(
RuntimeInvocation::effect(
RuntimeScope::for_turn(
"effect-conformance-session",
"effect-conformance-turn",
7,
0,
),
effect_id,
RuntimeEffectKind::ExecCode,
format!("effect-conformance:effect-conformance-turn:{effect_id}"),
),
RuntimeEffectCommand::ExecCode {
code: format!("emit {effect_id}"),
},
)
}
#[cfg(any(test, feature = "testing"))]
fn replay_conformance_recording_executor(
effect_id: &'static str,
barrier: Arc<tokio::sync::Barrier>,
release_slow: Arc<tokio::sync::Notify>,
completion_order: Arc<Mutex<Vec<String>>>,
) -> RuntimeEffectLocalExecutor<'static> {
RuntimeEffectLocalExecutor::testing(move |envelope| async move {
assert_eq!(envelope.invocation.effect_id(), Some(effect_id));
barrier.wait().await;
if effect_id == "effect-slow" {
release_slow.notified().await;
} else {
completion_order
.lock()
.expect("completion order")
.push(effect_id.to_string());
release_slow.notify_one();
}
if effect_id == "effect-slow" {
completion_order
.lock()
.expect("completion order")
.push(effect_id.to_string());
}
Ok(replay_conformance_exec_outcome(effect_id))
})
}
#[cfg(any(test, feature = "testing"))]
fn replay_conformance_failing_executor(
replay_local_calls: Arc<Mutex<Vec<String>>>,
) -> RuntimeEffectLocalExecutor<'static> {
RuntimeEffectLocalExecutor::testing(move |envelope| async move {
replay_local_calls
.lock()
.expect("replay local calls")
.push(envelope.invocation.effect_id().unwrap_or("").to_string());
Err(RuntimeEffectControllerError::new(
"conformance_replay_local_executor_called",
"recorded replay must not invoke local effect execution",
))
})
}
#[cfg(any(test, feature = "testing"))]
fn replay_conformance_exec_outcome(effect_id: &str) -> RuntimeEffectOutcome {
RuntimeEffectOutcome::ExecCode {
result: Ok(crate::ExecResponse {
observations: Vec::new(),
observation_truncation: Vec::new(),
tool_calls: Vec::new(),
images: Vec::new(),
printed_images: Vec::new(),
error: None,
duration_ms: 0,
terminal_finish: Some(serde_json::json!(effect_id)),
}),
}
}
#[cfg(any(test, feature = "testing"))]
fn assert_replay_conformance_exec_marker(outcome: RuntimeEffectOutcome, expected: &str) {
let RuntimeEffectOutcome::ExecCode { result } = outcome else {
panic!("expected exec-code effect outcome");
};
let response = result.expect("exec-code response");
assert_eq!(
response.terminal_finish,
Some(serde_json::json!(expected)),
"replayed outcome must come from the matching replay key"
);
}