Skip to main content

lash_core/
tool_provider.rs

1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10    PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30/// A message sent from the sandbox to the host during execution.
31#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33    pub text: String,
34    /// "tool_output" or another host-rendered progress event kind.
35    pub kind: String,
36}
37
38/// Sender for streaming progress messages from tools (e.g. live bash output).
39pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43    key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47    fn store(
48        &self,
49        key: crate::AwaitEventKey,
50    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51        let mut guard = self.key.lock().map_err(|_| {
52            crate::RuntimeError::new(
53                "tool_completion_state_poisoned",
54                "tool completion key state lock poisoned",
55            )
56        })?;
57        if let Some(existing) = guard.as_ref() {
58            return Ok(existing.clone());
59        }
60        *guard = Some(key.clone());
61        Ok(key)
62    }
63
64    pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65        self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66            crate::RuntimeError::new(
67                "tool_completion_state_poisoned",
68                "tool completion key state lock poisoned",
69            )
70        })
71    }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76    step_ids: Arc<Mutex<HashSet<String>>>,
77    process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81    fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82        let mut guard = self.step_ids.lock().map_err(|_| {
83            crate::RuntimeError::new(
84                "durable_effect_state_poisoned",
85                "durable effect step state lock poisoned",
86            )
87        })?;
88        if !guard.insert(step_id.to_string()) {
89            return Err(crate::RuntimeError::new(
90                "durable_effect_duplicate_step_id",
91                format!("durable effect step id `{step_id}` was already used by this tool call"),
92            ));
93        }
94        Ok(())
95    }
96
97    fn next_process_event_sequence(&self) -> u64 {
98        self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99    }
100}
101
102/// Per-call environment for [`ToolProvider::execute`]. Fields are sealed so
103/// the runtime can add capabilities without breaking tool authors.
104#[derive(Clone)]
105pub struct ToolContext<'run> {
106    pub(crate) session_id: String,
107    pub(crate) agent_frame_id: crate::AgentFrameId,
108    pub(crate) sessions: Arc<dyn SessionStateService>,
109    pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110    pub(crate) processes: Arc<dyn crate::ProcessService>,
111    pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113    pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
115    pub(crate) async_process_id: Option<String>,
116    pub(crate) runtime_process_id: Option<String>,
117    pub(crate) process_events: Option<ToolProcessEventContext>,
118    pub(crate) attachment_store: Arc<dyn AttachmentStore>,
119    pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
120    pub(crate) prepared_payload: serde_json::Value,
121    /// The id of the in-flight tool call that is invoking this tool.
122    pub(crate) tool_call_id: Option<String>,
123    pub(crate) attempt_number: u32,
124    pub(crate) max_attempts: u32,
125    pub(crate) replay_key: Option<String>,
126    pub(crate) completion: ToolCompletionState,
127    pub(crate) durable_effects: ToolDurableEffectState,
128    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
129    pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
130    pub(crate) lashlang_execution_call_site: Option<ToolLashlangExecutionCallSite>,
131}
132
133#[derive(Clone)]
134pub struct ToolLashlangExecutionCallSite {
135    sink: Arc<dyn lash_trace::TraceSink>,
136    base_context: lash_trace::TraceContext,
137    identity: lash_trace::TraceLashlangExecutionIdentity,
138    parent_node_id: String,
139    occurrence: u64,
140}
141
142impl ToolLashlangExecutionCallSite {
143    pub fn new(
144        sink: Arc<dyn lash_trace::TraceSink>,
145        base_context: lash_trace::TraceContext,
146        identity: lash_trace::TraceLashlangExecutionIdentity,
147        parent_node_id: impl Into<String>,
148        occurrence: u64,
149    ) -> Self {
150        Self {
151            sink,
152            base_context,
153            identity,
154            parent_node_id: parent_node_id.into(),
155            occurrence,
156        }
157    }
158}
159
160#[derive(Clone)]
161pub(crate) struct ToolProcessEventContext {
162    process_id: String,
163    registry: Arc<dyn crate::ProcessRegistry>,
164    store: Option<Arc<dyn crate::RuntimePersistence>>,
165    session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
166    session_graph: Arc<dyn SessionGraphService>,
167    queued_work_poke: Option<crate::QueuedWorkPoke>,
168}
169
170pub(crate) struct ToolContextBuilder<'run> {
171    session_id: String,
172    agent_frame_id: crate::AgentFrameId,
173    sessions: Arc<dyn SessionStateService>,
174    session_lifecycle: Arc<dyn SessionLifecycleService>,
175    session_graph: Arc<dyn SessionGraphService>,
176    processes: Arc<dyn crate::ProcessService>,
177    process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
178    effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
179    runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
180    cancellation_token: Option<tokio_util::sync::CancellationToken>,
181    async_process_id: Option<String>,
182    runtime_process_id: Option<String>,
183    process_events: Option<ToolProcessEventContext>,
184    attachment_store: Arc<dyn AttachmentStore>,
185    direct_completions: crate::DirectCompletionClient<'run>,
186    prepared_payload: serde_json::Value,
187    tool_call_id: Option<String>,
188    completion: ToolCompletionState,
189    durable_effects: ToolDurableEffectState,
190    parent_invocation: Option<crate::RuntimeInvocation>,
191    execution_env_spec: crate::ProcessExecutionEnvSpec,
192    lashlang_execution_call_site: Option<ToolLashlangExecutionCallSite>,
193}
194
195impl<'run> ToolContextBuilder<'run> {
196    pub(crate) fn from_dispatch(
197        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
198    ) -> Self {
199        Self {
200            session_id: dispatch.session_id.clone(),
201            agent_frame_id: dispatch.agent_frame_id.clone(),
202            sessions: Arc::clone(&dispatch.sessions),
203            session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
204            session_graph: Arc::clone(&dispatch.session_graph),
205            processes: Arc::clone(&dispatch.processes),
206            process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
207            effect_controller: dispatch.effect_controller.clone(),
208            runtime_dispatch: Some(Arc::clone(&dispatch)),
209            cancellation_token: None,
210            async_process_id: None,
211            runtime_process_id: None,
212            process_events: None,
213            attachment_store: Arc::clone(&dispatch.attachment_store),
214            direct_completions: dispatch.direct_completions.clone(),
215            prepared_payload: serde_json::Value::Null,
216            tool_call_id: None,
217            completion: ToolCompletionState::default(),
218            durable_effects: ToolDurableEffectState::default(),
219            parent_invocation: dispatch.parent_invocation.clone(),
220            execution_env_spec: dispatch.execution_env_spec.clone(),
221            lashlang_execution_call_site: None,
222        }
223    }
224
225    #[cfg(any(test, feature = "testing"))]
226    pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
227        self.tool_call_id = tool_call_id.into();
228        self
229    }
230
231    pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
232        self.tool_call_id = Some(call.call_id.clone());
233        self.prepared_payload = call.prepared_payload.clone();
234        self
235    }
236
237    pub(crate) fn cancellation_token(
238        mut self,
239        cancellation_token: Option<tokio_util::sync::CancellationToken>,
240    ) -> Self {
241        self.cancellation_token = cancellation_token;
242        self
243    }
244
245    pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
246        self.runtime_process_id = process_id;
247        self
248    }
249
250    pub(crate) fn async_process(
251        mut self,
252        process_id: impl Into<String>,
253        cancellation_token: tokio_util::sync::CancellationToken,
254    ) -> Self {
255        self.async_process_id = Some(process_id.into());
256        self.cancellation_token = Some(cancellation_token);
257        self
258    }
259
260    pub(crate) fn process_events(
261        mut self,
262        process_id: impl Into<String>,
263        registry: Arc<dyn crate::ProcessRegistry>,
264        store: Option<Arc<dyn crate::RuntimePersistence>>,
265        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
266        queued_work_poke: Option<crate::QueuedWorkPoke>,
267    ) -> Self {
268        self.process_events = Some(ToolProcessEventContext {
269            process_id: process_id.into(),
270            registry,
271            store,
272            session_store_factory,
273            session_graph: Arc::clone(&self.session_graph),
274            queued_work_poke,
275        });
276        self
277    }
278
279    pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
280        self.parent_invocation = metadata;
281        self
282    }
283
284    pub(crate) fn lashlang_execution_call_site(
285        mut self,
286        call_site: Option<ToolLashlangExecutionCallSite>,
287    ) -> Self {
288        self.lashlang_execution_call_site = call_site;
289        self
290    }
291
292    pub(crate) fn build(self) -> ToolContext<'run> {
293        ToolContext {
294            session_id: self.session_id,
295            agent_frame_id: self.agent_frame_id,
296            sessions: self.sessions,
297            session_lifecycle: self.session_lifecycle,
298            processes: self.processes,
299            process_cancel_ability: self.process_cancel_ability,
300            effect_controller: self.effect_controller,
301            runtime_dispatch: self.runtime_dispatch,
302            cancellation_token: self.cancellation_token,
303            async_process_id: self.async_process_id,
304            runtime_process_id: self.runtime_process_id,
305            process_events: self.process_events,
306            attachment_store: self.attachment_store,
307            direct_completions: self.direct_completions,
308            prepared_payload: self.prepared_payload,
309            tool_call_id: self.tool_call_id,
310            attempt_number: 1,
311            max_attempts: 1,
312            replay_key: None,
313            completion: self.completion,
314            durable_effects: self.durable_effects,
315            parent_invocation: self.parent_invocation,
316            execution_env_spec: self.execution_env_spec,
317            lashlang_execution_call_site: self.lashlang_execution_call_site,
318        }
319    }
320}
321
322impl<'run> ToolContext<'run> {
323    #[cfg(any(test, feature = "testing"))]
324    #[expect(
325        clippy::too_many_arguments,
326        reason = "testing constructor mirrors the sealed runtime tool context dependencies"
327    )]
328    pub(crate) fn builder(
329        session_id: String,
330        sessions: Arc<dyn SessionStateService>,
331        session_lifecycle: Arc<dyn SessionLifecycleService>,
332        session_graph: Arc<dyn SessionGraphService>,
333        processes: Arc<dyn crate::ProcessService>,
334        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
335        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
336        attachment_store: Arc<dyn AttachmentStore>,
337        direct_completions: crate::DirectCompletionClient<'run>,
338    ) -> ToolContextBuilder<'run> {
339        ToolContextBuilder {
340            session_id,
341            agent_frame_id: String::new(),
342            sessions,
343            session_lifecycle,
344            session_graph,
345            processes,
346            process_cancel_ability,
347            effect_controller,
348            runtime_dispatch: None,
349            cancellation_token: None,
350            async_process_id: None,
351            runtime_process_id: None,
352            process_events: None,
353            attachment_store,
354            direct_completions,
355            prepared_payload: serde_json::Value::Null,
356            tool_call_id: None,
357            completion: ToolCompletionState::default(),
358            durable_effects: ToolDurableEffectState::default(),
359            parent_invocation: None,
360            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
361                crate::PluginOptions::default(),
362                crate::SessionPolicy::default(),
363            ),
364            lashlang_execution_call_site: None,
365        }
366    }
367
368    pub(crate) fn from_dispatch(
369        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
370    ) -> ToolContextBuilder<'run> {
371        ToolContextBuilder::from_dispatch(dispatch)
372    }
373
374    pub fn session_id(&self) -> &str {
375        &self.session_id
376    }
377
378    pub fn agent_frame_id(&self) -> &str {
379        &self.agent_frame_id
380    }
381
382    pub fn sessions(&self) -> ToolSessionAdmin<'run> {
383        ToolSessionAdmin {
384            session_id: self.session_id.clone(),
385            sessions: Arc::clone(&self.sessions),
386            session_lifecycle: Arc::clone(&self.session_lifecycle),
387            effect_controller: self.effect_controller.clone(),
388        }
389    }
390
391    pub fn dispatch(&self) -> ToolDispatchClient<'run> {
392        ToolDispatchClient {
393            context: self.clone(),
394        }
395    }
396
397    pub fn triggers(&self) -> ToolTriggerClient<'run> {
398        ToolTriggerClient {
399            context: self.clone(),
400        }
401    }
402
403    pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
404        ToolSessionProcessAdmin {
405            session_id: self.session_id.clone(),
406            agent_frame_id: self.agent_frame_id.clone(),
407            processes: Arc::clone(&self.processes),
408            process_cancel_ability: Arc::clone(&self.process_cancel_ability),
409            effect_controller: self.effect_controller.clone(),
410            parent_invocation: self.parent_invocation.clone(),
411            tool_call_id: self.tool_call_id.clone(),
412            execution_env_spec: self.execution_env_spec.clone(),
413        }
414    }
415
416    pub fn emit_lashlang_child_process_started(
417        &self,
418        process_id: impl Into<String>,
419        child_entry_name: Option<String>,
420    ) {
421        let Some(call_site) = &self.lashlang_execution_call_site else {
422            return;
423        };
424        let child = lash_trace::TraceLashlangChildExecution {
425            scope: call_site.identity.scope.clone(),
426            subject: lash_trace::TraceRuntimeSubject::Process {
427                process_id: process_id.into(),
428            },
429            module_ref: None,
430            entry_ref: None,
431            entry_name: child_entry_name,
432        };
433        let child_graph_key = child.graph_key();
434        let event = lash_trace::TraceLashlangExecutionEvent::ChildStarted {
435            event_key: format!(
436                "lashlang_execution:{}:child:{}:{}:{}",
437                call_site.identity.graph_key(),
438                call_site.parent_node_id,
439                call_site.occurrence,
440                child_graph_key
441            ),
442            identity: call_site.identity.clone(),
443            parent_node_id: call_site.parent_node_id.clone(),
444            occurrence: call_site.occurrence,
445            child,
446        };
447        let mut context = lash_trace::TraceContext::default()
448            .for_session(call_site.identity.scope.session_id.clone());
449        if let Some(turn_id) = &call_site.identity.scope.turn_id {
450            context = context.for_turn(turn_id.clone());
451        }
452        if let Some(turn_index) = call_site.identity.scope.turn_index {
453            context = context.for_turn_index(turn_index);
454        }
455        if let Some(protocol_iteration) = call_site.identity.scope.protocol_iteration {
456            context = context.for_protocol_iteration(protocol_iteration);
457        }
458        if let lash_trace::TraceRuntimeSubject::Effect { effect_id, .. } =
459            &call_site.identity.subject
460        {
461            context.effect_id = Some(effect_id.clone());
462        }
463        crate::trace::emit_trace(
464            &Some(Arc::clone(&call_site.sink)),
465            &call_site.base_context,
466            context,
467            lash_trace::TraceEvent::LashlangExecution { event },
468        );
469    }
470
471    pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
472        ToolDirectCompletionClient {
473            session_id: self.session_id.clone(),
474            tool_call_id: self.tool_call_id.clone(),
475            direct_completions: self.direct_completions.clone(),
476        }
477    }
478
479    pub fn attachments(&self) -> ToolAttachmentClient {
480        ToolAttachmentClient {
481            store: Arc::clone(&self.attachment_store),
482        }
483    }
484
485    pub fn process_events(&self) -> ToolProcessEventClient {
486        ToolProcessEventClient {
487            context: self.process_events.clone(),
488        }
489    }
490
491    /// Borrow this tool call's durable effect boundary.
492    ///
493    /// This is available only while executing a prepared tool call under a
494    /// controller that explicitly supports durable tool effects. The returned
495    /// facade records JSON steps and await-event waits in the caller's existing
496    /// effect log; it does not expose the underlying workflow engine context.
497    pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
498        let Some(tool_call_id) = self.tool_call_id.as_deref() else {
499            return Err(crate::RuntimeError::new(
500                "durable_effects_missing_call_id",
501                "durable effects require a prepared tool call id",
502            ));
503        };
504        if tool_call_id.trim().is_empty() {
505            return Err(crate::RuntimeError::new(
506                "durable_effects_missing_call_id",
507                "durable effects require a non-empty prepared tool call id",
508            ));
509        }
510        let scoped = self.effect_controller.scoped();
511        if !scoped.controller().supports_durable_effects() {
512            return Err(crate::RuntimeError::new(
513                "durable_effects_unavailable",
514                "this effect controller does not support durable tool effects",
515            ));
516        }
517        Ok(ToolDurableEffects { context: self })
518    }
519
520    pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
521        self.cancellation_token.as_ref()
522    }
523
524    pub fn async_process_id(&self) -> Option<&str> {
525        self.async_process_id.as_deref()
526    }
527
528    pub fn runtime_process_id(&self) -> Option<&str> {
529        self.async_process_id
530            .as_deref()
531            .or(self.runtime_process_id.as_deref())
532            .or_else(|| {
533                self.process_events
534                    .as_ref()
535                    .map(|context| context.process_id.as_str())
536            })
537    }
538
539    pub fn tool_call_id(&self) -> Option<&str> {
540        self.tool_call_id.as_deref()
541    }
542
543    pub fn prepared_payload(&self) -> &serde_json::Value {
544        &self.prepared_payload
545    }
546
547    pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
548    where
549        T: serde::de::DeserializeOwned,
550    {
551        serde_json::from_value(self.prepared_payload.clone())
552    }
553
554    pub fn attempt_number(&self) -> u32 {
555        self.attempt_number
556    }
557
558    pub fn max_attempts(&self) -> u32 {
559        self.max_attempts
560    }
561
562    pub fn replay_key(&self) -> Option<&str> {
563        self.replay_key.as_deref()
564    }
565
566    /// Obtain the durable completion key for this call, required before returning
567    /// [`ToolResult::Pending`](crate::ToolResult::Pending).
568    ///
569    /// A tool that defers its outcome (waiting on a webhook, human approval, or another
570    /// service) calls this, hands the returned [`AwaitEventKey`](crate::AwaitEventKey)
571    /// to whatever will complete the work out-of-band, and then returns
572    /// `ToolResult::Pending(..)`. The key names the durable wait the runtime parks the
573    /// call on; the external resolver delivers the result against it later.
574    ///
575    /// The key is stored on the context and consumed by the dispatcher when the tool
576    /// returns `Pending`. Returning `Pending` without first calling this fails the call
577    /// with `pending_tool_missing_completion_key`. Calls made outside a prepared tool
578    /// invocation (no tool call id) fail with `tool_completion_key_missing_call_id`.
579    pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
580        let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
581            crate::RuntimeError::new(
582                "tool_completion_key_missing_call_id",
583                "completion keys require a prepared tool call id",
584            )
585        })?;
586        let scoped = self.effect_controller.scoped();
587        let key = scoped
588            .controller()
589            .await_event_key(
590                scoped.execution_scope(),
591                crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
592            )
593            .await?;
594        self.completion.store(key)
595    }
596
597    pub(crate) fn take_completion_key(
598        &self,
599    ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
600        self.completion.take()
601    }
602
603    pub fn with_async_process(
604        mut self,
605        process_id: impl Into<String>,
606        cancellation_token: tokio_util::sync::CancellationToken,
607    ) -> Self {
608        self.async_process_id = Some(process_id.into());
609        self.runtime_process_id = self.async_process_id.clone();
610        self.cancellation_token = Some(cancellation_token);
611        self
612    }
613
614    #[cfg(any(test, feature = "testing"))]
615    #[doc(hidden)]
616    pub fn with_process_events_for_testing(
617        mut self,
618        process_id: impl Into<String>,
619        registry: Arc<dyn crate::ProcessRegistry>,
620    ) -> Self {
621        self.process_events = Some(ToolProcessEventContext {
622            process_id: process_id.into(),
623            registry,
624            store: None,
625            session_store_factory: None,
626            session_graph: Arc::new(crate::plugin::NoopSessionManager),
627            queued_work_poke: None,
628        });
629        self
630    }
631
632    pub(crate) fn with_retry_context(
633        mut self,
634        tool_name: &str,
635        attempt_number: u32,
636        max_attempts: u32,
637    ) -> Self {
638        self.attempt_number = attempt_number.max(1);
639        self.max_attempts = max_attempts.max(1);
640        self.replay_key = self
641            .tool_call_id
642            .as_ref()
643            .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
644        self
645    }
646
647    pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
648        self.prepared_payload = payload;
649        self
650    }
651
652    /// Constructor reserved for `lash_core::testing` helpers. Do not call directly;
653    /// use [`lash_core::testing::mock_tool_context`] instead.
654    #[cfg(any(test, feature = "testing"))]
655    #[doc(hidden)]
656    #[expect(
657        clippy::too_many_arguments,
658        reason = "test-only constructor mirrors the sealed runtime tool context"
659    )]
660    pub fn __for_testing(
661        session_id: String,
662        sessions: Arc<dyn SessionStateService>,
663        session_lifecycle: Arc<dyn SessionLifecycleService>,
664        session_graph: Arc<dyn SessionGraphService>,
665        processes: Arc<dyn crate::ProcessService>,
666        attachment_store: Arc<dyn AttachmentStore>,
667        direct_completions: crate::DirectCompletionClient<'static>,
668        tool_call_id: Option<String>,
669    ) -> ToolContext<'static> {
670        ToolContext::builder(
671            session_id,
672            sessions,
673            session_lifecycle,
674            session_graph,
675            processes,
676            Arc::new(crate::DefaultProcessCancelAbility),
677            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
678                crate::InlineRuntimeEffectController,
679            )),
680            attachment_store,
681            direct_completions,
682        )
683        .tool_call_id(tool_call_id)
684        .build()
685    }
686
687    /// Constructor reserved for tests that need a custom process-cancel host
688    /// ability. Do not call directly; prefer public testing helpers when they
689    /// cover the case.
690    #[cfg(any(test, feature = "testing"))]
691    #[doc(hidden)]
692    #[expect(
693        clippy::too_many_arguments,
694        reason = "test-only constructor mirrors the sealed runtime context"
695    )]
696    pub fn __for_testing_with_process_cancel_ability(
697        session_id: String,
698        sessions: Arc<dyn SessionStateService>,
699        session_lifecycle: Arc<dyn SessionLifecycleService>,
700        session_graph: Arc<dyn SessionGraphService>,
701        processes: Arc<dyn crate::ProcessService>,
702        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
703        attachment_store: Arc<dyn AttachmentStore>,
704        direct_completions: crate::DirectCompletionClient<'static>,
705        tool_call_id: Option<String>,
706    ) -> ToolContext<'static> {
707        ToolContext::builder(
708            session_id,
709            sessions,
710            session_lifecycle,
711            session_graph,
712            processes,
713            process_cancel_ability,
714            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
715                crate::InlineRuntimeEffectController,
716            )),
717            attachment_store,
718            direct_completions,
719        )
720        .tool_call_id(tool_call_id)
721        .build()
722    }
723}
724
725/// Durable effect operations available to advanced in-process tools.
726///
727/// The facade borrows the caller's existing runtime effect boundary. It records
728/// JSON-only local steps and awaits host-signed event keys without exposing
729/// Restate, Temporal, or any other workflow-native context.
730pub struct ToolDurableEffects<'ctx, 'run> {
731    context: &'ctx ToolContext<'run>,
732}
733
734impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
735    pub async fn run_json<F, Fut>(
736        &self,
737        step_id: impl Into<String>,
738        input: serde_json::Value,
739        run: F,
740    ) -> Result<serde_json::Value, crate::RuntimeError>
741    where
742        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
743        Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
744    {
745        let step_id = step_id.into();
746        if step_id.trim().is_empty() {
747            return Err(crate::RuntimeError::new(
748                "durable_effect_empty_step_id",
749                "durable effect step id must be non-empty",
750            ));
751        }
752        self.context.durable_effects.reserve_step(&step_id)?;
753        let invocation = self.step_invocation(
754            format!("durable-step:{step_id}"),
755            crate::RuntimeEffectKind::DurableStep,
756            format!("durable-step:{step_id}"),
757        )?;
758        let outcome = self
759            .context
760            .effect_controller
761            .controller()
762            .execute_effect(
763                crate::RuntimeEffectEnvelope::new(
764                    invocation,
765                    crate::RuntimeEffectCommand::DurableStep {
766                        step_id,
767                        input: input.clone(),
768                    },
769                ),
770                crate::RuntimeEffectLocalExecutor::durable_step(run),
771            )
772            .await
773            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
774        outcome
775            .into_durable_step()
776            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
777    }
778
779    pub async fn external_event_key(
780        &self,
781        key: impl Into<String>,
782    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
783        let key = key.into();
784        if key.trim().is_empty() {
785            return Err(crate::RuntimeError::new(
786                "durable_effect_empty_event_key",
787                "durable effect external event key must be non-empty",
788            ));
789        }
790        let scoped = self.context.effect_controller.scoped();
791        scoped
792            .controller()
793            .await_event_key(
794                scoped.execution_scope(),
795                crate::AwaitEventWaitIdentity::Custom { key },
796            )
797            .await
798    }
799
800    pub async fn await_event_json(
801        &self,
802        key: crate::AwaitEventKey,
803    ) -> Result<serde_json::Value, crate::RuntimeError> {
804        let invocation = self.step_invocation(
805            format!("await-event:{}", key.key_id),
806            crate::RuntimeEffectKind::AwaitEvent,
807            format!("await-event:{}", key.key_id),
808        )?;
809        let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
810        let outcome = self
811            .context
812            .effect_controller
813            .controller()
814            .execute_effect(
815                crate::RuntimeEffectEnvelope::new(
816                    invocation,
817                    crate::RuntimeEffectCommand::AwaitEvent { key },
818                ),
819                crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
820            )
821            .await
822            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
823        match outcome
824            .into_await_event()
825            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
826        {
827            crate::Resolution::Ok(value) => Ok(value),
828            crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
829            crate::Resolution::Timeout => Err(crate::RuntimeError::new(
830                "durable_effect_event_timeout",
831                "durable effect external event wait timed out",
832            )),
833            crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
834                "durable_effect_event_cancelled",
835                "durable effect external event wait was cancelled",
836            )),
837        }
838    }
839
840    pub async fn emit_process_event(
841        &self,
842        event_type: impl Into<String>,
843        payload: serde_json::Value,
844    ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
845        let Some(process) = self.context.process_events.as_ref() else {
846            return Err(crate::RuntimeError::new(
847                "durable_effect_process_event_unavailable",
848                "durable effect process events are unavailable outside a durable process",
849            ));
850        };
851        let event_type = event_type.into();
852        if event_type.trim().is_empty() {
853            return Err(crate::RuntimeError::new(
854                "durable_effect_empty_process_event_type",
855                "durable effect process event type must be non-empty",
856            ));
857        }
858        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
859            crate::RuntimeError::new(
860                "durable_effects_missing_call_id",
861                "durable effects require a prepared tool call id",
862            )
863        })?;
864        let sequence = self.context.durable_effects.next_process_event_sequence();
865        let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
866            format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
867        );
868        self.context
869            .process_events()
870            .emit_request(request)
871            .await
872            .map_err(|err| {
873                crate::RuntimeError::new(
874                    "durable_effect_process_event_append_failed",
875                    err.to_string(),
876                )
877            })
878            .and_then(|event| {
879                if event.process_id == process.process_id {
880                    Ok(event)
881                } else {
882                    Err(crate::RuntimeError::new(
883                        "durable_effect_process_event_process_mismatch",
884                        "process event append returned an event for a different process",
885                    ))
886                }
887            })
888    }
889
890    fn step_invocation(
891        &self,
892        effect_id_suffix: impl Into<String>,
893        kind: crate::RuntimeEffectKind,
894        replay_suffix: impl AsRef<str>,
895    ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
896        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
897            crate::RuntimeError::new(
898                "durable_effects_missing_call_id",
899                "durable effects require a prepared tool call id",
900            )
901        })?;
902        let effect_id_suffix = effect_id_suffix.into();
903        if let Some(parent) = self.context.parent_invocation.as_ref() {
904            return Ok(crate::runtime::causal::child_effect_invocation(
905                parent,
906                format!("{tool_call_id}:{effect_id_suffix}"),
907                kind,
908                replay_suffix,
909            ));
910        }
911        let scoped = self.context.effect_controller.scoped();
912        let replay_key = format!(
913            "{}:tool:{tool_call_id}:{}",
914            scoped.scope_id(),
915            replay_suffix.as_ref()
916        );
917        Ok(crate::RuntimeInvocation::effect(
918            crate::RuntimeScope::new(self.context.session_id.clone()),
919            format!("{tool_call_id}:{effect_id_suffix}"),
920            kind,
921            replay_key,
922        ))
923    }
924}
925
926/// Runtime-prepared executable tool call.
927///
928/// The raw model/provider identity remains visible, but any argument rewrites
929/// and provider-owned context projections are frozen before the call crosses a
930/// runtime effect or process boundary.
931#[derive(Clone, Debug, Serialize, Deserialize)]
932pub struct PreparedToolCall {
933    pub call_id: String,
934    pub tool_name: String,
935    pub args: serde_json::Value,
936    #[serde(default, skip_serializing_if = "Option::is_none")]
937    pub replay: Option<ProviderReplayMeta>,
938    #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
939    pub prepared_payload: serde_json::Value,
940}
941
942impl PreparedToolCall {
943    pub fn identity(call: crate::sansio::PendingToolCall) -> Self {
944        Self {
945            call_id: call.call_id,
946            tool_name: call.tool_name,
947            args: call.args,
948            replay: call.replay,
949            prepared_payload: serde_json::Value::Null,
950        }
951    }
952
953    pub fn from_parts(
954        call_id: impl Into<String>,
955        tool_name: impl Into<String>,
956        args: serde_json::Value,
957        replay: Option<ProviderReplayMeta>,
958        prepared_payload: serde_json::Value,
959    ) -> Self {
960        Self {
961            call_id: call_id.into(),
962            tool_name: tool_name.into(),
963            args,
964            replay,
965            prepared_payload,
966        }
967    }
968}
969
970#[derive(Clone)]
971pub struct ToolPrepareContext {
972    session_id: String,
973    sessions: Arc<dyn SessionStateService>,
974    turn_context: crate::TurnContext,
975    tool_call_id: Option<String>,
976}
977
978impl ToolPrepareContext {
979    pub(crate) fn new(
980        session_id: String,
981        sessions: Arc<dyn SessionStateService>,
982        turn_context: crate::TurnContext,
983        tool_call_id: Option<String>,
984    ) -> Self {
985        Self {
986            session_id,
987            sessions,
988            turn_context,
989            tool_call_id,
990        }
991    }
992
993    pub fn session_id(&self) -> &str {
994        &self.session_id
995    }
996
997    pub fn tool_call_id(&self) -> Option<&str> {
998        self.tool_call_id.as_deref()
999    }
1000
1001    pub fn turn_context(&self) -> &crate::TurnContext {
1002        &self.turn_context
1003    }
1004
1005    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1006    where
1007        T: 'static,
1008    {
1009        self.turn_context.plugin_input::<T>(plugin_id)
1010    }
1011
1012    pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1013        self.sessions.snapshot_session(&self.session_id).await
1014    }
1015
1016    pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1017        self.sessions.tool_catalog(&self.session_id).await
1018    }
1019
1020    pub async fn shared_tool_catalog(
1021        &self,
1022    ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1023        self.sessions.shared_tool_catalog(&self.session_id).await
1024    }
1025}
1026
1027/// Inputs handed to [`ToolProvider::prepare_tool_call`].
1028pub struct ToolPrepareCall<'a> {
1029    pub pending: crate::sansio::PendingToolCall,
1030    pub context: &'a ToolPrepareContext,
1031}
1032
1033/// Per-call inputs handed to [`ToolProvider::execute`].
1034///
1035/// Fields are `pub` because `ToolCall` is a transient borrow; consumers
1036/// typically destructure (`let ToolCall { name, args, .. } = call`). The
1037/// stable surface lives on [`ToolContext`] (sealed) and the runtime's
1038/// dispatcher, which constructs `ToolCall` values.
1039pub struct ToolCall<'a> {
1040    pub name: &'a str,
1041    pub args: &'a serde_json::Value,
1042    pub context: &'a ToolContext<'a>,
1043    pub progress: Option<&'a ProgressSender>,
1044}
1045
1046/// Trait for providing tools to the sandbox. Implement this per-project.
1047///
1048/// Implementations supply cheap [`ToolManifest`]s, lazily resolved
1049/// [`ToolContract`]s, and a single
1050/// [`execute`](Self::execute) method that handles every call. Tools that
1051/// need session state read it from `call.context`; tools that stream
1052/// progress send through `call.progress`.
1053#[async_trait::async_trait]
1054pub trait ToolProvider: Send + Sync + 'static {
1055    fn tool_manifests(&self) -> Vec<ToolManifest>;
1056    fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1057        self.tool_manifests()
1058            .into_iter()
1059            .find(|manifest| manifest.name == name)
1060    }
1061    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1062    async fn prepare_tool_call(
1063        &self,
1064        call: ToolPrepareCall<'_>,
1065    ) -> Result<PreparedToolCall, ToolResult> {
1066        Ok(PreparedToolCall::identity(call.pending))
1067    }
1068    async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use super::*;
1074    use crate::ProcessRegistry;
1075    use crate::RuntimeEffectController;
1076    use std::sync::atomic::{AtomicU64, Ordering};
1077
1078    struct NoDurableEffectController;
1079
1080    #[async_trait::async_trait]
1081    impl crate::RuntimeEffectController for NoDurableEffectController {
1082        async fn execute_effect(
1083            &self,
1084            _envelope: crate::RuntimeEffectEnvelope,
1085            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1086        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1087            Err(crate::RuntimeEffectControllerError::new(
1088                "unexpected_effect",
1089                "test controller should not execute effects",
1090            ))
1091        }
1092    }
1093
1094    fn test_context_with_controller(
1095        tool_call_id: Option<String>,
1096        controller: Arc<dyn crate::RuntimeEffectController>,
1097    ) -> ToolContext<'static> {
1098        ToolContext::builder(
1099            "session-1".to_string(),
1100            Arc::new(crate::testing::MockSessionManager::default()),
1101            Arc::new(crate::testing::MockSessionManager::default()),
1102            Arc::new(crate::testing::MockSessionManager::default()),
1103            Arc::new(crate::UnavailableProcessService),
1104            Arc::new(crate::DefaultProcessCancelAbility),
1105            crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1106            Arc::new(crate::InMemoryAttachmentStore::new()),
1107            crate::DirectCompletionClient::unavailable(
1108                "direct completions are unavailable in this test context",
1109            ),
1110        )
1111        .tool_call_id(tool_call_id)
1112        .build()
1113    }
1114
1115    #[test]
1116    fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1117        let cancellation = tokio_util::sync::CancellationToken::new();
1118        let prepared = PreparedToolCall::from_parts(
1119            "call-1",
1120            "demo_tool",
1121            serde_json::json!({ "input": true }),
1122            None,
1123            serde_json::json!({ "prepared": true }),
1124        );
1125
1126        let context = ToolContext::builder(
1127            "session-1".to_string(),
1128            Arc::new(crate::testing::MockSessionManager::default()),
1129            Arc::new(crate::testing::MockSessionManager::default()),
1130            Arc::new(crate::testing::MockSessionManager::default()),
1131            Arc::new(crate::UnavailableProcessService),
1132            Arc::new(crate::DefaultProcessCancelAbility),
1133            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1134                crate::InlineRuntimeEffectController,
1135            )),
1136            Arc::new(crate::InMemoryAttachmentStore::new()),
1137            crate::DirectCompletionClient::unavailable(
1138                "direct completions are unavailable in this test context",
1139            ),
1140        )
1141        .prepared_call(&prepared)
1142        .cancellation_token(Some(cancellation.clone()))
1143        .async_process("process-1", cancellation.clone())
1144        .build();
1145
1146        assert_eq!(context.session_id(), "session-1");
1147        assert_eq!(context.tool_call_id(), Some("call-1"));
1148        assert_eq!(
1149            context.prepared_payload(),
1150            &serde_json::json!({ "prepared": true })
1151        );
1152        assert_eq!(context.async_process_id(), Some("process-1"));
1153        assert!(context.cancellation_token().is_some());
1154    }
1155
1156    #[test]
1157    fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1158        let missing_call =
1159            test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1160        let err = match missing_call.durable_effects() {
1161            Ok(_) => panic!("missing prepared tool call id should fail"),
1162            Err(err) => err,
1163        };
1164        assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1165
1166        let unsupported = test_context_with_controller(
1167            Some("call-1".to_string()),
1168            Arc::new(NoDurableEffectController),
1169        );
1170        let err = match unsupported.durable_effects() {
1171            Ok(_) => panic!("unsupported controller should fail"),
1172            Err(err) => err,
1173        };
1174        assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1175    }
1176
1177    #[tokio::test]
1178    async fn durable_run_json_executes_and_maps_closure_errors() {
1179        let context = test_context_with_controller(
1180            Some("call-run-json".to_string()),
1181            Arc::new(crate::InlineRuntimeEffectController),
1182        );
1183        let durable = context.durable_effects().expect("durable effects");
1184        let value = durable
1185            .run_json(
1186                "create",
1187                serde_json::json!({ "x": 1 }),
1188                |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1189            )
1190            .await
1191            .expect("durable step");
1192        assert_eq!(value, serde_json::json!({ "seen": 1 }));
1193
1194        let err = durable
1195            .run_json("fail", serde_json::json!({}), |_| async {
1196                Err(crate::RuntimeError::new(
1197                    "durable_step_failed",
1198                    "step failed",
1199                ))
1200            })
1201            .await
1202            .expect_err("closure error");
1203        assert_eq!(err.code.as_str(), "durable_step_failed");
1204        assert_eq!(err.message, "step failed");
1205    }
1206
1207    #[tokio::test]
1208    async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1209        let context = test_context_with_controller(
1210            Some("call-step-ids".to_string()),
1211            Arc::new(crate::InlineRuntimeEffectController),
1212        );
1213        let durable = context.durable_effects().expect("durable effects");
1214        let runs = Arc::new(AtomicU64::new(0));
1215
1216        let err = durable
1217            .run_json("", serde_json::Value::Null, {
1218                let runs = Arc::clone(&runs);
1219                move |_| async move {
1220                    runs.fetch_add(1, Ordering::Relaxed);
1221                    Ok(serde_json::Value::Null)
1222                }
1223            })
1224            .await
1225            .expect_err("empty step id");
1226        assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1227        assert_eq!(runs.load(Ordering::Relaxed), 0);
1228
1229        durable
1230            .run_json("same", serde_json::Value::Null, {
1231                let runs = Arc::clone(&runs);
1232                move |_| async move {
1233                    runs.fetch_add(1, Ordering::Relaxed);
1234                    Ok(serde_json::Value::Null)
1235                }
1236            })
1237            .await
1238            .expect("first step");
1239        let err = durable
1240            .run_json("same", serde_json::Value::Null, {
1241                let runs = Arc::clone(&runs);
1242                move |_| async move {
1243                    runs.fetch_add(1, Ordering::Relaxed);
1244                    Ok(serde_json::Value::Null)
1245                }
1246            })
1247            .await
1248            .expect_err("duplicate step id");
1249        assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1250        assert_eq!(runs.load(Ordering::Relaxed), 1);
1251    }
1252
1253    #[tokio::test]
1254    async fn durable_external_event_key_is_custom_and_stable() {
1255        let context = test_context_with_controller(
1256            Some("call-event-key".to_string()),
1257            Arc::new(crate::InlineRuntimeEffectController),
1258        );
1259        let durable = context.durable_effects().expect("durable effects");
1260        let first = durable
1261            .external_event_key("tool-event-stable")
1262            .await
1263            .expect("first key");
1264        let second = durable
1265            .external_event_key("tool-event-stable")
1266            .await
1267            .expect("second key");
1268
1269        assert_eq!(first, second);
1270        assert_eq!(
1271            first.wait,
1272            crate::AwaitEventWaitIdentity::Custom {
1273                key: "tool-event-stable".to_string()
1274            }
1275        );
1276    }
1277
1278    #[tokio::test]
1279    async fn durable_await_event_json_maps_terminal_resolutions() {
1280        let controller = Arc::new(crate::InlineRuntimeEffectController);
1281        let context =
1282            test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1283        let durable = context.durable_effects().expect("durable effects");
1284
1285        let ok_key = durable
1286            .external_event_key("tool-event-ok")
1287            .await
1288            .expect("ok key");
1289        controller
1290            .resolve_await_event(
1291                &ok_key,
1292                crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1293            )
1294            .await
1295            .expect("resolve ok");
1296        let value = durable
1297            .await_event_json(ok_key)
1298            .await
1299            .expect("await ok value");
1300        assert_eq!(value, serde_json::json!({ "answer": 42 }));
1301
1302        let err_key = durable
1303            .external_event_key("tool-event-err")
1304            .await
1305            .expect("err key");
1306        controller
1307            .resolve_await_event(
1308                &err_key,
1309                crate::Resolution::Err(crate::ExternalCompletionError::new(
1310                    "external_bad",
1311                    "external failed",
1312                )),
1313            )
1314            .await
1315            .expect("resolve err");
1316        let err = durable
1317            .await_event_json(err_key)
1318            .await
1319            .expect_err("await err value");
1320        assert_eq!(err.code.as_str(), "external_bad");
1321
1322        let cancelled_key = durable
1323            .external_event_key("tool-event-cancelled")
1324            .await
1325            .expect("cancelled key");
1326        controller
1327            .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1328            .await
1329            .expect("resolve cancelled");
1330        let err = durable
1331            .await_event_json(cancelled_key)
1332            .await
1333            .expect_err("await cancelled value");
1334        assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1335
1336        let timeout_key = durable
1337            .external_event_key("tool-event-timeout")
1338            .await
1339            .expect("timeout key");
1340        controller
1341            .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1342            .await
1343            .expect("resolve timeout");
1344        let err = durable
1345            .await_event_json(timeout_key)
1346            .await
1347            .expect_err("await timeout value");
1348        assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1349    }
1350
1351    #[tokio::test]
1352    async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1353        let context = test_context_with_controller(
1354            Some("call-no-process".to_string()),
1355            Arc::new(crate::InlineRuntimeEffectController),
1356        );
1357        let err = context
1358            .durable_effects()
1359            .expect("durable effects")
1360            .emit_process_event("tool.event", serde_json::json!({}))
1361            .await
1362            .expect_err("outside process");
1363        assert_eq!(
1364            err.code.as_str(),
1365            "durable_effect_process_event_unavailable"
1366        );
1367
1368        let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1369        let process_id = "process:durable-tool-event";
1370        registry
1371            .register_process(
1372                crate::ProcessRegistration::new(
1373                    process_id,
1374                    crate::ProcessInput::External {
1375                        metadata: serde_json::json!({}),
1376                    },
1377                    crate::ProcessProvenance::host(),
1378                )
1379                .with_extra_event_types([crate::ProcessEventType {
1380                    name: "tool.event".to_string(),
1381                    payload_schema: crate::LashSchema::any(),
1382                    semantics: crate::ProcessEventSemanticsSpec::default(),
1383                }]),
1384            )
1385            .await
1386            .expect("register process");
1387        let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1388        let context = test_context_with_controller(
1389            Some("call-process-event".to_string()),
1390            Arc::new(crate::InlineRuntimeEffectController),
1391        )
1392        .with_process_events_for_testing(process_id, registry_dyn);
1393
1394        let event = context
1395            .durable_effects()
1396            .expect("durable effects")
1397            .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1398            .await
1399            .expect("process event");
1400        assert_eq!(event.process_id, process_id);
1401        assert_eq!(event.event_type, "tool.event");
1402        assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1403        assert_eq!(
1404            event.invocation.replay_key(),
1405            Some("tool:call-process-event:durable-process-event:0")
1406        );
1407
1408        let append_err = context
1409            .durable_effects()
1410            .expect("durable effects")
1411            .emit_process_event("undeclared.event", serde_json::json!({}))
1412            .await
1413            .expect_err("undeclared event type must fail the append");
1414        assert_eq!(
1415            append_err.code.as_str(),
1416            "durable_effect_process_event_append_failed"
1417        );
1418    }
1419}