Skip to main content

lash_core/
tool_provider.rs

1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10    PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30/// A message sent from the sandbox to the host during execution.
31#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33    pub text: String,
34    /// "tool_output" or another host-rendered progress event kind.
35    pub kind: String,
36}
37
38/// Sender for streaming progress messages from tools (e.g. live bash output).
39pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43    key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47    fn store(
48        &self,
49        key: crate::AwaitEventKey,
50    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51        let mut guard = self.key.lock().map_err(|_| {
52            crate::RuntimeError::new(
53                "tool_completion_state_poisoned",
54                "tool completion key state lock poisoned",
55            )
56        })?;
57        if let Some(existing) = guard.as_ref() {
58            return Ok(existing.clone());
59        }
60        *guard = Some(key.clone());
61        Ok(key)
62    }
63
64    pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65        self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66            crate::RuntimeError::new(
67                "tool_completion_state_poisoned",
68                "tool completion key state lock poisoned",
69            )
70        })
71    }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76    step_ids: Arc<Mutex<HashSet<String>>>,
77    process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81    fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82        let mut guard = self.step_ids.lock().map_err(|_| {
83            crate::RuntimeError::new(
84                "durable_effect_state_poisoned",
85                "durable effect step state lock poisoned",
86            )
87        })?;
88        if !guard.insert(step_id.to_string()) {
89            return Err(crate::RuntimeError::new(
90                "durable_effect_duplicate_step_id",
91                format!("durable effect step id `{step_id}` was already used by this tool call"),
92            ));
93        }
94        Ok(())
95    }
96
97    fn next_process_event_sequence(&self) -> u64 {
98        self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99    }
100}
101
102/// Per-call environment for [`ToolProvider::execute`]. Fields are sealed so
103/// the runtime can add capabilities without breaking tool authors.
104#[derive(Clone)]
105pub struct ToolContext<'run> {
106    pub(crate) session_id: String,
107    pub(crate) agent_frame_id: crate::AgentFrameId,
108    pub(crate) sessions: Arc<dyn SessionStateService>,
109    pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110    pub(crate) processes: Arc<dyn crate::ProcessService>,
111    pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113    pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
115    pub(crate) async_process_id: Option<String>,
116    pub(crate) runtime_process_id: Option<String>,
117    pub(crate) process_events: Option<ToolProcessEventContext>,
118    pub(crate) attachment_store: Arc<dyn AttachmentStore>,
119    pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
120    pub(crate) prepared_payload: serde_json::Value,
121    /// The id of the in-flight tool call that is invoking this tool.
122    pub(crate) tool_call_id: Option<String>,
123    pub(crate) attempt_number: u32,
124    pub(crate) max_attempts: u32,
125    pub(crate) replay_key: Option<String>,
126    pub(crate) completion: ToolCompletionState,
127    pub(crate) durable_effects: ToolDurableEffectState,
128    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
129    pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
130    pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
131}
132
133#[derive(Clone)]
134pub struct ToolChildProcessStarted {
135    pub process_id: String,
136    pub child_entry_name: Option<String>,
137}
138
139#[derive(Clone)]
140pub struct ToolChildExecutionTraceHook {
141    on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
142}
143
144impl ToolChildExecutionTraceHook {
145    pub fn new(
146        on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
147    ) -> Self {
148        Self {
149            on_child_process_started: Arc::new(on_child_process_started),
150        }
151    }
152
153    pub fn child_process_started(&self, event: ToolChildProcessStarted) {
154        (self.on_child_process_started)(event);
155    }
156}
157
158#[derive(Clone)]
159pub(crate) struct ToolProcessEventContext {
160    process_id: String,
161    registry: Arc<dyn crate::ProcessRegistry>,
162    store: Option<Arc<dyn crate::RuntimePersistence>>,
163    session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
164    session_graph: Arc<dyn SessionGraphService>,
165    queued_work_poke: Option<crate::QueuedWorkPoke>,
166}
167
168pub(crate) struct ToolContextBuilder<'run> {
169    session_id: String,
170    agent_frame_id: crate::AgentFrameId,
171    sessions: Arc<dyn SessionStateService>,
172    session_lifecycle: Arc<dyn SessionLifecycleService>,
173    session_graph: Arc<dyn SessionGraphService>,
174    processes: Arc<dyn crate::ProcessService>,
175    process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
176    effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
177    runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
178    cancellation_token: Option<tokio_util::sync::CancellationToken>,
179    async_process_id: Option<String>,
180    runtime_process_id: Option<String>,
181    process_events: Option<ToolProcessEventContext>,
182    attachment_store: Arc<dyn AttachmentStore>,
183    direct_completions: crate::DirectCompletionClient<'run>,
184    prepared_payload: serde_json::Value,
185    tool_call_id: Option<String>,
186    completion: ToolCompletionState,
187    durable_effects: ToolDurableEffectState,
188    parent_invocation: Option<crate::RuntimeInvocation>,
189    execution_env_spec: crate::ProcessExecutionEnvSpec,
190    child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
191}
192
193impl<'run> ToolContextBuilder<'run> {
194    pub(crate) fn from_dispatch(
195        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
196    ) -> Self {
197        Self {
198            session_id: dispatch.session_id.clone(),
199            agent_frame_id: dispatch.agent_frame_id.clone(),
200            sessions: Arc::clone(&dispatch.sessions),
201            session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
202            session_graph: Arc::clone(&dispatch.session_graph),
203            processes: Arc::clone(&dispatch.processes),
204            process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
205            effect_controller: dispatch.effect_controller.clone(),
206            runtime_dispatch: Some(Arc::clone(&dispatch)),
207            cancellation_token: None,
208            async_process_id: None,
209            runtime_process_id: None,
210            process_events: None,
211            attachment_store: Arc::clone(&dispatch.attachment_store),
212            direct_completions: dispatch.direct_completions.clone(),
213            prepared_payload: serde_json::Value::Null,
214            tool_call_id: None,
215            completion: ToolCompletionState::default(),
216            durable_effects: ToolDurableEffectState::default(),
217            parent_invocation: dispatch.parent_invocation.clone(),
218            execution_env_spec: dispatch.execution_env_spec.clone(),
219            child_execution_trace_hook: None,
220        }
221    }
222
223    #[cfg(any(test, feature = "testing"))]
224    pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
225        self.tool_call_id = tool_call_id.into();
226        self
227    }
228
229    pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
230        self.tool_call_id = Some(call.call_id.clone());
231        self.prepared_payload = call.prepared_payload.clone();
232        self
233    }
234
235    pub(crate) fn cancellation_token(
236        mut self,
237        cancellation_token: Option<tokio_util::sync::CancellationToken>,
238    ) -> Self {
239        self.cancellation_token = cancellation_token;
240        self
241    }
242
243    pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
244        self.runtime_process_id = process_id;
245        self
246    }
247
248    pub(crate) fn async_process(
249        mut self,
250        process_id: impl Into<String>,
251        cancellation_token: tokio_util::sync::CancellationToken,
252    ) -> Self {
253        self.async_process_id = Some(process_id.into());
254        self.cancellation_token = Some(cancellation_token);
255        self
256    }
257
258    pub(crate) fn process_events(
259        mut self,
260        process_id: impl Into<String>,
261        registry: Arc<dyn crate::ProcessRegistry>,
262        store: Option<Arc<dyn crate::RuntimePersistence>>,
263        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
264        queued_work_poke: Option<crate::QueuedWorkPoke>,
265    ) -> Self {
266        self.process_events = Some(ToolProcessEventContext {
267            process_id: process_id.into(),
268            registry,
269            store,
270            session_store_factory,
271            session_graph: Arc::clone(&self.session_graph),
272            queued_work_poke,
273        });
274        self
275    }
276
277    pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
278        self.parent_invocation = metadata;
279        self
280    }
281
282    pub(crate) fn child_execution_trace_hook(
283        mut self,
284        hook: Option<ToolChildExecutionTraceHook>,
285    ) -> Self {
286        self.child_execution_trace_hook = hook;
287        self
288    }
289
290    pub(crate) fn build(self) -> ToolContext<'run> {
291        ToolContext {
292            session_id: self.session_id,
293            agent_frame_id: self.agent_frame_id,
294            sessions: self.sessions,
295            session_lifecycle: self.session_lifecycle,
296            processes: self.processes,
297            process_cancel_ability: self.process_cancel_ability,
298            effect_controller: self.effect_controller,
299            runtime_dispatch: self.runtime_dispatch,
300            cancellation_token: self.cancellation_token,
301            async_process_id: self.async_process_id,
302            runtime_process_id: self.runtime_process_id,
303            process_events: self.process_events,
304            attachment_store: self.attachment_store,
305            direct_completions: self.direct_completions,
306            prepared_payload: self.prepared_payload,
307            tool_call_id: self.tool_call_id,
308            attempt_number: 1,
309            max_attempts: 1,
310            replay_key: None,
311            completion: self.completion,
312            durable_effects: self.durable_effects,
313            parent_invocation: self.parent_invocation,
314            execution_env_spec: self.execution_env_spec,
315            child_execution_trace_hook: self.child_execution_trace_hook,
316        }
317    }
318}
319
320impl<'run> ToolContext<'run> {
321    #[cfg(any(test, feature = "testing"))]
322    #[expect(
323        clippy::too_many_arguments,
324        reason = "testing constructor mirrors the sealed runtime tool context dependencies"
325    )]
326    pub(crate) fn builder(
327        session_id: String,
328        sessions: Arc<dyn SessionStateService>,
329        session_lifecycle: Arc<dyn SessionLifecycleService>,
330        session_graph: Arc<dyn SessionGraphService>,
331        processes: Arc<dyn crate::ProcessService>,
332        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
333        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
334        attachment_store: Arc<dyn AttachmentStore>,
335        direct_completions: crate::DirectCompletionClient<'run>,
336    ) -> ToolContextBuilder<'run> {
337        ToolContextBuilder {
338            session_id,
339            agent_frame_id: String::new(),
340            sessions,
341            session_lifecycle,
342            session_graph,
343            processes,
344            process_cancel_ability,
345            effect_controller,
346            runtime_dispatch: None,
347            cancellation_token: None,
348            async_process_id: None,
349            runtime_process_id: None,
350            process_events: None,
351            attachment_store,
352            direct_completions,
353            prepared_payload: serde_json::Value::Null,
354            tool_call_id: None,
355            completion: ToolCompletionState::default(),
356            durable_effects: ToolDurableEffectState::default(),
357            parent_invocation: None,
358            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
359                crate::PluginOptions::default(),
360                crate::SessionPolicy::default(),
361            ),
362            child_execution_trace_hook: None,
363        }
364    }
365
366    pub(crate) fn from_dispatch(
367        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
368    ) -> ToolContextBuilder<'run> {
369        ToolContextBuilder::from_dispatch(dispatch)
370    }
371
372    pub fn session_id(&self) -> &str {
373        &self.session_id
374    }
375
376    pub fn agent_frame_id(&self) -> &str {
377        &self.agent_frame_id
378    }
379
380    pub fn sessions(&self) -> ToolSessionAdmin<'run> {
381        ToolSessionAdmin {
382            session_id: self.session_id.clone(),
383            sessions: Arc::clone(&self.sessions),
384            session_lifecycle: Arc::clone(&self.session_lifecycle),
385            effect_controller: self.effect_controller.clone(),
386        }
387    }
388
389    pub fn dispatch(&self) -> ToolDispatchClient<'run> {
390        ToolDispatchClient {
391            context: self.clone(),
392        }
393    }
394
395    pub fn triggers(&self) -> ToolTriggerClient<'run> {
396        ToolTriggerClient {
397            context: self.clone(),
398        }
399    }
400
401    pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
402        ToolSessionProcessAdmin {
403            session_id: self.session_id.clone(),
404            agent_frame_id: self.agent_frame_id.clone(),
405            processes: Arc::clone(&self.processes),
406            process_cancel_ability: Arc::clone(&self.process_cancel_ability),
407            effect_controller: self.effect_controller.clone(),
408            parent_invocation: self.parent_invocation.clone(),
409            tool_call_id: self.tool_call_id.clone(),
410            execution_env_spec: self.execution_env_spec.clone(),
411        }
412    }
413
414    pub fn emit_child_process_started(
415        &self,
416        process_id: impl Into<String>,
417        child_entry_name: Option<String>,
418    ) {
419        let Some(hook) = &self.child_execution_trace_hook else {
420            return;
421        };
422        hook.child_process_started(ToolChildProcessStarted {
423            process_id: process_id.into(),
424            child_entry_name,
425        });
426    }
427
428    pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
429        ToolDirectCompletionClient {
430            session_id: self.session_id.clone(),
431            tool_call_id: self.tool_call_id.clone(),
432            direct_completions: self.direct_completions.clone(),
433        }
434    }
435
436    pub fn attachments(&self) -> ToolAttachmentClient {
437        ToolAttachmentClient {
438            store: Arc::clone(&self.attachment_store),
439        }
440    }
441
442    pub fn process_events(&self) -> ToolProcessEventClient {
443        ToolProcessEventClient {
444            context: self.process_events.clone(),
445        }
446    }
447
448    /// Borrow this tool call's durable effect boundary.
449    ///
450    /// This is available only while executing a prepared tool call under a
451    /// controller that explicitly supports durable tool effects. The returned
452    /// facade records JSON steps and await-event waits in the caller's existing
453    /// effect log; it does not expose the underlying workflow engine context.
454    pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
455        let Some(tool_call_id) = self.tool_call_id.as_deref() else {
456            return Err(crate::RuntimeError::new(
457                "durable_effects_missing_call_id",
458                "durable effects require a prepared tool call id",
459            ));
460        };
461        if tool_call_id.trim().is_empty() {
462            return Err(crate::RuntimeError::new(
463                "durable_effects_missing_call_id",
464                "durable effects require a non-empty prepared tool call id",
465            ));
466        }
467        let scoped = self.effect_controller.scoped();
468        if !scoped.controller().supports_durable_effects() {
469            return Err(crate::RuntimeError::new(
470                "durable_effects_unavailable",
471                "this effect controller does not support durable tool effects",
472            ));
473        }
474        Ok(ToolDurableEffects { context: self })
475    }
476
477    pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
478        self.cancellation_token.as_ref()
479    }
480
481    pub fn async_process_id(&self) -> Option<&str> {
482        self.async_process_id.as_deref()
483    }
484
485    pub fn runtime_process_id(&self) -> Option<&str> {
486        self.async_process_id
487            .as_deref()
488            .or(self.runtime_process_id.as_deref())
489            .or_else(|| {
490                self.process_events
491                    .as_ref()
492                    .map(|context| context.process_id.as_str())
493            })
494    }
495
496    pub fn tool_call_id(&self) -> Option<&str> {
497        self.tool_call_id.as_deref()
498    }
499
500    pub fn prepared_payload(&self) -> &serde_json::Value {
501        &self.prepared_payload
502    }
503
504    pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
505    where
506        T: serde::de::DeserializeOwned,
507    {
508        serde_json::from_value(self.prepared_payload.clone())
509    }
510
511    pub fn attempt_number(&self) -> u32 {
512        self.attempt_number
513    }
514
515    pub fn max_attempts(&self) -> u32 {
516        self.max_attempts
517    }
518
519    pub fn replay_key(&self) -> Option<&str> {
520        self.replay_key.as_deref()
521    }
522
523    /// Obtain the durable completion key for this call, required before returning
524    /// [`ToolResult::Pending`](crate::ToolResult::Pending).
525    ///
526    /// A tool that defers its outcome (waiting on a webhook, human approval, or another
527    /// service) calls this, hands the returned [`AwaitEventKey`](crate::AwaitEventKey)
528    /// to whatever will complete the work out-of-band, and then returns
529    /// `ToolResult::Pending(..)`. The key names the durable wait the runtime parks the
530    /// call on; the external resolver delivers the result against it later.
531    ///
532    /// The key is stored on the context and consumed by the dispatcher when the tool
533    /// returns `Pending`. Returning `Pending` without first calling this fails the call
534    /// with `pending_tool_missing_completion_key`. Calls made outside a prepared tool
535    /// invocation (no tool call id) fail with `tool_completion_key_missing_call_id`.
536    pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
537        let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
538            crate::RuntimeError::new(
539                "tool_completion_key_missing_call_id",
540                "completion keys require a prepared tool call id",
541            )
542        })?;
543        let scoped = self.effect_controller.scoped();
544        let key = scoped
545            .controller()
546            .await_event_key(
547                scoped.execution_scope(),
548                crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
549            )
550            .await?;
551        self.completion.store(key)
552    }
553
554    pub(crate) fn take_completion_key(
555        &self,
556    ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
557        self.completion.take()
558    }
559
560    pub fn with_async_process(
561        mut self,
562        process_id: impl Into<String>,
563        cancellation_token: tokio_util::sync::CancellationToken,
564    ) -> Self {
565        self.async_process_id = Some(process_id.into());
566        self.runtime_process_id = self.async_process_id.clone();
567        self.cancellation_token = Some(cancellation_token);
568        self
569    }
570
571    #[cfg(any(test, feature = "testing"))]
572    #[doc(hidden)]
573    pub fn with_process_events_for_testing(
574        mut self,
575        process_id: impl Into<String>,
576        registry: Arc<dyn crate::ProcessRegistry>,
577    ) -> Self {
578        self.process_events = Some(ToolProcessEventContext {
579            process_id: process_id.into(),
580            registry,
581            store: None,
582            session_store_factory: None,
583            session_graph: Arc::new(crate::plugin::NoopSessionManager),
584            queued_work_poke: None,
585        });
586        self
587    }
588
589    pub(crate) fn with_retry_context(
590        mut self,
591        tool_name: &str,
592        attempt_number: u32,
593        max_attempts: u32,
594    ) -> Self {
595        self.attempt_number = attempt_number.max(1);
596        self.max_attempts = max_attempts.max(1);
597        self.replay_key = self
598            .tool_call_id
599            .as_ref()
600            .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
601        self
602    }
603
604    pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
605        self.prepared_payload = payload;
606        self
607    }
608
609    /// Constructor reserved for `lash_core::testing` helpers. Do not call directly;
610    /// use [`lash_core::testing::mock_tool_context`] instead.
611    #[cfg(any(test, feature = "testing"))]
612    #[doc(hidden)]
613    #[expect(
614        clippy::too_many_arguments,
615        reason = "test-only constructor mirrors the sealed runtime tool context"
616    )]
617    pub fn __for_testing(
618        session_id: String,
619        sessions: Arc<dyn SessionStateService>,
620        session_lifecycle: Arc<dyn SessionLifecycleService>,
621        session_graph: Arc<dyn SessionGraphService>,
622        processes: Arc<dyn crate::ProcessService>,
623        attachment_store: Arc<dyn AttachmentStore>,
624        direct_completions: crate::DirectCompletionClient<'static>,
625        tool_call_id: Option<String>,
626    ) -> ToolContext<'static> {
627        ToolContext::builder(
628            session_id,
629            sessions,
630            session_lifecycle,
631            session_graph,
632            processes,
633            Arc::new(crate::DefaultProcessCancelAbility),
634            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
635                crate::InlineRuntimeEffectController,
636            )),
637            attachment_store,
638            direct_completions,
639        )
640        .tool_call_id(tool_call_id)
641        .build()
642    }
643
644    /// Constructor reserved for tests that need a custom process-cancel host
645    /// ability. Do not call directly; prefer public testing helpers when they
646    /// cover the case.
647    #[cfg(any(test, feature = "testing"))]
648    #[doc(hidden)]
649    #[expect(
650        clippy::too_many_arguments,
651        reason = "test-only constructor mirrors the sealed runtime context"
652    )]
653    pub fn __for_testing_with_process_cancel_ability(
654        session_id: String,
655        sessions: Arc<dyn SessionStateService>,
656        session_lifecycle: Arc<dyn SessionLifecycleService>,
657        session_graph: Arc<dyn SessionGraphService>,
658        processes: Arc<dyn crate::ProcessService>,
659        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
660        attachment_store: Arc<dyn AttachmentStore>,
661        direct_completions: crate::DirectCompletionClient<'static>,
662        tool_call_id: Option<String>,
663    ) -> ToolContext<'static> {
664        ToolContext::builder(
665            session_id,
666            sessions,
667            session_lifecycle,
668            session_graph,
669            processes,
670            process_cancel_ability,
671            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
672                crate::InlineRuntimeEffectController,
673            )),
674            attachment_store,
675            direct_completions,
676        )
677        .tool_call_id(tool_call_id)
678        .build()
679    }
680}
681
682/// Durable effect operations available to advanced in-process tools.
683///
684/// The facade borrows the caller's existing runtime effect boundary. It records
685/// JSON-only local steps and awaits host-signed event keys without exposing
686/// Restate, Temporal, or any other workflow-native context.
687pub struct ToolDurableEffects<'ctx, 'run> {
688    context: &'ctx ToolContext<'run>,
689}
690
691impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
692    pub async fn run_json<F, Fut>(
693        &self,
694        step_id: impl Into<String>,
695        input: serde_json::Value,
696        run: F,
697    ) -> Result<serde_json::Value, crate::RuntimeError>
698    where
699        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
700        Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
701    {
702        let step_id = step_id.into();
703        if step_id.trim().is_empty() {
704            return Err(crate::RuntimeError::new(
705                "durable_effect_empty_step_id",
706                "durable effect step id must be non-empty",
707            ));
708        }
709        self.context.durable_effects.reserve_step(&step_id)?;
710        let invocation = self.step_invocation(
711            format!("durable-step:{step_id}"),
712            crate::RuntimeEffectKind::DurableStep,
713            format!("durable-step:{step_id}"),
714        )?;
715        let outcome = self
716            .context
717            .effect_controller
718            .controller()
719            .execute_effect(
720                crate::RuntimeEffectEnvelope::new(
721                    invocation,
722                    crate::RuntimeEffectCommand::DurableStep {
723                        step_id,
724                        input: input.clone(),
725                    },
726                ),
727                crate::RuntimeEffectLocalExecutor::durable_step(run),
728            )
729            .await
730            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
731        outcome
732            .into_durable_step()
733            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
734    }
735
736    pub async fn external_event_key(
737        &self,
738        key: impl Into<String>,
739    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
740        let key = key.into();
741        if key.trim().is_empty() {
742            return Err(crate::RuntimeError::new(
743                "durable_effect_empty_event_key",
744                "durable effect external event key must be non-empty",
745            ));
746        }
747        let scoped = self.context.effect_controller.scoped();
748        scoped
749            .controller()
750            .await_event_key(
751                scoped.execution_scope(),
752                crate::AwaitEventWaitIdentity::Custom { key },
753            )
754            .await
755    }
756
757    pub async fn await_event_json(
758        &self,
759        key: crate::AwaitEventKey,
760    ) -> Result<serde_json::Value, crate::RuntimeError> {
761        let invocation = self.step_invocation(
762            format!("await-event:{}", key.key_id),
763            crate::RuntimeEffectKind::AwaitEvent,
764            format!("await-event:{}", key.key_id),
765        )?;
766        let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
767        let outcome = self
768            .context
769            .effect_controller
770            .controller()
771            .execute_effect(
772                crate::RuntimeEffectEnvelope::new(
773                    invocation,
774                    crate::RuntimeEffectCommand::AwaitEvent { key },
775                ),
776                crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
777            )
778            .await
779            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
780        match outcome
781            .into_await_event()
782            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
783        {
784            crate::Resolution::Ok(value) => Ok(value),
785            crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
786            crate::Resolution::Timeout => Err(crate::RuntimeError::new(
787                "durable_effect_event_timeout",
788                "durable effect external event wait timed out",
789            )),
790            crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
791                "durable_effect_event_cancelled",
792                "durable effect external event wait was cancelled",
793            )),
794        }
795    }
796
797    pub async fn emit_process_event(
798        &self,
799        event_type: impl Into<String>,
800        payload: serde_json::Value,
801    ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
802        let Some(process) = self.context.process_events.as_ref() else {
803            return Err(crate::RuntimeError::new(
804                "durable_effect_process_event_unavailable",
805                "durable effect process events are unavailable outside a durable process",
806            ));
807        };
808        let event_type = event_type.into();
809        if event_type.trim().is_empty() {
810            return Err(crate::RuntimeError::new(
811                "durable_effect_empty_process_event_type",
812                "durable effect process event type must be non-empty",
813            ));
814        }
815        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
816            crate::RuntimeError::new(
817                "durable_effects_missing_call_id",
818                "durable effects require a prepared tool call id",
819            )
820        })?;
821        let sequence = self.context.durable_effects.next_process_event_sequence();
822        let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
823            format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
824        );
825        self.context
826            .process_events()
827            .emit_request(request)
828            .await
829            .map_err(|err| {
830                crate::RuntimeError::new(
831                    "durable_effect_process_event_append_failed",
832                    err.to_string(),
833                )
834            })
835            .and_then(|event| {
836                if event.process_id == process.process_id {
837                    Ok(event)
838                } else {
839                    Err(crate::RuntimeError::new(
840                        "durable_effect_process_event_process_mismatch",
841                        "process event append returned an event for a different process",
842                    ))
843                }
844            })
845    }
846
847    fn step_invocation(
848        &self,
849        effect_id_suffix: impl Into<String>,
850        kind: crate::RuntimeEffectKind,
851        replay_suffix: impl AsRef<str>,
852    ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
853        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
854            crate::RuntimeError::new(
855                "durable_effects_missing_call_id",
856                "durable effects require a prepared tool call id",
857            )
858        })?;
859        let effect_id_suffix = effect_id_suffix.into();
860        if let Some(parent) = self.context.parent_invocation.as_ref() {
861            return Ok(crate::runtime::causal::child_effect_invocation(
862                parent,
863                format!("{tool_call_id}:{effect_id_suffix}"),
864                kind,
865                replay_suffix,
866            ));
867        }
868        let scoped = self.context.effect_controller.scoped();
869        let replay_key = format!(
870            "{}:tool:{tool_call_id}:{}",
871            scoped.scope_id(),
872            replay_suffix.as_ref()
873        );
874        Ok(crate::RuntimeInvocation::effect(
875            crate::RuntimeScope::new(self.context.session_id.clone()),
876            format!("{tool_call_id}:{effect_id_suffix}"),
877            kind,
878            replay_key,
879        ))
880    }
881}
882
883/// Runtime-prepared executable tool call.
884///
885/// The raw model/provider identity remains visible, but any argument rewrites
886/// and provider-owned context projections are frozen before the call crosses a
887/// runtime effect or process boundary.
888#[derive(Clone, Debug, Serialize, Deserialize)]
889pub struct PreparedToolCall {
890    pub call_id: String,
891    pub tool_name: String,
892    pub args: serde_json::Value,
893    #[serde(default, skip_serializing_if = "Option::is_none")]
894    pub replay: Option<ProviderReplayMeta>,
895    #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
896    pub prepared_payload: serde_json::Value,
897}
898
899impl PreparedToolCall {
900    pub fn identity(call: crate::sansio::PendingToolCall) -> Self {
901        Self {
902            call_id: call.call_id,
903            tool_name: call.tool_name,
904            args: call.args,
905            replay: call.replay,
906            prepared_payload: serde_json::Value::Null,
907        }
908    }
909
910    pub fn from_parts(
911        call_id: impl Into<String>,
912        tool_name: impl Into<String>,
913        args: serde_json::Value,
914        replay: Option<ProviderReplayMeta>,
915        prepared_payload: serde_json::Value,
916    ) -> Self {
917        Self {
918            call_id: call_id.into(),
919            tool_name: tool_name.into(),
920            args,
921            replay,
922            prepared_payload,
923        }
924    }
925}
926
927#[derive(Clone)]
928pub struct ToolPrepareContext {
929    session_id: String,
930    sessions: Arc<dyn SessionStateService>,
931    turn_context: crate::TurnContext,
932    tool_call_id: Option<String>,
933}
934
935impl ToolPrepareContext {
936    pub(crate) fn new(
937        session_id: String,
938        sessions: Arc<dyn SessionStateService>,
939        turn_context: crate::TurnContext,
940        tool_call_id: Option<String>,
941    ) -> Self {
942        Self {
943            session_id,
944            sessions,
945            turn_context,
946            tool_call_id,
947        }
948    }
949
950    pub fn session_id(&self) -> &str {
951        &self.session_id
952    }
953
954    pub fn tool_call_id(&self) -> Option<&str> {
955        self.tool_call_id.as_deref()
956    }
957
958    pub fn turn_context(&self) -> &crate::TurnContext {
959        &self.turn_context
960    }
961
962    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
963    where
964        T: 'static,
965    {
966        self.turn_context.plugin_input::<T>(plugin_id)
967    }
968
969    pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
970        self.sessions.snapshot_session(&self.session_id).await
971    }
972
973    pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
974        self.sessions.tool_catalog(&self.session_id).await
975    }
976
977    pub async fn shared_tool_catalog(
978        &self,
979    ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
980        self.sessions.shared_tool_catalog(&self.session_id).await
981    }
982}
983
984/// Inputs handed to [`ToolProvider::prepare_tool_call`].
985pub struct ToolPrepareCall<'a> {
986    pub pending: crate::sansio::PendingToolCall,
987    pub context: &'a ToolPrepareContext,
988}
989
990/// Per-call inputs handed to [`ToolProvider::execute`].
991///
992/// Fields are `pub` because `ToolCall` is a transient borrow; consumers
993/// typically destructure (`let ToolCall { name, args, .. } = call`). The
994/// stable surface lives on [`ToolContext`] (sealed) and the runtime's
995/// dispatcher, which constructs `ToolCall` values.
996pub struct ToolCall<'a> {
997    pub name: &'a str,
998    pub args: &'a serde_json::Value,
999    pub context: &'a ToolContext<'a>,
1000    pub progress: Option<&'a ProgressSender>,
1001}
1002
1003/// Trait for providing tools to the sandbox. Implement this per-project.
1004///
1005/// Implementations supply cheap [`ToolManifest`]s, lazily resolved
1006/// [`ToolContract`]s, and a single
1007/// [`execute`](Self::execute) method that handles every call. Tools that
1008/// need session state read it from `call.context`; tools that stream
1009/// progress send through `call.progress`.
1010#[async_trait::async_trait]
1011pub trait ToolProvider: Send + Sync + 'static {
1012    fn tool_manifests(&self) -> Vec<ToolManifest>;
1013    fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1014        self.tool_manifests()
1015            .into_iter()
1016            .find(|manifest| manifest.name == name)
1017    }
1018    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1019    async fn prepare_tool_call(
1020        &self,
1021        call: ToolPrepareCall<'_>,
1022    ) -> Result<PreparedToolCall, ToolResult> {
1023        Ok(PreparedToolCall::identity(call.pending))
1024    }
1025    async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031    use crate::ProcessRegistry;
1032    use crate::RuntimeEffectController;
1033    use std::sync::atomic::{AtomicU64, Ordering};
1034
1035    struct NoDurableEffectController;
1036
1037    #[async_trait::async_trait]
1038    impl crate::RuntimeEffectController for NoDurableEffectController {
1039        async fn execute_effect(
1040            &self,
1041            _envelope: crate::RuntimeEffectEnvelope,
1042            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1043        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1044            Err(crate::RuntimeEffectControllerError::new(
1045                "unexpected_effect",
1046                "test controller should not execute effects",
1047            ))
1048        }
1049    }
1050
1051    fn test_context_with_controller(
1052        tool_call_id: Option<String>,
1053        controller: Arc<dyn crate::RuntimeEffectController>,
1054    ) -> ToolContext<'static> {
1055        ToolContext::builder(
1056            "session-1".to_string(),
1057            Arc::new(crate::testing::MockSessionManager::default()),
1058            Arc::new(crate::testing::MockSessionManager::default()),
1059            Arc::new(crate::testing::MockSessionManager::default()),
1060            Arc::new(crate::UnavailableProcessService),
1061            Arc::new(crate::DefaultProcessCancelAbility),
1062            crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1063            Arc::new(crate::InMemoryAttachmentStore::new()),
1064            crate::DirectCompletionClient::unavailable(
1065                "direct completions are unavailable in this test context",
1066            ),
1067        )
1068        .tool_call_id(tool_call_id)
1069        .build()
1070    }
1071
1072    #[test]
1073    fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1074        let cancellation = tokio_util::sync::CancellationToken::new();
1075        let prepared = PreparedToolCall::from_parts(
1076            "call-1",
1077            "demo_tool",
1078            serde_json::json!({ "input": true }),
1079            None,
1080            serde_json::json!({ "prepared": true }),
1081        );
1082
1083        let context = ToolContext::builder(
1084            "session-1".to_string(),
1085            Arc::new(crate::testing::MockSessionManager::default()),
1086            Arc::new(crate::testing::MockSessionManager::default()),
1087            Arc::new(crate::testing::MockSessionManager::default()),
1088            Arc::new(crate::UnavailableProcessService),
1089            Arc::new(crate::DefaultProcessCancelAbility),
1090            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1091                crate::InlineRuntimeEffectController,
1092            )),
1093            Arc::new(crate::InMemoryAttachmentStore::new()),
1094            crate::DirectCompletionClient::unavailable(
1095                "direct completions are unavailable in this test context",
1096            ),
1097        )
1098        .prepared_call(&prepared)
1099        .cancellation_token(Some(cancellation.clone()))
1100        .async_process("process-1", cancellation.clone())
1101        .build();
1102
1103        assert_eq!(context.session_id(), "session-1");
1104        assert_eq!(context.tool_call_id(), Some("call-1"));
1105        assert_eq!(
1106            context.prepared_payload(),
1107            &serde_json::json!({ "prepared": true })
1108        );
1109        assert_eq!(context.async_process_id(), Some("process-1"));
1110        assert!(context.cancellation_token().is_some());
1111    }
1112
1113    #[test]
1114    fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1115        let missing_call =
1116            test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1117        let err = match missing_call.durable_effects() {
1118            Ok(_) => panic!("missing prepared tool call id should fail"),
1119            Err(err) => err,
1120        };
1121        assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1122
1123        let unsupported = test_context_with_controller(
1124            Some("call-1".to_string()),
1125            Arc::new(NoDurableEffectController),
1126        );
1127        let err = match unsupported.durable_effects() {
1128            Ok(_) => panic!("unsupported controller should fail"),
1129            Err(err) => err,
1130        };
1131        assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1132    }
1133
1134    #[tokio::test]
1135    async fn durable_run_json_executes_and_maps_closure_errors() {
1136        let context = test_context_with_controller(
1137            Some("call-run-json".to_string()),
1138            Arc::new(crate::InlineRuntimeEffectController),
1139        );
1140        let durable = context.durable_effects().expect("durable effects");
1141        let value = durable
1142            .run_json(
1143                "create",
1144                serde_json::json!({ "x": 1 }),
1145                |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1146            )
1147            .await
1148            .expect("durable step");
1149        assert_eq!(value, serde_json::json!({ "seen": 1 }));
1150
1151        let err = durable
1152            .run_json("fail", serde_json::json!({}), |_| async {
1153                Err(crate::RuntimeError::new(
1154                    "durable_step_failed",
1155                    "step failed",
1156                ))
1157            })
1158            .await
1159            .expect_err("closure error");
1160        assert_eq!(err.code.as_str(), "durable_step_failed");
1161        assert_eq!(err.message, "step failed");
1162    }
1163
1164    #[tokio::test]
1165    async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1166        let context = test_context_with_controller(
1167            Some("call-step-ids".to_string()),
1168            Arc::new(crate::InlineRuntimeEffectController),
1169        );
1170        let durable = context.durable_effects().expect("durable effects");
1171        let runs = Arc::new(AtomicU64::new(0));
1172
1173        let err = durable
1174            .run_json("", serde_json::Value::Null, {
1175                let runs = Arc::clone(&runs);
1176                move |_| async move {
1177                    runs.fetch_add(1, Ordering::Relaxed);
1178                    Ok(serde_json::Value::Null)
1179                }
1180            })
1181            .await
1182            .expect_err("empty step id");
1183        assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1184        assert_eq!(runs.load(Ordering::Relaxed), 0);
1185
1186        durable
1187            .run_json("same", serde_json::Value::Null, {
1188                let runs = Arc::clone(&runs);
1189                move |_| async move {
1190                    runs.fetch_add(1, Ordering::Relaxed);
1191                    Ok(serde_json::Value::Null)
1192                }
1193            })
1194            .await
1195            .expect("first step");
1196        let err = durable
1197            .run_json("same", serde_json::Value::Null, {
1198                let runs = Arc::clone(&runs);
1199                move |_| async move {
1200                    runs.fetch_add(1, Ordering::Relaxed);
1201                    Ok(serde_json::Value::Null)
1202                }
1203            })
1204            .await
1205            .expect_err("duplicate step id");
1206        assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1207        assert_eq!(runs.load(Ordering::Relaxed), 1);
1208    }
1209
1210    #[tokio::test]
1211    async fn durable_external_event_key_is_custom_and_stable() {
1212        let context = test_context_with_controller(
1213            Some("call-event-key".to_string()),
1214            Arc::new(crate::InlineRuntimeEffectController),
1215        );
1216        let durable = context.durable_effects().expect("durable effects");
1217        let first = durable
1218            .external_event_key("tool-event-stable")
1219            .await
1220            .expect("first key");
1221        let second = durable
1222            .external_event_key("tool-event-stable")
1223            .await
1224            .expect("second key");
1225
1226        assert_eq!(first, second);
1227        assert_eq!(
1228            first.wait,
1229            crate::AwaitEventWaitIdentity::Custom {
1230                key: "tool-event-stable".to_string()
1231            }
1232        );
1233    }
1234
1235    #[tokio::test]
1236    async fn durable_await_event_json_maps_terminal_resolutions() {
1237        let controller = Arc::new(crate::InlineRuntimeEffectController);
1238        let context =
1239            test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1240        let durable = context.durable_effects().expect("durable effects");
1241
1242        let ok_key = durable
1243            .external_event_key("tool-event-ok")
1244            .await
1245            .expect("ok key");
1246        controller
1247            .resolve_await_event(
1248                &ok_key,
1249                crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1250            )
1251            .await
1252            .expect("resolve ok");
1253        let value = durable
1254            .await_event_json(ok_key)
1255            .await
1256            .expect("await ok value");
1257        assert_eq!(value, serde_json::json!({ "answer": 42 }));
1258
1259        let err_key = durable
1260            .external_event_key("tool-event-err")
1261            .await
1262            .expect("err key");
1263        controller
1264            .resolve_await_event(
1265                &err_key,
1266                crate::Resolution::Err(crate::ExternalCompletionError::new(
1267                    "external_bad",
1268                    "external failed",
1269                )),
1270            )
1271            .await
1272            .expect("resolve err");
1273        let err = durable
1274            .await_event_json(err_key)
1275            .await
1276            .expect_err("await err value");
1277        assert_eq!(err.code.as_str(), "external_bad");
1278
1279        let cancelled_key = durable
1280            .external_event_key("tool-event-cancelled")
1281            .await
1282            .expect("cancelled key");
1283        controller
1284            .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1285            .await
1286            .expect("resolve cancelled");
1287        let err = durable
1288            .await_event_json(cancelled_key)
1289            .await
1290            .expect_err("await cancelled value");
1291        assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1292
1293        let timeout_key = durable
1294            .external_event_key("tool-event-timeout")
1295            .await
1296            .expect("timeout key");
1297        controller
1298            .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1299            .await
1300            .expect("resolve timeout");
1301        let err = durable
1302            .await_event_json(timeout_key)
1303            .await
1304            .expect_err("await timeout value");
1305        assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1306    }
1307
1308    #[tokio::test]
1309    async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1310        let context = test_context_with_controller(
1311            Some("call-no-process".to_string()),
1312            Arc::new(crate::InlineRuntimeEffectController),
1313        );
1314        let err = context
1315            .durable_effects()
1316            .expect("durable effects")
1317            .emit_process_event("tool.event", serde_json::json!({}))
1318            .await
1319            .expect_err("outside process");
1320        assert_eq!(
1321            err.code.as_str(),
1322            "durable_effect_process_event_unavailable"
1323        );
1324
1325        let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1326        let process_id = "process:durable-tool-event";
1327        registry
1328            .register_process(
1329                crate::ProcessRegistration::new(
1330                    process_id,
1331                    crate::ProcessInput::External {
1332                        metadata: serde_json::json!({}),
1333                    },
1334                    crate::ProcessProvenance::host(),
1335                )
1336                .with_extra_event_types([crate::ProcessEventType {
1337                    name: "tool.event".to_string(),
1338                    payload_schema: crate::LashSchema::any(),
1339                    semantics: crate::ProcessEventSemanticsSpec::default(),
1340                }]),
1341            )
1342            .await
1343            .expect("register process");
1344        let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1345        let context = test_context_with_controller(
1346            Some("call-process-event".to_string()),
1347            Arc::new(crate::InlineRuntimeEffectController),
1348        )
1349        .with_process_events_for_testing(process_id, registry_dyn);
1350
1351        let event = context
1352            .durable_effects()
1353            .expect("durable effects")
1354            .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1355            .await
1356            .expect("process event");
1357        assert_eq!(event.process_id, process_id);
1358        assert_eq!(event.event_type, "tool.event");
1359        assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1360        assert_eq!(
1361            event.invocation.replay_key(),
1362            Some("tool:call-process-event:durable-process-event:0")
1363        );
1364
1365        let append_err = context
1366            .durable_effects()
1367            .expect("durable effects")
1368            .emit_process_event("undeclared.event", serde_json::json!({}))
1369            .await
1370            .expect_err("undeclared event type must fail the append");
1371        assert_eq!(
1372            append_err.code.as_str(),
1373            "durable_effect_process_event_append_failed"
1374        );
1375    }
1376}